tsdbReadImpl.c 29.7 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "vnodeInt.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18 19
#define TSDB_KEY_COL_OFFSET 0

H
refact  
Hongze Cheng 已提交
20 21
static void tsdbResetReadTable(SReadH *pReadh);
static void tsdbResetReadFile(SReadH *pReadh);
22
static int  tsdbLoadBlockOffset(SReadH *pReadh, SBlock *pBlock);
H
Hongze Cheng 已提交
23
static int  tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols);
24 25
static int  tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int32_t bitmapLen, int8_t comp,
                                         int numOfRows, int numOfBitmaps, int maxPoints, char *buffer, int bufferSize);
C
Cary Xu 已提交
26
static int  tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, const int16_t *colIds,
H
Hongze Cheng 已提交
27 28 29
                                      int numOfColIds);
static int  tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol);

H
Hongze Cheng 已提交
30
int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo) {
H
refact  
Hongze Cheng 已提交
31
  ASSERT(pReadh != NULL && pRepo != NULL);
H
Hongze Cheng 已提交
32

H
refact  
Hongze Cheng 已提交
33 34 35
  STsdbCfg *pCfg = REPO_CFG(pRepo);

  memset((void *)pReadh, 0, sizeof(*pReadh));
H
Hongze Cheng 已提交
36 37
  pReadh->pRepo = pRepo;

H
Hongze Cheng 已提交
38
  TSDB_FSET_SET_CLOSED(TSDB_READ_FSET(pReadh));
H
Hongze Cheng 已提交
39

H
refact  
Hongze Cheng 已提交
40 41
  pReadh->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx));
  if (pReadh->aBlkIdx == NULL) {
H
Hongze Cheng 已提交
42 43 44 45
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }

L
Liu Jicong 已提交
46
  pReadh->pDCols[0] = tdNewDataCols(0, pCfg->maxRowsPerFileBlock);
H
Hongze Cheng 已提交
47 48 49 50 51 52
  if (pReadh->pDCols[0] == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    tsdbDestroyReadH(pReadh);
    return -1;
  }

L
Liu Jicong 已提交
53
  pReadh->pDCols[1] = tdNewDataCols(0, pCfg->maxRowsPerFileBlock);
H
Hongze Cheng 已提交
54 55 56 57 58 59
  if (pReadh->pDCols[1] == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    tsdbDestroyReadH(pReadh);
    return -1;
  }

H
Hongze Cheng 已提交
60 61 62 63
  return 0;
}

void tsdbDestroyReadH(SReadH *pReadh) {
H
Hongze Cheng 已提交
64 65
  if (pReadh == NULL) return;

66
  pReadh->pExBuf = taosTZfree(pReadh->pExBuf);
H
Hongze Cheng 已提交
67 68 69 70
  pReadh->pCBuf = taosTZfree(pReadh->pCBuf);
  pReadh->pBuf = taosTZfree(pReadh->pBuf);
  pReadh->pDCols[0] = tdFreeDataCols(pReadh->pDCols[0]);
  pReadh->pDCols[1] = tdFreeDataCols(pReadh->pDCols[1]);
71
  pReadh->pAggrBlkData = taosTZfree(pReadh->pAggrBlkData);
H
Hongze Cheng 已提交
72
  pReadh->pBlkData = taosTZfree(pReadh->pBlkData);
H
refact  
Hongze Cheng 已提交
73
  pReadh->pBlkInfo = taosTZfree(pReadh->pBlkInfo);
H
Hongze Cheng 已提交
74 75 76 77 78 79
  pReadh->cidx = 0;
  pReadh->pBlkIdx = NULL;
  pReadh->pTable = NULL;
  pReadh->aBlkIdx = taosArrayDestroy(pReadh->aBlkIdx);
  tsdbCloseDFileSet(TSDB_READ_FSET(pReadh));
  pReadh->pRepo = NULL;
H
Hongze Cheng 已提交
80 81 82
}

int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet) {
H
refact  
Hongze Cheng 已提交
83 84
  ASSERT(pSet != NULL);
  tsdbResetReadFile(pReadh);
H
Hongze Cheng 已提交
85 86

  pReadh->rSet = *pSet;
H
Hongze Cheng 已提交
87
  TSDB_FSET_SET_CLOSED(TSDB_READ_FSET(pReadh));
88 89
  // if (tsdbOpenDFileSet(TSDB_READ_FSET(pReadh), O_RDONLY) < 0) {
  if (tsdbOpenDFileSet(TSDB_READ_FSET(pReadh), TD_FILE_READ) < 0) {
H
refact  
Hongze Cheng 已提交
90 91 92 93
    tsdbError("vgId:%d failed to open file set %d since %s", TSDB_READ_REPO_ID(pReadh), TSDB_FSET_FID(pSet),
              tstrerror(terrno));
    return -1;
  }
H
Hongze Cheng 已提交
94

H
Hongze Cheng 已提交
95 96 97
  return 0;
}

H
refact  
Hongze Cheng 已提交
98
void tsdbCloseAndUnsetFSet(SReadH *pReadh) { tsdbResetReadFile(pReadh); }
H
Hongze Cheng 已提交
99 100

int tsdbLoadBlockIdx(SReadH *pReadh) {
H
Hongze Cheng 已提交
101
  SDFile   *pHeadf = TSDB_READ_HEAD_FILE(pReadh);
H
Hongze Cheng 已提交
102 103
  SBlockIdx blkIdx;

H
Hongze Cheng 已提交
104 105 106 107 108 109
  ASSERT(taosArrayGetSize(pReadh->aBlkIdx) == 0);

  // No data at all, just return
  if (pHeadf->info.offset <= 0) return 0;

  if (tsdbSeekDFile(pHeadf, pHeadf->info.offset, SEEK_SET) < 0) {
H
refact  
Hongze Cheng 已提交
110
    tsdbError("vgId:%d failed to load SBlockIdx part while seek file %s since %s, offset:%u len :%u",
H
Hongze Cheng 已提交
111 112
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno), pHeadf->info.offset,
              pHeadf->info.len);
H
Hongze Cheng 已提交
113 114 115
    return -1;
  }

H
Hongze Cheng 已提交
116
  if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadh)), pHeadf->info.len) < 0) return -1;
H
Hongze Cheng 已提交
117 118

  int64_t nread = tsdbReadDFile(pHeadf, TSDB_READ_BUF(pReadh), pHeadf->info.len);
H
Hongze Cheng 已提交
119
  if (nread < 0) {
H
refact  
Hongze Cheng 已提交
120
    tsdbError("vgId:%d failed to load SBlockIdx part while read file %s since %s, offset:%u len :%u",
H
Hongze Cheng 已提交
121 122
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno), pHeadf->info.offset,
              pHeadf->info.len);
H
Hongze Cheng 已提交
123 124 125
    return -1;
  }

H
Hongze Cheng 已提交
126 127 128 129
  if (nread < pHeadf->info.len) {
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
    tsdbError("vgId:%d SBlockIdx part in file %s is corrupted, offset:%u expected bytes:%u read bytes: %" PRId64,
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pHeadf->info.offset, pHeadf->info.len, nread);
H
Hongze Cheng 已提交
130 131 132
    return -1;
  }

H
Hongze Cheng 已提交
133 134 135 136
  if (!taosCheckChecksumWhole((uint8_t *)TSDB_READ_BUF(pReadh), pHeadf->info.len)) {
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
    tsdbError("vgId:%d SBlockIdx part in file %s is corrupted since wrong checksum, offset:%u len :%u",
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pHeadf->info.offset, pHeadf->info.len);
H
Hongze Cheng 已提交
137 138 139 140
    return -1;
  }

  void *ptr = TSDB_READ_BUF(pReadh);
H
Hongze Cheng 已提交
141 142
  int   tsize = 0;
  while (POINTER_DISTANCE(ptr, TSDB_READ_BUF(pReadh)) < (pHeadf->info.len - sizeof(TSCKSUM))) {
H
Hongze Cheng 已提交
143
    ptr = tsdbDecodeSBlockIdx(ptr, &blkIdx);
H
Hongze Cheng 已提交
144
    ASSERT(ptr != NULL);
H
Hongze Cheng 已提交
145

H
Hongze Cheng 已提交
146
    if (taosArrayPush(pReadh->aBlkIdx, (void *)(&blkIdx)) == NULL) {
H
Hongze Cheng 已提交
147 148 149
      terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
      return -1;
    }
H
Hongze Cheng 已提交
150 151

    tsize++;
H
Hongze Cheng 已提交
152 153
    // ASSERT(tsize == 1 || ((SBlockIdx *)taosArrayGet(pReadh->aBlkIdx, tsize - 2))->tid <
    //                          ((SBlockIdx *)taosArrayGet(pReadh->aBlkIdx, tsize - 1))->tid);
H
Hongze Cheng 已提交
154 155 156 157 158 159 160 161
  }

  return 0;
}

int tsdbSetReadTable(SReadH *pReadh, STable *pTable) {
  STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);

H
refact  
Hongze Cheng 已提交
162 163
  pReadh->pTable = pTable;

H
Hongze Cheng 已提交
164 165 166 167 168 169 170 171 172 173 174 175 176 177
  if (tdInitDataCols(pReadh->pDCols[0], pSchema) < 0) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }

  if (tdInitDataCols(pReadh->pDCols[1], pSchema) < 0) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }

  size_t size = taosArrayGetSize(pReadh->aBlkIdx);
  if (size > 0) {
    while (true) {
      if (pReadh->cidx >= size) {
H
refact  
Hongze Cheng 已提交
178
        pReadh->pBlkIdx = NULL;
H
Hongze Cheng 已提交
179 180 181 182
        break;
      }

      SBlockIdx *pBlkIdx = taosArrayGet(pReadh->aBlkIdx, pReadh->cidx);
H
Hongze Cheng 已提交
183
      if (pBlkIdx->uid == TABLE_TID(pTable)) {
H
Hongze Cheng 已提交
184
        if (pBlkIdx->uid == TABLE_UID(pTable)) {
H
refact  
Hongze Cheng 已提交
185
          pReadh->pBlkIdx = pBlkIdx;
H
Hongze Cheng 已提交
186
        } else {
H
refact  
Hongze Cheng 已提交
187
          pReadh->pBlkIdx = NULL;
H
Hongze Cheng 已提交
188 189 190
        }
        pReadh->cidx++;
        break;
H
Hongze Cheng 已提交
191
      } else if (pBlkIdx->uid > TABLE_TID(pTable)) {
H
refact  
Hongze Cheng 已提交
192
        pReadh->pBlkIdx = NULL;
H
Hongze Cheng 已提交
193 194 195 196 197 198
        break;
      } else {
        pReadh->cidx++;
      }
    }
  } else {
H
refact  
Hongze Cheng 已提交
199
    pReadh->pBlkIdx = NULL;
H
Hongze Cheng 已提交
200 201 202 203 204 205
  }

  return 0;
}

int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) {
H
Hongze Cheng 已提交
206 207
  ASSERT(pReadh->pBlkIdx != NULL);

H
Hongze Cheng 已提交
208
  SDFile    *pHeadf = TSDB_READ_HEAD_FILE(pReadh);
H
Hongze Cheng 已提交
209 210 211
  SBlockIdx *pBlkIdx = pReadh->pBlkIdx;

  if (tsdbSeekDFile(pHeadf, pBlkIdx->offset, SEEK_SET) < 0) {
H
refact  
Hongze Cheng 已提交
212 213
    tsdbError("vgId:%d failed to load SBlockInfo part while seek file %s since %s, offset:%u len:%u",
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno), pBlkIdx->offset, pBlkIdx->len);
H
Hongze Cheng 已提交
214 215 216 217 218 219 220
    return -1;
  }

  if (tsdbMakeRoom((void **)(&(pReadh->pBlkInfo)), pBlkIdx->len) < 0) return -1;

  int64_t nread = tsdbReadDFile(pHeadf, (void *)(pReadh->pBlkInfo), pBlkIdx->len);
  if (nread < 0) {
H
refact  
Hongze Cheng 已提交
221
    tsdbError("vgId:%d failed to load SBlockInfo part while read file %s since %s, offset:%u len :%u",
H
Hongze Cheng 已提交
222 223 224 225 226 227
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno), pBlkIdx->offset, pBlkIdx->len);
    return -1;
  }

  if (nread < pBlkIdx->len) {
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
H
refact  
Hongze Cheng 已提交
228
    tsdbError("vgId:%d SBlockInfo part in file %s is corrupted, offset:%u expected bytes:%u read bytes:%" PRId64,
H
Hongze Cheng 已提交
229 230 231 232 233 234 235 236 237 238 239
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pBlkIdx->offset, pBlkIdx->len, nread);
    return -1;
  }

  if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pBlkInfo), pBlkIdx->len)) {
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
    tsdbError("vgId:%d SBlockInfo part in file %s is corrupted since wrong checksum, offset:%u len :%u",
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pBlkIdx->offset, pBlkIdx->len);
    return -1;
  }

H
Hongze Cheng 已提交
240
  // ASSERT(pBlkIdx->tid == pReadh->pBlkInfo->tid && pBlkIdx->uid == pReadh->pBlkInfo->uid);
H
Hongze Cheng 已提交
241

H
Hongze Cheng 已提交
242
  if (pTarget) {
H
refact  
Hongze Cheng 已提交
243
    memcpy(pTarget, (void *)(pReadh->pBlkInfo), pBlkIdx->len);
H
Hongze Cheng 已提交
244 245
  }

H
Hongze Cheng 已提交
246 247 248
  return 0;
}

H
Hongze Cheng 已提交
249
int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
H
Hongze Cheng 已提交
250
  ASSERT(pBlock->numOfSubBlocks > 0);
251
  int8_t update = pReadh->pRepo->config.update;
H
Hongze Cheng 已提交
252

H
Hongze Cheng 已提交
253
  SBlock *iBlock = pBlock;
H
Hongze Cheng 已提交
254
  if (pBlock->numOfSubBlocks > 1) {
H
refact  
Hongze Cheng 已提交
255 256
    if (pBlkInfo) {
      iBlock = (SBlock *)POINTER_SHIFT(pBlkInfo, pBlock->offset);
H
Hongze Cheng 已提交
257
    } else {
H
refact  
Hongze Cheng 已提交
258
      iBlock = (SBlock *)POINTER_SHIFT(pReadh->pBlkInfo, pBlock->offset);
H
Hongze Cheng 已提交
259 260 261 262 263 264 265
    }
  }

  if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[0]) < 0) return -1;
  for (int i = 1; i < pBlock->numOfSubBlocks; i++) {
    iBlock++;
    if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[1]) < 0) return -1;
H
Hongze Cheng 已提交
266 267 268
    if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL,
                        update != TD_ROW_PARTIAL_UPDATE) < 0)
      return -1;
H
Hongze Cheng 已提交
269 270 271 272 273 274
  }

  ASSERT(pReadh->pDCols[0]->numOfRows == pBlock->numOfRows);
  ASSERT(dataColsKeyFirst(pReadh->pDCols[0]) == pBlock->keyFirst);
  ASSERT(dataColsKeyLast(pReadh->pDCols[0]) == pBlock->keyLast);

H
Hongze Cheng 已提交
275 276 277
  return 0;
}

H
Hongze Cheng 已提交
278
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds,
C
Cary Xu 已提交
279
                          int numOfColsIds, bool mergeBitmap) {
H
Hongze Cheng 已提交
280
  ASSERT(pBlock->numOfSubBlocks > 0);
281
  int8_t update = pReadh->pRepo->config.update;
H
Hongze Cheng 已提交
282

H
Hongze Cheng 已提交
283
  SBlock *iBlock = pBlock;
H
Hongze Cheng 已提交
284
  if (pBlock->numOfSubBlocks > 1) {
H
refact  
Hongze Cheng 已提交
285 286
    if (pBlkInfo) {
      iBlock = POINTER_SHIFT(pBlkInfo, pBlock->offset);
H
Hongze Cheng 已提交
287 288 289 290 291 292 293 294 295
    } else {
      iBlock = POINTER_SHIFT(pReadh->pBlkInfo, pBlock->offset);
    }
  }

  if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[0], colIds, numOfColsIds) < 0) return -1;
  for (int i = 1; i < pBlock->numOfSubBlocks; i++) {
    iBlock++;
    if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[1], colIds, numOfColsIds) < 0) return -1;
H
Hongze Cheng 已提交
296 297 298
    if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL,
                        update != TD_ROW_PARTIAL_UPDATE) < 0)
      return -1;
H
Hongze Cheng 已提交
299 300
  }

C
Cary Xu 已提交
301 302 303 304 305 306 307 308 309 310
  if (mergeBitmap && !tdDataColsIsBitmapI(pReadh->pDCols[0])) {
    for (int i = 0; i < numOfColsIds; ++i) {
      SDataCol *pDataCol = pReadh->pDCols[0]->cols + i;
      if (pDataCol->bitmap) {
        ASSERT(pDataCol->colId != PRIMARYKEY_TIMESTAMP_COL_ID);
        tdMergeBitmap(pDataCol->pBitmap, TD_BITMAP_BYTES(pReadh->pDCols[0]->numOfRows), pDataCol->pBitmap);
      }
    }
  }

H
Hongze Cheng 已提交
311 312 313 314
  ASSERT(pReadh->pDCols[0]->numOfRows == pBlock->numOfRows);
  ASSERT(dataColsKeyFirst(pReadh->pDCols[0]) == pBlock->keyFirst);
  ASSERT(dataColsKeyLast(pReadh->pDCols[0]) == pBlock->keyLast);

H
Hongze Cheng 已提交
315 316 317 318
  return 0;
}

int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) {
H
Hongze Cheng 已提交
319 320
  ASSERT(pBlock->numOfSubBlocks <= 1);

321 322 323 324 325
  if (!pBlock->aggrStat) {
    return TSDB_STATIS_NONE;
  }

  SDFile *pDFileAggr = pBlock->last ? TSDB_READ_SMAL_FILE(pReadh) : TSDB_READ_SMAD_FILE(pReadh);
H
Hongze Cheng 已提交
326

327 328 329 330 331 332 333
  if (tsdbSeekDFile(pDFileAggr, pBlock->aggrOffset, SEEK_SET) < 0) {
    tsdbError("vgId:%d failed to load block aggr part while seek file %s to offset %" PRIu64 " since %s",
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (uint64_t)pBlock->aggrOffset,
              tstrerror(terrno));
    return -1;
  }

C
Cary Xu 已提交
334
  size_t sizeAggr = tsdbBlockAggrSize(pBlock->numOfBSma, (uint32_t)pBlock->blkVer);
335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365
  if (tsdbMakeRoom((void **)(&(pReadh->pAggrBlkData)), sizeAggr) < 0) return -1;

  int64_t nreadAggr = tsdbReadDFile(pDFileAggr, (void *)(pReadh->pAggrBlkData), sizeAggr);
  if (nreadAggr < 0) {
    tsdbError("vgId:%d failed to load block aggr part while read file %s since %s, offset:%" PRIu64 " len :%" PRIzu,
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), tstrerror(terrno),
              (uint64_t)pBlock->aggrOffset, sizeAggr);
    return -1;
  }

  if (nreadAggr < sizeAggr) {
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
    tsdbError("vgId:%d block aggr part in file %s is corrupted, offset:%" PRIu64 " expected bytes:%" PRIzu
              " read bytes: %" PRId64,
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (uint64_t)pBlock->aggrOffset, sizeAggr,
              nreadAggr);
    return -1;
  }

  if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pAggrBlkData), (uint32_t)sizeAggr)) {
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
    tsdbError("vgId:%d block aggr part in file %s is corrupted since wrong checksum, offset:%" PRIu64 " len :%" PRIzu,
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr), (uint64_t)pBlock->aggrOffset, sizeAggr);
    return -1;
  }
  return 0;
}

static int tsdbLoadBlockOffset(SReadH *pReadh, SBlock *pBlock) {
  ASSERT(pBlock->numOfSubBlocks <= 1);
  SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
H
refact  
Hongze Cheng 已提交
366 367
  if (tsdbSeekDFile(pDFile, pBlock->offset, SEEK_SET) < 0) {
    tsdbError("vgId:%d failed to load block statis part while seek file %s to offset %" PRId64 " since %s",
H
Hongze Cheng 已提交
368
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, tstrerror(terrno));
H
Hongze Cheng 已提交
369 370 371
    return -1;
  }

372
  size_t size = tsdbBlockStatisSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer);
H
refact  
Hongze Cheng 已提交
373
  if (tsdbMakeRoom((void **)(&(pReadh->pBlkData)), size) < 0) return -1;
H
Hongze Cheng 已提交
374

H
refact  
Hongze Cheng 已提交
375
  int64_t nread = tsdbReadDFile(pDFile, (void *)(pReadh->pBlkData), size);
H
Hongze Cheng 已提交
376
  if (nread < 0) {
H
refact  
Hongze Cheng 已提交
377
    tsdbError("vgId:%d failed to load block statis part while read file %s since %s, offset:%" PRId64 " len :%" PRIzu,
H
Hongze Cheng 已提交
378
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), (int64_t)pBlock->offset, size);
H
Hongze Cheng 已提交
379 380 381 382 383 384 385
    return -1;
  }

  if (nread < size) {
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
    tsdbError("vgId:%d block statis part in file %s is corrupted, offset:%" PRId64 " expected bytes:%" PRIzu
              " read bytes: %" PRId64,
H
Hongze Cheng 已提交
386
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size, nread);
H
Hongze Cheng 已提交
387 388 389
    return -1;
  }

S
TD-1207  
Shengliang Guan 已提交
390
  if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pBlkData), (uint32_t)size)) {
H
Hongze Cheng 已提交
391 392
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
    tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%" PRIzu,
H
Hongze Cheng 已提交
393
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size);
H
Hongze Cheng 已提交
394 395
    return -1;
  }
H
Hongze Cheng 已提交
396 397 398 399 400 401
  return 0;
}

int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx) {
  int tlen = 0;

H
Hongze Cheng 已提交
402
  // tlen += taosEncodeVariantI32(buf, pIdx->tid);
H
Hongze Cheng 已提交
403 404 405 406 407 408 409 410 411 412 413 414 415 416 417
  tlen += taosEncodeVariantU32(buf, pIdx->len);
  tlen += taosEncodeVariantU32(buf, pIdx->offset);
  tlen += taosEncodeFixedU8(buf, pIdx->hasLast);
  tlen += taosEncodeVariantU32(buf, pIdx->numOfBlocks);
  tlen += taosEncodeFixedU64(buf, pIdx->uid);
  tlen += taosEncodeFixedU64(buf, pIdx->maxKey);

  return tlen;
}

void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) {
  uint8_t  hasLast = 0;
  uint32_t numOfBlocks = 0;
  uint64_t value = 0;

H
Hongze Cheng 已提交
418
  // if ((buf = taosDecodeVariantI32(buf, &(pIdx->tid))) == NULL) return NULL;
H
Hongze Cheng 已提交
419 420 421 422 423 424 425 426 427 428 429 430 431
  if ((buf = taosDecodeVariantU32(buf, &(pIdx->len))) == NULL) return NULL;
  if ((buf = taosDecodeVariantU32(buf, &(pIdx->offset))) == NULL) return NULL;
  if ((buf = taosDecodeFixedU8(buf, &(hasLast))) == NULL) return NULL;
  pIdx->hasLast = hasLast;
  if ((buf = taosDecodeVariantU32(buf, &(numOfBlocks))) == NULL) return NULL;
  pIdx->numOfBlocks = numOfBlocks;
  if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL;
  pIdx->uid = (int64_t)value;
  if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL;
  pIdx->maxKey = (TSKEY)value;

  return buf;
}
H
Hongze Cheng 已提交
432

433 434
void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols, SBlock *pBlock) {
#ifdef TD_REFACTOR_3
H
Hongze Cheng 已提交
435 436 437 438 439
  SBlockData *pBlockData = pReadh->pBlkData;

  for (int i = 0, j = 0; i < numOfCols;) {
    if (j >= pBlockData->numOfCols) {
      pStatis[i].numOfNull = -1;
440
      ++i;
H
Hongze Cheng 已提交
441 442 443 444 445 446 447 448 449 450
      continue;
    }

    if (pStatis[i].colId == pBlockData->cols[j].colId) {
      pStatis[i].sum = pBlockData->cols[j].sum;
      pStatis[i].max = pBlockData->cols[j].max;
      pStatis[i].min = pBlockData->cols[j].min;
      pStatis[i].maxIndex = pBlockData->cols[j].maxIndex;
      pStatis[i].minIndex = pBlockData->cols[j].minIndex;
      pStatis[i].numOfNull = pBlockData->cols[j].numOfNull;
451 452
      ++i;
      ++j;
H
Hongze Cheng 已提交
453 454
    } else if (pStatis[i].colId < pBlockData->cols[j].colId) {
      pStatis[i].numOfNull = -1;
455
      ++i;
H
Hongze Cheng 已提交
456
    } else {
457
      ++j;
H
Hongze Cheng 已提交
458 459
    }
  }
460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489
#else
  if (pBlock->aggrStat) {
    SAggrBlkData *pAggrBlkData = pReadh->pAggrBlkData;

    for (int i = 0, j = 0; i < numOfCols;) {
      if (j >= pBlock->numOfCols) {
        pStatis[i].numOfNull = -1;
        ++i;
        continue;
      }
      SAggrBlkCol *pAggrBlkCol = ((SAggrBlkCol *)(pAggrBlkData)) + j;
      if (pStatis[i].colId == pAggrBlkCol->colId) {
        pStatis[i].sum = pAggrBlkCol->sum;
        pStatis[i].max = pAggrBlkCol->max;
        pStatis[i].min = pAggrBlkCol->min;
        pStatis[i].maxIndex = pAggrBlkCol->maxIndex;
        pStatis[i].minIndex = pAggrBlkCol->minIndex;
        pStatis[i].numOfNull = pAggrBlkCol->numOfNull;
        ++i;
        ++j;
      } else if (pStatis[i].colId < pAggrBlkCol->colId) {
        pStatis[i].numOfNull = -1;
        ++i;
      } else {
        ++j;
      }
    }
  }

#endif
H
Hongze Cheng 已提交
490 491
}

H
refact  
Hongze Cheng 已提交
492
static void tsdbResetReadTable(SReadH *pReadh) {
H
Hongze Cheng 已提交
493 494 495 496 497
  tdResetDataCols(pReadh->pDCols[0]);
  tdResetDataCols(pReadh->pDCols[1]);
  pReadh->cidx = 0;
  pReadh->pBlkIdx = NULL;
  pReadh->pTable = NULL;
H
refact  
Hongze Cheng 已提交
498 499 500 501
}

static void tsdbResetReadFile(SReadH *pReadh) {
  tsdbResetReadTable(pReadh);
H
Hongze Cheng 已提交
502 503 504 505
  taosArrayClear(pReadh->aBlkIdx);
  tsdbCloseDFileSet(TSDB_READ_FSET(pReadh));
}

H
Hongze Cheng 已提交
506
static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols) {
H
refact  
Hongze Cheng 已提交
507
  ASSERT(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1);
H
Hongze Cheng 已提交
508

H
Hongze Cheng 已提交
509
  SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
H
Hongze Cheng 已提交
510

H
refact  
Hongze Cheng 已提交
511
  tdResetDataCols(pDataCols);
C
Cary Xu 已提交
512 513 514 515 516

  if(tsdbIsSupBlock(pBlock)) {
    tdDataColsSetBitmapI(pDataCols);
  }

H
refact  
Hongze Cheng 已提交
517
  if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadh)), pBlock->len) < 0) return -1;
H
Hongze Cheng 已提交
518

H
refact  
Hongze Cheng 已提交
519
  SBlockData *pBlockData = (SBlockData *)TSDB_READ_BUF(pReadh);
H
Hongze Cheng 已提交
520 521

  if (tsdbSeekDFile(pDFile, pBlock->offset, SEEK_SET) < 0) {
H
refact  
Hongze Cheng 已提交
522
    tsdbError("vgId:%d failed to load block data part while seek file %s to offset %" PRId64 " since %s",
H
Hongze Cheng 已提交
523
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, tstrerror(terrno));
H
Hongze Cheng 已提交
524 525 526
    return -1;
  }

H
refact  
Hongze Cheng 已提交
527
  int64_t nread = tsdbReadDFile(pDFile, TSDB_READ_BUF(pReadh), pBlock->len);
H
Hongze Cheng 已提交
528
  if (nread < 0) {
H
refact  
Hongze Cheng 已提交
529
    tsdbError("vgId:%d failed to load block data part while read file %s since %s, offset:%" PRId64 " len :%d",
H
Hongze Cheng 已提交
530 531
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), (int64_t)pBlock->offset,
              pBlock->len);
H
Hongze Cheng 已提交
532 533 534 535 536
    return -1;
  }

  if (nread < pBlock->len) {
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
H
refact  
Hongze Cheng 已提交
537 538
    tsdbError("vgId:%d block data part in file %s is corrupted, offset:%" PRId64
              " expected bytes:%d read bytes: %" PRId64,
H
Hongze Cheng 已提交
539
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, pBlock->len, nread);
H
Hongze Cheng 已提交
540 541 542
    return -1;
  }

543
  int32_t tsize = (int32_t)tsdbBlockStatisSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer);
H
refact  
Hongze Cheng 已提交
544
  if (!taosCheckChecksumWhole((uint8_t *)TSDB_READ_BUF(pReadh), tsize)) {
H
Hongze Cheng 已提交
545 546
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
    tsdbError("vgId:%d block statis part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%d",
H
Hongze Cheng 已提交
547
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, tsize);
H
Hongze Cheng 已提交
548 549 550
    return -1;
  }

H
refact  
Hongze Cheng 已提交
551
  ASSERT(tsize < pBlock->len);
H
Hongze Cheng 已提交
552 553 554 555 556
  ASSERT(pBlockData->numOfCols == pBlock->numOfCols);

  pDataCols->numOfRows = pBlock->numOfRows;

  // Recover the data
H
Hongze Cheng 已提交
557 558 559
  int        ccol = 0;  // loop iter for SBlockCol object
  int        dcol = 0;  // loop iter for SDataCols object
  int        nBitmaps = (int)TD_BITMAP_BYTES(pBlock->numOfRows);
C
Cary Xu 已提交
560
  SBlockCol *pBlockCol = NULL;
H
Hongze Cheng 已提交
561 562 563 564
  while (dcol < pDataCols->numOfCols) {
    SDataCol *pDataCol = &(pDataCols->cols[dcol]);
    if (dcol != 0 && ccol >= pBlockData->numOfCols) {
      // Set current column as NULL and forward
L
Liu Jicong 已提交
565
      dataColReset(pDataCol);
566
      ++dcol;
H
Hongze Cheng 已提交
567 568 569
      continue;
    }

570
    int16_t  tcolId = PRIMARYKEY_TIMESTAMP_COL_ID;
571 572
    uint32_t toffset = TSDB_KEY_COL_OFFSET;
    int32_t  tlen = pBlock->keyLen;
H
Hongze Cheng 已提交
573 574

    if (dcol != 0) {
C
Cary Xu 已提交
575
      pBlockCol = &(pBlockData->cols[ccol]);
H
Hongze Cheng 已提交
576
      tcolId = pBlockCol->colId;
577
      toffset = tsdbGetBlockColOffset(pBlockCol);
H
Hongze Cheng 已提交
578
      tlen = pBlockCol->len;
C
Cary Xu 已提交
579
      pDataCol->bitmap = pBlockCol->bitmap;
H
Hongze Cheng 已提交
580 581
    } else {
      ASSERT(pDataCol->colId == tcolId);
C
Cary Xu 已提交
582
      TD_SET_COL_ROWS_NORM(pDataCol);
C
Cary Xu 已提交
583 584
    }

585
    // int32_t tBitmaps = 0;
C
Cary Xu 已提交
586
    int32_t tLenBitmap = 0;
C
Cary Xu 已提交
587
    if ((dcol != 0) && !TD_COL_ROWS_NORM(pBlockCol)) {
588 589
      tLenBitmap = nBitmaps;
#if 0
C
Cary Xu 已提交
590 591 592 593 594 595 596
      if (IS_VAR_DATA_TYPE(pDataCol->type)) {
        tBitmaps = nBitmaps;
        tLenBitmap = tBitmaps;
      } else {
        tBitmaps = (int32_t)ceil((double)nBitmaps / TYPE_BYTES[pDataCol->type]);
        tLenBitmap = tBitmaps * TYPE_BYTES[pDataCol->type];
      }
597
#endif
H
Hongze Cheng 已提交
598 599 600 601
    }

    if (tcolId == pDataCol->colId) {
      if (pBlock->algorithm == TWO_STAGE_COMP) {
602
        int zsize = pDataCol->bytes * pBlock->numOfRows + tLenBitmap + 2 * COMP_OVERFLOW_BYTES;
H
refact  
Hongze Cheng 已提交
603
        if (tsdbMakeRoom((void **)(&TSDB_READ_COMP_BUF(pReadh)), zsize) < 0) return -1;
H
Hongze Cheng 已提交
604 605
      }

606 607 608
      if (tsdbCheckAndDecodeColumnData(pDataCol, POINTER_SHIFT(pBlockData, tsize + toffset), tlen, pBlockCol->blen,
                                       pBlock->algorithm, pBlock->numOfRows, tLenBitmap, pDataCols->maxPoints,
                                       TSDB_READ_COMP_BUF(pReadh), (int)taosTSizeof(TSDB_READ_COMP_BUF(pReadh))) < 0) {
609
        tsdbError("vgId:%d file %s is broken at column %d block offset %" PRId64 " column offset %u",
H
Hongze Cheng 已提交
610 611 612
                  TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tcolId, (int64_t)pBlock->offset, toffset);
        return -1;
      }
H
refact  
Hongze Cheng 已提交
613 614 615 616

      if (dcol != 0) {
        ccol++;
      }
H
Hongze Cheng 已提交
617 618 619 620 621
      dcol++;
    } else if (tcolId < pDataCol->colId) {
      ccol++;
    } else {
      // Set current column as NULL and forward
L
Liu Jicong 已提交
622
      dataColReset(pDataCol);
H
Hongze Cheng 已提交
623 624 625 626 627 628 629
      dcol++;
    }
  }

  return 0;
}

630 631
static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int32_t bitmapLen, int8_t comp,
                                        int numOfRows, int numOfBitmaps, int maxPoints, char *buffer, int bufferSize) {
H
Hongze Cheng 已提交
632 633 634 635 636
  if (!taosCheckChecksumWhole((uint8_t *)content, len)) {
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
    return -1;
  }

637
  tdAllocMemForCol(pDataCol, maxPoints);
638

H
Hongze Cheng 已提交
639 640 641
  // Decode the data
  if (comp) {
    // Need to decompress
C
Cary Xu 已提交
642
    int tlen =
643
        (*(tDataTypes[pDataCol->type].decompFunc))(content, len - bitmapLen - sizeof(TSCKSUM), numOfRows,
C
Cary Xu 已提交
644
                                                   pDataCol->pData, pDataCol->spaceSize, comp, buffer, bufferSize);
H
Hongze Cheng 已提交
645
    if (tlen <= 0) {
646 647 648
      tsdbError(
          "Failed to decompress column data, file corrupted, len:%d comp:%d numOfRows:%d maxPoints:%d bufferSize:%d",
          (int32_t)(len - bitmapLen - sizeof(TSCKSUM)), comp, numOfRows, maxPoints, bufferSize);
H
Hongze Cheng 已提交
649 650 651 652
      terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
      return -1;
    }
    pDataCol->len = tlen;
653 654 655 656 657 658 659 660 661 662 663 664 665 666

    if (numOfBitmaps > 0) {
      tlen = tsDecompressTinyint(POINTER_SHIFT(content, len - bitmapLen - sizeof(TSCKSUM)), bitmapLen, numOfBitmaps,
                                 pDataCol->pBitmap, pDataCol->spaceSize, comp, buffer, bufferSize);
      if (tlen <= 0) {
        tsdbError(
            "Failed to decompress column bitmap, file corrupted, len:%d comp:%d numOfRows:%d maxPoints:%d "
            "bufferSize:%d",
            bitmapLen, comp, numOfBitmaps, maxPoints, bufferSize);
        terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
        return -1;
      }
      // pDataCol->blen = tlen;
    }
H
Hongze Cheng 已提交
667 668
  } else {
    // No need to decompress, just memcpy it
669
    pDataCol->len = len - bitmapLen - sizeof(TSCKSUM);
H
Hongze Cheng 已提交
670
    memcpy(pDataCol->pData, content, pDataCol->len);
671 672 673 674
    if (numOfBitmaps > 0) {
      // pDataCol->blen = bitmapLen;
      memcpy(pDataCol->pBitmap, POINTER_SHIFT(content, len - bitmapLen - sizeof(TSCKSUM)), bitmapLen);
    }
H
Hongze Cheng 已提交
675 676
  }

677
#if 0
C
Cary Xu 已提交
678 679 680 681 682 683 684 685 686 687 688 689 690
  if (lenOfBitmaps > 0) {
    pDataCol->len -= lenOfBitmaps;

    void *pSrcBitmap = NULL;
    if (IS_VAR_DATA_TYPE(pDataCol->type)) {
      pSrcBitmap = dataColSetOffset(pDataCol, numOfRows);
    } else {
      pSrcBitmap = POINTER_SHIFT(pDataCol->pData, numOfRows * TYPE_BYTES[pDataCol->type]);
    }
    void *pDestBitmap = POINTER_SHIFT(pDataCol->pData, pDataCol->bytes * maxPoints);
    // restore the bitmap parts
    memcpy(pDestBitmap, pSrcBitmap, lenOfBitmaps);
  } else if (IS_VAR_DATA_TYPE(pDataCol->type)) {
H
Hongze Cheng 已提交
691 692
    dataColSetOffset(pDataCol, numOfRows);
  }
693 694 695 696
#endif
  if (IS_VAR_DATA_TYPE(pDataCol->type)) {
    dataColSetOffset(pDataCol, numOfRows);
  }
H
Hongze Cheng 已提交
697 698 699
  return 0;
}

L
Liu Jicong 已提交
700
static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, const int16_t *colIds,
H
refact  
Hongze Cheng 已提交
701
                                     int numOfColIds) {
H
refact  
Hongze Cheng 已提交
702
  ASSERT(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1);
H
Haojun Liao 已提交
703
  ASSERT(colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID);
H
Hongze Cheng 已提交
704

H
Hongze Cheng 已提交
705
  SDFile   *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
H
Hongze Cheng 已提交
706 707
  SBlockCol blockCol = {0};

H
refact  
Hongze Cheng 已提交
708 709
  tdResetDataCols(pDataCols);

C
Cary Xu 已提交
710 711 712 713
  if(tsdbIsSupBlock(pBlock)) {
    tdDataColsSetBitmapI(pDataCols);
  }

H
Hongze Cheng 已提交
714
  // If only load timestamp column, no need to load SBlockData part
715
  if (numOfColIds > 1 && tsdbLoadBlockOffset(pReadh, pBlock) < 0) return -1;
H
Hongze Cheng 已提交
716 717 718 719 720 721 722

  pDataCols->numOfRows = pBlock->numOfRows;

  int dcol = 0;
  int ccol = 0;
  for (int i = 0; i < numOfColIds; i++) {
    int16_t    colId = colIds[i];
H
Hongze Cheng 已提交
723
    SDataCol  *pDataCol = NULL;
H
Hongze Cheng 已提交
724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743
    SBlockCol *pBlockCol = NULL;

    while (true) {
      if (dcol >= pDataCols->numOfCols) {
        pDataCol = NULL;
        break;
      }
      pDataCol = &pDataCols->cols[dcol];
      if (pDataCol->colId > colId) {
        pDataCol = NULL;
        break;
      } else {
        dcol++;
        if (pDataCol->colId == colId) break;
      }
    }

    if (pDataCol == NULL) continue;
    ASSERT(pDataCol->colId == colId);

H
Haojun Liao 已提交
744
    if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) {  // load the key row
H
Hongze Cheng 已提交
745
      blockCol.colId = colId;
C
Cary Xu 已提交
746
      TD_SET_COL_ROWS_NORM(&blockCol);  // default is NORM for the primary key column
H
Hongze Cheng 已提交
747 748 749 750 751 752 753 754 755 756 757
      blockCol.len = pBlock->keyLen;
      blockCol.type = pDataCol->type;
      blockCol.offset = TSDB_KEY_COL_OFFSET;
      pBlockCol = &blockCol;
    } else {  // load non-key rows
      while (true) {
        if (ccol >= pBlock->numOfCols) {
          pBlockCol = NULL;
          break;
        }

H
refact  
Hongze Cheng 已提交
758
        pBlockCol = &(pReadh->pBlkData->cols[ccol]);
H
Hongze Cheng 已提交
759 760 761 762 763 764 765 766 767 768
        if (pBlockCol->colId > colId) {
          pBlockCol = NULL;
          break;
        } else {
          ccol++;
          if (pBlockCol->colId == colId) break;
        }
      }

      if (pBlockCol == NULL) {
L
Liu Jicong 已提交
769
        dataColReset(pDataCol);
H
Hongze Cheng 已提交
770 771 772 773 774
        continue;
      }

      ASSERT(pBlockCol->colId == pDataCol->colId);
    }
C
Cary Xu 已提交
775 776
    // set the bitmap
    pDataCol->bitmap = pBlockCol->bitmap;
H
Hongze Cheng 已提交
777 778 779 780 781 782 783 784 785 786

    if (tsdbLoadColData(pReadh, pDFile, pBlock, pBlockCol, pDataCol) < 0) return -1;
  }

  return 0;
}

static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol) {
  ASSERT(pDataCol->colId == pBlockCol->colId);

787
  STsdb    *pRepo = TSDB_READ_REPO(pReadh);
H
Hongze Cheng 已提交
788
  STsdbCfg *pCfg = REPO_CFG(pRepo);
C
Cary Xu 已提交
789

790 791
  int nBitmaps = (int)TD_BITMAP_BYTES(pBlock->numOfRows);
  // int32_t tBitmaps = 0;
C
Cary Xu 已提交
792 793
  int32_t tLenBitmap = 0;

C
Cary Xu 已提交
794
  if (!TD_COL_ROWS_NORM(pBlockCol)) {
795 796
    tLenBitmap = nBitmaps;
#if 0
C
Cary Xu 已提交
797 798 799 800 801 802 803
    if (IS_VAR_DATA_TYPE(pDataCol->type)) {
      tBitmaps = nBitmaps;
      tLenBitmap = tBitmaps;
    } else {
      tBitmaps = (int32_t)ceil((double)nBitmaps / TYPE_BYTES[pDataCol->type]);
      tLenBitmap = tBitmaps * TYPE_BYTES[pDataCol->type];
    }
804
#endif
C
Cary Xu 已提交
805 806
  }

807
  int tsize = pDataCol->bytes * pBlock->numOfRows + tLenBitmap + 2 * COMP_OVERFLOW_BYTES;
H
Hongze Cheng 已提交
808

H
Hongze Cheng 已提交
809 810
  if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadh)), pBlockCol->len) < 0) return -1;
  if (tsdbMakeRoom((void **)(&TSDB_READ_COMP_BUF(pReadh)), tsize) < 0) return -1;
H
Hongze Cheng 已提交
811

812 813
  int64_t offset = pBlock->offset + tsdbBlockStatisSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer) +
                   tsdbGetBlockColOffset(pBlockCol);
H
Hongze Cheng 已提交
814 815 816 817 818 819
  if (tsdbSeekDFile(pDFile, offset, SEEK_SET) < 0) {
    tsdbError("vgId:%d failed to load block column data while seek file %s to offset %" PRId64 " since %s",
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), offset, tstrerror(terrno));
    return -1;
  }

H
Hongze Cheng 已提交
820
  int64_t nread = tsdbReadDFile(pDFile, TSDB_READ_BUF(pReadh), pBlockCol->len);
H
Hongze Cheng 已提交
821
  if (nread < 0) {
H
refact  
Hongze Cheng 已提交
822
    tsdbError("vgId:%d failed to load block column data while read file %s since %s, offset:%" PRId64 " len :%d",
H
Hongze Cheng 已提交
823 824 825 826 827 828 829 830 831 832 833 834
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), offset, pBlockCol->len);
    return -1;
  }

  if (nread < pBlockCol->len) {
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
    tsdbError("vgId:%d block column data in file %s is corrupted, offset:%" PRId64 " expected bytes:%d" PRIzu
              " read bytes: %" PRId64,
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), offset, pBlockCol->len, nread);
    return -1;
  }

835 836
  if (tsdbCheckAndDecodeColumnData(pDataCol, pReadh->pBuf, pBlockCol->len, pBlockCol->blen, pBlock->algorithm,
                                   pBlock->numOfRows, tLenBitmap, pCfg->maxRowsPerFileBlock, pReadh->pCBuf,
C
Cary Xu 已提交
837
                                   (int32_t)taosTSizeof(pReadh->pCBuf)) < 0) {
H
Hongze Cheng 已提交
838
    tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile),
H
Hongze Cheng 已提交
839 840 841 842 843
              pBlockCol->colId, offset);
    return -1;
  }

  return 0;
844
}