tsdbMergeTree.c 9.4 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tsdb.h"

18 19 20
// SLDataIter =================================================
typedef struct SLDataIter {
  SRBTreeNode   node;
H
Hongze Cheng 已提交
21
  SSstBlk      *pSstBlk;
H
Hongze Cheng 已提交
22
  SDataFReader *pReader;
H
Hongze Cheng 已提交
23
  int32_t       iSst;
H
Hongze Cheng 已提交
24
  int8_t        backward;
H
Hongze Cheng 已提交
25 26
  SArray       *aSstBlk;
  int32_t       iSstBlk;
H
Hongze Cheng 已提交
27 28 29
  SBlockData    bData;
  int32_t       iRow;
  SRowInfo      rInfo;
30 31 32
  uint64_t      uid;
  STimeWindow   timeWindow;
  SVersionRange verRange;
H
Hongze Cheng 已提交
33 34
} SLDataIter;

H
Hongze Cheng 已提交
35
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iSst, int8_t backward, uint64_t uid,
36
                       STimeWindow *pTimeWindow, SVersionRange *pRange) {
H
Hongze Cheng 已提交
37
  int32_t code = 0;
38
  *pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
39 40 41 42
  if (*pIter == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }
H
Hongze Cheng 已提交
43

44 45 46 47
  (*pIter)->uid = uid;
  (*pIter)->timeWindow = *pTimeWindow;
  (*pIter)->verRange = *pRange;
  (*pIter)->pReader = pReader;
H
Hongze Cheng 已提交
48
  (*pIter)->iSst = iSst;
49
  (*pIter)->backward = backward;
H
Hongze Cheng 已提交
50 51
  (*pIter)->aSstBlk = taosArrayInit(0, sizeof(SSstBlk));
  if ((*pIter)->aSstBlk == NULL) {
H
Hongze Cheng 已提交
52 53 54 55
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _exit;
  }

56 57 58 59
  code = tBlockDataCreate(&(*pIter)->bData);
  if (code) {
    goto _exit;
  }
H
Hongze Cheng 已提交
60

H
Hongze Cheng 已提交
61
  code = tsdbReadSstBlk(pReader, iSst, (*pIter)->aSstBlk);
62 63 64
  if (code) {
    goto _exit;
  }
H
Hongze Cheng 已提交
65

H
Hongze Cheng 已提交
66
  size_t size = taosArrayGetSize((*pIter)->aSstBlk);
67 68 69

  // find the start block
  int32_t index = -1;
70 71
  if (!backward) {  // asc
    for (int32_t i = 0; i < size; ++i) {
H
Hongze Cheng 已提交
72
      SSstBlk *p = taosArrayGet((*pIter)->aSstBlk, i);
73 74 75 76 77 78 79
      if (p->minUid <= uid && p->maxUid >= uid) {
        index = i;
        break;
      }
    }
  } else {  // desc
    for (int32_t i = size - 1; i >= 0; --i) {
H
Hongze Cheng 已提交
80
      SSstBlk *p = taosArrayGet((*pIter)->aSstBlk, i);
81 82 83 84
      if (p->minUid <= uid && p->maxUid >= uid) {
        index = i;
        break;
      }
85 86 87
    }
  }

H
Hongze Cheng 已提交
88
  (*pIter)->iSstBlk = index;
89
  if (index != -1) {
H
Hongze Cheng 已提交
90
    (*pIter)->pSstBlk = taosArrayGet((*pIter)->aSstBlk, (*pIter)->iSstBlk);
H
Hongze Cheng 已提交
91 92 93 94 95 96 97 98
  }

_exit:
  return code;
}

void tLDataIterClose(SLDataIter *pIter) {
  tBlockDataDestroy(&pIter->bData, 1);
H
Hongze Cheng 已提交
99
  taosArrayDestroy(pIter->aSstBlk);
100
  taosMemoryFree(pIter);
H
Hongze Cheng 已提交
101 102
}

H
Hongze Cheng 已提交
103
extern int32_t tsdbReadSstBlockEx(SDataFReader *pReader, int32_t iSst, SSstBlk *pSstBlk, SBlockData *pBlockData);
H
Hongze Cheng 已提交
104 105

void tLDataIterNextBlock(SLDataIter *pIter) {
H
Hongze Cheng 已提交
106
  int32_t step = pIter->backward ? -1 : 1;
H
Hongze Cheng 已提交
107
  pIter->iSstBlk += step;
H
Hongze Cheng 已提交
108

109
  int32_t index = -1;
H
Hongze Cheng 已提交
110 111 112
  size_t  size = taosArrayGetSize(pIter->aSstBlk);
  for (int32_t i = pIter->iSstBlk; i < size && i >= 0; i += step) {
    SSstBlk *p = taosArrayGet(pIter->aSstBlk, i);
113 114 115 116 117
    if ((!pIter->backward) && p->minUid > pIter->uid) {
      break;
    }

    if (pIter->backward && p->maxUid < pIter->uid) {
118 119 120
      break;
    }

121 122
    if (p->minUid <= pIter->uid && p->maxUid >= pIter->uid) {
      index = i;
123 124 125 126 127
      break;
    }
  }

  if (index == -1) {
H
Hongze Cheng 已提交
128
    pIter->pSstBlk = NULL;
H
Hongze Cheng 已提交
129
  } else {
H
Hongze Cheng 已提交
130
    pIter->pSstBlk = (SSstBlk *)taosArrayGet(pIter->aSstBlk, pIter->iSstBlk);
131 132 133
  }
}

H
Hongze Cheng 已提交
134 135
static void findNextValidRow(SLDataIter *pIter) {
  int32_t step = pIter->backward ? -1 : 1;
136

H
Hongze Cheng 已提交
137
  bool    hasVal = false;
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
  int32_t i = pIter->iRow;
  for (; i < pIter->bData.nRow && i >= 0; i += step) {
    if (pIter->bData.aUid != NULL) {
      if (!pIter->backward) {
        if (pIter->bData.aUid[i] < pIter->uid) {
          continue;
        } else if (pIter->bData.aUid[i] > pIter->uid) {
          break;
        }
      } else {
        if (pIter->bData.aUid[i] > pIter->uid) {
          continue;
        } else if (pIter->bData.aUid[i] < pIter->uid) {
          break;
        }
      }
    }

    int64_t ts = pIter->bData.aTSKEY[i];
H
Hongze Cheng 已提交
157
    if (!pIter->backward) {               // asc
158 159 160 161 162 163 164 165 166 167 168
      if (ts > pIter->timeWindow.ekey) {  // no more data
        break;
      } else if (ts < pIter->timeWindow.skey) {
        continue;
      }
    } else {
      if (ts < pIter->timeWindow.skey) {
        break;
      } else if (ts > pIter->timeWindow.ekey) {
        continue;
      }
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
    }

    int64_t ver = pIter->bData.aVersion[i];
    if (ver < pIter->verRange.minVer) {
      continue;
    }

    // todo opt handle desc case
    if (ver > pIter->verRange.maxVer) {
      continue;
    }

    //  todo handle delete soon
#if 0
    TSDBKEY k = {.ts = ts, .version = ver};
        if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order)) {
          continue;
        }
#endif

    hasVal = true;
    break;
H
Hongze Cheng 已提交
191
  }
192

H
Hongze Cheng 已提交
193
  pIter->iRow = (hasVal) ? i : -1;
H
Hongze Cheng 已提交
194 195
}

196
bool tLDataIterNextRow(SLDataIter *pIter) {
H
Hongze Cheng 已提交
197
  int32_t code = 0;
H
Hongze Cheng 已提交
198
  int32_t step = pIter->backward ? -1 : 1;
199 200

  // no qualified last file block in current file, no need to fetch row
H
Hongze Cheng 已提交
201
  if (pIter->pSstBlk == NULL) {
202 203
    return false;
  }
H
Hongze Cheng 已提交
204

H
Hongze Cheng 已提交
205
  int32_t iBlockL = pIter->iSstBlk;
206

H
Hongze Cheng 已提交
207 208
  if (pIter->bData.nRow == 0 && pIter->pSstBlk != NULL) {  // current block not loaded yet
    code = tsdbReadSstBlockEx(pIter->pReader, pIter->iSst, pIter->pSstBlk, &pIter->bData);
209 210
    if (code != TSDB_CODE_SUCCESS) {
      goto _exit;
H
Hongze Cheng 已提交
211
    }
212

H
Hongze Cheng 已提交
213
    pIter->iRow = (pIter->backward) ? pIter->bData.nRow : -1;
214
  }
215

216 217
  pIter->iRow += step;

H
Hongze Cheng 已提交
218
  while (1) {
219 220 221 222
    findNextValidRow(pIter);

    if (pIter->iRow >= pIter->bData.nRow || pIter->iRow < 0) {
      tLDataIterNextBlock(pIter);
H
Hongze Cheng 已提交
223
      if (pIter->pSstBlk == NULL) {  // no more data
224 225 226 227
        goto _exit;
      }
    } else {
      break;
H
Hongze Cheng 已提交
228 229
    }

H
Hongze Cheng 已提交
230 231
    if (iBlockL != pIter->iSstBlk) {
      code = tsdbReadSstBlockEx(pIter->pReader, pIter->iSst, pIter->pSstBlk, &pIter->bData);
232 233 234
      if (code) {
        goto _exit;
      }
235
      pIter->iRow = pIter->backward ? (pIter->bData.nRow - 1) : 0;
H
Hongze Cheng 已提交
236 237 238 239 240 241 242 243
    }
  }

  pIter->rInfo.suid = pIter->bData.suid;
  pIter->rInfo.uid = pIter->bData.uid;
  pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);

_exit:
H
Hongze Cheng 已提交
244
  if (code != TSDB_CODE_SUCCESS) {
245
    terrno = code;
246
  }
247

H
Hongze Cheng 已提交
248
  return (code == TSDB_CODE_SUCCESS) && (pIter->pSstBlk != NULL);
H
Hongze Cheng 已提交
249 250
}

H
Hongze Cheng 已提交
251
SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; }
H
Hongze Cheng 已提交
252 253

// SMergeTree =================================================
H
Hongze Cheng 已提交
254
static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) {
255 256
  SLDataIter *pIter1 = (SLDataIter *)(p1 - sizeof(SRBTreeNode));
  SLDataIter *pIter2 = (SLDataIter *)(p2 - sizeof(SRBTreeNode));
H
Hongze Cheng 已提交
257

258 259
  TSDBKEY key1 = TSDBROW_KEY(&pIter1->rInfo.row);
  TSDBKEY key2 = TSDBROW_KEY(&pIter2->rInfo.row);
H
Hongze Cheng 已提交
260

261 262 263 264 265 266 267 268 269 270 271 272 273
  if (key1.ts < key2.ts) {
    return -1;
  } else if (key1.ts > key2.ts) {
    return 1;
  } else {
    if (key1.version < key2.version) {
      return -1;
    } else if (key1.version > key2.version) {
      return 1;
    } else {
      return 0;
    }
  }
H
Hongze Cheng 已提交
274 275
}

H
Hongze Cheng 已提交
276 277
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t uid,
                       STimeWindow *pTimeWindow, SVersionRange *pVerRange) {
H
Hongze Cheng 已提交
278
  pMTree->backward = backward;
279
  pMTree->pIter = NULL;
280
  pMTree->pIterList = taosArrayInit(4, POINTER_BYTES);
281 282 283
  if (pMTree->pIterList == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
284

H
Hongze Cheng 已提交
285
  tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
286
  int32_t code = TSDB_CODE_OUT_OF_MEMORY;
287

H
Hongze Cheng 已提交
288 289
  struct SLDataIter *pIterList[TSDB_DEFAULT_LAST_FILE] = {0};
  for (int32_t i = 0; i < pFReader->pSet->nSstF; ++i) {  // open all last file
290 291 292 293 294
    code = tLDataIterOpen(&pIterList[i], pFReader, i, pMTree->backward, uid, pTimeWindow, pVerRange);
    if (code != TSDB_CODE_SUCCESS) {
      goto _end;
    }

295 296
    bool hasVal = tLDataIterNextRow(pIterList[i]);
    if (hasVal) {
297
      taosArrayPush(pMTree->pIterList, &pIterList[i]);
298
      tMergeTreeAddIter(pMTree, pIterList[i]);
299 300
    } else {
      tLDataIterClose(pIterList[i]);
301 302
    }
  }
303 304 305

  return code;

H
Hongze Cheng 已提交
306
_end:
307 308
  tMergeTreeClose(pMTree);
  return code;
H
Hongze Cheng 已提交
309
}
H
Hongze Cheng 已提交
310

311 312
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); }

H
Hongze Cheng 已提交
313
bool tMergeTreeNext(SMergeTree *pMTree) {
314 315 316 317 318 319 320 321 322 323 324 325
  int32_t code = TSDB_CODE_SUCCESS;
  if (pMTree->pIter) {
    SLDataIter *pIter = pMTree->pIter;

    bool hasVal = tLDataIterNextRow(pIter);
    if (!hasVal) {
      pMTree->pIter = NULL;
    }

    // compare with min in RB Tree
    pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
    if (pMTree->pIter && pIter) {
H
Hongze Cheng 已提交
326
      int32_t c = pMTree->rbt.cmprFn(RBTREE_NODE_PAYLOAD(&pMTree->pIter->node), RBTREE_NODE_PAYLOAD(&pIter->node));
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345
      if (c > 0) {
        tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
        pMTree->pIter = NULL;
      } else {
        ASSERT(c);
      }
    }
  }

  if (pMTree->pIter == NULL) {
    pMTree->pIter = (SLDataIter *)tRBTreeMin(&pMTree->rbt);
    if (pMTree->pIter) {
      tRBTreeDrop(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
    }
  }

  return pMTree->pIter != NULL;
}

H
Hongze Cheng 已提交
346
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree) { return pMTree->pIter->rInfo.row; }
347

H
Hongze Cheng 已提交
348
void tMergeTreeClose(SMergeTree *pMTree) {
349
  size_t size = taosArrayGetSize(pMTree->pIterList);
H
Hongze Cheng 已提交
350 351
  for (int32_t i = 0; i < size; ++i) {
    SLDataIter *pIter = taosArrayGetP(pMTree->pIterList, i);
352 353
    tLDataIterClose(pIter);
  }
354

355 356
  pMTree->pIterList = taosArrayDestroy(pMTree->pIterList);
  pMTree->pIter = NULL;
357
}