tsdbMergeTree.c 12.2 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

18
// SLDataIter =================================================
H
Hongze Cheng 已提交
19
struct SLDataIter {
20
  SRBTreeNode   node;
H
Hongze Cheng 已提交
21
  SSttBlk      *pSttBlk;
H
Hongze Cheng 已提交
22
  SDataFReader *pReader;
H
Hongze Cheng 已提交
23
  int32_t       iStt;
H
Hongze Cheng 已提交
24
  int8_t        backward;
H
Hongze Cheng 已提交
25
  int32_t       iSttBlk;
H
Hongze Cheng 已提交
26 27
  int32_t       iRow;
  SRowInfo      rInfo;
28 29 30
  uint64_t      uid;
  STimeWindow   timeWindow;
  SVersionRange verRange;
31 32

  SSttBlockLoadInfo* pBlockLoadInfo;
H
Hongze Cheng 已提交
33
};
H
Hongze Cheng 已提交
34

35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
SSttBlockLoadInfo* tCreateLastBlockLoadInfo() {
  SSttBlockLoadInfo* pLoadInfo = taosMemoryCalloc(TSDB_DEFAULT_STT_FILE, sizeof(SSttBlockLoadInfo));
  if (pLoadInfo == NULL) {
    terrno =  TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for(int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) {
    pLoadInfo[i].blockIndex[0] = -1;
    pLoadInfo[i].blockIndex[1] = -1;
    pLoadInfo[i].currentLoadBlockIndex = 1;

    int32_t code = tBlockDataCreate(&pLoadInfo[i].blockData[0]);
    if (code) {
      terrno = code;
    }

    code = tBlockDataCreate(&pLoadInfo[i].blockData[1]);
    if (code) {
      terrno = code;
    }

    pLoadInfo[i].aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
  }

  return pLoadInfo;
}

void resetLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo) {
  for(int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) {
    pLoadInfo[i].currentLoadBlockIndex = 1;
    pLoadInfo[i].blockIndex[0] = -1;
    pLoadInfo[i].blockIndex[1] = -1;
68

69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
    taosArrayClear(pLoadInfo[i].aSttBlk);
  }
}

void* destroyLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo) {
  for(int32_t i = 0; i < TSDB_DEFAULT_STT_FILE; ++i) {
    pLoadInfo[i].currentLoadBlockIndex = 1;
    pLoadInfo[i].blockIndex[0] = -1;
    pLoadInfo[i].blockIndex[1] = -1;

    tBlockDataDestroy(&pLoadInfo[i].blockData[0], true);
    tBlockDataDestroy(&pLoadInfo[i].blockData[1], true);

    taosArrayDestroy(pLoadInfo[i].aSttBlk);
  }

  taosMemoryFree(pLoadInfo);
  return NULL;
}

static SBlockData* loadBlockIfMissing(SLDataIter *pIter) {
  int32_t code = 0;

  SSttBlockLoadInfo* pInfo = pIter->pBlockLoadInfo;
93
  if (pInfo->blockIndex[0]  == pIter->iSttBlk) {
94 95 96
    return &pInfo->blockData[0];
  }

97
  if (pInfo->blockIndex[1] == pIter->iSttBlk) {
98 99 100 101 102 103
    return &pInfo->blockData[1];
  }

  pInfo->currentLoadBlockIndex ^= 1;
  if (pIter->pSttBlk != NULL) {  // current block not loaded yet
    code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, &pInfo->blockData[pInfo->currentLoadBlockIndex]);
104
    tsdbDebug("read last block, index:%d, last file index:%d", pIter->iSttBlk, pIter->iStt);
105 106 107 108
    if (code != TSDB_CODE_SUCCESS) {
      goto _exit;
    }

109
    pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
110 111 112 113 114 115 116 117 118 119 120
    pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1;
  }

  return &pInfo->blockData[pInfo->currentLoadBlockIndex];

  _exit:
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
  }

  return NULL;
121 122
}

H
Haojun Liao 已提交
123
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
124
                       uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo* pBlockLoadInfo) {
H
Hongze Cheng 已提交
125
  int32_t code = 0;
126
  *pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
127 128 129 130
  if (*pIter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }
H
Hongze Cheng 已提交
131

132
  (*pIter)->uid = uid;
133
  (*pIter)->pReader = pReader;
H
Hongze Cheng 已提交
134
  (*pIter)->iStt = iStt;
135
  (*pIter)->backward = backward;
H
Haojun Liao 已提交
136 137
  (*pIter)->verRange = *pRange;
  (*pIter)->timeWindow = *pTimeWindow;
H
Hongze Cheng 已提交
138

139
  (*pIter)->pBlockLoadInfo = pBlockLoadInfo;
140
  if (taosArrayGetSize(pBlockLoadInfo->aSttBlk) == 0) {
141 142 143 144
    code = tsdbReadSttBlk(pReader, iStt, pBlockLoadInfo->aSttBlk);
    if (code) {
      goto _exit;
    }
145
  }
H
Hongze Cheng 已提交
146

147
  size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
148 149 150

  // find the start block
  int32_t index = -1;
151 152
  if (!backward) {  // asc
    for (int32_t i = 0; i < size; ++i) {
153
      SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
154 155 156 157
      if (p->suid != suid) {
        continue;
      }

158 159 160 161 162 163 164
      if (p->minUid <= uid && p->maxUid >= uid) {
        index = i;
        break;
      }
    }
  } else {  // desc
    for (int32_t i = size - 1; i >= 0; --i) {
165
      SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
166 167 168 169
      if (p->suid != suid) {
        continue;
      }

170 171 172 173
      if (p->minUid <= uid && p->maxUid >= uid) {
        index = i;
        break;
      }
174 175 176
    }
  }

H
Hongze Cheng 已提交
177
  (*pIter)->iSttBlk = index;
178
  if (index != -1) {
179
    (*pIter)->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, (*pIter)->iSttBlk);
H
Hongze Cheng 已提交
180 181 182 183 184 185 186
  }

_exit:
  return code;
}

void tLDataIterClose(SLDataIter *pIter) {
187
  taosMemoryFree(pIter);
H
Hongze Cheng 已提交
188 189 190
}

void tLDataIterNextBlock(SLDataIter *pIter) {
H
Hongze Cheng 已提交
191
  int32_t step = pIter->backward ? -1 : 1;
H
Hongze Cheng 已提交
192
  pIter->iSttBlk += step;
H
Hongze Cheng 已提交
193

194
  int32_t index = -1;
195
  size_t  size = taosArrayGetSize(pIter->pBlockLoadInfo->aSttBlk);
H
Hongze Cheng 已提交
196
  for (int32_t i = pIter->iSttBlk; i < size && i >= 0; i += step) {
197
    SSttBlk *p = taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, i);
198 199 200 201 202
    if ((!pIter->backward) && p->minUid > pIter->uid) {
      break;
    }

    if (pIter->backward && p->maxUid < pIter->uid) {
203 204 205
      break;
    }

206
    // check uid firstly
207
    if (p->minUid <= pIter->uid && p->maxUid >= pIter->uid) {
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
      if ((!pIter->backward) && p->minKey > pIter->timeWindow.ekey) {
        break;
      }

      if (pIter->backward && p->maxKey < pIter->timeWindow.skey) {
        break;
      }

      // check time range secondly
      if (p->minKey <= pIter->timeWindow.ekey && p->maxKey >= pIter->timeWindow.skey) {
        if ((!pIter->backward) && p->minVer > pIter->verRange.maxVer) {
          break;
        }

        if (pIter->backward && p->maxVer < pIter->verRange.minVer) {
          break;
        }

        if (p->minVer <= pIter->verRange.maxVer && p->maxVer >= pIter->verRange.minVer) {
          index = i;
          break;
        }
      }
231 232 233 234
    }
  }

  if (index == -1) {
H
Hongze Cheng 已提交
235
    pIter->pSttBlk = NULL;
H
Hongze Cheng 已提交
236
  } else {
237
    pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
238 239 240
  }
}

H
Hongze Cheng 已提交
241 242
static void findNextValidRow(SLDataIter *pIter) {
  int32_t step = pIter->backward ? -1 : 1;
243

H
Hongze Cheng 已提交
244 245
  bool        hasVal = false;
  int32_t     i = pIter->iRow;
246 247

  SBlockData *pBlockData = loadBlockIfMissing(pIter);
248 249 250

  for (; i < pBlockData->nRow && i >= 0; i += step) {
    if (pBlockData->aUid != NULL) {
251
      if (!pIter->backward) {
252
        if (pBlockData->aUid[i] < pIter->uid) {
253
          continue;
254
        } else if (pBlockData->aUid[i] > pIter->uid) {
255 256 257
          break;
        }
      } else {
258
        if (pBlockData->aUid[i] > pIter->uid) {
259
          continue;
260
        } else if (pBlockData->aUid[i] < pIter->uid) {
261 262 263 264 265
          break;
        }
      }
    }

266
    int64_t ts = pBlockData->aTSKEY[i];
H
Hongze Cheng 已提交
267
    if (!pIter->backward) {               // asc
268 269 270 271 272 273 274 275 276 277 278
      if (ts > pIter->timeWindow.ekey) {  // no more data
        break;
      } else if (ts < pIter->timeWindow.skey) {
        continue;
      }
    } else {
      if (ts < pIter->timeWindow.skey) {
        break;
      } else if (ts > pIter->timeWindow.ekey) {
        continue;
      }
279 280
    }

281
    int64_t ver = pBlockData->aVersion[i];
282 283 284 285 286 287 288 289 290 291 292
    if (ver < pIter->verRange.minVer) {
      continue;
    }

    // todo opt handle desc case
    if (ver > pIter->verRange.maxVer) {
      continue;
    }

    hasVal = true;
    break;
H
Hongze Cheng 已提交
293
  }
294

H
Hongze Cheng 已提交
295
  pIter->iRow = (hasVal) ? i : -1;
H
Hongze Cheng 已提交
296 297
}

298
bool tLDataIterNextRow(SLDataIter *pIter) {
H
Hongze Cheng 已提交
299
  int32_t code = 0;
H
Hongze Cheng 已提交
300
  int32_t step = pIter->backward ? -1 : 1;
301 302

  // no qualified last file block in current file, no need to fetch row
H
Hongze Cheng 已提交
303
  if (pIter->pSttBlk == NULL) {
304 305
    return false;
  }
H
Hongze Cheng 已提交
306

307 308
  int32_t iBlockL = pIter->iSttBlk;
  SBlockData *pBlockData = loadBlockIfMissing(pIter);
309

310 311
  pIter->iRow += step;

H
Hongze Cheng 已提交
312
  while (1) {
313 314
    findNextValidRow(pIter);

315
    if (pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) {
316
      tLDataIterNextBlock(pIter);
H
Hongze Cheng 已提交
317
      if (pIter->pSttBlk == NULL) {  // no more data
318 319 320 321
        goto _exit;
      }
    } else {
      break;
H
Hongze Cheng 已提交
322 323
    }

H
Hongze Cheng 已提交
324
    if (iBlockL != pIter->iSttBlk) {
325
      pBlockData = loadBlockIfMissing(pIter);
H
Hongze Cheng 已提交
326 327 328
    }
  }

329 330 331
  pIter->rInfo.suid = pBlockData->suid;
  pIter->rInfo.uid = pBlockData->uid;
  pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow);
H
Hongze Cheng 已提交
332 333

_exit:
H
Hongze Cheng 已提交
334
  if (code != TSDB_CODE_SUCCESS) {
335
    terrno = code;
336
  }
337

H
Hongze Cheng 已提交
338
  return (code == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL);
H
Hongze Cheng 已提交
339 340
}

H
Hongze Cheng 已提交
341
SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; }
H
Hongze Cheng 已提交
342 343

// SMergeTree =================================================
H
Hongze Cheng 已提交
344
static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) {
H
Hongze Cheng 已提交
345 346
  SLDataIter *pIter1 = (SLDataIter *)(((uint8_t *)p1) - sizeof(SRBTreeNode));
  SLDataIter *pIter2 = (SLDataIter *)(((uint8_t *)p2) - sizeof(SRBTreeNode));
H
Hongze Cheng 已提交
347

348 349
  TSDBKEY key1 = TSDBROW_KEY(&pIter1->rInfo.row);
  TSDBKEY key2 = TSDBROW_KEY(&pIter2->rInfo.row);
H
Hongze Cheng 已提交
350

351 352 353 354 355 356 357 358 359 360 361 362 363
  if (key1.ts < key2.ts) {
    return -1;
  } else if (key1.ts > key2.ts) {
    return 1;
  } else {
    if (key1.version < key2.version) {
      return -1;
    } else if (key1.version > key2.version) {
      return 1;
    } else {
      return 0;
    }
  }
H
Hongze Cheng 已提交
364 365
}

366
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
367
                       STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pBlockLoadInfo) {
H
Hongze Cheng 已提交
368
  pMTree->backward = backward;
369
  pMTree->pIter = NULL;
370
  pMTree->pIterList = taosArrayInit(4, POINTER_BYTES);
371 372 373
  if (pMTree->pIterList == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
374

H
Hongze Cheng 已提交
375
  tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
376 377 378 379 380 381 382 383 384 385 386 387 388
  int32_t code = TSDB_CODE_SUCCESS;

  SSttBlockLoadInfo* pLoadInfo = NULL;
  if (pBlockLoadInfo == NULL) {
    if (pMTree->pLoadInfo == NULL) {
      pMTree->destroyLoadInfo = true;
      pMTree->pLoadInfo = tCreateLastBlockLoadInfo();
    }

    pLoadInfo = pMTree->pLoadInfo;
  } else {
    pLoadInfo = pBlockLoadInfo;
  }
389

H
Hongze Cheng 已提交
390
  for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) {  // open all last file
391 392
    struct SLDataIter* pIter = NULL;
    code = tLDataIterOpen(&pIter, pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange, &pLoadInfo[i]);
393 394 395 396
    if (code != TSDB_CODE_SUCCESS) {
      goto _end;
    }

397
    bool hasVal = tLDataIterNextRow(pIter);
398
    if (hasVal) {
399 400
      taosArrayPush(pMTree->pIterList, &pIter);
      tMergeTreeAddIter(pMTree, pIter);
401
    } else {
402
      tLDataIterClose(pIter);
403 404
    }
  }
405 406 407

  return code;

H
Hongze Cheng 已提交
408
_end:
409 410
  tMergeTreeClose(pMTree);
  return code;
H
Hongze Cheng 已提交
411
}
H
Hongze Cheng 已提交
412

413 414
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); }

H
Hongze Cheng 已提交
415
bool tMergeTreeNext(SMergeTree *pMTree) {
416 417 418 419 420 421 422 423 424 425 426 427
  int32_t code = TSDB_CODE_SUCCESS;
  if (pMTree->pIter) {
    SLDataIter *pIter = pMTree->pIter;

    bool hasVal = tLDataIterNextRow(pIter);
    if (!hasVal) {
      pMTree->pIter = NULL;
    }

    // compare with min in RB Tree
    pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
    if (pMTree->pIter && pIter) {
H
Hongze Cheng 已提交
428
      int32_t c = pMTree->rbt.cmprFn(RBTREE_NODE_PAYLOAD(&pMTree->pIter->node), RBTREE_NODE_PAYLOAD(&pIter->node));
429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447
      if (c > 0) {
        tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
        pMTree->pIter = NULL;
      } else {
        ASSERT(c);
      }
    }
  }

  if (pMTree->pIter == NULL) {
    pMTree->pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
    if (pMTree->pIter) {
      tRBTreeDrop(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
    }
  }

  return pMTree->pIter != NULL;
}

H
Hongze Cheng 已提交
448
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree) { return pMTree->pIter->rInfo.row; }
449

H
Hongze Cheng 已提交
450
void tMergeTreeClose(SMergeTree *pMTree) {
451
  size_t size = taosArrayGetSize(pMTree->pIterList);
H
Hongze Cheng 已提交
452 453
  for (int32_t i = 0; i < size; ++i) {
    SLDataIter *pIter = taosArrayGetP(pMTree->pIterList, i);
454 455
    tLDataIterClose(pIter);
  }
456

457 458
  pMTree->pIterList = taosArrayDestroy(pMTree->pIterList);
  pMTree->pIter = NULL;
459 460 461 462 463

  if (pMTree->destroyLoadInfo) {
    pMTree->pLoadInfo = destroyLastBlockLoadInfo(pMTree->pLoadInfo);
    pMTree->destroyLoadInfo = false;
  }
464
}