tsdbReadImpl.c 35.5 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tsdb.h"
H
Hongze Cheng 已提交
17

H
Hongze Cheng 已提交
18 19
#define TSDB_KEY_COL_OFFSET 0

H
refact  
Hongze Cheng 已提交
20 21
static void tsdbResetReadTable(SReadH *pReadh);
static void tsdbResetReadFile(SReadH *pReadh);
22
static int  tsdbLoadBlockOffset(SReadH *pReadh, SBlock *pBlock);
23
static int  tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, int8_t bitmapMode);
24 25
static int  tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int32_t bitmapLen, int8_t comp,
                                         int numOfRows, int numOfBitmaps, int maxPoints, char *buffer, int bufferSize);
C
Cary Xu 已提交
26
static int  tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, const int16_t *colIds,
27
                                      int numOfColIds, int8_t bitmapMode);
H
Hongze Cheng 已提交
28 29
static int  tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol);

H
Hongze Cheng 已提交
30
int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo) {
H
refact  
Hongze Cheng 已提交
31
  ASSERT(pReadh != NULL && pRepo != NULL);
H
Hongze Cheng 已提交
32

H
refact  
Hongze Cheng 已提交
33 34 35
  STsdbCfg *pCfg = REPO_CFG(pRepo);

  memset((void *)pReadh, 0, sizeof(*pReadh));
H
Hongze Cheng 已提交
36 37
  pReadh->pRepo = pRepo;

H
Hongze Cheng 已提交
38
  TSDB_FSET_SET_CLOSED(TSDB_READ_FSET(pReadh));
H
Hongze Cheng 已提交
39

H
refact  
Hongze Cheng 已提交
40 41
  pReadh->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx));
  if (pReadh->aBlkIdx == NULL) {
H
Hongze Cheng 已提交
42 43 44 45
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }

H
refact  
Hongze Cheng 已提交
46
  pReadh->pDCols[0] = tdNewDataCols(0, pCfg->maxRows);
H
Hongze Cheng 已提交
47 48 49 50 51 52
  if (pReadh->pDCols[0] == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    tsdbDestroyReadH(pReadh);
    return -1;
  }

H
refact  
Hongze Cheng 已提交
53
  pReadh->pDCols[1] = tdNewDataCols(0, pCfg->maxRows);
H
Hongze Cheng 已提交
54 55 56 57 58 59
  if (pReadh->pDCols[1] == NULL) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    tsdbDestroyReadH(pReadh);
    return -1;
  }

H
Hongze Cheng 已提交
60 61 62 63
  return 0;
}

void tsdbDestroyReadH(SReadH *pReadh) {
H
Hongze Cheng 已提交
64 65
  if (pReadh == NULL) return;

66
  pReadh->pExBuf = taosTZfree(pReadh->pExBuf);
H
Hongze Cheng 已提交
67 68 69 70
  pReadh->pCBuf = taosTZfree(pReadh->pCBuf);
  pReadh->pBuf = taosTZfree(pReadh->pBuf);
  pReadh->pDCols[0] = tdFreeDataCols(pReadh->pDCols[0]);
  pReadh->pDCols[1] = tdFreeDataCols(pReadh->pDCols[1]);
71
  pReadh->pAggrBlkData = taosTZfree(pReadh->pAggrBlkData);
H
Hongze Cheng 已提交
72
  pReadh->pBlkData = taosTZfree(pReadh->pBlkData);
H
refact  
Hongze Cheng 已提交
73
  pReadh->pBlkInfo = taosTZfree(pReadh->pBlkInfo);
H
Hongze Cheng 已提交
74 75 76 77 78 79
  pReadh->cidx = 0;
  pReadh->pBlkIdx = NULL;
  pReadh->pTable = NULL;
  pReadh->aBlkIdx = taosArrayDestroy(pReadh->aBlkIdx);
  tsdbCloseDFileSet(TSDB_READ_FSET(pReadh));
  pReadh->pRepo = NULL;
H
Hongze Cheng 已提交
80 81 82
}

int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet) {
H
refact  
Hongze Cheng 已提交
83 84
  ASSERT(pSet != NULL);
  tsdbResetReadFile(pReadh);
H
Hongze Cheng 已提交
85 86

  pReadh->rSet = *pSet;
H
Hongze Cheng 已提交
87
  TSDB_FSET_SET_CLOSED(TSDB_READ_FSET(pReadh));
88 89
  // if (tsdbOpenDFileSet(TSDB_READ_FSET(pReadh), O_RDONLY) < 0) {
  if (tsdbOpenDFileSet(TSDB_READ_FSET(pReadh), TD_FILE_READ) < 0) {
S
Shengliang Guan 已提交
90
    tsdbError("vgId:%d, failed to open file set %d since %s", TSDB_READ_REPO_ID(pReadh), TSDB_FSET_FID(pSet),
H
refact  
Hongze Cheng 已提交
91 92 93
              tstrerror(terrno));
    return -1;
  }
H
Hongze Cheng 已提交
94

H
Hongze Cheng 已提交
95 96 97
  return 0;
}

H
refact  
Hongze Cheng 已提交
98
void tsdbCloseAndUnsetFSet(SReadH *pReadh) { tsdbResetReadFile(pReadh); }
H
Hongze Cheng 已提交
99 100

int tsdbLoadBlockIdx(SReadH *pReadh) {
H
Hongze Cheng 已提交
101
  SDFile   *pHeadf = TSDB_READ_HEAD_FILE(pReadh);
H
Hongze Cheng 已提交
102 103
  SBlockIdx blkIdx;

H
Hongze Cheng 已提交
104 105 106 107 108 109
  ASSERT(taosArrayGetSize(pReadh->aBlkIdx) == 0);

  // No data at all, just return
  if (pHeadf->info.offset <= 0) return 0;

  if (tsdbSeekDFile(pHeadf, pHeadf->info.offset, SEEK_SET) < 0) {
S
Shengliang Guan 已提交
110
    tsdbError("vgId:%d, failed to load SBlockIdx part while seek file %s since %s, offset:%u len :%u",
H
Hongze Cheng 已提交
111 112
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno), pHeadf->info.offset,
              pHeadf->info.len);
H
Hongze Cheng 已提交
113 114 115
    return -1;
  }

H
Hongze Cheng 已提交
116
  if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadh)), pHeadf->info.len) < 0) return -1;
H
Hongze Cheng 已提交
117 118

  int64_t nread = tsdbReadDFile(pHeadf, TSDB_READ_BUF(pReadh), pHeadf->info.len);
H
Hongze Cheng 已提交
119
  if (nread < 0) {
S
Shengliang Guan 已提交
120
    tsdbError("vgId:%d, failed to load SBlockIdx part while read file %s since %s, offset:%u len :%u",
H
Hongze Cheng 已提交
121 122
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno), pHeadf->info.offset,
              pHeadf->info.len);
H
Hongze Cheng 已提交
123 124 125
    return -1;
  }

H
Hongze Cheng 已提交
126 127
  if (nread < pHeadf->info.len) {
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
S
Shengliang Guan 已提交
128
    tsdbError("vgId:%d, SBlockIdx part in file %s is corrupted, offset:%u expected bytes:%u read bytes: %" PRId64,
H
Hongze Cheng 已提交
129
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pHeadf->info.offset, pHeadf->info.len, nread);
H
Hongze Cheng 已提交
130 131 132
    return -1;
  }

H
Hongze Cheng 已提交
133 134
  if (!taosCheckChecksumWhole((uint8_t *)TSDB_READ_BUF(pReadh), pHeadf->info.len)) {
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
S
Shengliang Guan 已提交
135
    tsdbError("vgId:%d, SBlockIdx part in file %s is corrupted since wrong checksum, offset:%u len :%u",
H
Hongze Cheng 已提交
136
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pHeadf->info.offset, pHeadf->info.len);
H
Hongze Cheng 已提交
137 138 139 140
    return -1;
  }

  void *ptr = TSDB_READ_BUF(pReadh);
H
Hongze Cheng 已提交
141 142
  int   tsize = 0;
  while (POINTER_DISTANCE(ptr, TSDB_READ_BUF(pReadh)) < (pHeadf->info.len - sizeof(TSCKSUM))) {
H
Hongze Cheng 已提交
143
    ptr = tsdbDecodeSBlockIdx(ptr, &blkIdx);
H
Hongze Cheng 已提交
144
    ASSERT(ptr != NULL);
H
Hongze Cheng 已提交
145

H
Hongze Cheng 已提交
146
    if (taosArrayPush(pReadh->aBlkIdx, (void *)(&blkIdx)) == NULL) {
H
Hongze Cheng 已提交
147 148 149
      terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
      return -1;
    }
H
Hongze Cheng 已提交
150 151

    tsize++;
H
Hongze Cheng 已提交
152 153
    // ASSERT(tsize == 1 || ((SBlockIdx *)taosArrayGet(pReadh->aBlkIdx, tsize - 2))->tid <
    //                          ((SBlockIdx *)taosArrayGet(pReadh->aBlkIdx, tsize - 1))->tid);
H
Hongze Cheng 已提交
154 155 156 157 158 159
  }

  return 0;
}

int tsdbSetReadTable(SReadH *pReadh, STable *pTable) {
160
  STSchema *pSchema = tsdbGetTableSchemaImpl(TSDB_READ_REPO(pReadh), pTable, false, false, -1);
H
Hongze Cheng 已提交
161

H
refact  
Hongze Cheng 已提交
162 163
  pReadh->pTable = pTable;

H
Hongze Cheng 已提交
164 165 166 167 168 169 170 171 172 173 174 175 176 177
  if (tdInitDataCols(pReadh->pDCols[0], pSchema) < 0) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }

  if (tdInitDataCols(pReadh->pDCols[1], pSchema) < 0) {
    terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
    return -1;
  }

  size_t size = taosArrayGetSize(pReadh->aBlkIdx);
  if (size > 0) {
    while (true) {
      if (pReadh->cidx >= size) {
H
refact  
Hongze Cheng 已提交
178
        pReadh->pBlkIdx = NULL;
H
Hongze Cheng 已提交
179 180 181 182
        break;
      }

      SBlockIdx *pBlkIdx = taosArrayGet(pReadh->aBlkIdx, pReadh->cidx);
H
Hongze Cheng 已提交
183
      if (pBlkIdx->uid == TABLE_TID(pTable)) {
H
Hongze Cheng 已提交
184
        if (pBlkIdx->uid == TABLE_UID(pTable)) {
H
refact  
Hongze Cheng 已提交
185
          pReadh->pBlkIdx = pBlkIdx;
H
Hongze Cheng 已提交
186
        } else {
H
refact  
Hongze Cheng 已提交
187
          pReadh->pBlkIdx = NULL;
H
Hongze Cheng 已提交
188 189 190
        }
        pReadh->cidx++;
        break;
H
Hongze Cheng 已提交
191
      } else if (pBlkIdx->uid > TABLE_TID(pTable)) {
H
refact  
Hongze Cheng 已提交
192
        pReadh->pBlkIdx = NULL;
H
Hongze Cheng 已提交
193 194 195 196 197 198
        break;
      } else {
        pReadh->cidx++;
      }
    }
  } else {
H
refact  
Hongze Cheng 已提交
199
    pReadh->pBlkIdx = NULL;
H
Hongze Cheng 已提交
200 201 202 203 204 205
  }

  return 0;
}

int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) {
H
Hongze Cheng 已提交
206 207
  ASSERT(pReadh->pBlkIdx != NULL);

H
Hongze Cheng 已提交
208
  SDFile    *pHeadf = TSDB_READ_HEAD_FILE(pReadh);
H
Hongze Cheng 已提交
209 210 211
  SBlockIdx *pBlkIdx = pReadh->pBlkIdx;

  if (tsdbSeekDFile(pHeadf, pBlkIdx->offset, SEEK_SET) < 0) {
S
Shengliang Guan 已提交
212
    tsdbError("vgId:%d, failed to load SBlockInfo part while seek file %s since %s, offset:%u len:%u",
H
refact  
Hongze Cheng 已提交
213
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno), pBlkIdx->offset, pBlkIdx->len);
H
Hongze Cheng 已提交
214 215 216 217 218 219 220
    return -1;
  }

  if (tsdbMakeRoom((void **)(&(pReadh->pBlkInfo)), pBlkIdx->len) < 0) return -1;

  int64_t nread = tsdbReadDFile(pHeadf, (void *)(pReadh->pBlkInfo), pBlkIdx->len);
  if (nread < 0) {
S
Shengliang Guan 已提交
221
    tsdbError("vgId:%d, failed to load SBlockInfo part while read file %s since %s, offset:%u len :%u",
H
Hongze Cheng 已提交
222 223 224 225 226 227
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno), pBlkIdx->offset, pBlkIdx->len);
    return -1;
  }

  if (nread < pBlkIdx->len) {
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
S
Shengliang Guan 已提交
228
    tsdbError("vgId:%d, SBlockInfo part in file %s is corrupted, offset:%u expected bytes:%u read bytes:%" PRId64,
H
Hongze Cheng 已提交
229 230 231 232 233 234
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pBlkIdx->offset, pBlkIdx->len, nread);
    return -1;
  }

  if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pBlkInfo), pBlkIdx->len)) {
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
S
Shengliang Guan 已提交
235
    tsdbError("vgId:%d, SBlockInfo part in file %s is corrupted since wrong checksum, offset:%u len :%u",
H
Hongze Cheng 已提交
236 237 238 239
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pHeadf), pBlkIdx->offset, pBlkIdx->len);
    return -1;
  }

H
Hongze Cheng 已提交
240
  // ASSERT(pBlkIdx->tid == pReadh->pBlkInfo->tid && pBlkIdx->uid == pReadh->pBlkInfo->uid);
H
Hongze Cheng 已提交
241

H
Hongze Cheng 已提交
242
  if (pTarget) {
H
refact  
Hongze Cheng 已提交
243
    memcpy(pTarget, (void *)(pReadh->pBlkInfo), pBlkIdx->len);
H
Hongze Cheng 已提交
244 245
  }

H
Hongze Cheng 已提交
246 247 248
  return 0;
}

C
Cary Xu 已提交
249 250 251 252 253 254
static FORCE_INLINE void tsdbSwapDataCols(SDataCols *pDest, SDataCols *pSrc) {
  SDataCol *pCols = pDest->cols;
  memcpy(pDest, pSrc, sizeof(SDataCols));
  pSrc->cols = pCols;
}

255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
static void printTsdbLoadBlkData(SReadH *readh, SDataCols *pDCols, SBlock *pBlock, const char *tag, int32_t ln) {
  printf("%s:%d:%" PRIi64 " ================\n", tag, ln, taosGetSelfPthreadId());
  if (pBlock) {
    SDFile *pHeadf = TSDB_READ_HEAD_FILE(readh);
    printf("%s:%d:%" PRIi64 ":%p:%d %s\n", tag, ln, taosGetSelfPthreadId(), pBlock, (int32_t)pBlock->len,
           pHeadf->f.aname);
    SDFile *pDFile = pBlock->last ? TSDB_READ_LAST_FILE(readh) : TSDB_READ_DATA_FILE(readh);
    printf("%s:%d:%" PRIi64 ":%p:%d %s\n", tag, ln, taosGetSelfPthreadId(), pBlock, (int32_t)pBlock->len,
           pDFile->f.aname);
  }
  SDataCol *pDCol = pDCols->cols + 0;
  if (TSKEY_MIN == *(int64_t *)pDCol->pData) {
    ASSERT(0);
  }

  int rows = pDCols->numOfRows;
  for (int r = 0; r < rows; ++r) {
    if (pBlock) {
      printf("%s:%d:%" PRIi64 ":%p:%d rows[%d][%d] ", tag, ln, taosGetSelfPthreadId(), pBlock, (int32_t)pBlock->len,
             rows, r);
    } else {
      printf("%s:%d:%" PRIi64 ":%s rows[%d][%d] ", tag, ln, taosGetSelfPthreadId(), "=== merge === ", rows, r);
    }

    int      nDataCols = pDCols->numOfCols;
    int      j = 0;
    SCellVal sVal = {0};
    while (j < nDataCols) {
      SDataCol *pDataCol = pDCols->cols + j;
      tdGetColDataOfRow(&sVal, pDataCol, r, pDCols->bitmapMode);
      tdSCellValPrint(&sVal, pDataCol->type);
      ++j;
    }
    printf("\n");
  }

  fflush(stdout);
}

H
Hongze Cheng 已提交
294
int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
H
Hongze Cheng 已提交
295
  ASSERT(pBlock->numOfSubBlocks > 0);
H
Hongze Cheng 已提交
296 297
  STsdbCfg *pCfg = REPO_CFG(pReadh->pRepo);
  int8_t    update = pCfg->update;
H
Hongze Cheng 已提交
298

H
Hongze Cheng 已提交
299
  SBlock *iBlock = pBlock;
H
Hongze Cheng 已提交
300
  if (pBlock->numOfSubBlocks > 1) {
H
refact  
Hongze Cheng 已提交
301 302
    if (pBlkInfo) {
      iBlock = (SBlock *)POINTER_SHIFT(pBlkInfo, pBlock->offset);
H
Hongze Cheng 已提交
303
    } else {
H
refact  
Hongze Cheng 已提交
304
      iBlock = (SBlock *)POINTER_SHIFT(pReadh->pBlkInfo, pBlock->offset);
H
Hongze Cheng 已提交
305 306 307
    }
  }

308
  if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[0], TSDB_BITMODE_ONE_BIT) < 0) return -1;
C
Cary Xu 已提交
309
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
310 311
  printTsdbLoadBlkData(pReadh, pReadh->pDCols[0], iBlock, __func__, __LINE__);
#endif
H
Hongze Cheng 已提交
312 313
  for (int i = 1; i < pBlock->numOfSubBlocks; i++) {
    iBlock++;
314
    if (tsdbLoadBlockDataImpl(pReadh, iBlock, pReadh->pDCols[1], TSDB_BITMODE_DEFAULT) < 0) return -1;
C
Cary Xu 已提交
315
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
316 317
    printTsdbLoadBlkData(pReadh, pReadh->pDCols[1], iBlock, __func__, __LINE__);
#endif
C
Cary Xu 已提交
318
    // TODO: use the real maxVersion to replace the UINT64_MAX to support Multi-Version
H
Hongze Cheng 已提交
319
    if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL,
C
Cary Xu 已提交
320
                        TD_SUPPORT_UPDATE(update), TD_VER_MAX) < 0)
H
Hongze Cheng 已提交
321
      return -1;
C
Cary Xu 已提交
322
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
323 324
    printTsdbLoadBlkData(pReadh, pReadh->pDCols[0], iBlock, " === MERGE === ", __LINE__);
#endif
H
Hongze Cheng 已提交
325
  }
C
Cary Xu 已提交
326 327 328 329 330 331 332 333
  // if ((pBlock->numOfSubBlocks == 1) && (iBlock->hasDupKey)) { // TODO: use this line
  if (pBlock->numOfSubBlocks == 1) {
    tdResetDataCols(pReadh->pDCols[1]);
    pReadh->pDCols[1]->bitmapMode = pReadh->pDCols[0]->bitmapMode;
    if (tdMergeDataCols(pReadh->pDCols[1], pReadh->pDCols[0], pReadh->pDCols[0]->numOfRows, NULL,
                        TD_SUPPORT_UPDATE(update), TD_VER_MAX) < 0) {
      return -1;
    }
C
Cary Xu 已提交
334
    tsdbSwapDataCols(pReadh->pDCols[0], pReadh->pDCols[1]);
C
Cary Xu 已提交
335
    ASSERT(pReadh->pDCols[0]->bitmapMode != 0);
C
Cary Xu 已提交
336
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
337 338
    printTsdbLoadBlkData(pReadh, pReadh->pDCols[0], iBlock, " === UPDATE FILTER === ", __LINE__);
#endif
C
Cary Xu 已提交
339
  }
H
Hongze Cheng 已提交
340

C
Cary Xu 已提交
341
  ASSERT(pReadh->pDCols[0]->numOfRows <= pBlock->numOfRows);
H
Hongze Cheng 已提交
342 343
  ASSERT(dataColsKeyFirst(pReadh->pDCols[0]) == pBlock->minKey.ts);
  ASSERT(dataColsKeyLast(pReadh->pDCols[0]) == pBlock->maxKey.ts);
H
Hongze Cheng 已提交
344

H
Hongze Cheng 已提交
345 346 347
  return 0;
}

348 349
static void printTsdbLoadBlkDataCols(SReadH *readh, SDataCols *pDCols, SBlock *pBlock, const int16_t *colIds,
                                     int numOfColsIds, const char *tag, int32_t ln) {
C
Cary Xu 已提交
350
  printf("%s:%d:%" PRIi64 " ================\n", tag, ln, taosGetSelfPthreadId());
351 352
  if (pBlock) {
    SDFile *pHeadf = TSDB_READ_HEAD_FILE(readh);
C
Cary Xu 已提交
353
    printf("%s:%d:%" PRIi64 ":%p:%d %s\n", tag, ln, taosGetSelfPthreadId(), pBlock, (int32_t)pBlock->len,
354 355
           pHeadf->f.aname);
    SDFile *pDFile = pBlock->last ? TSDB_READ_LAST_FILE(readh) : TSDB_READ_DATA_FILE(readh);
C
Cary Xu 已提交
356
    printf("%s:%d:%" PRIi64 ":%p:%d %s\n", tag, ln, taosGetSelfPthreadId(), pBlock, (int32_t)pBlock->len,
357 358 359 360 361 362
           pDFile->f.aname);
  }

  int rows = pDCols->numOfRows;
  for (int r = 0; r < rows; ++r) {
    if (pBlock) {
C
Cary Xu 已提交
363 364
      printf("%s:%d:%" PRIi64 ":%p:%d  rows[%d][%d] ", tag, ln, taosGetSelfPthreadId(), pBlock, (int32_t)pBlock->len,
             rows, r);
365
    } else {
C
Cary Xu 已提交
366
      printf("%s:%d:%" PRIi64 ":%s rows[%d][%d] ", tag, ln, taosGetSelfPthreadId(), "=== merge === ", rows, r);
367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394
    }

    int      nDataCols = pDCols->numOfCols;
    int      j = 0, k = 0;
    SCellVal sVal = {0};
    while (j < nDataCols) {
      if (k >= numOfColsIds) break;
      SDataCol *pDataCol = pDCols->cols + j;
      int16_t   colId1 = pDataCol->colId;
      int16_t   colId2 = *(colIds + k);
      if (colId1 < colId2) {
        ++j;
      } else if (colId1 > colId2) {
        ++k;  // colId2 not exists in SDataCols
        printf("NotExists ");
      } else {
        tdGetColDataOfRow(&sVal, pDataCol, r, pDCols->bitmapMode);
        tdSCellValPrint(&sVal, pDataCol->type);
        ++j;
        ++k;
      }
    }
    printf("\n");
  }

  fflush(stdout);
}

C
Cary Xu 已提交
395
// TODO: filter by Multi-Version
H
refact  
Hongze Cheng 已提交
396 397
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int numOfColsIds,
                          bool mergeBitmap) {
H
Hongze Cheng 已提交
398
  ASSERT(pBlock->numOfSubBlocks > 0);
H
Hongze Cheng 已提交
399
  int8_t update = pReadh->pRepo->pVnode->config.tsdbCfg.update;
H
Hongze Cheng 已提交
400

H
Hongze Cheng 已提交
401
  SBlock *iBlock = pBlock;
H
Hongze Cheng 已提交
402
  if (pBlock->numOfSubBlocks > 1) {
H
refact  
Hongze Cheng 已提交
403 404
    if (pBlkInfo) {
      iBlock = POINTER_SHIFT(pBlkInfo, pBlock->offset);
H
Hongze Cheng 已提交
405 406 407 408 409
    } else {
      iBlock = POINTER_SHIFT(pReadh->pBlkInfo, pBlock->offset);
    }
  }

410 411
  if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[0], colIds, numOfColsIds, TSDB_BITMODE_ONE_BIT) < 0)
    return -1;
C
Cary Xu 已提交
412
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
413 414
  printTsdbLoadBlkDataCols(pReadh, pReadh->pDCols[0], iBlock, colIds, numOfColsIds, __func__, __LINE__);
#endif
H
Hongze Cheng 已提交
415 416
  for (int i = 1; i < pBlock->numOfSubBlocks; i++) {
    iBlock++;
417 418
    if (tsdbLoadBlockDataColsImpl(pReadh, iBlock, pReadh->pDCols[1], colIds, numOfColsIds, TSDB_BITMODE_DEFAULT) < 0)
      return -1;
C
Cary Xu 已提交
419
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
420 421
    printTsdbLoadBlkDataCols(pReadh, pReadh->pDCols[1], iBlock, colIds, numOfColsIds, __func__, __LINE__);
#endif
C
Cary Xu 已提交
422
    // TODO: use the real maxVersion to replace the UINT64_MAX to support Multi-Version
H
Hongze Cheng 已提交
423
    if (tdMergeDataCols(pReadh->pDCols[0], pReadh->pDCols[1], pReadh->pDCols[1]->numOfRows, NULL,
C
Cary Xu 已提交
424 425
                        TD_SUPPORT_UPDATE(update), TD_VER_MAX) < 0)
      return -1;
C
Cary Xu 已提交
426
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
427 428
    printTsdbLoadBlkDataCols(pReadh, pReadh->pDCols[0], NULL, colIds, numOfColsIds, __func__, __LINE__);
#endif
C
Cary Xu 已提交
429 430 431 432 433 434 435
  }
  // if ((pBlock->numOfSubBlocks == 1) && (iBlock->hasDupKey)) { // TODO: use this line
  if (pBlock->numOfSubBlocks == 1) {
    tdResetDataCols(pReadh->pDCols[1]);
    pReadh->pDCols[1]->bitmapMode = pReadh->pDCols[0]->bitmapMode;
    if (tdMergeDataCols(pReadh->pDCols[1], pReadh->pDCols[0], pReadh->pDCols[0]->numOfRows, NULL,
                        TD_SUPPORT_UPDATE(update), TD_VER_MAX) < 0) {
H
Hongze Cheng 已提交
436
      return -1;
C
Cary Xu 已提交
437
    }
C
Cary Xu 已提交
438
    tsdbSwapDataCols(pReadh->pDCols[0], pReadh->pDCols[1]);
C
Cary Xu 已提交
439
    ASSERT(pReadh->pDCols[0]->bitmapMode != 0);
C
Cary Xu 已提交
440
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
441 442 443
    printTsdbLoadBlkDataCols(pReadh, pReadh->pDCols[0], NULL, colIds, numOfColsIds,
                             " === update filter === ", __LINE__);
#endif
H
Hongze Cheng 已提交
444 445
  }

C
Cary Xu 已提交
446 447 448
  if (mergeBitmap && !tdDataColsIsBitmapI(pReadh->pDCols[0])) {
    for (int i = 0; i < numOfColsIds; ++i) {
      SDataCol *pDataCol = pReadh->pDCols[0]->cols + i;
449
      if (pDataCol->len > 0 && pDataCol->bitmap) {
C
Cary Xu 已提交
450
        tdMergeBitmap(pDataCol->pBitmap, pReadh->pDCols[0]->numOfRows, pDataCol->pBitmap);
C
Cary Xu 已提交
451
        tdDataColsSetBitmapI(pReadh->pDCols[0]);
C
Cary Xu 已提交
452 453
      }
    }
C
Cary Xu 已提交
454
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
455 456
    printTsdbLoadBlkDataCols(pReadh, pReadh->pDCols[0], NULL, colIds, numOfColsIds, " === merge bitmap === ", __LINE__);
#endif
C
Cary Xu 已提交
457 458
  }

C
Cary Xu 已提交
459
  ASSERT(pReadh->pDCols[0]->numOfRows <= pBlock->numOfRows);
H
Hongze Cheng 已提交
460 461
  ASSERT(dataColsKeyFirst(pReadh->pDCols[0]) == pBlock->minKey.ts);
  ASSERT(dataColsKeyLast(pReadh->pDCols[0]) == pBlock->maxKey.ts);
H
Hongze Cheng 已提交
462

H
Hongze Cheng 已提交
463 464 465 466
  return 0;
}

int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) {
H
Hongze Cheng 已提交
467 468
  ASSERT(pBlock->numOfSubBlocks <= 1);

469
  if (!pBlock->aggrStat) {
S
Shengliang Guan 已提交
470
    tsdbDebug("vgId:%d, no need to load block statis part for uid %" PRIu64 " since not exist", REPO_ID(pReadh->pRepo),
C
Cary Xu 已提交
471
              TSDB_READ_TABLE_UID(pReadh));
472 473 474 475
    return TSDB_STATIS_NONE;
  }

  SDFile *pDFileAggr = pBlock->last ? TSDB_READ_SMAL_FILE(pReadh) : TSDB_READ_SMAD_FILE(pReadh);
H
Hongze Cheng 已提交
476

477
  if (tsdbSeekDFile(pDFileAggr, pBlock->aggrOffset, SEEK_SET) < 0) {
S
Shengliang Guan 已提交
478
    tsdbError("vgId:%d, failed to load block statis part for uid %" PRIu64 " while seek file %s to offset %" PRIu64
C
Cary Xu 已提交
479 480 481
              " since %s",
              TSDB_READ_REPO_ID(pReadh), TSDB_READ_TABLE_UID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr),
              (uint64_t)pBlock->aggrOffset, tstrerror(terrno));
482 483 484
    return -1;
  }

C
Cary Xu 已提交
485
  size_t sizeAggr = tsdbBlockAggrSize(pBlock->numOfBSma, (uint32_t)pBlock->blkVer);
486 487 488 489
  if (tsdbMakeRoom((void **)(&(pReadh->pAggrBlkData)), sizeAggr) < 0) return -1;

  int64_t nreadAggr = tsdbReadDFile(pDFileAggr, (void *)(pReadh->pAggrBlkData), sizeAggr);
  if (nreadAggr < 0) {
S
Shengliang Guan 已提交
490
    tsdbError("vgId:%d, failed to load block statis part for uid %" PRIu64
C
Cary Xu 已提交
491 492 493
              " while read file %s since %s, offset:%" PRIu64 " len :%" PRIzu,
              TSDB_READ_REPO_ID(pReadh), TSDB_READ_TABLE_UID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr),
              tstrerror(terrno), (uint64_t)pBlock->aggrOffset, sizeAggr);
494 495 496 497 498
    return -1;
  }

  if (nreadAggr < sizeAggr) {
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
S
Shengliang Guan 已提交
499
    tsdbError("vgId:%d, block statis part for uid %" PRIu64 " in file %s is corrupted, offset:%" PRIu64
C
Cary Xu 已提交
500 501 502
              " expected bytes:%" PRIzu " read bytes: %" PRId64,
              TSDB_READ_REPO_ID(pReadh), TSDB_READ_TABLE_UID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr),
              (uint64_t)pBlock->aggrOffset, sizeAggr, nreadAggr);
503 504 505 506 507
    return -1;
  }

  if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pAggrBlkData), (uint32_t)sizeAggr)) {
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
S
Shengliang Guan 已提交
508
    tsdbError("vgId:%d, block statis part for uid %" PRIu64
C
Cary Xu 已提交
509 510 511
              "in file %s is corrupted since wrong checksum, offset:%" PRIu64 " len :%" PRIzu,
              TSDB_READ_REPO_ID(pReadh), TSDB_READ_TABLE_UID(pReadh), TSDB_FILE_FULL_NAME(pDFileAggr),
              (uint64_t)pBlock->aggrOffset, sizeAggr);
512 513 514 515 516 517 518 519
    return -1;
  }
  return 0;
}

static int tsdbLoadBlockOffset(SReadH *pReadh, SBlock *pBlock) {
  ASSERT(pBlock->numOfSubBlocks <= 1);
  SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
H
refact  
Hongze Cheng 已提交
520
  if (tsdbSeekDFile(pDFile, pBlock->offset, SEEK_SET) < 0) {
S
Shengliang Guan 已提交
521
    tsdbError("vgId:%d, failed to load block head part while seek file %s to offset %" PRId64 " since %s",
H
Hongze Cheng 已提交
522
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, tstrerror(terrno));
H
Hongze Cheng 已提交
523 524 525
    return -1;
  }

526
  size_t size = tsdbBlockStatisSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer);
H
refact  
Hongze Cheng 已提交
527
  if (tsdbMakeRoom((void **)(&(pReadh->pBlkData)), size) < 0) return -1;
H
Hongze Cheng 已提交
528

H
refact  
Hongze Cheng 已提交
529
  int64_t nread = tsdbReadDFile(pDFile, (void *)(pReadh->pBlkData), size);
H
Hongze Cheng 已提交
530
  if (nread < 0) {
S
Shengliang Guan 已提交
531
    tsdbError("vgId:%d, failed to load block head part while read file %s since %s, offset:%" PRId64 " len :%" PRIzu,
H
Hongze Cheng 已提交
532
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), (int64_t)pBlock->offset, size);
H
Hongze Cheng 已提交
533 534 535 536 537
    return -1;
  }

  if (nread < size) {
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
S
Shengliang Guan 已提交
538
    tsdbError("vgId:%d, block head part in file %s is corrupted, offset:%" PRId64 " expected bytes:%" PRIzu
H
Hongze Cheng 已提交
539
              " read bytes: %" PRId64,
H
Hongze Cheng 已提交
540
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size, nread);
H
Hongze Cheng 已提交
541 542 543
    return -1;
  }

S
TD-1207  
Shengliang Guan 已提交
544
  if (!taosCheckChecksumWhole((uint8_t *)(pReadh->pBlkData), (uint32_t)size)) {
H
Hongze Cheng 已提交
545
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
S
Shengliang Guan 已提交
546
    tsdbError("vgId:%d, block head part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%" PRIzu,
H
Hongze Cheng 已提交
547
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, size);
H
Hongze Cheng 已提交
548 549
    return -1;
  }
H
Hongze Cheng 已提交
550 551 552 553 554 555
  return 0;
}

int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx) {
  int tlen = 0;

H
Hongze Cheng 已提交
556
  // tlen += taosEncodeVariantI32(buf, pIdx->tid);
H
Hongze Cheng 已提交
557 558 559 560 561
  tlen += taosEncodeVariantU32(buf, pIdx->len);
  tlen += taosEncodeVariantU32(buf, pIdx->offset);
  tlen += taosEncodeFixedU8(buf, pIdx->hasLast);
  tlen += taosEncodeVariantU32(buf, pIdx->numOfBlocks);
  tlen += taosEncodeFixedU64(buf, pIdx->uid);
H
Hongze Cheng 已提交
562
  tlen += taosEncodeFixedU64(buf, pIdx->maxKey.ts);
H
Hongze Cheng 已提交
563 564 565 566 567 568 569 570 571

  return tlen;
}

void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) {
  uint8_t  hasLast = 0;
  uint32_t numOfBlocks = 0;
  uint64_t value = 0;

H
Hongze Cheng 已提交
572
  // if ((buf = taosDecodeVariantI32(buf, &(pIdx->tid))) == NULL) return NULL;
H
Hongze Cheng 已提交
573 574 575 576 577 578 579 580 581
  if ((buf = taosDecodeVariantU32(buf, &(pIdx->len))) == NULL) return NULL;
  if ((buf = taosDecodeVariantU32(buf, &(pIdx->offset))) == NULL) return NULL;
  if ((buf = taosDecodeFixedU8(buf, &(hasLast))) == NULL) return NULL;
  pIdx->hasLast = hasLast;
  if ((buf = taosDecodeVariantU32(buf, &(numOfBlocks))) == NULL) return NULL;
  pIdx->numOfBlocks = numOfBlocks;
  if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL;
  pIdx->uid = (int64_t)value;
  if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL;
H
Hongze Cheng 已提交
582
  pIdx->maxKey.ts = (TSKEY)value;
H
Hongze Cheng 已提交
583 584 585

  return buf;
}
H
Hongze Cheng 已提交
586

587
void tsdbGetBlockStatis(SReadH *pReadh, SColumnDataAgg *pStatis, int numOfCols, SBlock *pBlock) {
588
#ifdef TD_REFACTOR_3
H
Hongze Cheng 已提交
589 590 591 592 593
  SBlockData *pBlockData = pReadh->pBlkData;

  for (int i = 0, j = 0; i < numOfCols;) {
    if (j >= pBlockData->numOfCols) {
      pStatis[i].numOfNull = -1;
594
      ++i;
H
Hongze Cheng 已提交
595 596 597 598 599 600 601 602 603 604
      continue;
    }

    if (pStatis[i].colId == pBlockData->cols[j].colId) {
      pStatis[i].sum = pBlockData->cols[j].sum;
      pStatis[i].max = pBlockData->cols[j].max;
      pStatis[i].min = pBlockData->cols[j].min;
      pStatis[i].maxIndex = pBlockData->cols[j].maxIndex;
      pStatis[i].minIndex = pBlockData->cols[j].minIndex;
      pStatis[i].numOfNull = pBlockData->cols[j].numOfNull;
605 606
      ++i;
      ++j;
H
Hongze Cheng 已提交
607 608
    } else if (pStatis[i].colId < pBlockData->cols[j].colId) {
      pStatis[i].numOfNull = -1;
609
      ++i;
H
Hongze Cheng 已提交
610
    } else {
611
      ++j;
H
Hongze Cheng 已提交
612 613
    }
  }
614 615 616 617 618
#else
  if (pBlock->aggrStat) {
    SAggrBlkData *pAggrBlkData = pReadh->pAggrBlkData;

    for (int i = 0, j = 0; i < numOfCols;) {
C
Cary Xu 已提交
619
      if (j >= pBlock->numOfBSma) {
620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643
        pStatis[i].numOfNull = -1;
        ++i;
        continue;
      }
      SAggrBlkCol *pAggrBlkCol = ((SAggrBlkCol *)(pAggrBlkData)) + j;
      if (pStatis[i].colId == pAggrBlkCol->colId) {
        pStatis[i].sum = pAggrBlkCol->sum;
        pStatis[i].max = pAggrBlkCol->max;
        pStatis[i].min = pAggrBlkCol->min;
        pStatis[i].maxIndex = pAggrBlkCol->maxIndex;
        pStatis[i].minIndex = pAggrBlkCol->minIndex;
        pStatis[i].numOfNull = pAggrBlkCol->numOfNull;
        ++i;
        ++j;
      } else if (pStatis[i].colId < pAggrBlkCol->colId) {
        pStatis[i].numOfNull = -1;
        ++i;
      } else {
        ++j;
      }
    }
  }

#endif
H
Hongze Cheng 已提交
644 645
}

H
refact  
Hongze Cheng 已提交
646
static void tsdbResetReadTable(SReadH *pReadh) {
H
Hongze Cheng 已提交
647 648 649 650 651
  tdResetDataCols(pReadh->pDCols[0]);
  tdResetDataCols(pReadh->pDCols[1]);
  pReadh->cidx = 0;
  pReadh->pBlkIdx = NULL;
  pReadh->pTable = NULL;
H
refact  
Hongze Cheng 已提交
652 653 654 655
}

static void tsdbResetReadFile(SReadH *pReadh) {
  tsdbResetReadTable(pReadh);
H
Hongze Cheng 已提交
656 657 658 659
  taosArrayClear(pReadh->aBlkIdx);
  tsdbCloseDFileSet(TSDB_READ_FSET(pReadh));
}

660
static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, int8_t bitmapMode) {
H
refact  
Hongze Cheng 已提交
661
  ASSERT(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1);
H
Hongze Cheng 已提交
662

H
Hongze Cheng 已提交
663
  SDFile *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
H
Hongze Cheng 已提交
664

H
refact  
Hongze Cheng 已提交
665
  tdResetDataCols(pDataCols);
C
Cary Xu 已提交
666

667
  pDataCols->bitmapMode = bitmapMode;
C
Cary Xu 已提交
668

H
refact  
Hongze Cheng 已提交
669
  if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadh)), pBlock->len) < 0) return -1;
H
Hongze Cheng 已提交
670

H
refact  
Hongze Cheng 已提交
671
  SBlockData *pBlockData = (SBlockData *)TSDB_READ_BUF(pReadh);
H
Hongze Cheng 已提交
672 673

  if (tsdbSeekDFile(pDFile, pBlock->offset, SEEK_SET) < 0) {
S
Shengliang Guan 已提交
674
    tsdbError("vgId:%d, failed to load block data part while seek file %s to offset %" PRId64 " since %s",
H
Hongze Cheng 已提交
675
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, tstrerror(terrno));
H
Hongze Cheng 已提交
676 677 678
    return -1;
  }

H
refact  
Hongze Cheng 已提交
679
  int64_t nread = tsdbReadDFile(pDFile, TSDB_READ_BUF(pReadh), pBlock->len);
H
Hongze Cheng 已提交
680
  if (nread < 0) {
S
Shengliang Guan 已提交
681
    tsdbError("vgId:%d, failed to load block data part while read file %s since %s, offset:%" PRId64 " len :%d",
H
Hongze Cheng 已提交
682 683
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), (int64_t)pBlock->offset,
              pBlock->len);
H
Hongze Cheng 已提交
684 685 686 687 688
    return -1;
  }

  if (nread < pBlock->len) {
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
S
Shengliang Guan 已提交
689
    tsdbError("vgId:%d, block data part in file %s is corrupted, offset:%" PRId64
H
refact  
Hongze Cheng 已提交
690
              " expected bytes:%d read bytes: %" PRId64,
H
Hongze Cheng 已提交
691
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, pBlock->len, nread);
H
Hongze Cheng 已提交
692 693 694
    return -1;
  }

695
  int32_t tsize = (int32_t)tsdbBlockStatisSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer);
H
refact  
Hongze Cheng 已提交
696
  if (!taosCheckChecksumWhole((uint8_t *)TSDB_READ_BUF(pReadh), tsize)) {
H
Hongze Cheng 已提交
697
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
S
Shengliang Guan 已提交
698
    tsdbError("vgId:%d, block head part in file %s is corrupted since wrong checksum, offset:%" PRId64 " len :%d",
H
Hongze Cheng 已提交
699
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), (int64_t)pBlock->offset, tsize);
H
Hongze Cheng 已提交
700 701 702
    return -1;
  }

H
refact  
Hongze Cheng 已提交
703
  ASSERT(tsize < pBlock->len);
H
Hongze Cheng 已提交
704 705 706 707 708
  ASSERT(pBlockData->numOfCols == pBlock->numOfCols);

  pDataCols->numOfRows = pBlock->numOfRows;

  // Recover the data
H
Hongze Cheng 已提交
709 710 711
  int        ccol = 0;  // loop iter for SBlockCol object
  int        dcol = 0;  // loop iter for SDataCols object
  int        nBitmaps = (int)TD_BITMAP_BYTES(pBlock->numOfRows);
C
Cary Xu 已提交
712
  SBlockCol *pBlockCol = NULL;
H
Hongze Cheng 已提交
713 714 715 716
  while (dcol < pDataCols->numOfCols) {
    SDataCol *pDataCol = &(pDataCols->cols[dcol]);
    if (dcol != 0 && ccol >= pBlockData->numOfCols) {
      // Set current column as NULL and forward
L
Liu Jicong 已提交
717
      dataColReset(pDataCol);
718
      ++dcol;
H
Hongze Cheng 已提交
719 720 721
      continue;
    }

722
    int16_t  tcolId = PRIMARYKEY_TIMESTAMP_COL_ID;
723 724
    uint32_t toffset = TSDB_KEY_COL_OFFSET;
    int32_t  tlen = pBlock->keyLen;
H
Hongze Cheng 已提交
725 726

    if (dcol != 0) {
C
Cary Xu 已提交
727
      pBlockCol = &(pBlockData->cols[ccol]);
H
Hongze Cheng 已提交
728
      tcolId = pBlockCol->colId;
H
Hongze Cheng 已提交
729
      toffset = pBlockCol->offset;
H
Hongze Cheng 已提交
730
      tlen = pBlockCol->len;
C
Cary Xu 已提交
731
      pDataCol->bitmap = pBlockCol->blen > 0 ? 1 : 0;
H
Hongze Cheng 已提交
732 733
    } else {
      ASSERT(pDataCol->colId == tcolId);
C
Cary Xu 已提交
734
      TD_SET_COL_ROWS_NORM(pDataCol);
C
Cary Xu 已提交
735 736
    }

737
    // int32_t tBitmaps = 0;
C
Cary Xu 已提交
738
    int32_t tLenBitmap = 0;
C
Cary Xu 已提交
739
    if ((dcol != 0) && (pBlockCol->blen > 0)) {
740
      tLenBitmap = nBitmaps;
H
Hongze Cheng 已提交
741 742 743 744
    }

    if (tcolId == pDataCol->colId) {
      if (pBlock->algorithm == TWO_STAGE_COMP) {
745
        int zsize = pDataCol->bytes * pBlock->numOfRows + tLenBitmap + 2 * COMP_OVERFLOW_BYTES;
H
refact  
Hongze Cheng 已提交
746
        if (tsdbMakeRoom((void **)(&TSDB_READ_COMP_BUF(pReadh)), zsize) < 0) return -1;
H
Hongze Cheng 已提交
747 748
      }

749 750 751 752
      if (tsdbCheckAndDecodeColumnData(pDataCol, POINTER_SHIFT(pBlockData, tsize + toffset), tlen,
                                       pBlockCol ? pBlockCol->blen : 0, pBlock->algorithm, pBlock->numOfRows,
                                       tLenBitmap, pDataCols->maxPoints, TSDB_READ_COMP_BUF(pReadh),
                                       (int)taosTSizeof(TSDB_READ_COMP_BUF(pReadh))) < 0) {
S
Shengliang Guan 已提交
753
        tsdbError("vgId:%d, file %s is broken at column %d block offset %" PRId64 " column offset %u",
H
Hongze Cheng 已提交
754 755 756
                  TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tcolId, (int64_t)pBlock->offset, toffset);
        return -1;
      }
H
refact  
Hongze Cheng 已提交
757 758

      if (dcol != 0) {
C
Cary Xu 已提交
759
        ++ccol;
H
refact  
Hongze Cheng 已提交
760
      }
C
Cary Xu 已提交
761
      ++dcol;
H
Hongze Cheng 已提交
762
    } else if (tcolId < pDataCol->colId) {
C
Cary Xu 已提交
763
      ++ccol;
H
Hongze Cheng 已提交
764 765
    } else {
      // Set current column as NULL and forward
L
Liu Jicong 已提交
766
      dataColReset(pDataCol);
C
Cary Xu 已提交
767
      ++dcol;
H
Hongze Cheng 已提交
768 769 770 771 772 773
    }
  }

  return 0;
}

774 775
static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int32_t bitmapLen, int8_t comp,
                                        int numOfRows, int numOfBitmaps, int maxPoints, char *buffer, int bufferSize) {
H
Hongze Cheng 已提交
776 777 778 779 780
  if (!taosCheckChecksumWhole((uint8_t *)content, len)) {
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
    return -1;
  }

781
  tdAllocMemForCol(pDataCol, maxPoints);
782

H
Hongze Cheng 已提交
783 784 785
  // Decode the data
  if (comp) {
    // Need to decompress
C
Cary Xu 已提交
786
    int tlen =
787
        (*(tDataTypes[pDataCol->type].decompFunc))(content, len - bitmapLen - sizeof(TSCKSUM), numOfRows,
C
Cary Xu 已提交
788
                                                   pDataCol->pData, pDataCol->spaceSize, comp, buffer, bufferSize);
H
Hongze Cheng 已提交
789
    if (tlen <= 0) {
790 791 792
      tsdbError(
          "Failed to decompress column data, file corrupted, len:%d comp:%d numOfRows:%d maxPoints:%d bufferSize:%d",
          (int32_t)(len - bitmapLen - sizeof(TSCKSUM)), comp, numOfRows, maxPoints, bufferSize);
H
Hongze Cheng 已提交
793 794 795 796
      terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
      return -1;
    }
    pDataCol->len = tlen;
797 798 799 800 801 802 803 804 805 806 807 808 809 810

    if (numOfBitmaps > 0) {
      tlen = tsDecompressTinyint(POINTER_SHIFT(content, len - bitmapLen - sizeof(TSCKSUM)), bitmapLen, numOfBitmaps,
                                 pDataCol->pBitmap, pDataCol->spaceSize, comp, buffer, bufferSize);
      if (tlen <= 0) {
        tsdbError(
            "Failed to decompress column bitmap, file corrupted, len:%d comp:%d numOfRows:%d maxPoints:%d "
            "bufferSize:%d",
            bitmapLen, comp, numOfBitmaps, maxPoints, bufferSize);
        terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
        return -1;
      }
      // pDataCol->blen = tlen;
    }
H
Hongze Cheng 已提交
811 812
  } else {
    // No need to decompress, just memcpy it
813
    pDataCol->len = len - bitmapLen - sizeof(TSCKSUM);
H
Hongze Cheng 已提交
814
    memcpy(pDataCol->pData, content, pDataCol->len);
815 816 817 818
    if (numOfBitmaps > 0) {
      // pDataCol->blen = bitmapLen;
      memcpy(pDataCol->pBitmap, POINTER_SHIFT(content, len - bitmapLen - sizeof(TSCKSUM)), bitmapLen);
    }
H
Hongze Cheng 已提交
819 820
  }

821
#if 0
C
Cary Xu 已提交
822 823 824 825 826 827 828 829 830 831 832 833 834
  if (lenOfBitmaps > 0) {
    pDataCol->len -= lenOfBitmaps;

    void *pSrcBitmap = NULL;
    if (IS_VAR_DATA_TYPE(pDataCol->type)) {
      pSrcBitmap = dataColSetOffset(pDataCol, numOfRows);
    } else {
      pSrcBitmap = POINTER_SHIFT(pDataCol->pData, numOfRows * TYPE_BYTES[pDataCol->type]);
    }
    void *pDestBitmap = POINTER_SHIFT(pDataCol->pData, pDataCol->bytes * maxPoints);
    // restore the bitmap parts
    memcpy(pDestBitmap, pSrcBitmap, lenOfBitmaps);
  } else if (IS_VAR_DATA_TYPE(pDataCol->type)) {
H
Hongze Cheng 已提交
835 836
    dataColSetOffset(pDataCol, numOfRows);
  }
837 838 839 840
#endif
  if (IS_VAR_DATA_TYPE(pDataCol->type)) {
    dataColSetOffset(pDataCol, numOfRows);
  }
H
Hongze Cheng 已提交
841 842 843
  return 0;
}

L
Liu Jicong 已提交
844
static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, const int16_t *colIds,
845
                                     int numOfColIds, int8_t bitmapMode) {
H
refact  
Hongze Cheng 已提交
846
  ASSERT(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1);
H
Haojun Liao 已提交
847
  ASSERT(colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID);
H
Hongze Cheng 已提交
848

H
Hongze Cheng 已提交
849
  SDFile   *pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
H
Hongze Cheng 已提交
850 851
  SBlockCol blockCol = {0};

H
refact  
Hongze Cheng 已提交
852 853
  tdResetDataCols(pDataCols);

854
  pDataCols->bitmapMode = bitmapMode;
C
Cary Xu 已提交
855

H
Hongze Cheng 已提交
856
  // If only load timestamp column, no need to load SBlockData part
857
  if (numOfColIds > 1 && tsdbLoadBlockOffset(pReadh, pBlock) < 0) return -1;
H
Hongze Cheng 已提交
858 859 860 861 862 863 864

  pDataCols->numOfRows = pBlock->numOfRows;

  int dcol = 0;
  int ccol = 0;
  for (int i = 0; i < numOfColIds; i++) {
    int16_t    colId = colIds[i];
H
Hongze Cheng 已提交
865
    SDataCol  *pDataCol = NULL;
H
Hongze Cheng 已提交
866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885
    SBlockCol *pBlockCol = NULL;

    while (true) {
      if (dcol >= pDataCols->numOfCols) {
        pDataCol = NULL;
        break;
      }
      pDataCol = &pDataCols->cols[dcol];
      if (pDataCol->colId > colId) {
        pDataCol = NULL;
        break;
      } else {
        dcol++;
        if (pDataCol->colId == colId) break;
      }
    }

    if (pDataCol == NULL) continue;
    ASSERT(pDataCol->colId == colId);

H
Haojun Liao 已提交
886
    if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) {  // load the key row
H
Hongze Cheng 已提交
887
      blockCol.colId = colId;
C
Cary Xu 已提交
888
      blockCol.blen = 0;  // default is NORM for the primary key column
H
Hongze Cheng 已提交
889 890 891 892 893 894 895 896 897 898 899
      blockCol.len = pBlock->keyLen;
      blockCol.type = pDataCol->type;
      blockCol.offset = TSDB_KEY_COL_OFFSET;
      pBlockCol = &blockCol;
    } else {  // load non-key rows
      while (true) {
        if (ccol >= pBlock->numOfCols) {
          pBlockCol = NULL;
          break;
        }

H
refact  
Hongze Cheng 已提交
900
        pBlockCol = &(pReadh->pBlkData->cols[ccol]);
H
Hongze Cheng 已提交
901 902 903 904 905 906 907 908 909 910
        if (pBlockCol->colId > colId) {
          pBlockCol = NULL;
          break;
        } else {
          ccol++;
          if (pBlockCol->colId == colId) break;
        }
      }

      if (pBlockCol == NULL) {
L
Liu Jicong 已提交
911
        dataColReset(pDataCol);
H
Hongze Cheng 已提交
912 913 914 915 916
        continue;
      }

      ASSERT(pBlockCol->colId == pDataCol->colId);
    }
C
Cary Xu 已提交
917
    // set the bitmap
C
Cary Xu 已提交
918
    pDataCol->bitmap = pBlockCol->blen > 0 ? 1 : 0;
H
Hongze Cheng 已提交
919 920 921 922 923 924 925 926 927 928

    if (tsdbLoadColData(pReadh, pDFile, pBlock, pBlockCol, pDataCol) < 0) return -1;
  }

  return 0;
}

static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol) {
  ASSERT(pDataCol->colId == pBlockCol->colId);

929
  STsdb    *pRepo = TSDB_READ_REPO(pReadh);
H
Hongze Cheng 已提交
930
  STsdbCfg *pCfg = REPO_CFG(pRepo);
C
Cary Xu 已提交
931

932 933
  int nBitmaps = (int)TD_BITMAP_BYTES(pBlock->numOfRows);
  // int32_t tBitmaps = 0;
C
Cary Xu 已提交
934 935
  int32_t tLenBitmap = 0;

C
Cary Xu 已提交
936
  if (pBlockCol->blen) {
937
    tLenBitmap = nBitmaps;
C
Cary Xu 已提交
938 939
  }

940
  int tsize = pDataCol->bytes * pBlock->numOfRows + tLenBitmap + 2 * COMP_OVERFLOW_BYTES;
H
Hongze Cheng 已提交
941

H
Hongze Cheng 已提交
942 943
  if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadh)), pBlockCol->len) < 0) return -1;
  if (tsdbMakeRoom((void **)(&TSDB_READ_COMP_BUF(pReadh)), tsize) < 0) return -1;
H
Hongze Cheng 已提交
944

H
Hongze Cheng 已提交
945 946
  int64_t offset =
      pBlock->offset + tsdbBlockStatisSize(pBlock->numOfCols, (uint32_t)pBlock->blkVer) + pBlockCol->offset;
H
Hongze Cheng 已提交
947
  if (tsdbSeekDFile(pDFile, offset, SEEK_SET) < 0) {
S
Shengliang Guan 已提交
948
    tsdbError("vgId:%d, failed to load block column data while seek file %s to offset %" PRId64 " since %s",
H
Hongze Cheng 已提交
949 950 951 952
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), offset, tstrerror(terrno));
    return -1;
  }

H
Hongze Cheng 已提交
953
  int64_t nread = tsdbReadDFile(pDFile, TSDB_READ_BUF(pReadh), pBlockCol->len);
H
Hongze Cheng 已提交
954
  if (nread < 0) {
S
Shengliang Guan 已提交
955
    tsdbError("vgId:%d, failed to load block column data while read file %s since %s, offset:%" PRId64 " len :%d",
H
Hongze Cheng 已提交
956 957 958 959 960 961
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tstrerror(terrno), offset, pBlockCol->len);
    return -1;
  }

  if (nread < pBlockCol->len) {
    terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
S
Shengliang Guan 已提交
962
    tsdbError("vgId:%d, block column data in file %s is corrupted, offset:%" PRId64 " expected bytes:%d" PRIzu
H
Hongze Cheng 已提交
963 964 965 966 967
              " read bytes: %" PRId64,
              TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), offset, pBlockCol->len, nread);
    return -1;
  }

968
  if (tsdbCheckAndDecodeColumnData(pDataCol, pReadh->pBuf, pBlockCol->len, pBlockCol->blen, pBlock->algorithm,
H
refact  
Hongze Cheng 已提交
969
                                   pBlock->numOfRows, tLenBitmap, pCfg->maxRows, pReadh->pCBuf,
C
Cary Xu 已提交
970
                                   (int32_t)taosTSizeof(pReadh->pCBuf)) < 0) {
S
Shengliang Guan 已提交
971
    tsdbError("vgId:%d, file %s is broken at column %d offset %" PRId64, REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile),
H
Hongze Cheng 已提交
972 973 974 975 976
              pBlockCol->colId, offset);
    return -1;
  }

  return 0;
977
}