tsdbReadImpl.c 31.6 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18 19
#define TSDB_KEY_COL_OFFSET 0

H
refact  
Hongze Cheng 已提交
20 21
static void tsdbResetReadTable(SReadH *pReadh);
static void tsdbResetReadFile(SReadH *pReadh);
22
static int  tsdbLoadBlockOffset(SReadH *pReadh, SBlock *pBlock);
H
Hongze Cheng 已提交
23
static int  tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols);
24 25
static int  tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int32_t bitmapLen, int8_t comp,
                                         int numOfRows, int numOfBitmaps, int maxPoints, char *buffer, int bufferSize);
C
Cary Xu 已提交
26
static int  tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, const int16_t *colIds,
H
Hongze Cheng 已提交
27 28 29
                                      int numOfColIds);
static int  tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol);

H
Hongze Cheng 已提交
30
int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo) {
H
refact  
Hongze Cheng 已提交
31
  ASSERT(pReadh != NULL && pRepo != NULL);
H
Hongze Cheng 已提交
32

H
refact  
Hongze Cheng 已提交
33 34 35
  STsdbCfg *pCfg = REPO_CFG(pRepo);

  memset((void *)pReadh, 0, sizeof(*pReadh));
H
Hongze Cheng 已提交
36 37
  pReadh->pRepo = pRepo;

H
Hongze Cheng 已提交
38
  TSDB_FSET_SET_CLOSED(TSDB_READ_FSET(pReadh));
H
Hongze Cheng 已提交
39

H
refact  
Hongze Cheng 已提交
40 41
  pReadh->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx));
  if (pReadh->aBlkIdx == NULL) {
H
Hongze Cheng 已提交
42 43 44 45
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }

H
refact  
Hongze Cheng 已提交
46
  pReadh->pDCols[0] = tdNewDataCols(0, pCfg->maxRows);
H
Hongze Cheng 已提交
47 48 49 50 51 52
  if (pReadh->pDCols[0] == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    tsdbDestroyReadH(pReadh);
    return -1;
  }

H
refact  
Hongze Cheng 已提交
53
  pReadh->pDCols[1] = tdNewDataCols(0, pCfg->maxRows);
H
Hongze Cheng 已提交
54 55 56 57 58 59
  if (pReadh->pDCols[1] == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    tsdbDestroyReadH(pReadh);
    return -1;
  }

H
Hongze Cheng 已提交
60 61 62 63
  return 0;
}

void tsdbDestroyReadH(SReadH *pReadh) {
H
Hongze Cheng 已提交
64 65
  if (pReadh == NULL) return;

66
  pReadh->pExBuf = taosTZfree(pReadh->pExBuf);
H
Hongze Cheng 已提交
67 68 69 70
  pReadh->pCBuf = taosTZfree(pReadh->pCBuf);
  pReadh->pBuf = taosTZfree(pReadh->pBuf);
  pReadh->pDCols[0] = tdFreeDataCols(pReadh->pDCols[0]);
  pReadh->pDCols[1] = tdFreeDataCols(pReadh->pDCols[1]);
71
  pReadh->pAggrBlkData = taosTZfree(pReadh->pAggrBlkData);
H
Hongze Cheng 已提交
72
  pReadh->pBlkData = taosTZfree(pReadh->pBlkData);
H
refact  
Hongze Cheng 已提交
73
  pReadh->pBlkInfo = taosTZfree(pReadh->pBlkInfo);
H
Hongze Cheng 已提交
74 75 76 77 78 79
  pReadh->cidx = 0;
  pReadh->pBlkIdx = NULL;
  pReadh->pTable = NULL;
  pReadh->aBlkIdx = taosArrayDestroy(pReadh->aBlkIdx);
  tsdbCloseDFileSet(TSDB_READ_FSET(pReadh));
  pReadh->pRepo = NULL;
H
Hongze Cheng 已提交
80 81 82
}

int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet) {
H
refact  
Hongze Cheng 已提交
83 84
  ASSERT(pSet != NULL);
  tsdbResetReadFile(pReadh);
H
Hongze Cheng 已提交
85 86

  pReadh->rSet = *pSet;
H
Hongze Cheng 已提交
87
  TSDB_FSET_SET_CLOSED(TSDB_READ_FSET(pReadh));
88 89
  // if (tsdbOpenDFileSet(TSDB_READ_FSET(pReadh), O_RDONLY) < 0) {
  if (tsdbOpenDFileSet(TSDB_READ_FSET(pReadh), TD_FILE_READ) < 0) {
H
refact  
Hongze Cheng 已提交
90 91 92 93
    tsdbError("vgId:%d failed to open file set %d since %s", TSDB_READ_REPO_ID(pReadh), TSDB_FSET_FID(pSet),
              tstrerror(terrno));
    return -1;
  }
H
Hongze Cheng 已提交
94

H
Hongze Cheng 已提交
95 96 97
  return 0;
}

H
refact  
Hongze Cheng 已提交
98
void tsdbCloseAndUnsetFSet(SReadH *pReadh) { tsdbResetReadFile(pReadh); }
H
Hongze Cheng 已提交
99 100

int tsdbLoadBlockIdx(SReadH *pReadh) {
H
Hongze Cheng 已提交
101
  SDFile   *pHeadf = TSDB_READ_HEAD_FILE(pReadh);
H
Hongze Cheng 已提交
102 103
  SBlockIdx blkIdx;

H
Hongze Cheng 已提交
104 105 106 107 108 109
  ASSERT(taosArrayGetSize(pReadh->aBlkIdx) == 0);

  // No data at all, just return
  if (pHeadf->info.offset <= 0) return 0;

  if (tsdbSeekDFile(pHeadf, pHeadf->info.offset, SEEK_SET) < 0) {
H
refact  
Hongze Cheng 已提交
110
    tsdbError("vgId:%d failed to load SBlockIdx part while seek file %s since %s, offset:%u len :%u",
H
Hongze Cheng 已提交
111 112
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno), pHeadf->info.offset,
              pHeadf->info.len);
H
Hongze Cheng 已提交
113 114 115
    return -1;
  }

H
Hongze Cheng 已提交
116
  if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadh)), pHeadf->info.len) < 0) return -1;
H
Hongze Cheng 已提交
117 118

  int64_t nread = tsdbReadDFile(pHeadf, TSDB_READ_BUF(pReadh), pHeadf->info.len);
H
Hongze Cheng 已提交
119
  if (nread < 0) {
H
refact  
Hongze Cheng 已提交
120
    tsdbError("vgId:%d failed to load SBlockIdx part while read file %s since %s, offset:%u len :%u",
H
Hongze Cheng 已提交
121 122
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno), pHeadf->info.offset,
              pHeadf->info.len);
H
Hongze Cheng 已提交
123 124 125
    return -1;
  }

H
Hongze Cheng 已提交
126 127 128 129
  if (nread < pHeadf->info.len) {
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
    tsdbError("vgId:%d SBlockIdx part in file %s is corrupted, offset:%u expected bytes:%u read bytes: %" PRId64,
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pHeadf->info.offset, pHeadf->info.len, nread);
H
Hongze Cheng 已提交
130 131 132
    return -1;
  }

H
Hongze Cheng 已提交
133 134 135 136
  if (!taosCheckChecksumWhole((uint8_t *)TSDB_READ_BUF(pReadh), pHeadf->info.len)) {
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
    tsdbError("vgId:%d SBlockIdx part in file %s is corrupted since wrong checksum, offset:%u len :%u",
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pHeadf->info.offset, pHeadf->info.len);
H
Hongze Cheng 已提交
137 138 139 140
    return -1;
  }

  void *ptr = TSDB_READ_BUF(pReadh);
H
Hongze Cheng 已提交
141 142
  int   tsize = 0;
  while (POINTER_DISTANCE(ptr, TSDB_READ_BUF(pReadh)) < (pHeadf->info.len - sizeof(TSCKSUM))) {
H
Hongze Cheng 已提交
143
    ptr = tsdbDecodeSBlockIdx(ptr, &blkIdx);
H
Hongze Cheng 已提交
144
    ASSERT(ptr != NULL);
H
Hongze Cheng 已提交
145

H
Hongze Cheng 已提交
146
    if (taosArrayPush(pReadh->aBlkIdx, (void *)(&blkIdx)) == NULL) {
H
Hongze Cheng 已提交
147 148 149
      terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
      return -1;
    }
H
Hongze Cheng 已提交
150 151

    tsize++;
H
Hongze Cheng 已提交
152 153
    // ASSERT(tsize == 1 || ((SBlockIdx *)taosArrayGet(pReadh->aBlkIdx, tsize - 2))->tid <
    //                          ((SBlockIdx *)taosArrayGet(pReadh->aBlkIdx, tsize - 1))->tid);
H
Hongze Cheng 已提交
154 155 156 157 158 159 160 161
  }

  return 0;
}

int tsdbSetReadTable(SReadH *pReadh, STable *pTable) {
  STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);

H
refact  
Hongze Cheng 已提交
162 163
  pReadh->pTable = pTable;

H
Hongze Cheng 已提交
164 165 166 167 168 169 170 171 172 173 174 175 176 177
  if (tdInitDataCols(pReadh->pDCols[0], pSchema) < 0) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }

  if (tdInitDataCols(pReadh->pDCols[1], pSchema) < 0) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }

  size_t size = taosArrayGetSize(pReadh->aBlkIdx);
  if (size > 0) {
    while (true) {
      if (pReadh->cidx >= size) {
H
refact  
Hongze Cheng 已提交
178
        pReadh->pBlkIdx = NULL;
H
Hongze Cheng 已提交
179 180 181 182
        break;
      }

      SBlockIdx *pBlkIdx = taosArrayGet(pReadh->aBlkIdx, pReadh->cidx);
H
Hongze Cheng 已提交
183
      if (pBlkIdx->uid == TABLE_TID(pTable)) {
H
Hongze Cheng 已提交
184
        if (pBlkIdx->uid == TABLE_UID(pTable)) {
H
refact  
Hongze Cheng 已提交
185
          pReadh->pBlkIdx = pBlkIdx;
H
Hongze Cheng 已提交
186
        } else {
H
refact  
Hongze Cheng 已提交
187
          pReadh->pBlkIdx = NULL;
H
Hongze Cheng 已提交
188 189 190
        }
        pReadh->cidx++;
        break;
H
Hongze Cheng 已提交
191
      } else if (pBlkIdx->uid > TABLE_TID(pTable)) {
H
refact  
Hongze Cheng 已提交
192
        pReadh->pBlkIdx = NULL;
H
Hongze Cheng 已提交
193 194 195 196 197 198
        break;
      } else {
        pReadh->cidx++;
      }
    }
  } else {
H
refact  
Hongze Cheng 已提交
199
    pReadh->pBlkIdx = NULL;
H
Hongze Cheng 已提交
200 201 202 203 204 205
  }

  return 0;
}

int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) {
H
Hongze Cheng 已提交
206 207
  ASSERT(pReadh->pBlkIdx != NULL);

H
Hongze Cheng 已提交
208
  SDFile    *pHeadf = TSDB_READ_HEAD_FILE(pReadh);
H
Hongze Cheng 已提交
209 210 211
  SBlockIdx *pBlkIdx = pReadh->pBlkIdx;

  if (tsdbSeekDFile(pHeadf, pBlkIdx->offset, SEEK_SET) < 0) {
H
refact  
Hongze Cheng 已提交
212 213
    tsdbError("vgId:%d failed to load SBlockInfo part while seek file %s since %s, offset:%u len:%u",
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno), pBlkIdx->offset, pBlkIdx->len);
H
Hongze Cheng 已提交
214 215 216 217 218 219 220
    return -1;
  }

  if (tsdbMakeRoom((void **)(&(pReadh->pBlkInfo)), pBlkIdx->len) < 0) return -1;

  int64_t nread = tsdbReadDFile(pHeadf, (void *)(pReadh->pBlkInfo), pBlkIdx->len);
  if (nread < 0) {
H
refact  
Hongze Cheng 已提交
221
    tsdbError("vgId:%d failed to load SBlockInfo part while read file %s since %s, offset:%u len :%u",
H
Hongze Cheng 已提交
222 223 224 225 226 227
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno), pBlkIdx->offset, pBlkIdx->len);
    return -1;
  }

  if (nread < pBlkIdx->len) {
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
H
refact  
Hongze Cheng 已提交
228
    tsdbError("vgId:%d SBlockInfo part in file %s is corrupted, offset:%u expected bytes:%u read bytes:%" PRId64,
H
Hongze Cheng 已提交
229 230 231 232 233 234 235 236 237 238 239
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pBlkIdx->offset, pBlkIdx->len, nread);
    return -1;
  }

  if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pBlkInfo), pBlkIdx->len)) {
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
    tsdbError("vgId:%d SBlockInfo part in file %s is corrupted since wrong checksum, offset:%u len :%u",
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pBlkIdx->offset, pBlkIdx->len);
    return -1;
  }

H
Hongze Cheng 已提交
240
  // ASSERT(pBlkIdx->tid == pReadh->pBlkInfo->tid && pBlkIdx->uid == pReadh->pBlkInfo->uid);
H
Hongze Cheng 已提交
241

H
Hongze Cheng 已提交
242
  if (pTarget) {
H
refact  
Hongze Cheng 已提交
243
    memcpy(pTarget, (void *)(pReadh->pBlkInfo), pBlkIdx->len);
H
Hongze Cheng 已提交
244 245
  }

H
Hongze Cheng 已提交
246 247 248
  return 0;
}

C
Cary Xu 已提交
249 250 251 252 253 254
static FORCE_INLINE void tsdbSwapDataCols(SDataCols *pDest, SDataCols *pSrc) {
  SDataCol *pCols = pDest->cols;
  memcpy(pDest, pSrc, sizeof(SDataCols));
  pSrc->cols = pCols;
}

H
Hongze Cheng 已提交
255
int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
H
Hongze Cheng 已提交
256
  ASSERT(pBlock->numOfSubBlocks > 0);
H
Hongze Cheng 已提交
257 258
  STsdbCfg *pCfg = REPO_CFG(pReadh->pRepo);
  int8_t    update = pCfg->update;
H
Hongze Cheng 已提交
259

H
Hongze Cheng 已提交
260
  SBlock *iBlock = pBlock;
H
Hongze Cheng 已提交
261
  if (pBlock->numOfSubBlocks > 1) {
H
refact  
Hongze Cheng 已提交
262 263
    if (pBlkInfo) {
      iBlock = (SBlock *)POINTER_SHIFT(pBlkInfo, pBlock->offset);
H
Hongze Cheng 已提交
264
    } else {
H
refact  
Hongze Cheng 已提交
265
      iBlock = (SBlock *)POINTER_SHIFT(pReadh->pBlkInfo, pBlock->offset);
H
Hongze Cheng 已提交
266 267 268 269 270 271 272
    }
  }

  if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[0]) < 0) return -1;
  for (int i = 1; i < pBlock->numOfSubBlocks; i++) {
    iBlock++;
    if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[1]) < 0) return -1;
C
Cary Xu 已提交
273
    // TODO: use the real maxVersion to replace the UINT64_MAX to support Multi-Version
H
Hongze Cheng 已提交
274
    if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL,
C
Cary Xu 已提交
275
                        TD_SUPPORT_UPDATE(update), TD_VER_MAX) < 0)
H
Hongze Cheng 已提交
276
      return -1;
H
Hongze Cheng 已提交
277
  }
C
Cary Xu 已提交
278 279 280 281 282 283 284 285
  // if ((pBlock->numOfSubBlocks == 1) && (iBlock->hasDupKey)) { // TODO: use this line
  if (pBlock->numOfSubBlocks == 1) {
    tdResetDataCols(pReadh->pDCols[1]);
    pReadh->pDCols[1]->bitmapMode = pReadh->pDCols[0]->bitmapMode;
    if (tdMergeDataCols(pReadh->pDCols[1], pReadh->pDCols[0], pReadh->pDCols[0]->numOfRows, NULL,
                        TD_SUPPORT_UPDATE(update), TD_VER_MAX) < 0) {
      return -1;
    }
C
Cary Xu 已提交
286
    tsdbSwapDataCols(pReadh->pDCols[0], pReadh->pDCols[1]);
C
Cary Xu 已提交
287 288
    ASSERT(pReadh->pDCols[0]->bitmapMode != 0);
  }
H
Hongze Cheng 已提交
289

C
Cary Xu 已提交
290
  ASSERT(pReadh->pDCols[0]->numOfRows <= pBlock->numOfRows);
H
Hongze Cheng 已提交
291 292 293
  ASSERT(dataColsKeyFirst(pReadh->pDCols[0]) == pBlock->keyFirst);
  ASSERT(dataColsKeyLast(pReadh->pDCols[0]) == pBlock->keyLast);

H
Hongze Cheng 已提交
294 295 296
  return 0;
}

C
Cary Xu 已提交
297
// TODO: filter by Multi-Version
H
refact  
Hongze Cheng 已提交
298 299
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int numOfColsIds,
                          bool mergeBitmap) {
H
Hongze Cheng 已提交
300
  ASSERT(pBlock->numOfSubBlocks > 0);
H
Hongze Cheng 已提交
301
  int8_t update = pReadh->pRepo->pVnode->config.tsdbCfg.update;
H
Hongze Cheng 已提交
302

H
Hongze Cheng 已提交
303
  SBlock *iBlock = pBlock;
H
Hongze Cheng 已提交
304
  if (pBlock->numOfSubBlocks > 1) {
H
refact  
Hongze Cheng 已提交
305 306
    if (pBlkInfo) {
      iBlock = POINTER_SHIFT(pBlkInfo, pBlock->offset);
H
Hongze Cheng 已提交
307 308 309 310 311 312 313 314 315
    } else {
      iBlock = POINTER_SHIFT(pReadh->pBlkInfo, pBlock->offset);
    }
  }

  if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[0], colIds, numOfColsIds) < 0) return -1;
  for (int i = 1; i < pBlock->numOfSubBlocks; i++) {
    iBlock++;
    if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[1], colIds, numOfColsIds) < 0) return -1;
C
Cary Xu 已提交
316
    // TODO: use the real maxVersion to replace the UINT64_MAX to support Multi-Version
H
Hongze Cheng 已提交
317
    if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL,
C
Cary Xu 已提交
318 319 320 321 322 323 324 325 326
                        TD_SUPPORT_UPDATE(update), TD_VER_MAX) < 0)
      return -1;
  }
  // if ((pBlock->numOfSubBlocks == 1) && (iBlock->hasDupKey)) { // TODO: use this line
  if (pBlock->numOfSubBlocks == 1) {
    tdResetDataCols(pReadh->pDCols[1]);
    pReadh->pDCols[1]->bitmapMode = pReadh->pDCols[0]->bitmapMode;
    if (tdMergeDataCols(pReadh->pDCols[1], pReadh->pDCols[0], pReadh->pDCols[0]->numOfRows, NULL,
                        TD_SUPPORT_UPDATE(update), TD_VER_MAX) < 0) {
H
Hongze Cheng 已提交
327
      return -1;
C
Cary Xu 已提交
328
    }
C
Cary Xu 已提交
329
    tsdbSwapDataCols(pReadh->pDCols[0], pReadh->pDCols[1]);
C
Cary Xu 已提交
330
    ASSERT(pReadh->pDCols[0]->bitmapMode != 0);
H
Hongze Cheng 已提交
331 332
  }

C
Cary Xu 已提交
333

C
Cary Xu 已提交
334 335 336 337 338
  if (mergeBitmap && !tdDataColsIsBitmapI(pReadh->pDCols[0])) {
    for (int i = 0; i < numOfColsIds; ++i) {
      SDataCol *pDataCol = pReadh->pDCols[0]->cols + i;
      if (pDataCol->bitmap) {
        ASSERT(pDataCol->colId != PRIMARYKEY_TIMESTAMP_COL_ID);
C
Cary Xu 已提交
339
        tdMergeBitmap(pDataCol->pBitmap, pReadh->pDCols[0]->numOfRows, pDataCol->pBitmap);
C
Cary Xu 已提交
340
        tdDataColsSetBitmapI(pReadh->pDCols[0]);
C
Cary Xu 已提交
341 342 343 344
      }
    }
  }

C
Cary Xu 已提交
345
  ASSERT(pReadh->pDCols[0]->numOfRows <= pBlock->numOfRows);
H
Hongze Cheng 已提交
346 347 348
  ASSERT(dataColsKeyFirst(pReadh->pDCols[0]) == pBlock->keyFirst);
  ASSERT(dataColsKeyLast(pReadh->pDCols[0]) == pBlock->keyLast);

H
Hongze Cheng 已提交
349 350 351 352
  return 0;
}

int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) {
H
Hongze Cheng 已提交
353 354
  ASSERT(pBlock->numOfSubBlocks <= 1);

355
  if (!pBlock->aggrStat) {
C
Cary Xu 已提交
356 357
    tsdbDebug("vgId:%d no need to load block statis part for uid %" PRIu64 " since not exist", REPO_ID(pReadh->pRepo),
              TSDB_READ_TABLE_UID(pReadh));
358 359 360 361
    return TSDB_STATIS_NONE;
  }

  SDFile *pDFileAggr = pBlock->last ? TSDB_READ_SMAL_FILE(pReadh) : TSDB_READ_SMAD_FILE(pReadh);
H
Hongze Cheng 已提交
362

363
  if (tsdbSeekDFile(pDFileAggr, pBlock->aggrOffset, SEEK_SET) < 0) {
C
Cary Xu 已提交
364 365 366 367
    tsdbError("vgId:%d failed to load block statis part for uid %" PRIu64 " while seek file %s to offset %" PRIu64
              " since %s",
              TSDB_READ_REPO_ID(pReadh), TSDB_READ_TABLE_UID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr),
              (uint64_t)pBlock->aggrOffset, tstrerror(terrno));
368 369 370
    return -1;
  }

C
Cary Xu 已提交
371
  size_t sizeAggr = tsdbBlockAggrSize(pBlock->numOfBSma, (uint32_t)pBlock->blkVer);
372 373 374 375
  if (tsdbMakeRoom((void **)(&(pReadh->pAggrBlkData)), sizeAggr) < 0) return -1;

  int64_t nreadAggr = tsdbReadDFile(pDFileAggr, (void *)(pReadh->pAggrBlkData), sizeAggr);
  if (nreadAggr < 0) {
C
Cary Xu 已提交
376 377 378 379
    tsdbError("vgId:%d failed to load block statis part for uid %" PRIu64
              " while read file %s since %s, offset:%" PRIu64 " len :%" PRIzu,
              TSDB_READ_REPO_ID(pReadh), TSDB_READ_TABLE_UID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr),
              tstrerror(terrno), (uint64_t)pBlock->aggrOffset, sizeAggr);
380 381 382 383 384
    return -1;
  }

  if (nreadAggr < sizeAggr) {
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
C
Cary Xu 已提交
385 386 387 388
    tsdbError("vgId:%d block statis part for uid %" PRIu64 " in file %s is corrupted, offset:%" PRIu64
              " expected bytes:%" PRIzu " read bytes: %" PRId64,
              TSDB_READ_REPO_ID(pReadh), TSDB_READ_TABLE_UID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr),
              (uint64_t)pBlock->aggrOffset, sizeAggr, nreadAggr);
389 390 391 392 393
    return -1;
  }

  if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pAggrBlkData), (uint32_t)sizeAggr)) {
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
C
Cary Xu 已提交
394 395 396 397
    tsdbError("vgId:%d block statis part for uid %" PRIu64
              "in file %s is corrupted since wrong checksum, offset:%" PRIu64 " len :%" PRIzu,
              TSDB_READ_REPO_ID(pReadh), TSDB_READ_TABLE_UID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr),
              (uint64_t)pBlock->aggrOffset, sizeAggr);
398 399 400 401 402 403 404 405
    return -1;
  }
  return 0;
}

static int tsdbLoadBlockOffset(SReadH *pReadh, SBlock *pBlock) {
  ASSERT(pBlock->numOfSubBlocks <= 1);
  SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
H
refact  
Hongze Cheng 已提交
406
  if (tsdbSeekDFile(pDFile, pBlock->offset, SEEK_SET) < 0) {
C
Cary Xu 已提交
407
    tsdbError("vgId:%d failed to load block head part while seek file %s to offset %" PRId64 " since %s",
H
Hongze Cheng 已提交
408
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, tstrerror(terrno));
H
Hongze Cheng 已提交
409 410 411
    return -1;
  }

412
  size_t size = tsdbBlockStatisSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer);
H
refact  
Hongze Cheng 已提交
413
  if (tsdbMakeRoom((void **)(&(pReadh->pBlkData)), size) < 0) return -1;
H
Hongze Cheng 已提交
414

H
refact  
Hongze Cheng 已提交
415
  int64_t nread = tsdbReadDFile(pDFile, (void *)(pReadh->pBlkData), size);
H
Hongze Cheng 已提交
416
  if (nread < 0) {
C
Cary Xu 已提交
417
    tsdbError("vgId:%d failed to load block head part while read file %s since %s, offset:%" PRId64 " len :%" PRIzu,
H
Hongze Cheng 已提交
418
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), (int64_t)pBlock->offset, size);
H
Hongze Cheng 已提交
419 420 421 422 423
    return -1;
  }

  if (nread < size) {
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
C
Cary Xu 已提交
424
    tsdbError("vgId:%d block head part in file %s is corrupted, offset:%" PRId64 " expected bytes:%" PRIzu
H
Hongze Cheng 已提交
425
              " read bytes: %" PRId64,
H
Hongze Cheng 已提交
426
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size, nread);
H
Hongze Cheng 已提交
427 428 429
    return -1;
  }

S
TD-1207  
Shengliang Guan 已提交
430
  if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pBlkData), (uint32_t)size)) {
H
Hongze Cheng 已提交
431
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
C
Cary Xu 已提交
432
    tsdbError("vgId:%d block head part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%" PRIzu,
H
Hongze Cheng 已提交
433
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size);
H
Hongze Cheng 已提交
434 435
    return -1;
  }
H
Hongze Cheng 已提交
436 437 438 439 440 441
  return 0;
}

int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx) {
  int tlen = 0;

H
Hongze Cheng 已提交
442
  // tlen += taosEncodeVariantI32(buf, pIdx->tid);
H
Hongze Cheng 已提交
443 444 445 446 447 448 449 450 451 452 453 454 455 456 457
  tlen += taosEncodeVariantU32(buf, pIdx->len);
  tlen += taosEncodeVariantU32(buf, pIdx->offset);
  tlen += taosEncodeFixedU8(buf, pIdx->hasLast);
  tlen += taosEncodeVariantU32(buf, pIdx->numOfBlocks);
  tlen += taosEncodeFixedU64(buf, pIdx->uid);
  tlen += taosEncodeFixedU64(buf, pIdx->maxKey);

  return tlen;
}

void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) {
  uint8_t  hasLast = 0;
  uint32_t numOfBlocks = 0;
  uint64_t value = 0;

H
Hongze Cheng 已提交
458
  // if ((buf = taosDecodeVariantI32(buf, &(pIdx->tid))) == NULL) return NULL;
H
Hongze Cheng 已提交
459 460 461 462 463 464 465 466 467 468 469 470 471
  if ((buf = taosDecodeVariantU32(buf, &(pIdx->len))) == NULL) return NULL;
  if ((buf = taosDecodeVariantU32(buf, &(pIdx->offset))) == NULL) return NULL;
  if ((buf = taosDecodeFixedU8(buf, &(hasLast))) == NULL) return NULL;
  pIdx->hasLast = hasLast;
  if ((buf = taosDecodeVariantU32(buf, &(numOfBlocks))) == NULL) return NULL;
  pIdx->numOfBlocks = numOfBlocks;
  if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL;
  pIdx->uid = (int64_t)value;
  if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL;
  pIdx->maxKey = (TSKEY)value;

  return buf;
}
H
Hongze Cheng 已提交
472

473
void tsdbGetBlockStatis(SReadH *pReadh, SColumnDataAgg *pStatis, int numOfCols, SBlock *pBlock) {
474
#ifdef TD_REFACTOR_3
H
Hongze Cheng 已提交
475 476 477 478 479
  SBlockData *pBlockData = pReadh->pBlkData;

  for (int i = 0, j = 0; i < numOfCols;) {
    if (j >= pBlockData->numOfCols) {
      pStatis[i].numOfNull = -1;
480
      ++i;
H
Hongze Cheng 已提交
481 482 483 484 485 486 487 488 489 490
      continue;
    }

    if (pStatis[i].colId == pBlockData->cols[j].colId) {
      pStatis[i].sum = pBlockData->cols[j].sum;
      pStatis[i].max = pBlockData->cols[j].max;
      pStatis[i].min = pBlockData->cols[j].min;
      pStatis[i].maxIndex = pBlockData->cols[j].maxIndex;
      pStatis[i].minIndex = pBlockData->cols[j].minIndex;
      pStatis[i].numOfNull = pBlockData->cols[j].numOfNull;
491 492
      ++i;
      ++j;
H
Hongze Cheng 已提交
493 494
    } else if (pStatis[i].colId < pBlockData->cols[j].colId) {
      pStatis[i].numOfNull = -1;
495
      ++i;
H
Hongze Cheng 已提交
496
    } else {
497
      ++j;
H
Hongze Cheng 已提交
498 499
    }
  }
500 501 502 503 504
#else
  if (pBlock->aggrStat) {
    SAggrBlkData *pAggrBlkData = pReadh->pAggrBlkData;

    for (int i = 0, j = 0; i < numOfCols;) {
C
Cary Xu 已提交
505
      if (j >= pBlock->numOfBSma) {
506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529
        pStatis[i].numOfNull = -1;
        ++i;
        continue;
      }
      SAggrBlkCol *pAggrBlkCol = ((SAggrBlkCol *)(pAggrBlkData)) + j;
      if (pStatis[i].colId == pAggrBlkCol->colId) {
        pStatis[i].sum = pAggrBlkCol->sum;
        pStatis[i].max = pAggrBlkCol->max;
        pStatis[i].min = pAggrBlkCol->min;
        pStatis[i].maxIndex = pAggrBlkCol->maxIndex;
        pStatis[i].minIndex = pAggrBlkCol->minIndex;
        pStatis[i].numOfNull = pAggrBlkCol->numOfNull;
        ++i;
        ++j;
      } else if (pStatis[i].colId < pAggrBlkCol->colId) {
        pStatis[i].numOfNull = -1;
        ++i;
      } else {
        ++j;
      }
    }
  }

#endif
H
Hongze Cheng 已提交
530 531
}

H
refact  
Hongze Cheng 已提交
532
static void tsdbResetReadTable(SReadH *pReadh) {
H
Hongze Cheng 已提交
533 534 535 536 537
  tdResetDataCols(pReadh->pDCols[0]);
  tdResetDataCols(pReadh->pDCols[1]);
  pReadh->cidx = 0;
  pReadh->pBlkIdx = NULL;
  pReadh->pTable = NULL;
H
refact  
Hongze Cheng 已提交
538 539 540 541
}

static void tsdbResetReadFile(SReadH *pReadh) {
  tsdbResetReadTable(pReadh);
H
Hongze Cheng 已提交
542 543 544 545
  taosArrayClear(pReadh->aBlkIdx);
  tsdbCloseDFileSet(TSDB_READ_FSET(pReadh));
}

H
Hongze Cheng 已提交
546
static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols) {
H
refact  
Hongze Cheng 已提交
547
  ASSERT(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1);
H
Hongze Cheng 已提交
548

H
Hongze Cheng 已提交
549
  SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
H
Hongze Cheng 已提交
550

H
refact  
Hongze Cheng 已提交
551
  tdResetDataCols(pDataCols);
C
Cary Xu 已提交
552

H
refact  
Hongze Cheng 已提交
553
  if (tsdbIsSupBlock(pBlock)) {
C
Cary Xu 已提交
554 555 556
    tdDataColsSetBitmapI(pDataCols);
  }

H
refact  
Hongze Cheng 已提交
557
  if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadh)), pBlock->len) < 0) return -1;
H
Hongze Cheng 已提交
558

H
refact  
Hongze Cheng 已提交
559
  SBlockData *pBlockData = (SBlockData *)TSDB_READ_BUF(pReadh);
H
Hongze Cheng 已提交
560 561

  if (tsdbSeekDFile(pDFile, pBlock->offset, SEEK_SET) < 0) {
H
refact  
Hongze Cheng 已提交
562
    tsdbError("vgId:%d failed to load block data part while seek file %s to offset %" PRId64 " since %s",
H
Hongze Cheng 已提交
563
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, tstrerror(terrno));
H
Hongze Cheng 已提交
564 565 566
    return -1;
  }

H
refact  
Hongze Cheng 已提交
567
  int64_t nread = tsdbReadDFile(pDFile, TSDB_READ_BUF(pReadh), pBlock->len);
H
Hongze Cheng 已提交
568
  if (nread < 0) {
H
refact  
Hongze Cheng 已提交
569
    tsdbError("vgId:%d failed to load block data part while read file %s since %s, offset:%" PRId64 " len :%d",
H
Hongze Cheng 已提交
570 571
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), (int64_t)pBlock->offset,
              pBlock->len);
H
Hongze Cheng 已提交
572 573 574 575 576
    return -1;
  }

  if (nread < pBlock->len) {
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
H
refact  
Hongze Cheng 已提交
577 578
    tsdbError("vgId:%d block data part in file %s is corrupted, offset:%" PRId64
              " expected bytes:%d read bytes: %" PRId64,
H
Hongze Cheng 已提交
579
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, pBlock->len, nread);
H
Hongze Cheng 已提交
580 581 582
    return -1;
  }

583
  int32_t tsize = (int32_t)tsdbBlockStatisSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer);
H
refact  
Hongze Cheng 已提交
584
  if (!taosCheckChecksumWhole((uint8_t *)TSDB_READ_BUF(pReadh), tsize)) {
H
Hongze Cheng 已提交
585
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
C
Cary Xu 已提交
586
    tsdbError("vgId:%d block head part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%d",
H
Hongze Cheng 已提交
587
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, tsize);
H
Hongze Cheng 已提交
588 589 590
    return -1;
  }

H
refact  
Hongze Cheng 已提交
591
  ASSERT(tsize < pBlock->len);
H
Hongze Cheng 已提交
592 593 594 595 596
  ASSERT(pBlockData->numOfCols == pBlock->numOfCols);

  pDataCols->numOfRows = pBlock->numOfRows;

  // Recover the data
H
Hongze Cheng 已提交
597 598 599
  int        ccol = 0;  // loop iter for SBlockCol object
  int        dcol = 0;  // loop iter for SDataCols object
  int        nBitmaps = (int)TD_BITMAP_BYTES(pBlock->numOfRows);
C
Cary Xu 已提交
600
  SBlockCol *pBlockCol = NULL;
H
Hongze Cheng 已提交
601 602 603 604
  while (dcol < pDataCols->numOfCols) {
    SDataCol *pDataCol = &(pDataCols->cols[dcol]);
    if (dcol != 0 && ccol >= pBlockData->numOfCols) {
      // Set current column as NULL and forward
L
Liu Jicong 已提交
605
      dataColReset(pDataCol);
606
      ++dcol;
H
Hongze Cheng 已提交
607 608 609
      continue;
    }

610
    int16_t  tcolId = PRIMARYKEY_TIMESTAMP_COL_ID;
611 612
    uint32_t toffset = TSDB_KEY_COL_OFFSET;
    int32_t  tlen = pBlock->keyLen;
H
Hongze Cheng 已提交
613 614

    if (dcol != 0) {
C
Cary Xu 已提交
615
      pBlockCol = &(pBlockData->cols[ccol]);
H
Hongze Cheng 已提交
616
      tcolId = pBlockCol->colId;
617
      toffset = tsdbGetBlockColOffset(pBlockCol);
H
Hongze Cheng 已提交
618
      tlen = pBlockCol->len;
C
Cary Xu 已提交
619
      pDataCol->bitmap = pBlockCol->bitmap;
H
Hongze Cheng 已提交
620 621
    } else {
      ASSERT(pDataCol->colId == tcolId);
C
Cary Xu 已提交
622
      TD_SET_COL_ROWS_NORM(pDataCol);
C
Cary Xu 已提交
623 624
    }

625
    // int32_t tBitmaps = 0;
C
Cary Xu 已提交
626
    int32_t tLenBitmap = 0;
C
Cary Xu 已提交
627
    if ((dcol != 0) && !TD_COL_ROWS_NORM(pBlockCol)) {
628 629
      tLenBitmap = nBitmaps;
#if 0
C
Cary Xu 已提交
630 631 632 633 634 635 636
      if (IS_VAR_DATA_TYPE(pDataCol->type)) {
        tBitmaps = nBitmaps;
        tLenBitmap = tBitmaps;
      } else {
        tBitmaps = (int32_t)ceil((double)nBitmaps / TYPE_BYTES[pDataCol->type]);
        tLenBitmap = tBitmaps * TYPE_BYTES[pDataCol->type];
      }
637
#endif
H
Hongze Cheng 已提交
638 639 640 641
    }

    if (tcolId == pDataCol->colId) {
      if (pBlock->algorithm == TWO_STAGE_COMP) {
642
        int zsize = pDataCol->bytes * pBlock->numOfRows + tLenBitmap + 2 * COMP_OVERFLOW_BYTES;
H
refact  
Hongze Cheng 已提交
643
        if (tsdbMakeRoom((void **)(&TSDB_READ_COMP_BUF(pReadh)), zsize) < 0) return -1;
H
Hongze Cheng 已提交
644 645
      }

646 647 648 649
      if (tsdbCheckAndDecodeColumnData(pDataCol, POINTER_SHIFT(pBlockData, tsize + toffset), tlen,
                                       pBlockCol ? pBlockCol->blen : 0, pBlock->algorithm, pBlock->numOfRows,
                                       tLenBitmap, pDataCols->maxPoints, TSDB_READ_COMP_BUF(pReadh),
                                       (int)taosTSizeof(TSDB_READ_COMP_BUF(pReadh))) < 0) {
650
        tsdbError("vgId:%d file %s is broken at column %d block offset %" PRId64 " column offset %u",
H
Hongze Cheng 已提交
651 652 653
                  TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tcolId, (int64_t)pBlock->offset, toffset);
        return -1;
      }
H
refact  
Hongze Cheng 已提交
654 655

      if (dcol != 0) {
C
Cary Xu 已提交
656
        ++ccol;
H
refact  
Hongze Cheng 已提交
657
      }
C
Cary Xu 已提交
658
      ++dcol;
H
Hongze Cheng 已提交
659
    } else if (tcolId < pDataCol->colId) {
C
Cary Xu 已提交
660
      ++ccol;
H
Hongze Cheng 已提交
661 662
    } else {
      // Set current column as NULL and forward
L
Liu Jicong 已提交
663
      dataColReset(pDataCol);
C
Cary Xu 已提交
664
      ++dcol;
H
Hongze Cheng 已提交
665 666 667 668 669 670
    }
  }

  return 0;
}

671 672
static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int32_t bitmapLen, int8_t comp,
                                        int numOfRows, int numOfBitmaps, int maxPoints, char *buffer, int bufferSize) {
H
Hongze Cheng 已提交
673 674 675 676 677
  if (!taosCheckChecksumWhole((uint8_t *)content, len)) {
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
    return -1;
  }

678
  tdAllocMemForCol(pDataCol, maxPoints);
679

H
Hongze Cheng 已提交
680 681 682
  // Decode the data
  if (comp) {
    // Need to decompress
C
Cary Xu 已提交
683
    int tlen =
684
        (*(tDataTypes[pDataCol->type].decompFunc))(content, len - bitmapLen - sizeof(TSCKSUM), numOfRows,
C
Cary Xu 已提交
685
                                                   pDataCol->pData, pDataCol->spaceSize, comp, buffer, bufferSize);
H
Hongze Cheng 已提交
686
    if (tlen <= 0) {
687 688 689
      tsdbError(
          "Failed to decompress column data, file corrupted, len:%d comp:%d numOfRows:%d maxPoints:%d bufferSize:%d",
          (int32_t)(len - bitmapLen - sizeof(TSCKSUM)), comp, numOfRows, maxPoints, bufferSize);
H
Hongze Cheng 已提交
690 691 692 693
      terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
      return -1;
    }
    pDataCol->len = tlen;
694 695 696 697 698 699 700 701 702 703 704 705 706 707

    if (numOfBitmaps > 0) {
      tlen = tsDecompressTinyint(POINTER_SHIFT(content, len - bitmapLen - sizeof(TSCKSUM)), bitmapLen, numOfBitmaps,
                                 pDataCol->pBitmap, pDataCol->spaceSize, comp, buffer, bufferSize);
      if (tlen <= 0) {
        tsdbError(
            "Failed to decompress column bitmap, file corrupted, len:%d comp:%d numOfRows:%d maxPoints:%d "
            "bufferSize:%d",
            bitmapLen, comp, numOfBitmaps, maxPoints, bufferSize);
        terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
        return -1;
      }
      // pDataCol->blen = tlen;
    }
H
Hongze Cheng 已提交
708 709
  } else {
    // No need to decompress, just memcpy it
710
    pDataCol->len = len - bitmapLen - sizeof(TSCKSUM);
H
Hongze Cheng 已提交
711
    memcpy(pDataCol->pData, content, pDataCol->len);
712 713 714 715
    if (numOfBitmaps > 0) {
      // pDataCol->blen = bitmapLen;
      memcpy(pDataCol->pBitmap, POINTER_SHIFT(content, len - bitmapLen - sizeof(TSCKSUM)), bitmapLen);
    }
H
Hongze Cheng 已提交
716 717
  }

718
#if 0
C
Cary Xu 已提交
719 720 721 722 723 724 725 726 727 728 729 730 731
  if (lenOfBitmaps > 0) {
    pDataCol->len -= lenOfBitmaps;

    void *pSrcBitmap = NULL;
    if (IS_VAR_DATA_TYPE(pDataCol->type)) {
      pSrcBitmap = dataColSetOffset(pDataCol, numOfRows);
    } else {
      pSrcBitmap = POINTER_SHIFT(pDataCol->pData, numOfRows * TYPE_BYTES[pDataCol->type]);
    }
    void *pDestBitmap = POINTER_SHIFT(pDataCol->pData, pDataCol->bytes * maxPoints);
    // restore the bitmap parts
    memcpy(pDestBitmap, pSrcBitmap, lenOfBitmaps);
  } else if (IS_VAR_DATA_TYPE(pDataCol->type)) {
H
Hongze Cheng 已提交
732 733
    dataColSetOffset(pDataCol, numOfRows);
  }
734 735 736 737
#endif
  if (IS_VAR_DATA_TYPE(pDataCol->type)) {
    dataColSetOffset(pDataCol, numOfRows);
  }
H
Hongze Cheng 已提交
738 739 740
  return 0;
}

L
Liu Jicong 已提交
741
static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, const int16_t *colIds,
H
refact  
Hongze Cheng 已提交
742
                                     int numOfColIds) {
H
refact  
Hongze Cheng 已提交
743
  ASSERT(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1);
H
Haojun Liao 已提交
744
  ASSERT(colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID);
H
Hongze Cheng 已提交
745

H
Hongze Cheng 已提交
746
  SDFile   *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
H
Hongze Cheng 已提交
747 748
  SBlockCol blockCol = {0};

H
refact  
Hongze Cheng 已提交
749 750
  tdResetDataCols(pDataCols);

H
refact  
Hongze Cheng 已提交
751
  if (tsdbIsSupBlock(pBlock)) {
C
Cary Xu 已提交
752 753 754
    tdDataColsSetBitmapI(pDataCols);
  }

H
Hongze Cheng 已提交
755
  // If only load timestamp column, no need to load SBlockData part
756
  if (numOfColIds > 1 && tsdbLoadBlockOffset(pReadh, pBlock) < 0) return -1;
H
Hongze Cheng 已提交
757 758 759 760 761 762 763

  pDataCols->numOfRows = pBlock->numOfRows;

  int dcol = 0;
  int ccol = 0;
  for (int i = 0; i < numOfColIds; i++) {
    int16_t    colId = colIds[i];
H
Hongze Cheng 已提交
764
    SDataCol  *pDataCol = NULL;
H
Hongze Cheng 已提交
765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784
    SBlockCol *pBlockCol = NULL;

    while (true) {
      if (dcol >= pDataCols->numOfCols) {
        pDataCol = NULL;
        break;
      }
      pDataCol = &pDataCols->cols[dcol];
      if (pDataCol->colId > colId) {
        pDataCol = NULL;
        break;
      } else {
        dcol++;
        if (pDataCol->colId == colId) break;
      }
    }

    if (pDataCol == NULL) continue;
    ASSERT(pDataCol->colId == colId);

H
Haojun Liao 已提交
785
    if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) {  // load the key row
H
Hongze Cheng 已提交
786
      blockCol.colId = colId;
C
Cary Xu 已提交
787
      TD_SET_COL_ROWS_NORM(&blockCol);  // default is NORM for the primary key column
788
      blockCol.blen = 0;
H
Hongze Cheng 已提交
789 790 791 792 793 794 795 796 797 798 799
      blockCol.len = pBlock->keyLen;
      blockCol.type = pDataCol->type;
      blockCol.offset = TSDB_KEY_COL_OFFSET;
      pBlockCol = &blockCol;
    } else {  // load non-key rows
      while (true) {
        if (ccol >= pBlock->numOfCols) {
          pBlockCol = NULL;
          break;
        }

H
refact  
Hongze Cheng 已提交
800
        pBlockCol = &(pReadh->pBlkData->cols[ccol]);
H
Hongze Cheng 已提交
801 802 803 804 805 806 807 808 809 810
        if (pBlockCol->colId > colId) {
          pBlockCol = NULL;
          break;
        } else {
          ccol++;
          if (pBlockCol->colId == colId) break;
        }
      }

      if (pBlockCol == NULL) {
L
Liu Jicong 已提交
811
        dataColReset(pDataCol);
H
Hongze Cheng 已提交
812 813 814 815 816
        continue;
      }

      ASSERT(pBlockCol->colId == pDataCol->colId);
    }
C
Cary Xu 已提交
817 818
    // set the bitmap
    pDataCol->bitmap = pBlockCol->bitmap;
H
Hongze Cheng 已提交
819 820 821 822 823 824 825 826 827 828

    if (tsdbLoadColData(pReadh, pDFile, pBlock, pBlockCol, pDataCol) < 0) return -1;
  }

  return 0;
}

static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol) {
  ASSERT(pDataCol->colId == pBlockCol->colId);

829
  STsdb    *pRepo = TSDB_READ_REPO(pReadh);
H
Hongze Cheng 已提交
830
  STsdbCfg *pCfg = REPO_CFG(pRepo);
C
Cary Xu 已提交
831

832 833
  int nBitmaps = (int)TD_BITMAP_BYTES(pBlock->numOfRows);
  // int32_t tBitmaps = 0;
C
Cary Xu 已提交
834 835
  int32_t tLenBitmap = 0;

C
Cary Xu 已提交
836
  if (!TD_COL_ROWS_NORM(pBlockCol)) {
837 838
    tLenBitmap = nBitmaps;
#if 0
C
Cary Xu 已提交
839 840 841 842 843 844 845
    if (IS_VAR_DATA_TYPE(pDataCol->type)) {
      tBitmaps = nBitmaps;
      tLenBitmap = tBitmaps;
    } else {
      tBitmaps = (int32_t)ceil((double)nBitmaps / TYPE_BYTES[pDataCol->type]);
      tLenBitmap = tBitmaps * TYPE_BYTES[pDataCol->type];
    }
846
#endif
C
Cary Xu 已提交
847 848
  }

849
  int tsize = pDataCol->bytes * pBlock->numOfRows + tLenBitmap + 2 * COMP_OVERFLOW_BYTES;
H
Hongze Cheng 已提交
850

H
Hongze Cheng 已提交
851 852
  if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadh)), pBlockCol->len) < 0) return -1;
  if (tsdbMakeRoom((void **)(&TSDB_READ_COMP_BUF(pReadh)), tsize) < 0) return -1;
H
Hongze Cheng 已提交
853

854 855
  int64_t offset = pBlock->offset + tsdbBlockStatisSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer) +
                   tsdbGetBlockColOffset(pBlockCol);
H
Hongze Cheng 已提交
856 857 858 859 860 861
  if (tsdbSeekDFile(pDFile, offset, SEEK_SET) < 0) {
    tsdbError("vgId:%d failed to load block column data while seek file %s to offset %" PRId64 " since %s",
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), offset, tstrerror(terrno));
    return -1;
  }

H
Hongze Cheng 已提交
862
  int64_t nread = tsdbReadDFile(pDFile, TSDB_READ_BUF(pReadh), pBlockCol->len);
H
Hongze Cheng 已提交
863
  if (nread < 0) {
H
refact  
Hongze Cheng 已提交
864
    tsdbError("vgId:%d failed to load block column data while read file %s since %s, offset:%" PRId64 " len :%d",
H
Hongze Cheng 已提交
865 866 867 868 869 870 871 872 873 874 875 876
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), offset, pBlockCol->len);
    return -1;
  }

  if (nread < pBlockCol->len) {
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
    tsdbError("vgId:%d block column data in file %s is corrupted, offset:%" PRId64 " expected bytes:%d" PRIzu
              " read bytes: %" PRId64,
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), offset, pBlockCol->len, nread);
    return -1;
  }

877
  if (tsdbCheckAndDecodeColumnData(pDataCol, pReadh->pBuf, pBlockCol->len, pBlockCol->blen, pBlock->algorithm,
H
refact  
Hongze Cheng 已提交
878
                                   pBlock->numOfRows, tLenBitmap, pCfg->maxRows, pReadh->pCBuf,
C
Cary Xu 已提交
879
                                   (int32_t)taosTSizeof(pReadh->pCBuf)) < 0) {
H
Hongze Cheng 已提交
880
    tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile),
H
Hongze Cheng 已提交
881 882 883 884 885
              pBlockCol->colId, offset);
    return -1;
  }

  return 0;
886
}