tsdbMergeTree.c 10.5 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

18 19 20
// SLDataIter =================================================
typedef struct SLDataIter {
  SRBTreeNode   node;
H
Hongze Cheng 已提交
21
  SSstBlk      *pSstBlk;
H
Hongze Cheng 已提交
22
  SDataFReader *pReader;
H
Hongze Cheng 已提交
23
  int32_t       iSst;
H
Hongze Cheng 已提交
24
  int8_t        backward;
H
Hongze Cheng 已提交
25 26
  SArray       *aSstBlk;
  int32_t       iSstBlk;
27 28
  SBlockData    bData[2];
  int32_t       loadIndex;
H
Hongze Cheng 已提交
29 30
  int32_t       iRow;
  SRowInfo      rInfo;
31 32 33
  uint64_t      uid;
  STimeWindow   timeWindow;
  SVersionRange verRange;
H
Hongze Cheng 已提交
34 35
} SLDataIter;

36 37 38 39 40 41 42 43 44
static SBlockData* getCurrentBlock(SLDataIter* pIter) {
  return &pIter->bData[pIter->loadIndex];
}

static SBlockData* getNextBlock(SLDataIter* pIter) {
  pIter->loadIndex ^= 1;
  return getCurrentBlock(pIter);
}

45 46
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iSst, int8_t backward, uint64_t suid,
                       uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange) {
H
Hongze Cheng 已提交
47
  int32_t code = 0;
48
  *pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
49 50 51 52
  if (*pIter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }
H
Hongze Cheng 已提交
53

54
  (*pIter)->uid = uid;
H
Hongze Cheng 已提交
55
  (*pIter)->iSst = iSst;
56 57
  (*pIter)->pReader = pReader;
  (*pIter)->verRange = *pRange;
58
  (*pIter)->backward = backward;
59
  (*pIter)->timeWindow = *pTimeWindow;
H
Hongze Cheng 已提交
60 61
  (*pIter)->aSstBlk = taosArrayInit(0, sizeof(SSstBlk));
  if ((*pIter)->aSstBlk == NULL) {
H
Hongze Cheng 已提交
62 63 64 65
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

66 67 68 69 70 71
  code = tBlockDataCreate(&(*pIter)->bData[0]);
  if (code) {
    goto _exit;
  }

  code = tBlockDataCreate(&(*pIter)->bData[1]);
72 73 74
  if (code) {
    goto _exit;
  }
H
Hongze Cheng 已提交
75

H
Hongze Cheng 已提交
76
  code = tsdbReadSstBlk(pReader, iSst, (*pIter)->aSstBlk);
77 78 79
  if (code) {
    goto _exit;
  }
H
Hongze Cheng 已提交
80

H
Hongze Cheng 已提交
81
  size_t size = taosArrayGetSize((*pIter)->aSstBlk);
82 83 84

  // find the start block
  int32_t index = -1;
85 86
  if (!backward) {  // asc
    for (int32_t i = 0; i < size; ++i) {
H
Hongze Cheng 已提交
87
      SSstBlk *p = taosArrayGet((*pIter)->aSstBlk, i);
88 89 90 91
      if (p->suid != suid) {
        continue;
      }

92 93 94 95 96 97 98
      if (p->minUid <= uid && p->maxUid >= uid) {
        index = i;
        break;
      }
    }
  } else {  // desc
    for (int32_t i = size - 1; i >= 0; --i) {
H
Hongze Cheng 已提交
99
      SSstBlk *p = taosArrayGet((*pIter)->aSstBlk, i);
100 101 102 103
      if (p->suid != suid) {
        continue;
      }

104 105 106 107
      if (p->minUid <= uid && p->maxUid >= uid) {
        index = i;
        break;
      }
108 109 110
    }
  }

H
Hongze Cheng 已提交
111
  (*pIter)->iSstBlk = index;
112
  if (index != -1) {
H
Hongze Cheng 已提交
113
    (*pIter)->pSstBlk = taosArrayGet((*pIter)->aSstBlk, (*pIter)->iSstBlk);
H
Hongze Cheng 已提交
114 115 116 117 118 119 120
  }

_exit:
  return code;
}

void tLDataIterClose(SLDataIter *pIter) {
121 122
  tBlockDataDestroy(&pIter->bData[0], 1);
  tBlockDataDestroy(&pIter->bData[1], 1);
H
Hongze Cheng 已提交
123
  taosArrayDestroy(pIter->aSstBlk);
124
  taosMemoryFree(pIter);
H
Hongze Cheng 已提交
125 126
}

H
Hongze Cheng 已提交
127
extern int32_t tsdbReadSstBlockEx(SDataFReader *pReader, int32_t iSst, SSstBlk *pSstBlk, SBlockData *pBlockData);
H
Hongze Cheng 已提交
128 129

void tLDataIterNextBlock(SLDataIter *pIter) {
H
Hongze Cheng 已提交
130
  int32_t step = pIter->backward ? -1 : 1;
H
Hongze Cheng 已提交
131
  pIter->iSstBlk += step;
H
Hongze Cheng 已提交
132

133
  int32_t index = -1;
H
Hongze Cheng 已提交
134 135 136
  size_t  size = taosArrayGetSize(pIter->aSstBlk);
  for (int32_t i = pIter->iSstBlk; i < size && i >= 0; i += step) {
    SSstBlk *p = taosArrayGet(pIter->aSstBlk, i);
137 138 139 140 141
    if ((!pIter->backward) && p->minUid > pIter->uid) {
      break;
    }

    if (pIter->backward && p->maxUid < pIter->uid) {
142 143 144
      break;
    }

145
    // check uid firstly
146
    if (p->minUid <= pIter->uid && p->maxUid >= pIter->uid) {
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
      if ((!pIter->backward) && p->minKey > pIter->timeWindow.ekey) {
        break;
      }

      if (pIter->backward && p->maxKey < pIter->timeWindow.skey) {
        break;
      }

      // check time range secondly
      if (p->minKey <= pIter->timeWindow.ekey && p->maxKey >= pIter->timeWindow.skey) {
        if ((!pIter->backward) && p->minVer > pIter->verRange.maxVer) {
          break;
        }

        if (pIter->backward && p->maxVer < pIter->verRange.minVer) {
          break;
        }

        if (p->minVer <= pIter->verRange.maxVer && p->maxVer >= pIter->verRange.minVer) {
          index = i;
          break;
        }
      }
170 171 172 173
    }
  }

  if (index == -1) {
H
Hongze Cheng 已提交
174
    pIter->pSstBlk = NULL;
H
Hongze Cheng 已提交
175
  } else {
H
Hongze Cheng 已提交
176
    pIter->pSstBlk = (SSstBlk *)taosArrayGet(pIter->aSstBlk, pIter->iSstBlk);
177 178 179
  }
}

H
Hongze Cheng 已提交
180 181
static void findNextValidRow(SLDataIter *pIter) {
  int32_t step = pIter->backward ? -1 : 1;
182

H
Hongze Cheng 已提交
183
  bool    hasVal = false;
184
  int32_t i = pIter->iRow;
185 186 187 188
  SBlockData* pBlockData = getCurrentBlock(pIter);

  for (; i < pBlockData->nRow && i >= 0; i += step) {
    if (pBlockData->aUid != NULL) {
189
      if (!pIter->backward) {
190
        if (pBlockData->aUid[i] < pIter->uid) {
191
          continue;
192
        } else if (pBlockData->aUid[i] > pIter->uid) {
193 194 195
          break;
        }
      } else {
196
        if (pBlockData->aUid[i] > pIter->uid) {
197
          continue;
198
        } else if (pBlockData->aUid[i] < pIter->uid) {
199 200 201 202 203
          break;
        }
      }
    }

204
    int64_t ts = pBlockData->aTSKEY[i];
H
Hongze Cheng 已提交
205
    if (!pIter->backward) {               // asc
206 207 208 209 210 211 212 213 214 215 216
      if (ts > pIter->timeWindow.ekey) {  // no more data
        break;
      } else if (ts < pIter->timeWindow.skey) {
        continue;
      }
    } else {
      if (ts < pIter->timeWindow.skey) {
        break;
      } else if (ts > pIter->timeWindow.ekey) {
        continue;
      }
217 218
    }

219
    int64_t ver = pBlockData->aVersion[i];
220 221 222 223 224 225 226 227 228 229 230
    if (ver < pIter->verRange.minVer) {
      continue;
    }

    // todo opt handle desc case
    if (ver > pIter->verRange.maxVer) {
      continue;
    }

    hasVal = true;
    break;
H
Hongze Cheng 已提交
231
  }
232

H
Hongze Cheng 已提交
233
  pIter->iRow = (hasVal) ? i : -1;
H
Hongze Cheng 已提交
234 235
}

236
bool tLDataIterNextRow(SLDataIter *pIter) {
H
Hongze Cheng 已提交
237
  int32_t code = 0;
H
Hongze Cheng 已提交
238
  int32_t step = pIter->backward ? -1 : 1;
239 240

  // no qualified last file block in current file, no need to fetch row
H
Hongze Cheng 已提交
241
  if (pIter->pSstBlk == NULL) {
242 243
    return false;
  }
H
Hongze Cheng 已提交
244

H
Hongze Cheng 已提交
245
  int32_t iBlockL = pIter->iSstBlk;
246
  SBlockData* pBlockData = getCurrentBlock(pIter);
247

248 249 250
  if (pBlockData->nRow == 0 && pIter->pSstBlk != NULL) {  // current block not loaded yet
    pBlockData = getNextBlock(pIter);
    code = tsdbReadSstBlockEx(pIter->pReader, pIter->iSst, pIter->pSstBlk, pBlockData);
251 252
    if (code != TSDB_CODE_SUCCESS) {
      goto _exit;
H
Hongze Cheng 已提交
253
    }
254

255
    pIter->iRow = (pIter->backward) ? pBlockData->nRow : -1;
256
  }
257

258 259
  pIter->iRow += step;

H
Hongze Cheng 已提交
260
  while (1) {
261 262
    findNextValidRow(pIter);

263
    if (pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) {
264
      tLDataIterNextBlock(pIter);
H
Hongze Cheng 已提交
265
      if (pIter->pSstBlk == NULL) {  // no more data
266 267 268 269
        goto _exit;
      }
    } else {
      break;
H
Hongze Cheng 已提交
270 271
    }

H
Hongze Cheng 已提交
272
    if (iBlockL != pIter->iSstBlk) {
273 274
      pBlockData = getNextBlock(pIter);
      code = tsdbReadSstBlockEx(pIter->pReader, pIter->iSst, pIter->pSstBlk, pBlockData);
275 276 277
      if (code) {
        goto _exit;
      }
278
      pIter->iRow = pIter->backward ? (pBlockData->nRow - 1) : 0;
H
Hongze Cheng 已提交
279 280 281
    }
  }

282 283 284
  pIter->rInfo.suid = pBlockData->suid;
  pIter->rInfo.uid = pBlockData->uid;
  pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow);
H
Hongze Cheng 已提交
285 286

_exit:
H
Hongze Cheng 已提交
287
  if (code != TSDB_CODE_SUCCESS) {
288
    terrno = code;
289
  }
290

H
Hongze Cheng 已提交
291
  return (code == TSDB_CODE_SUCCESS) && (pIter->pSstBlk != NULL);
H
Hongze Cheng 已提交
292 293
}

H
Hongze Cheng 已提交
294
SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; }
H
Hongze Cheng 已提交
295 296

// SMergeTree =================================================
H
Hongze Cheng 已提交
297
static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) {
H
Hongze Cheng 已提交
298 299
  SLDataIter *pIter1 = (SLDataIter *)(((uint8_t *)p1) - sizeof(SRBTreeNode));
  SLDataIter *pIter2 = (SLDataIter *)(((uint8_t *)p2) - sizeof(SRBTreeNode));
H
Hongze Cheng 已提交
300

301 302
  TSDBKEY key1 = TSDBROW_KEY(&pIter1->rInfo.row);
  TSDBKEY key2 = TSDBROW_KEY(&pIter2->rInfo.row);
H
Hongze Cheng 已提交
303

304 305 306 307 308 309 310 311 312 313 314 315 316
  if (key1.ts < key2.ts) {
    return -1;
  } else if (key1.ts > key2.ts) {
    return 1;
  } else {
    if (key1.version < key2.version) {
      return -1;
    } else if (key1.version > key2.version) {
      return 1;
    } else {
      return 0;
    }
  }
H
Hongze Cheng 已提交
317 318
}

319
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
H
Hongze Cheng 已提交
320
                       STimeWindow *pTimeWindow, SVersionRange *pVerRange) {
H
Hongze Cheng 已提交
321
  pMTree->backward = backward;
322
  pMTree->pIter = NULL;
323
  pMTree->pIterList = taosArrayInit(4, POINTER_BYTES);
324 325 326
  if (pMTree->pIterList == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
327

H
Hongze Cheng 已提交
328
  tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
329
  int32_t code = TSDB_CODE_OUT_OF_MEMORY;
330

H
Hongze Cheng 已提交
331 332
  struct SLDataIter *pIterList[TSDB_DEFAULT_LAST_FILE] = {0};
  for (int32_t i = 0; i < pFReader->pSet->nSstF; ++i) {  // open all last file
333
    code = tLDataIterOpen(&pIterList[i], pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange);
334 335 336 337
    if (code != TSDB_CODE_SUCCESS) {
      goto _end;
    }

338 339
    bool hasVal = tLDataIterNextRow(pIterList[i]);
    if (hasVal) {
340
      taosArrayPush(pMTree->pIterList, &pIterList[i]);
341
      tMergeTreeAddIter(pMTree, pIterList[i]);
342 343
    } else {
      tLDataIterClose(pIterList[i]);
344 345
    }
  }
346 347 348

  return code;

H
Hongze Cheng 已提交
349
_end:
350 351
  tMergeTreeClose(pMTree);
  return code;
H
Hongze Cheng 已提交
352
}
H
Hongze Cheng 已提交
353

354 355
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); }

H
Hongze Cheng 已提交
356
bool tMergeTreeNext(SMergeTree *pMTree) {
357 358 359 360 361 362 363 364 365 366 367 368
  int32_t code = TSDB_CODE_SUCCESS;
  if (pMTree->pIter) {
    SLDataIter *pIter = pMTree->pIter;

    bool hasVal = tLDataIterNextRow(pIter);
    if (!hasVal) {
      pMTree->pIter = NULL;
    }

    // compare with min in RB Tree
    pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
    if (pMTree->pIter && pIter) {
H
Hongze Cheng 已提交
369
      int32_t c = pMTree->rbt.cmprFn(RBTREE_NODE_PAYLOAD(&pMTree->pIter->node), RBTREE_NODE_PAYLOAD(&pIter->node));
370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388
      if (c > 0) {
        tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
        pMTree->pIter = NULL;
      } else {
        ASSERT(c);
      }
    }
  }

  if (pMTree->pIter == NULL) {
    pMTree->pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
    if (pMTree->pIter) {
      tRBTreeDrop(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
    }
  }

  return pMTree->pIter != NULL;
}

H
Hongze Cheng 已提交
389
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree) { return pMTree->pIter->rInfo.row; }
390

H
Hongze Cheng 已提交
391
void tMergeTreeClose(SMergeTree *pMTree) {
392
  size_t size = taosArrayGetSize(pMTree->pIterList);
H
Hongze Cheng 已提交
393 394
  for (int32_t i = 0; i < size; ++i) {
    SLDataIter *pIter = taosArrayGetP(pMTree->pIterList, i);
395 396
    tLDataIterClose(pIter);
  }
397

398 399
  pMTree->pIterList = taosArrayDestroy(pMTree->pIterList);
  pMTree->pIter = NULL;
400
}