tdatablock.c 86.6 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
H
Haojun Liao 已提交
17
#include "tdatablock.h"
S
compare  
Shengliang Guan 已提交
18
#include "tcompare.h"
19
#include "tlog.h"
20
#include "tname.h"
21

22
#define MALLOC_ALIGN_BYTES 32
23

24
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
H
Haojun Liao 已提交
25
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
26 27 28 29 30 31 32 33 34 35 36 37 38 39
    if (pColumnInfoData->reassigned) {
      int32_t totalSize = 0;
      for (int32_t row = 0; row < numOfRows; ++row) {
        char* pColData = pColumnInfoData->pData + pColumnInfoData->varmeta.offset[row];
        int32_t colSize = 0;
        if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON) {
          colSize = getJsonValueLen(pColData);
        } else {
          colSize = varDataTLen(pColData);
        }
        totalSize += colSize;
      }
      return totalSize;
    }
H
Haojun Liao 已提交
40 41
    return pColumnInfoData->varmeta.length;
  } else {
42 43 44 45 46
    if (pColumnInfoData->info.type == TSDB_DATA_TYPE_NULL) {
      return 0;
    } else {
      return pColumnInfoData->info.bytes * numOfRows;
    }
H
Haojun Liao 已提交
47 48 49
  }
}

50 51 52 53 54 55 56 57 58 59 60

int32_t colDataGetRowLength(const SColumnInfoData* pColumnInfoData, int32_t rowIdx) {
  if (colDataIsNull_s(pColumnInfoData, rowIdx)) return 0;

  if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) return pColumnInfoData->info.bytes;
  if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON)
    return getJsonValueLen(colDataGetData(pColumnInfoData, rowIdx));
  else
    return varDataTLen(colDataGetData(pColumnInfoData, rowIdx));
}

H
Haojun Liao 已提交
61 62 63 64
int32_t colDataGetFullLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
    return pColumnInfoData->varmeta.length + sizeof(int32_t) * numOfRows;
  } else {
65 66
    return ((pColumnInfoData->info.type == TSDB_DATA_TYPE_NULL) ? 0 : pColumnInfoData->info.bytes * numOfRows) +
           BitmapLen(numOfRows);
H
Haojun Liao 已提交
67 68 69
  }
}

H
Haojun Liao 已提交
70 71 72 73
void colDataTrim(SColumnInfoData* pColumnInfoData) {
  // TODO
}

74
int32_t getJsonValueLen(const char* data) {
wmmhello's avatar
wmmhello 已提交
75 76 77 78 79 80 81 82 83
  int32_t dataLen = 0;
  if (*data == TSDB_DATA_TYPE_NULL) {
    dataLen = CHAR_BYTES;
  } else if (*data == TSDB_DATA_TYPE_NCHAR) {
    dataLen = varDataTLen(data + CHAR_BYTES) + CHAR_BYTES;
  } else if (*data == TSDB_DATA_TYPE_DOUBLE) {
    dataLen = DOUBLE_BYTES + CHAR_BYTES;
  } else if (*data == TSDB_DATA_TYPE_BOOL) {
    dataLen = CHAR_BYTES + CHAR_BYTES;
wmmhello's avatar
wmmhello 已提交
84
  } else if (tTagIsJson(data)) {  // json string
wmmhello's avatar
wmmhello 已提交
85 86 87 88 89 90 91
    dataLen = ((STag*)(data))->len;
  } else {
    ASSERT(0);
  }
  return dataLen;
}

92 93 94 95
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull) {
  return colDataSetVal(pColumnInfoData, rowIndex, pData, isNull);
}

96
int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull) {
97
  if (isNull) {
H
Haojun Liao 已提交
98 99
    // There is a placehold for each NULL value of binary or nchar type.
    if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
100
      pColumnInfoData->varmeta.offset[rowIndex] = -1;  // it is a null value of VAR type.
H
Haojun Liao 已提交
101
    } else {
102
      colDataSetNull_f_s(pColumnInfoData, rowIndex);
H
Haojun Liao 已提交
103 104
    }

H
Haojun Liao 已提交
105
    pColumnInfoData->hasNull = true;
106 107 108 109 110
    return 0;
  }

  int32_t type = pColumnInfoData->info.type;
  if (IS_VAR_DATA_TYPE(type)) {
wmmhello's avatar
wmmhello 已提交
111
    int32_t dataLen = 0;
112
    if (type == TSDB_DATA_TYPE_JSON) {
wmmhello's avatar
wmmhello 已提交
113
      dataLen = getJsonValueLen(pData);
114
    } else {
wmmhello's avatar
wmmhello 已提交
115
      dataLen = varDataTLen(pData);
116
    }
117

H
Haojun Liao 已提交
118
    SVarColAttr* pAttr = &pColumnInfoData->varmeta;
119
    if (pAttr->allocLen < pAttr->length + dataLen) {
H
Haojun Liao 已提交
120
      uint32_t newSize = pAttr->allocLen;
121
      if (newSize <= 1) {
H
Haojun Liao 已提交
122 123 124
        newSize = 8;
      }

125
      while (newSize < pAttr->length + dataLen) {
H
Haojun Liao 已提交
126
        newSize = newSize * 1.5;
127 128 129
        if (newSize > UINT32_MAX) {
          return TSDB_CODE_OUT_OF_MEMORY;
        }
H
Haojun Liao 已提交
130 131
      }

wafwerar's avatar
wafwerar 已提交
132
      char* buf = taosMemoryRealloc(pColumnInfoData->pData, newSize);
H
Haojun Liao 已提交
133
      if (buf == NULL) {
H
Haojun Liao 已提交
134
        return TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
135 136 137 138 139 140 141
      }

      pColumnInfoData->pData = buf;
      pAttr->allocLen = newSize;
    }

    uint32_t len = pColumnInfoData->varmeta.length;
142
    pColumnInfoData->varmeta.offset[rowIndex] = len;
H
Haojun Liao 已提交
143

H
Haojun Liao 已提交
144
    memmove(pColumnInfoData->pData + len, pData, dataLen);
145
    pColumnInfoData->varmeta.length += dataLen;
146
  } else {
147
    memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex, pData, pColumnInfoData->info.bytes);
148
    colDataClearNull_f(pColumnInfoData->nullbitmap, rowIndex);
149 150 151 152 153
  }

  return 0;
}

D
dapan1121 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166
int32_t colDataReassignVal(SColumnInfoData* pColumnInfoData, uint32_t dstRowIdx, uint32_t srcRowIdx, const char* pData) {
  int32_t type = pColumnInfoData->info.type;
  if (IS_VAR_DATA_TYPE(type)) {
    int32_t dataLen = 0;
    if (type == TSDB_DATA_TYPE_JSON) {
      dataLen = getJsonValueLen(pData);
    } else {
      dataLen = varDataTLen(pData);
    }

    SVarColAttr* pAttr = &pColumnInfoData->varmeta;

    pColumnInfoData->varmeta.offset[dstRowIdx] = pColumnInfoData->varmeta.offset[srcRowIdx];
167
    pColumnInfoData->reassigned = true;
D
dapan1121 已提交
168 169 170 171 172 173 174 175 176
  } else {
    memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * dstRowIdx, pData, pColumnInfoData->info.bytes);
    colDataClearNull_f(pColumnInfoData->nullbitmap, dstRowIdx);
  }

  return 0;
}


177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
int32_t colDataReserve(SColumnInfoData* pColumnInfoData, size_t newSize) {
  if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
    return TSDB_CODE_SUCCESS;
  }

  if (pColumnInfoData->varmeta.allocLen >= newSize) {
    return TSDB_CODE_SUCCESS;
  }

  if (pColumnInfoData->varmeta.allocLen < newSize) {
    char* buf = taosMemoryRealloc(pColumnInfoData->pData, newSize);
    if (buf == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    pColumnInfoData->pData = buf;
    pColumnInfoData->varmeta.allocLen = newSize;
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
199 200 201 202 203 204 205 206 207 208 209
static int32_t doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t currentRow, const char* pData,
                         int32_t itemLen, int32_t numOfRows, bool trimValue) {
  if (pColumnInfoData->info.bytes < itemLen) {
    uWarn("column/tag actual data len %d is bigger than schema len %d, trim it:%d", itemLen, pColumnInfoData->info.bytes, trimValue);
    if (trimValue) {
      itemLen = pColumnInfoData->info.bytes;
    } else {
      return TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
    }
  }
  
H
Haojun Liao 已提交
210 211 212 213 214 215
  size_t start = 1;

  // the first item
  memcpy(pColumnInfoData->pData, pData, itemLen);

  int32_t t = 0;
L
Liu Jicong 已提交
216 217
  int32_t count = log(numOfRows) / log(2);
  while (t < count) {
H
Haojun Liao 已提交
218
    int32_t xlen = 1 << t;
L
Liu Jicong 已提交
219 220
    memcpy(pColumnInfoData->pData + start * itemLen + pColumnInfoData->varmeta.length, pColumnInfoData->pData,
           xlen * itemLen);
H
Haojun Liao 已提交
221 222 223 224 225 226
    t += 1;
    start += xlen;
  }

  // the tail part
  if (numOfRows > start) {
L
Liu Jicong 已提交
227 228
    memcpy(pColumnInfoData->pData + start * itemLen + currentRow * itemLen, pColumnInfoData->pData,
           (numOfRows - start) * itemLen);
H
Haojun Liao 已提交
229 230 231
  }

  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
L
Liu Jicong 已提交
232
    for (int32_t i = 0; i < numOfRows; ++i) {
H
Haojun Liao 已提交
233 234 235 236 237
      pColumnInfoData->varmeta.offset[i + currentRow] = pColumnInfoData->varmeta.length + i * itemLen;
    }

    pColumnInfoData->varmeta.length += numOfRows * itemLen;
  }
D
dapan1121 已提交
238 239

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
240 241
}

242
int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData,
D
dapan1121 已提交
243
                            uint32_t numOfRows, bool trimValue) {
H
Haojun Liao 已提交
244 245 246 247 248 249 250 251 252 253 254
  int32_t len = pColumnInfoData->info.bytes;
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
    len = varDataTLen(pData);
    if (pColumnInfoData->varmeta.allocLen < (numOfRows + currentRow) * len) {
      int32_t code = colDataReserve(pColumnInfoData, (numOfRows + currentRow) * len);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
    }
  }

D
dapan1121 已提交
255
  return doCopyNItems(pColumnInfoData, currentRow, pData, len, numOfRows, trimValue);
H
Haojun Liao 已提交
256 257
}

L
Liu Jicong 已提交
258 259
static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, const SColumnInfoData* pSource,
                          int32_t numOfRow2) {
wmmhello's avatar
wmmhello 已提交
260 261
  if (numOfRow2 <= 0) return;

H
Haojun Liao 已提交
262 263 264 265 266 267 268
  uint32_t total = numOfRow1 + numOfRow2;

  uint32_t remindBits = BitPos(numOfRow1);
  uint32_t shiftBits = 8 - remindBits;

  if (remindBits == 0) {  // no need to shift bits of bitmap
    memcpy(pColumnInfoData->nullbitmap + BitmapLen(numOfRow1), pSource->nullbitmap, BitmapLen(numOfRow2));
wmmhello's avatar
wmmhello 已提交
269 270
    return;
  }
H
Haojun Liao 已提交
271

wmmhello's avatar
wmmhello 已提交
272
  uint8_t* p = (uint8_t*)pSource->nullbitmap;
273
  pColumnInfoData->nullbitmap[BitmapLen(numOfRow1) - 1] &= (0B11111111 << shiftBits);  // clear remind bits
wmmhello's avatar
wmmhello 已提交
274
  pColumnInfoData->nullbitmap[BitmapLen(numOfRow1) - 1] |= (p[0] >> remindBits);  // copy remind bits
H
Haojun Liao 已提交
275

wmmhello's avatar
wmmhello 已提交
276 277 278
  if (BitmapLen(numOfRow1) == BitmapLen(total)) {
    return;
  }
H
Haojun Liao 已提交
279

wmmhello's avatar
wmmhello 已提交
280 281
  int32_t len = BitmapLen(numOfRow2);
  int32_t i = 0;
H
Haojun Liao 已提交
282

wmmhello's avatar
wmmhello 已提交
283
  uint8_t* start = (uint8_t*)&pColumnInfoData->nullbitmap[BitmapLen(numOfRow1)];
C
Cary Xu 已提交
284
  int32_t  overCount = BitmapLen(total) - BitmapLen(numOfRow1);
285
  memset(start, 0, overCount);
C
Cary Xu 已提交
286
  while (i < len) {  // size limit of pSource->nullbitmap
wmmhello's avatar
wmmhello 已提交
287
    if (i >= 1) {
C
Cary Xu 已提交
288
      start[i - 1] |= (p[i] >> remindBits);  // copy remind bits
H
Haojun Liao 已提交
289
    }
wmmhello's avatar
wmmhello 已提交
290

C
Cary Xu 已提交
291
    if (i >= overCount) {  // size limit of pColumnInfoData->nullbitmap
wmmhello's avatar
wmmhello 已提交
292 293 294
      return;
    }

C
Cary Xu 已提交
295
    start[i] |= (p[i] << shiftBits);  // copy shift bits
wmmhello's avatar
wmmhello 已提交
296
    i += 1;
H
Haojun Liao 已提交
297 298 299
  }
}

H
Haojun Liao 已提交
300 301
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity,
                        const SColumnInfoData* pSource, int32_t numOfRow2) {
G
Ganlin Zhao 已提交
302 303 304 305
  if (pColumnInfoData->info.type != pSource->info.type) {
    return TSDB_CODE_FAILED;
  }

H
Haojun Liao 已提交
306 307 308 309
  if (numOfRow2 == 0) {
    return numOfRow1;
  }

wmmhello's avatar
wmmhello 已提交
310 311 312
  if (pSource->hasNull) {
    pColumnInfoData->hasNull = pSource->hasNull;
  }
313

314
  uint32_t finalNumOfRows = numOfRow1 + numOfRow2;
H
Haojun Liao 已提交
315 316
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
    // Handle the bitmap
317
    if (finalNumOfRows > (*capacity)) {
318 319 320 321 322 323 324
      char* p = taosMemoryRealloc(pColumnInfoData->varmeta.offset, sizeof(int32_t) * (numOfRow1 + numOfRow2));
      if (p == NULL) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }

      *capacity = finalNumOfRows;
      pColumnInfoData->varmeta.offset = (int32_t*)p;
H
Haojun Liao 已提交
325 326
    }

L
Liu Jicong 已提交
327
    for (int32_t i = 0; i < numOfRow2; ++i) {
328 329 330 331 332
      if (pSource->varmeta.offset[i] == -1) {
        pColumnInfoData->varmeta.offset[i + numOfRow1] = -1;
      } else {
        pColumnInfoData->varmeta.offset[i + numOfRow1] = pSource->varmeta.offset[i] + pColumnInfoData->varmeta.length;
      }
333
    }
H
Haojun Liao 已提交
334

335
    // copy data
H
Haojun Liao 已提交
336 337 338
    uint32_t len = pSource->varmeta.length;
    uint32_t oldLen = pColumnInfoData->varmeta.length;
    if (pColumnInfoData->varmeta.allocLen < len + oldLen) {
wafwerar's avatar
wafwerar 已提交
339
      char* tmp = taosMemoryRealloc(pColumnInfoData->pData, len + oldLen);
H
Haojun Liao 已提交
340
      if (tmp == NULL) {
341
        return TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
342 343 344 345 346 347
      }

      pColumnInfoData->pData = tmp;
      pColumnInfoData->varmeta.allocLen = len + oldLen;
    }

348
    if (pColumnInfoData->pData && pSource->pData) {  // TD-20382
349 350
      memcpy(pColumnInfoData->pData + oldLen, pSource->pData, len);
    }
351
    pColumnInfoData->varmeta.length = len + oldLen;
H
Haojun Liao 已提交
352
  } else {
353
    if (finalNumOfRows > (*capacity)) {
H
Haojun Liao 已提交
354
      // all data may be null, when the pColumnInfoData->info.type == 0, bytes == 0;
355
      char* tmp = taosMemoryRealloc(pColumnInfoData->pData, finalNumOfRows * pColumnInfoData->info.bytes);
356
      if (tmp == NULL) {
357
        return TSDB_CODE_OUT_OF_MEMORY;
358
      }
H
Haojun Liao 已提交
359

360 361 362
      pColumnInfoData->pData = tmp;
      if (BitmapLen(numOfRow1) < BitmapLen(finalNumOfRows)) {
        char*    btmp = taosMemoryRealloc(pColumnInfoData->nullbitmap, BitmapLen(finalNumOfRows));
363 364 365
        if (btmp == NULL) {
          return TSDB_CODE_OUT_OF_MEMORY;
        }
366 367 368 369 370 371
        uint32_t extend = BitmapLen(finalNumOfRows) - BitmapLen(numOfRow1);
        memset(btmp + BitmapLen(numOfRow1), 0, extend);
        pColumnInfoData->nullbitmap = btmp;
      }

      *capacity = finalNumOfRows;
H
Haojun Liao 已提交
372 373
    }

374 375
    doBitmapMerge(pColumnInfoData, numOfRow1, pSource, numOfRow2);

X
Xiaoyu Wang 已提交
376 377 378 379
    if (pSource->pData) {
      int32_t offset = pColumnInfoData->info.bytes * numOfRow1;
      memcpy(pColumnInfoData->pData + offset, pSource->pData, pSource->info.bytes * numOfRow2);
    }
H
Haojun Liao 已提交
380 381 382 383 384
  }

  return numOfRow1 + numOfRow2;
}

L
Liu Jicong 已提交
385 386
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows,
                      const SDataBlockInfo* pBlockInfo) {
X
Xiaoyu Wang 已提交
387
  if (pColumnInfoData->info.type != pSource->info.type || (pBlockInfo != NULL && pBlockInfo->capacity < numOfRows)) {
G
Ganlin Zhao 已提交
388
    return TSDB_CODE_FAILED;
389 390
  }

G
Ganlin Zhao 已提交
391 392
  if (numOfRows <= 0) {
    return numOfRows;
393
  }
394

395 396
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
    memcpy(pColumnInfoData->varmeta.offset, pSource->varmeta.offset, sizeof(int32_t) * numOfRows);
397 398 399 400 401 402 403 404 405
    if (pColumnInfoData->varmeta.allocLen < pSource->varmeta.length) {
      char* tmp = taosMemoryRealloc(pColumnInfoData->pData, pSource->varmeta.length);
      if (tmp == NULL) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }

      pColumnInfoData->pData = tmp;
      pColumnInfoData->varmeta.allocLen = pSource->varmeta.length;
    }
406 407

    pColumnInfoData->varmeta.length = pSource->varmeta.length;
408 409 410
    if (pColumnInfoData->pData != NULL && pSource->pData != NULL) {
      memcpy(pColumnInfoData->pData, pSource->pData, pSource->varmeta.length);
    }
411 412
  } else {
    memcpy(pColumnInfoData->nullbitmap, pSource->nullbitmap, BitmapLen(numOfRows));
413
    if (pSource->pData != NULL) {
D
dapan1121 已提交
414 415
      memcpy(pColumnInfoData->pData, pSource->pData, pSource->info.bytes * numOfRows);
    }
416 417
  }

418 419
  pColumnInfoData->hasNull = pSource->hasNull;
  pColumnInfoData->info = pSource->info;
420 421 422
  return 0;
}

L
Liu Jicong 已提交
423
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) { return taosArrayGetSize(pBlock->pDataBlock); }
424

L
Liu Jicong 已提交
425
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; }
426

427
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex) {
428
  if (pDataBlock == NULL || pDataBlock->info.rows <= 0 || pDataBlock->info.dataLoad == 0) {
429 430 431
    return 0;
  }

G
Ganlin Zhao 已提交
432 433 434 435
  if (pDataBlock->info.rows > 0) {
    //    ASSERT(pDataBlock->info.dataLoad == 1);
  }

436 437
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
  if (numOfCols <= 0) {
438 439 440
    return -1;
  }

441 442
  int32_t index = (tsColumnIndex == -1) ? 0 : tsColumnIndex;

443
  SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, index);
444 445 446 447
  if (pColInfoData->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
    return 0;
  }

448 449 450 451 452 453
  TSKEY skey = *(TSKEY*)colDataGetData(pColInfoData, 0);
  TSKEY ekey = *(TSKEY*)colDataGetData(pColInfoData, (pDataBlock->info.rows - 1));

  pDataBlock->info.window.skey = TMIN(skey, ekey);
  pDataBlock->info.window.ekey = TMAX(skey, ekey);

454 455 456
  return 0;
}

457
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) {
458 459
  int32_t capacity = pDest->info.capacity;

460 461
  size_t numOfCols = taosArrayGetSize(pDest->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
462
    SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
463
    SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i);
H
Haojun Liao 已提交
464

465 466
    capacity = pDest->info.capacity;
    colDataMergeCol(pCol2, pDest->info.rows, &capacity, pCol1, pSrc->info.rows);
H
Haojun Liao 已提交
467 468
  }

469
  pDest->info.capacity = capacity;
H
Haojun Liao 已提交
470 471 472 473 474
  pDest->info.rows += pSrc->info.rows;
  return TSDB_CODE_SUCCESS;
}

size_t blockDataGetSize(const SSDataBlock* pBlock) {
475 476
  size_t total = 0;
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
L
Liu Jicong 已提交
477
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
478
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
H
Haojun Liao 已提交
479
    total += colDataGetFullLength(pColInfoData, pBlock->info.rows);
H
Haojun Liao 已提交
480 481 482 483 484 485 486
  }

  return total;
}

// the number of tuples can be fit in one page.
// Actual data rows pluses the corresponding meta data must fit in one memory buffer of the given page size.
L
Liu Jicong 已提交
487 488
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex,
                           int32_t pageSize) {
L
Liu Jicong 已提交
489
  size_t  numOfCols = taosArrayGetSize(pBlock->pDataBlock);
H
Haojun Liao 已提交
490 491
  int32_t numOfRows = pBlock->info.rows;

H
Haojun Liao 已提交
492 493
  int32_t bitmapChar = 1;

L
Liu Jicong 已提交
494
  size_t headerSize = sizeof(int32_t);
495
  size_t colHeaderSize = sizeof(int32_t) * numOfCols;
496

H
Haojun Liao 已提交
497
  // TODO speedup by checking if the whole page can fit in firstly.
H
Haojun Liao 已提交
498
  if (!hasVarCol) {
L
Liu Jicong 已提交
499
    size_t  rowSize = blockDataGetRowSize(pBlock);
G
Ganlin Zhao 已提交
500
    int32_t capacity = blockDataGetCapacityInRow(pBlock, pageSize, headerSize + colHeaderSize);
G
Ganlin Zhao 已提交
501 502 503
    if (capacity <= 0) {
      return TSDB_CODE_FAILED;
    }
504

505
    *stopIndex = startIndex + capacity - 1;
H
Haojun Liao 已提交
506 507 508
    if (*stopIndex >= numOfRows) {
      *stopIndex = numOfRows - 1;
    }
509

H
Haojun Liao 已提交
510
    return TSDB_CODE_SUCCESS;
511
  }
wmmhello's avatar
wmmhello 已提交
512 513 514 515 516 517
  // iterate the rows that can be fit in this buffer page
  int32_t size = (headerSize + colHeaderSize);
  for (int32_t j = startIndex; j < numOfRows; ++j) {
    for (int32_t i = 0; i < numOfCols; ++i) {
      SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, i);
      if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
518
        if (pColInfoData->varmeta.offset[j] != -1) {
wmmhello's avatar
wmmhello 已提交
519 520
          char* p = colDataGetData(pColInfoData, j);
          size += varDataTLen(p);
H
Haojun Liao 已提交
521 522
        }

wmmhello's avatar
wmmhello 已提交
523
        size += sizeof(pColInfoData->varmeta.offset[0]);
524
      } else {
wmmhello's avatar
wmmhello 已提交
525
        size += pColInfoData->info.bytes;
H
Haojun Liao 已提交
526

wmmhello's avatar
wmmhello 已提交
527 528 529
        if (((j - startIndex) & 0x07) == 0) {
          size += 1;  // the space for null bitmap
        }
H
Haojun Liao 已提交
530 531 532
      }
    }

wmmhello's avatar
wmmhello 已提交
533
    if (size > pageSize) {  // pageSize must be able to hold one row
wmmhello's avatar
wmmhello 已提交
534
      *stopIndex = j - 1;
G
Ganlin Zhao 已提交
535 536 537
      if (*stopIndex < startIndex) {
        return TSDB_CODE_FAILED;
      }
wmmhello's avatar
wmmhello 已提交
538 539 540

      return TSDB_CODE_SUCCESS;
    }
H
Haojun Liao 已提交
541
  }
wmmhello's avatar
wmmhello 已提交
542 543 544 545

  // all fit in
  *stopIndex = numOfRows - 1;
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
546 547
}

548 549 550 551 552
SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount) {
  if (pBlock == NULL || startIndex < 0 || rowCount > pBlock->info.rows || rowCount + startIndex > pBlock->info.rows) {
    return NULL;
  }

553
  SSDataBlock* pDst = createDataBlock();
554 555 556 557
  if (pDst == NULL) {
    return NULL;
  }

558
  pDst->info = pBlock->info;
559
  pDst->info.rows = 0;
560
  pDst->info.capacity = 0;
561 562
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
L
Liu Jicong 已提交
563
    SColumnInfoData  colInfo = {0};
564 565
    SColumnInfoData* pSrcCol = taosArrayGet(pBlock->pDataBlock, i);
    colInfo.info = pSrcCol->info;
566
    blockDataAppendColInfo(pDst, &colInfo);
567 568
  }

569 570 571
  blockDataEnsureCapacity(pDst, rowCount);

  for (int32_t i = 0; i < numOfCols; ++i) {
572 573 574 575
    SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
    SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);

    for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) {
576 577
      bool isNull = false;
      if (pBlock->pBlockAgg == NULL) {
578
        isNull = colDataIsNull_s(pColData, j);
579 580 581
      } else {
        isNull = colDataIsNull(pColData, pBlock->info.rows, j, pBlock->pBlockAgg[i]);
      }
582

583
      if (isNull) {
584
        colDataSetNULL(pDstCol, j - startIndex);
585 586
      } else {
        char* p = colDataGetData(pColData, j);
587
        colDataSetVal(pDstCol, j - startIndex, p, false);
588
      }
589 590 591 592 593 594 595
    }
  }

  pDst->info.rows = rowCount;
  return pDst;
}

H
Haojun Liao 已提交
596 597
/**
 *
598 599 600 601 602
 * +------------------+---------------------------------------------+
 * |the number of rows|                    column #1                |
 * |    (4 bytes)     |------------+-----------------------+--------+
 * |                  | null bitmap| column length(4bytes) | values |
 * +------------------+------------+-----------------------+--------+
H
Haojun Liao 已提交
603 604 605 606
 * @param buf
 * @param pBlock
 * @return
 */
607
int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) {
H
Haojun Liao 已提交
608
  // write the number of rows
L
Liu Jicong 已提交
609
  *(uint32_t*)buf = pBlock->info.rows;
H
Haojun Liao 已提交
610

L
Liu Jicong 已提交
611
  size_t  numOfCols = taosArrayGetSize(pBlock->pDataBlock);
H
Haojun Liao 已提交
612 613 614 615
  int32_t numOfRows = pBlock->info.rows;

  char* pStart = buf + sizeof(uint32_t);

L
Liu Jicong 已提交
616
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
617 618 619 620 621 622 623 624 625
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      memcpy(pStart, pCol->varmeta.offset, numOfRows * sizeof(int32_t));
      pStart += numOfRows * sizeof(int32_t);
    } else {
      memcpy(pStart, pCol->nullbitmap, BitmapLen(numOfRows));
      pStart += BitmapLen(pBlock->info.rows);
    }

626
    uint32_t dataSize = colDataGetLength(pCol, numOfRows);
627

L
Liu Jicong 已提交
628
    *(int32_t*)pStart = dataSize;
629 630
    pStart += sizeof(int32_t);

631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646
    if (pCol->reassigned && IS_VAR_DATA_TYPE(pCol->info.type)) {
      for (int32_t row = 0; row < numOfRows; ++row) {
        char* pColData = pCol->pData + pCol->varmeta.offset[row];
        int32_t colSize = 0;
        if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
          colSize = getJsonValueLen(pColData);
        } else {
          colSize = varDataTLen(pColData);
        }
        memcpy(pStart, pColData, colSize);
        pStart += colSize;
      }
    } else {
      memcpy(pStart, pCol->pData, dataSize);
      pStart += dataSize;
    }    
H
Haojun Liao 已提交
647 648 649 650 651
  }

  return 0;
}

652
int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
L
Liu Jicong 已提交
653
  int32_t numOfRows = *(int32_t*)buf;
654
  blockDataEnsureCapacity(pBlock, numOfRows);
655

656
  pBlock->info.rows = numOfRows;
L
Liu Jicong 已提交
657
  size_t      numOfCols = taosArrayGetSize(pBlock->pDataBlock);
658 659
  const char* pStart = buf + sizeof(uint32_t);

L
Liu Jicong 已提交
660
  for (int32_t i = 0; i < numOfCols; ++i) {
661 662 663
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);

    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
H
Haojun Liao 已提交
664
      size_t metaSize = pBlock->info.rows * sizeof(int32_t);
C
Cary Xu 已提交
665
      char*  tmp = taosMemoryRealloc(pCol->varmeta.offset, metaSize);  // preview calloc is too small
wmmhello's avatar
wmmhello 已提交
666 667 668 669
      if (tmp == NULL) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      pCol->varmeta.offset = (int32_t*)tmp;
670 671 672 673 674 675 676
      memcpy(pCol->varmeta.offset, pStart, metaSize);
      pStart += metaSize;
    } else {
      memcpy(pCol->nullbitmap, pStart, BitmapLen(pBlock->info.rows));
      pStart += BitmapLen(pBlock->info.rows);
    }

L
Liu Jicong 已提交
677
    int32_t colLength = *(int32_t*)pStart;
678 679
    pStart += sizeof(int32_t);

680 681
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      if (pCol->varmeta.allocLen < colLength) {
wafwerar's avatar
wafwerar 已提交
682
        char* tmp = taosMemoryRealloc(pCol->pData, colLength);
683 684 685 686 687 688 689 690 691
        if (tmp == NULL) {
          return TSDB_CODE_OUT_OF_MEMORY;
        }

        pCol->pData = tmp;
        pCol->varmeta.allocLen = colLength;
      }

      pCol->varmeta.length = colLength;
G
Ganlin Zhao 已提交
692 693 694
      if (pCol->varmeta.length > pCol->varmeta.allocLen) {
        return TSDB_CODE_FAILED;
      }
695 696 697 698 699
    }

    memcpy(pCol->pData, pStart, colLength);
    pStart += colLength;
  }
700 701

  return TSDB_CODE_SUCCESS;
702 703
}

704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730
static bool colDataIsNNull(const SColumnInfoData* pColumnInfoData, int32_t startIndex,
                                          uint32_t nRows) {
  if (!pColumnInfoData->hasNull) {
    return false;
  }

  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
    for (int32_t i = startIndex; i < nRows; ++i) {
      if (!colDataIsNull_var(pColumnInfoData, i)) {
        return false;
      }
    }
  } else {
    if (pColumnInfoData->nullbitmap == NULL) {
      return false;
    }

    for (int32_t i = startIndex; i < nRows; ++i) {
      if (!colDataIsNull_f(pColumnInfoData->nullbitmap, i)) {
        return false;
      }
    }
  }

  return true;
}

731
// todo remove this
H
Haojun Liao 已提交
732
int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity) {
X
Xiaoyu Wang 已提交
733
  pBlock->info.rows = *(int32_t*)buf;
H
Haojun Liao 已提交
734
  pBlock->info.id.groupId = *(uint64_t*)(buf + sizeof(int32_t));
H
Haojun Liao 已提交
735

736 737
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);

738
  const char* pStart = buf + sizeof(uint32_t) + sizeof(uint64_t);
H
Haojun Liao 已提交
739 740 741

  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
742
    pCol->hasNull = true;
H
Haojun Liao 已提交
743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767

    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      size_t metaSize = capacity * sizeof(int32_t);
      memcpy(pCol->varmeta.offset, pStart, metaSize);
      pStart += metaSize;
    } else {
      memcpy(pCol->nullbitmap, pStart, BitmapLen(capacity));
      pStart += BitmapLen(capacity);
    }

    int32_t colLength = *(int32_t*)pStart;
    pStart += sizeof(int32_t);

    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      if (pCol->varmeta.allocLen < colLength) {
        char* tmp = taosMemoryRealloc(pCol->pData, colLength);
        if (tmp == NULL) {
          return TSDB_CODE_OUT_OF_MEMORY;
        }

        pCol->pData = tmp;
        pCol->varmeta.allocLen = colLength;
      }

      pCol->varmeta.length = colLength;
G
Ganlin Zhao 已提交
768 769 770
      if (pCol->varmeta.length > pCol->varmeta.allocLen) {
        return TSDB_CODE_FAILED;
      }
H
Haojun Liao 已提交
771 772
    }

773
    if (!colDataIsNNull(pCol, 0, pBlock->info.rows)) {
774 775 776
      memcpy(pCol->pData, pStart, colLength);
    }

777
    pStart += pCol->info.bytes * capacity;
H
Haojun Liao 已提交
778 779 780 781 782
  }

  return TSDB_CODE_SUCCESS;
}

783 784 785
size_t blockDataGetRowSize(SSDataBlock* pBlock) {
  if (pBlock->info.rowSize == 0) {
    size_t rowSize = 0;
H
Haojun Liao 已提交
786

787
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
L
Liu Jicong 已提交
788 789 790 791
    for (int32_t i = 0; i < numOfCols; ++i) {
      SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
      rowSize += pColInfo->info.bytes;
    }
792 793

    pBlock->info.rowSize = rowSize;
H
Haojun Liao 已提交
794 795
  }

796
  return pBlock->info.rowSize;
H
Haojun Liao 已提交
797 798
}

H
Haojun Liao 已提交
799 800 801 802 803
/**
 * @refitem blockDataToBuf for the meta size
 * @param pBlock
 * @return
 */
804
size_t blockDataGetSerialMetaSize(uint32_t numOfCols) {
805 806
  // | version | total length | total rows | total columns | flag seg| block group id | column schema
  // | each column length |
807 808
  return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(uint64_t) +
         numOfCols * (sizeof(int8_t) + sizeof(int32_t)) + numOfCols * sizeof(int32_t);
H
Haojun Liao 已提交
809 810
}

X
Xiaoyu Wang 已提交
811
double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
H
Haojun Liao 已提交
812 813
  double rowSize = 0;

814
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
L
Liu Jicong 已提交
815
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
816 817 818 819 820 821
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
    rowSize += pColInfo->info.bytes;

    if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
      rowSize += sizeof(int32_t);
    } else {
L
Liu Jicong 已提交
822
      rowSize += 1 / 8.0;  // one bit for each record
H
Haojun Liao 已提交
823 824 825 826 827 828
    }
  }

  return rowSize;
}

H
Haojun Liao 已提交
829
typedef struct SSDataBlockSortHelper {
L
Liu Jicong 已提交
830 831
  SArray*      orderInfo;  // SArray<SBlockOrderInfo>
  SSDataBlock* pDataBlock;
H
Haojun Liao 已提交
832 833 834
} SSDataBlockSortHelper;

int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
L
Liu Jicong 已提交
835
  const SSDataBlockSortHelper* pHelper = (const SSDataBlockSortHelper*)param;
H
Haojun Liao 已提交
836 837 838

  SSDataBlock* pDataBlock = pHelper->pDataBlock;

L
Liu Jicong 已提交
839 840
  int32_t left = *(int32_t*)p1;
  int32_t right = *(int32_t*)p2;
H
Haojun Liao 已提交
841 842

  SArray* pInfo = pHelper->orderInfo;
843

L
Liu Jicong 已提交
844
  for (int32_t i = 0; i < pInfo->size; ++i) {
H
Haojun Liao 已提交
845
    SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
L
Liu Jicong 已提交
846
    SColumnInfoData* pColInfoData = pOrder->pColData;  // TARRAY_GET_ELEM(pDataBlock->pDataBlock, pOrder->colIndex);
H
Haojun Liao 已提交
847

848
    if (pColInfoData->hasNull) {
849 850
      bool leftNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, left, NULL);
      bool rightNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, right, NULL);
851
      if (leftNull && rightNull) {
L
Liu Jicong 已提交
852
        continue;  // continue to next slot
853
      }
H
Haojun Liao 已提交
854

855
      if (rightNull) {
H
Haojun Liao 已提交
856
        return pOrder->nullFirst ? 1 : -1;
857
      }
H
Haojun Liao 已提交
858

859
      if (leftNull) {
H
Haojun Liao 已提交
860
        return pOrder->nullFirst ? -1 : 1;
861
      }
H
Haojun Liao 已提交
862 863
    }

L
Liu Jicong 已提交
864
    void* left1 = colDataGetData(pColInfoData, left);
865
    void* right1 = colDataGetData(pColInfoData, right);
wmmhello's avatar
wmmhello 已提交
866 867 868 869 870 871
    if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
      if (tTagIsJson(left1) || tTagIsJson(right1)) {
        terrno = TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR;
        return 0;
      }
    }
872
    __compar_fn_t fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order);
873

874 875 876 877 878
    int ret = fn(left1, right1);
    if (ret == 0) {
      continue;
    } else {
      return ret;
H
Haojun Liao 已提交
879 880 881 882 883 884
    }
  }

  return 0;
}

L
Liu Jicong 已提交
885 886
static int32_t doAssignOneTuple(SColumnInfoData* pDstCols, int32_t numOfRows, const SSDataBlock* pSrcBlock,
                                int32_t tupleIndex) {
H
Haojun Liao 已提交
887
  int32_t code = 0;
L
Liu Jicong 已提交
888
  size_t  numOfCols = taosArrayGetSize(pSrcBlock->pDataBlock);
H
Haojun Liao 已提交
889 890 891 892 893

  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pDst = &pDstCols[i];
    SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, i);

894
    if (pSrc->hasNull && colDataIsNull(pSrc, pSrcBlock->info.rows, tupleIndex, pSrcBlock->pBlockAgg[i])) {
895
      code = colDataSetVal(pDst, numOfRows, NULL, true);
H
Haojun Liao 已提交
896 897 898
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
H
Haojun Liao 已提交
899
    } else {
900
      char* p = colDataGetData(pSrc, tupleIndex);
901
      code = colDataSetVal(pDst, numOfRows, p, false);
H
Haojun Liao 已提交
902 903 904
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
H
Haojun Liao 已提交
905 906
    }
  }
907 908

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
909 910
}

911
static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataBlock, const int32_t* index) {
H
Haojun Liao 已提交
912
#if 0
H
Haojun Liao 已提交
913
  for (int32_t i = 0; i < pDataBlock->info.rows; ++i) {
H
Haojun Liao 已提交
914 915 916 917
    int32_t code = doAssignOneTuple(pCols, i, pDataBlock, index[i]);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
H
Haojun Liao 已提交
918
  }
H
Haojun Liao 已提交
919
#else
920 921
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
922 923 924 925
    SColumnInfoData* pDst = &pCols[i];
    SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);

    if (IS_VAR_DATA_TYPE(pSrc->info.type)) {
926 927 928
      if (pSrc->varmeta.length != 0) {
        memcpy(pDst->pData, pSrc->pData, pSrc->varmeta.length);
      }
L
Liu Jicong 已提交
929
      pDst->varmeta.length = pSrc->varmeta.length;
H
Haojun Liao 已提交
930

L
Liu Jicong 已提交
931 932 933
      for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
        pDst->varmeta.offset[j] = pSrc->varmeta.offset[index[j]];
      }
H
Haojun Liao 已提交
934
    } else {
wmmhello's avatar
wmmhello 已提交
935 936
      for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
        if (colDataIsNull_f(pSrc->nullbitmap, index[j])) {
937
          colDataSetNull_f_s(pDst, j);
wmmhello's avatar
wmmhello 已提交
938
          continue;
H
Haojun Liao 已提交
939
        }
wmmhello's avatar
wmmhello 已提交
940
        memcpy(pDst->pData + j * pDst->info.bytes, pSrc->pData + index[j] * pDst->info.bytes, pDst->info.bytes);
H
Haojun Liao 已提交
941 942 943 944
      }
    }
  }
#endif
H
Haojun Liao 已提交
945
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
946 947 948
}

static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) {
949
  int32_t rows = pDataBlock->info.capacity;
L
Liu Jicong 已提交
950
  size_t  numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
H
Haojun Liao 已提交
951

wafwerar's avatar
wafwerar 已提交
952
  SColumnInfoData* pCols = taosMemoryCalloc(numOfCols, sizeof(SColumnInfoData));
H
Haojun Liao 已提交
953 954 955 956
  if (pCols == NULL) {
    return NULL;
  }

L
Liu Jicong 已提交
957
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
958 959 960 961
    SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i);
    pCols[i].info = pColInfoData->info;

    if (IS_VAR_DATA_TYPE(pCols[i].info.type)) {
wafwerar's avatar
wafwerar 已提交
962 963
      pCols[i].varmeta.offset = taosMemoryCalloc(rows, sizeof(int32_t));
      pCols[i].pData = taosMemoryCalloc(1, pColInfoData->varmeta.length);
H
Haojun Liao 已提交
964 965 966

      pCols[i].varmeta.length = pColInfoData->varmeta.length;
      pCols[i].varmeta.allocLen = pCols[i].varmeta.length;
H
Haojun Liao 已提交
967
    } else {
wafwerar's avatar
wafwerar 已提交
968 969
      pCols[i].nullbitmap = taosMemoryCalloc(1, BitmapLen(rows));
      pCols[i].pData = taosMemoryCalloc(rows, pCols[i].info.bytes);
H
Haojun Liao 已提交
970 971 972 973 974 975
    }
  }

  return pCols;
}

H
Haojun Liao 已提交
976
static void copyBackToBlock(SSDataBlock* pDataBlock, SColumnInfoData* pCols) {
977
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
H
Haojun Liao 已提交
978

L
Liu Jicong 已提交
979
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
980 981 982 983
    SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i);
    pColInfoData->info = pCols[i].info;

    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
wafwerar's avatar
wafwerar 已提交
984
      taosMemoryFreeClear(pColInfoData->varmeta.offset);
H
Haojun Liao 已提交
985 986
      pColInfoData->varmeta = pCols[i].varmeta;
    } else {
wafwerar's avatar
wafwerar 已提交
987
      taosMemoryFreeClear(pColInfoData->nullbitmap);
H
Haojun Liao 已提交
988 989 990
      pColInfoData->nullbitmap = pCols[i].nullbitmap;
    }

wafwerar's avatar
wafwerar 已提交
991
    taosMemoryFreeClear(pColInfoData->pData);
H
Haojun Liao 已提交
992 993 994
    pColInfoData->pData = pCols[i].pData;
  }

wafwerar's avatar
wafwerar 已提交
995
  taosMemoryFreeClear(pCols);
H
Haojun Liao 已提交
996 997 998
}

static int32_t* createTupleIndex(size_t rows) {
wafwerar's avatar
wafwerar 已提交
999
  int32_t* index = taosMemoryCalloc(rows, sizeof(int32_t));
H
Haojun Liao 已提交
1000 1001 1002 1003
  if (index == NULL) {
    return NULL;
  }

L
Liu Jicong 已提交
1004
  for (int32_t i = 0; i < rows; ++i) {
H
Haojun Liao 已提交
1005 1006 1007 1008 1009 1010
    index[i] = i;
  }

  return index;
}

wafwerar's avatar
wafwerar 已提交
1011
static void destroyTupleIndex(int32_t* index) { taosMemoryFreeClear(index); }
H
Haojun Liao 已提交
1012

H
Haojun Liao 已提交
1013
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
H
Haojun Liao 已提交
1014 1015 1016 1017 1018 1019
  if (pDataBlock->info.rows <= 1) {
    return TSDB_CODE_SUCCESS;
  }

  // Allocate the additional buffer.
  uint32_t rows = pDataBlock->info.rows;
H
Haojun Liao 已提交
1020 1021 1022 1023 1024 1025 1026

  bool sortColumnHasNull = false;
  bool varTypeSort = false;

  for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) {
    SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i);

H
Haojun Liao 已提交
1027
    SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId);
H
Haojun Liao 已提交
1028 1029 1030 1031 1032 1033 1034 1035 1036
    if (pColInfoData->hasNull) {
      sortColumnHasNull = true;
    }

    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
      varTypeSort = true;
    }
  }

1037 1038
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);

H
Haojun Liao 已提交
1039
  if (taosArrayGetSize(pOrderInfo) == 1 && (!sortColumnHasNull)) {
1040
    if (numOfCols == 1) {
H
Haojun Liao 已提交
1041 1042 1043 1044 1045
      if (!varTypeSort) {
        SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, 0);
        SBlockOrderInfo* pOrder = taosArrayGet(pOrderInfo, 0);

        int64_t p0 = taosGetTimestampUs();
X
Xiaoyu Wang 已提交
1046

1047
        __compar_fn_t fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order);
wafwerar's avatar
wafwerar 已提交
1048
        taosSort(pColInfoData->pData, pDataBlock->info.rows, pColInfoData->info.bytes, fn);
H
Haojun Liao 已提交
1049 1050

        int64_t p1 = taosGetTimestampUs();
D
dapan1121 已提交
1051
        uDebug("blockDataSort easy cost:%" PRId64 ", rows:%" PRId64 "\n", p1 - p0, pDataBlock->info.rows);
H
Haojun Liao 已提交
1052 1053 1054 1055

        return TSDB_CODE_SUCCESS;
      } else {  // var data type
      }
1056
    } else if (numOfCols == 2) {
H
Haojun Liao 已提交
1057 1058 1059
    }
  }

H
Haojun Liao 已提交
1060 1061 1062 1063 1064 1065
  int32_t* index = createTupleIndex(rows);
  if (index == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return terrno;
  }

1066 1067
  int64_t p0 = taosGetTimestampUs();

H
Haojun Liao 已提交
1068
  SSDataBlockSortHelper helper = {.pDataBlock = pDataBlock, .orderInfo = pOrderInfo};
L
Liu Jicong 已提交
1069
  for (int32_t i = 0; i < taosArrayGetSize(helper.orderInfo); ++i) {
H
Haojun Liao 已提交
1070
    struct SBlockOrderInfo* pInfo = taosArrayGet(helper.orderInfo, i);
H
Haojun Liao 已提交
1071
    pInfo->pColData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId);
H
Haojun Liao 已提交
1072 1073
  }

X
Xiaoyu Wang 已提交
1074
  terrno = 0;
H
Haojun Liao 已提交
1075
  taosqsort(index, rows, sizeof(int32_t), &helper, dataBlockCompar);
X
Xiaoyu Wang 已提交
1076
  if (terrno) return terrno;
H
Haojun Liao 已提交
1077

1078 1079
  int64_t p1 = taosGetTimestampUs();

H
Haojun Liao 已提交
1080 1081
  SColumnInfoData* pCols = createHelpColInfoData(pDataBlock);
  if (pCols == NULL) {
1082
    destroyTupleIndex(index);
H
Haojun Liao 已提交
1083 1084 1085 1086
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return terrno;
  }

1087 1088
  int64_t p2 = taosGetTimestampUs();

wmmhello's avatar
wmmhello 已提交
1089
  blockDataAssign(pCols, pDataBlock, index);
H
Haojun Liao 已提交
1090

1091
  int64_t p3 = taosGetTimestampUs();
H
Haojun Liao 已提交
1092 1093

  copyBackToBlock(pDataBlock, pCols);
1094 1095
  int64_t p4 = taosGetTimestampUs();

C
Cary Xu 已提交
1096 1097 1098
  uDebug("blockDataSort complex sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64
         ", rows:%d\n",
         p1 - p0, p2 - p1, p3 - p2, p4 - p3, rows);
H
Haojun Liao 已提交
1099
  destroyTupleIndex(index);
H
Haojun Liao 已提交
1100 1101

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1102
}
1103

1104
#if 0
H
Haojun Liao 已提交
1105 1106
typedef struct SHelper {
  int32_t index;
L
Liu Jicong 已提交
1107 1108 1109 1110 1111
  union {
    char*   pData;
    int64_t i64;
    double  d64;
  };
H
Haojun Liao 已提交
1112 1113 1114 1115 1116 1117
} SHelper;

SHelper* createTupleIndex_rv(int32_t numOfRows, SArray* pOrderInfo, SSDataBlock* pBlock) {
  int32_t sortValLengthPerRow = 0;
  int32_t numOfCols = taosArrayGetSize(pOrderInfo);

L
Liu Jicong 已提交
1118
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
1119
    SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i);
H
Haojun Liao 已提交
1120
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->slotId);
H
Haojun Liao 已提交
1121 1122 1123 1124 1125 1126
    pInfo->pColData = pColInfo;
    sortValLengthPerRow += pColInfo->info.bytes;
  }

  size_t len = sortValLengthPerRow * pBlock->info.rows;

wafwerar's avatar
wafwerar 已提交
1127 1128
  char*    buf = taosMemoryCalloc(1, len);
  SHelper* phelper = taosMemoryCalloc(numOfRows, sizeof(SHelper));
L
Liu Jicong 已提交
1129
  for (int32_t i = 0; i < numOfRows; ++i) {
H
Haojun Liao 已提交
1130 1131 1132 1133 1134
    phelper[i].index = i;
    phelper[i].pData = buf + sortValLengthPerRow * i;
  }

  int32_t offset = 0;
L
Liu Jicong 已提交
1135
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
1136
    SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i);
L
Liu Jicong 已提交
1137 1138 1139 1140
    for (int32_t j = 0; j < numOfRows; ++j) {
      phelper[j].i64 = *(int32_t*)pInfo->pColData->pData + pInfo->pColData->info.bytes * j;
      //      memcpy(phelper[j].pData + offset, pInfo->pColData->pData + pInfo->pColData->info.bytes * j,
      //      pInfo->pColData->info.bytes);
H
Haojun Liao 已提交
1141 1142 1143 1144 1145
    }

    offset += pInfo->pColData->info.bytes;
  }

H
Haojun Liao 已提交
1146
  taosMemoryFree(buf);
H
Haojun Liao 已提交
1147 1148 1149 1150
  return phelper;
}

int32_t dataBlockCompar_rv(const void* p1, const void* p2, const void* param) {
L
Liu Jicong 已提交
1151
  const SSDataBlockSortHelper* pHelper = (const SSDataBlockSortHelper*)param;
H
Haojun Liao 已提交
1152

L
Liu Jicong 已提交
1153 1154
  SHelper* left = (SHelper*)p1;
  SHelper* right = (SHelper*)p2;
H
Haojun Liao 已提交
1155 1156 1157 1158

  SArray* pInfo = pHelper->orderInfo;

  int32_t offset = 0;
L
Liu Jicong 已提交
1159 1160 1161 1162 1163 1164 1165
  int32_t leftx = *(int32_t*)left->pData;    //*(int32_t*)(left->pData + offset);
  int32_t rightx = *(int32_t*)right->pData;  //*(int32_t*)(right->pData + offset);

  if (leftx == rightx) {
    return 0;
  } else {
    return (leftx < rightx) ? -1 : 1;
1166
  }
H
Haojun Liao 已提交
1167 1168 1169 1170
  return 0;
}

int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst) {
L
Liu Jicong 已提交
1171
  // Allocate the additional buffer.
H
Haojun Liao 已提交
1172 1173
  int64_t p0 = taosGetTimestampUs();

H
Haojun Liao 已提交
1174
  SSDataBlockSortHelper helper = {.pDataBlock = pDataBlock, .orderInfo = pOrderInfo};
H
Haojun Liao 已提交
1175 1176 1177 1178 1179 1180 1181 1182 1183 1184

  uint32_t rows = pDataBlock->info.rows;
  SHelper* index = createTupleIndex_rv(rows, helper.orderInfo, pDataBlock);
  if (index == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return terrno;
  }

  taosqsort(index, rows, sizeof(SHelper), &helper, dataBlockCompar_rv);

L
Liu Jicong 已提交
1185
  int64_t          p1 = taosGetTimestampUs();
H
Haojun Liao 已提交
1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204
  SColumnInfoData* pCols = createHelpColInfoData(pDataBlock);
  if (pCols == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return terrno;
  }

  int64_t p2 = taosGetTimestampUs();

  //  int32_t code = blockDataAssign(pCols, pDataBlock, index);
  //  if (code != TSDB_CODE_SUCCESS) {
  //    terrno = code;
  //    return code;
  //  }

  int64_t p3 = taosGetTimestampUs();

  copyBackToBlock(pDataBlock, pCols);
  int64_t p4 = taosGetTimestampUs();

L
Liu Jicong 已提交
1205 1206
  printf("sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64 ", rows:%d\n", p1 - p0, p2 - p1,
         p3 - p2, p4 - p3, rows);
H
Haojun Liao 已提交
1207
  //  destroyTupleIndex(index);
1208
  return 0;
H
Haojun Liao 已提交
1209
}
1210
#endif
H
Haojun Liao 已提交
1211

1212
void blockDataCleanup(SSDataBlock* pDataBlock) {
H
Haojun Liao 已提交
1213
  blockDataEmpty(pDataBlock);
H
Haojun Liao 已提交
1214
  SDataBlockInfo* pInfo = &pDataBlock->info;
H
Haojun Liao 已提交
1215 1216
  pInfo->id.uid = 0;
  pInfo->id.groupId = 0;
H
Haojun Liao 已提交
1217
}
1218

H
Haojun Liao 已提交
1219 1220
void blockDataEmpty(SSDataBlock* pDataBlock) {
  SDataBlockInfo* pInfo = &pDataBlock->info;
D
dapan1121 已提交
1221
  if (pInfo->capacity == 0) {
1222 1223 1224
    return;
  }

1225 1226
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
1227
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
H
Haojun Liao 已提交
1228
    colInfoDataCleanup(p, pInfo->capacity);
1229
  }
H
Haojun Liao 已提交
1230 1231

  pInfo->rows = 0;
1232
  pInfo->dataLoad = 0;
H
Haojun Liao 已提交
1233 1234
  pInfo->window.ekey = 0;
  pInfo->window.skey = 0;
1235 1236
}

1237 1238 1239 1240 1241
/*
 * NOTE: the type of the input column may be TSDB_DATA_TYPE_NULL, which is used to denote
 * the all NULL value in this column. It is an internal representation of all NULL value column, and no visible to
 * any users. The length of TSDB_DATA_TYPE_NULL is 0, and it is an special case.
 */
L
Liu Jicong 已提交
1242 1243
static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows,
                                bool clearPayload) {
G
Ganlin Zhao 已提交
1244
  if (numOfRows <= 0 || numOfRows <= pBlockInfo->capacity) {
X
Xiaoyu Wang 已提交
1245 1246 1247
    return TSDB_CODE_SUCCESS;
  }

1248 1249
  int32_t existedRows = pBlockInfo->rows;

D
dapan1121 已提交
1250
  if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
wafwerar's avatar
wafwerar 已提交
1251
    char* tmp = taosMemoryRealloc(pColumn->varmeta.offset, sizeof(int32_t) * numOfRows);
D
dapan1121 已提交
1252 1253 1254
    if (tmp == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1255

D
dapan1121 已提交
1256
    pColumn->varmeta.offset = (int32_t*)tmp;
1257
    memset(&pColumn->varmeta.offset[existedRows], 0, sizeof(int32_t) * (numOfRows - existedRows));
D
dapan1121 已提交
1258
  } else {
1259
    // prepare for the null bitmap
wafwerar's avatar
wafwerar 已提交
1260
    char* tmp = taosMemoryRealloc(pColumn->nullbitmap, BitmapLen(numOfRows));
D
dapan1121 已提交
1261 1262 1263
    if (tmp == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1264

1265
    int32_t oldLen = BitmapLen(existedRows);
D
dapan1121 已提交
1266
    pColumn->nullbitmap = tmp;
1267
    memset(&pColumn->nullbitmap[oldLen], 0, BitmapLen(numOfRows) - oldLen);
G
Ganlin Zhao 已提交
1268 1269 1270
    if (pColumn->info.bytes == 0) {
      return TSDB_CODE_FAILED;
    }
1271

1272 1273
    // here we employ the aligned malloc function, to make sure that the address of allocated memory is aligned
    // to MALLOC_ALIGN_BYTES
1274
    tmp = taosMemoryMallocAlign(MALLOC_ALIGN_BYTES, numOfRows * pColumn->info.bytes);
D
dapan1121 已提交
1275 1276 1277
    if (tmp == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
L
Liu Jicong 已提交
1278

1279 1280 1281 1282 1283 1284
    // copy back the existed data
    if (pColumn->pData != NULL) {
      memcpy(tmp, pColumn->pData, existedRows * pColumn->info.bytes);
      taosMemoryFreeClear(pColumn->pData);
    }

D
dapan1121 已提交
1285
    pColumn->pData = tmp;
1286

1287
    // check if the allocated memory is aligned to the requried bytes.
1288
#if defined LINUX
G
Ganlin Zhao 已提交
1289 1290 1291
    if ((((uint64_t)pColumn->pData) & (MALLOC_ALIGN_BYTES - 1)) != 0x0) {
      return TSDB_CODE_FAILED;
    }
1292
#endif
H
Haojun Liao 已提交
1293

H
Haojun Liao 已提交
1294 1295 1296
    if (clearPayload) {
      memset(tmp + pColumn->info.bytes * existedRows, 0, pColumn->info.bytes * (numOfRows - existedRows));
    }
D
dapan1121 已提交
1297 1298 1299 1300 1301
  }

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1302
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) {
1303
  pColumn->hasNull = false;
H
Haojun Liao 已提交
1304

1305 1306
  if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
    pColumn->varmeta.length = 0;
1307
    if (pColumn->varmeta.offset != NULL) {
H
Haojun Liao 已提交
1308 1309
      memset(pColumn->varmeta.offset, 0, sizeof(int32_t) * numOfRows);
    }
1310
  } else {
dengyihao's avatar
dengyihao 已提交
1311 1312 1313
    if (pColumn->nullbitmap != NULL) {
      memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows));
    }
1314 1315 1316
  }
}

H
Haojun Liao 已提交
1317
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload) {
1318
  SDataBlockInfo info = {0};
H
Haojun Liao 已提交
1319
  return doEnsureCapacity(pColumn, &info, numOfRows, clearPayload);
1320 1321
}

D
dapan1121 已提交
1322 1323
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
  int32_t code = 0;
H
Haojun Liao 已提交
1324
  if (numOfRows == 0 || numOfRows <= pDataBlock->info.capacity) {
H
Haojun Liao 已提交
1325 1326
    return TSDB_CODE_SUCCESS;
  }
L
Liu Jicong 已提交
1327

1328 1329
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
D
dapan1121 已提交
1330
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
H
Haojun Liao 已提交
1331
    code = doEnsureCapacity(p, &pDataBlock->info, numOfRows, false);
D
dapan1121 已提交
1332 1333
    if (code) {
      return code;
1334 1335
    }
  }
H
Haojun Liao 已提交
1336

H
Haojun Liao 已提交
1337
  pDataBlock->info.capacity = numOfRows;
H
Haojun Liao 已提交
1338 1339 1340
  return TSDB_CODE_SUCCESS;
}

1341 1342 1343 1344 1345 1346 1347 1348
void blockDataFreeRes(SSDataBlock* pBlock) {
  int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock);
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
    colDataDestroy(pColInfoData);
  }

  taosArrayDestroy(pBlock->pDataBlock);
wmmhello's avatar
wmmhello 已提交
1349
  pBlock->pDataBlock = NULL;
1350 1351 1352 1353
  taosMemoryFreeClear(pBlock->pBlockAgg);
  memset(&pBlock->info, 0, sizeof(SDataBlockInfo));
}

H
Haojun Liao 已提交
1354 1355 1356 1357 1358
void* blockDataDestroy(SSDataBlock* pBlock) {
  if (pBlock == NULL) {
    return NULL;
  }

1359
  blockDataFreeRes(pBlock);
wafwerar's avatar
wafwerar 已提交
1360
  taosMemoryFreeClear(pBlock);
H
Haojun Liao 已提交
1361
  return NULL;
D
dapan1121 已提交
1362
}
1363

L
Liu Jicong 已提交
1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
  dst->info = src->info;
  dst->info.rows = 0;
  dst->info.capacity = 0;

  size_t numOfCols = taosArrayGetSize(src->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* p = taosArrayGet(src->pDataBlock, i);
    SColumnInfoData  colInfo = {.hasNull = true, .info = p->info};
    blockDataAppendColInfo(dst, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(dst, src->info.rows);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return -1;
  }

  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pDst = taosArrayGet(dst->pDataBlock, i);
    SColumnInfoData* pSrc = taosArrayGet(src->pDataBlock, i);
D
dapan1121 已提交
1385
    if (pSrc->pData == NULL && (!IS_VAR_DATA_TYPE(pSrc->info.type))) {
L
Liu Jicong 已提交
1386 1387 1388 1389 1390 1391
      continue;
    }

    colDataAssign(pDst, pSrc, src->info.rows, &src->info);
  }

1392
  uint32_t cap = dst->info.capacity;
1393
  dst->info = src->info;
1394
  dst->info.capacity = cap;
L
Liu Jicong 已提交
1395 1396
  return 0;
}
1397

5
54liuyao 已提交
1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411
int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
  blockDataCleanup(dst);
  int32_t code = blockDataEnsureCapacity(dst, src->info.rows);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return code;
  }

  size_t numOfCols = taosArrayGetSize(src->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pDst = taosArrayGet(dst->pDataBlock, i);
    SColumnInfoData* pSrc = taosArrayGet(src->pDataBlock, i);
    colDataAssign(pDst, pSrc, src->info.rows, &src->info);
  }
H
Haojun Liao 已提交
1412

1413
  uint32_t cap = dst->info.capacity;
1414
  dst->info = src->info;
1415
  dst->info.capacity = cap;
5
54liuyao 已提交
1416 1417 1418
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1419 1420 1421
SSDataBlock* createSpecialDataBlock(EStreamType type) {
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
  pBlock->info.hasVarCol = false;
H
Haojun Liao 已提交
1422
  pBlock->info.id.groupId = 0;
L
Liu Jicong 已提交
1423 1424
  pBlock->info.rows = 0;
  pBlock->info.type = type;
1425
  pBlock->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) +
L
Liu Jicong 已提交
1426
                         sizeof(TSKEY) + VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN;
L
Liu Jicong 已提交
1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444
  pBlock->info.watermark = INT64_MIN;

  pBlock->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData));
  SColumnInfoData infoData = {0};
  infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP;
  infoData.info.bytes = sizeof(TSKEY);
  // window start ts
  taosArrayPush(pBlock->pDataBlock, &infoData);
  // window end ts
  taosArrayPush(pBlock->pDataBlock, &infoData);

  infoData.info.type = TSDB_DATA_TYPE_UBIGINT;
  infoData.info.bytes = sizeof(uint64_t);
  // uid
  taosArrayPush(pBlock->pDataBlock, &infoData);
  // group id
  taosArrayPush(pBlock->pDataBlock, &infoData);

1445 1446
  infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP;
  infoData.info.bytes = sizeof(TSKEY);
L
Liu Jicong 已提交
1447 1448 1449 1450 1451
  // calculate start ts
  taosArrayPush(pBlock->pDataBlock, &infoData);
  // calculate end ts
  taosArrayPush(pBlock->pDataBlock, &infoData);

1452 1453
  // table name
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
L
Liu Jicong 已提交
1454
  infoData.info.bytes = VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN;
1455 1456
  taosArrayPush(pBlock->pDataBlock, &infoData);

L
Liu Jicong 已提交
1457 1458 1459
  return pBlock;
}

1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488
SSDataBlock* blockCopyOneRow(const SSDataBlock* pDataBlock, int32_t rowIdx) {
  if (pDataBlock == NULL) {
    return NULL;
  }

  SSDataBlock* pBlock = createDataBlock();
  pBlock->info = pDataBlock->info;
  pBlock->info.rows = 0;
  pBlock->info.capacity = 0;

  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
    SColumnInfoData  colInfo = {.hasNull = true, .info = p->info};
    blockDataAppendColInfo(pBlock, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(pBlock, 1);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    blockDataDestroy(pBlock);
    return NULL;
  }

  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
    SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
    void*            pData = colDataGetData(pSrc, rowIdx);
    bool             isNull = colDataIsNull(pSrc, pDataBlock->info.rows, rowIdx, NULL);
1489
    colDataSetVal(pDst, 0, pData, isNull);
1490 1491 1492 1493 1494 1495 1496
  }

  pBlock->info.rows = 1;

  return pBlock;
}

1497
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
C
Cary Xu 已提交
1498
  if (pDataBlock == NULL) {
1499 1500
    return NULL;
  }
1501

1502 1503 1504 1505
  SSDataBlock* pBlock = createDataBlock();
  pBlock->info = pDataBlock->info;
  pBlock->info.rows = 0;
  pBlock->info.capacity = 0;
1506
  pBlock->info.rowSize = 0;
1507
  pBlock->info.id = pDataBlock->info.id;
1508

1509
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
L
Liu Jicong 已提交
1510
  for (int32_t i = 0; i < numOfCols; ++i) {
1511
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
1512 1513
    SColumnInfoData  colInfo = {.hasNull = true, .info = p->info};
    blockDataAppendColInfo(pBlock, &colInfo);
1514 1515
  }

1516
  if (copyData) {
1517 1518 1519 1520 1521 1522 1523
    int32_t code = blockDataEnsureCapacity(pBlock, pDataBlock->info.rows);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      blockDataDestroy(pBlock);
      return NULL;
    }

1524 1525 1526
    for (int32_t i = 0; i < numOfCols; ++i) {
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
      SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
1527
      colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info);
1528 1529 1530 1531 1532 1533
    }

    pBlock->info.rows = pDataBlock->info.rows;
    pBlock->info.capacity = pDataBlock->info.rows;
  }

1534 1535 1536
  return pBlock;
}

1537 1538 1539 1540
SSDataBlock* createDataBlock() {
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
  if (pBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
1541
    return NULL;
1542 1543 1544 1545 1546 1547
  }

  pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
  if (pBlock->pDataBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    taosMemoryFree(pBlock);
wmmhello's avatar
wmmhello 已提交
1548
    return NULL;
1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568
  }

  return pBlock;
}

int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData) {
  if (pBlock->pDataBlock == NULL) {
    pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
    if (pBlock->pDataBlock == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return terrno;
    }
  }

  void* p = taosArrayPush(pBlock->pDataBlock, pColInfoData);
  if (p == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return terrno;
  }

H
Haojun Liao 已提交
1569
  // todo disable it temporarily
L
Liu Jicong 已提交
1570
  //  ASSERT(pColInfoData->info.type != 0);
1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581
  if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
    pBlock->info.hasVarCol = true;
  }

  pBlock->info.rowSize += pColInfoData->info.bytes;
  return TSDB_CODE_SUCCESS;
}

SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId) {
  SColumnInfoData col = {.hasNull = true};
  col.info.colId = colId;
L
Liu Jicong 已提交
1582
  col.info.type = type;
1583 1584 1585 1586 1587
  col.info.bytes = bytes;

  return col;
}

L
Liu Jicong 已提交
1588
SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index) {
1589 1590 1591 1592 1593 1594 1595
  if (index >= taosArrayGetSize(pBlock->pDataBlock)) {
    return NULL;
  }

  return taosArrayGet(pBlock->pDataBlock, index);
}

G
Ganlin Zhao 已提交
1596
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize, int32_t extraSize) {
1597
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
1598

G
Ganlin Zhao 已提交
1599
  int32_t payloadSize = pageSize - extraSize;
1600 1601
  int32_t rowSize = pBlock->info.rowSize;
  int32_t nRows = payloadSize / rowSize;
H
Haojun Liao 已提交
1602
  ASSERT(nRows >= 1);
1603

1604 1605
  int32_t numVarCols = 0;
  int32_t numFixCols = 0;
1606
  for (int32_t i = 0; i < numOfCols; ++i) {
1607 1608
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625
      ++numVarCols;
    } else {
      ++numFixCols;
    }
  }

  // find the data payload whose size is greater than payloadSize
  int result = -1;
  int start = 1;
  int end = nRows;
  while (start <= end) {
    int mid = start + (end - start) / 2;
    //data size + var data type columns offset + fixed data type columns bitmap len 
    int midSize = rowSize * mid + numVarCols * sizeof(int32_t) * mid + numFixCols * BitmapLen(mid); 
    if (midSize > payloadSize) {
      result = mid;
      end = mid - 1;
1626
    } else {
1627
      start = mid + 1;
1628 1629 1630
    }
  }

1631 1632
  int32_t newRows = (result != -1) ? result - 1 : nRows;
  // the true value must be less than the value of nRows
1633
  ASSERT(newRows <= nRows && newRows >= 1);
1634 1635

  return newRows;
1636
}
H
Haojun Liao 已提交
1637

1638
void colDataDestroy(SColumnInfoData* pColData) {
1639 1640 1641 1642
  if (!pColData) {
    return;
  }

1643
  if (IS_VAR_DATA_TYPE(pColData->info.type)) {
wmmhello's avatar
wmmhello 已提交
1644
    taosMemoryFreeClear(pColData->varmeta.offset);
1645
  } else {
wmmhello's avatar
wmmhello 已提交
1646
    taosMemoryFreeClear(pColData->nullbitmap);
1647 1648
  }

wmmhello's avatar
wmmhello 已提交
1649
  taosMemoryFreeClear(pColData->pData);
1650 1651
}

H
Haojun Liao 已提交
1652 1653 1654 1655
static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
  int32_t len = BitmapLen(total);

  int32_t newLen = BitmapLen(total - n);
C
Cary Xu 已提交
1656 1657
  if (n % 8 == 0) {
    memmove(nullBitmap, nullBitmap + n / 8, newLen);
H
Haojun Liao 已提交
1658
  } else {
1659
    int32_t  tail = n % 8;
1660
    int32_t  i = 0;
C
Cary Xu 已提交
1661
    uint8_t* p = (uint8_t*)nullBitmap;
H
Haojun Liao 已提交
1662

1663 1664 1665 1666 1667 1668 1669 1670 1671
    if (n < 8) {
      while (i < len) {
        uint8_t v = p[i];  // source bitmap value
        p[i] = (v << tail);

        if (i < len - 1) {
          uint8_t next = p[i + 1];
          p[i] |= (next >> (8 - tail));
        }
H
Haojun Liao 已提交
1672

1673
        i += 1;
H
Haojun Liao 已提交
1674
      }
1675
    } else if (n > 8) {
1676
      int32_t remain = (total % 8 != 0 && total % 8 <= tail) ? 1 : 0;
1677
      int32_t gap = len - newLen - remain;
1678
      while (i < newLen) {
1679 1680 1681
        uint8_t v = p[i + gap];
        p[i] = (v << tail);

1682
        if (i < newLen - 1 + remain) {
1683 1684 1685
          uint8_t next = p[i + gap + 1];
          p[i] |= (next >> (8 - tail));
        }
H
Haojun Liao 已提交
1686

1687 1688
        i += 1;
      }
H
Haojun Liao 已提交
1689 1690 1691 1692
    }
  }
}

X
Xiaoyu Wang 已提交
1693
static int32_t colDataMoveVarData(SColumnInfoData* pColInfoData, size_t start, size_t end) {
wmmhello's avatar
wmmhello 已提交
1694 1695 1696
  int32_t dataOffset = -1;
  int32_t dataLen = 0;
  int32_t beigin = start;
X
Xiaoyu Wang 已提交
1697
  while (beigin < end) {
wmmhello's avatar
wmmhello 已提交
1698
    int32_t offset = pColInfoData->varmeta.offset[beigin];
X
Xiaoyu Wang 已提交
1699
    if (offset == -1) {
wmmhello's avatar
wmmhello 已提交
1700 1701 1702
      beigin++;
      continue;
    }
X
Xiaoyu Wang 已提交
1703
    if (start != 0) {
wmmhello's avatar
wmmhello 已提交
1704 1705
      pColInfoData->varmeta.offset[beigin] = dataLen;
    }
X
Xiaoyu Wang 已提交
1706 1707
    char* data = pColInfoData->pData + offset;
    if (dataOffset == -1) dataOffset = offset;  // mark the begin of data
wmmhello's avatar
wmmhello 已提交
1708 1709 1710 1711 1712 1713 1714 1715
    int32_t type = pColInfoData->info.type;
    if (type == TSDB_DATA_TYPE_JSON) {
      dataLen += getJsonValueLen(data);
    } else {
      dataLen += varDataTLen(data);
    }
    beigin++;
  }
1716

X
Xiaoyu Wang 已提交
1717
  if (dataOffset > 0) {
wmmhello's avatar
wmmhello 已提交
1718 1719
    memmove(pColInfoData->pData, pColInfoData->pData + dataOffset, dataLen);
  }
1720 1721

  memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[start], (end - start) * sizeof(int32_t));
wmmhello's avatar
wmmhello 已提交
1722 1723 1724
  return dataLen;
}

H
Haojun Liao 已提交
1725 1726
static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
  if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
D
dapan1121 已提交
1727 1728
    // pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, n, total);
    memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[n], (total - n) * sizeof(int32_t));
H
Haojun Liao 已提交
1729 1730

    // clear the offset value of the unused entries.
1731
    memset(&pColInfoData->varmeta.offset[total - n], 0, n);
H
Haojun Liao 已提交
1732 1733 1734 1735 1736 1737 1738
  } else {
    int32_t bytes = pColInfoData->info.bytes;
    memmove(pColInfoData->pData, ((char*)pColInfoData->pData + n * bytes), (total - n) * bytes);
    doShiftBitmap(pColInfoData->nullbitmap, n, total);
  }
}

1739
int32_t blockDataTrimFirstRows(SSDataBlock* pBlock, size_t n) {
H
Haojun Liao 已提交
1740 1741 1742 1743 1744
  if (n == 0) {
    return TSDB_CODE_SUCCESS;
  }

  if (pBlock->info.rows <= n) {
H
Haojun Liao 已提交
1745
    blockDataEmpty(pBlock);
H
Haojun Liao 已提交
1746
  } else {
1747 1748
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
    for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
1749 1750 1751 1752 1753 1754 1755 1756 1757
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
      colDataTrimFirstNRows(pColInfoData, n, pBlock->info.rows);
    }

    pBlock->info.rows -= n;
  }
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
1758 1759
static void colDataKeepFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
  if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
D
dapan1121 已提交
1760
    // pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, 0, n);
wmmhello's avatar
wmmhello 已提交
1761
    memset(&pColInfoData->varmeta.offset[n], 0, total - n);
H
Haojun Liao 已提交
1762
  } else {  // reset the bitmap value
1763 1764
    /*int32_t stopIndex = BitmapLen(n) * 8;
    for(int32_t i = n; i < stopIndex; ++i) {
H
Haojun Liao 已提交
1765 1766 1767 1768 1769
      colDataClearNull_f(pColInfoData->nullbitmap, i);
    }

    int32_t remain = BitmapLen(total) - BitmapLen(n);
    if (remain > 0) {
1770 1771
      memset(pColInfoData->nullbitmap+BitmapLen(n), 0, remain);
    }*/
wmmhello's avatar
wmmhello 已提交
1772 1773 1774 1775 1776
  }
}

int32_t blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n) {
  if (n == 0) {
H
Haojun Liao 已提交
1777
    blockDataEmpty(pBlock);
wmmhello's avatar
wmmhello 已提交
1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794
    return TSDB_CODE_SUCCESS;
  }

  if (pBlock->info.rows <= n) {
    return TSDB_CODE_SUCCESS;
  } else {
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
    for (int32_t i = 0; i < numOfCols; ++i) {
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
      colDataKeepFirstNRows(pColInfoData, n, pBlock->info.rows);
    }

    pBlock->info.rows = n;
  }
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
1795
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
H
Haojun Liao 已提交
1796
  int64_t tbUid = pBlock->info.id.uid;
1797
  int16_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
H
Haojun Liao 已提交
1798
  int16_t hasVarCol = pBlock->info.hasVarCol;
D
dapan1121 已提交
1799
  int64_t rows = pBlock->info.rows;
H
Haojun Liao 已提交
1800 1801 1802 1803 1804 1805
  int32_t sz = taosArrayGetSize(pBlock->pDataBlock);

  int32_t tlen = 0;
  tlen += taosEncodeFixedI64(buf, tbUid);
  tlen += taosEncodeFixedI16(buf, numOfCols);
  tlen += taosEncodeFixedI16(buf, hasVarCol);
D
dapan1121 已提交
1806
  tlen += taosEncodeFixedI64(buf, rows);
H
Haojun Liao 已提交
1807 1808 1809 1810
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
    SColumnInfoData* pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
    tlen += taosEncodeFixedI16(buf, pColData->info.colId);
1811
    tlen += taosEncodeFixedI8(buf, pColData->info.type);
H
Haojun Liao 已提交
1812
    tlen += taosEncodeFixedI32(buf, pColData->info.bytes);
S
slzhou@taodata.com 已提交
1813
    tlen += taosEncodeFixedBool(buf, pColData->hasNull);
H
Haojun Liao 已提交
1814 1815 1816 1817 1818 1819 1820 1821

    if (IS_VAR_DATA_TYPE(pColData->info.type)) {
      tlen += taosEncodeBinary(buf, pColData->varmeta.offset, sizeof(int32_t) * rows);
    } else {
      tlen += taosEncodeBinary(buf, pColData->nullbitmap, BitmapLen(rows));
    }

    int32_t len = colDataGetLength(pColData, rows);
X
Xiaoyu Wang 已提交
1822
    tlen += taosEncodeFixedI32(buf, len);
H
Haojun Liao 已提交
1823

1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837
    if (pColData->reassigned && IS_VAR_DATA_TYPE(pColData->info.type)) {
      for (int32_t row = 0; row < rows; ++row) {
        char* pData = pColData->pData + pColData->varmeta.offset[row];
        int32_t colSize = 0;
        if (pColData->info.type == TSDB_DATA_TYPE_JSON) {
          colSize = getJsonValueLen(pData);
        } else {
          colSize = varDataTLen(pData);
        }
        tlen += taosEncodeBinary(buf, pData, colSize);
      }
    } else {
      tlen += taosEncodeBinary(buf, pColData->pData, len);
    }
H
Haojun Liao 已提交
1838 1839 1840 1841 1842 1843 1844
  }
  return tlen;
}

void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) {
  int32_t sz;

1845 1846
  int16_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);

H
Haojun Liao 已提交
1847
  buf = taosDecodeFixedU64(buf, &pBlock->info.id.uid);
1848
  buf = taosDecodeFixedI16(buf, &numOfCols);
H
Haojun Liao 已提交
1849
  buf = taosDecodeFixedI16(buf, &pBlock->info.hasVarCol);
D
dapan1121 已提交
1850
  buf = taosDecodeFixedI64(buf, &pBlock->info.rows);
H
Haojun Liao 已提交
1851 1852 1853 1854 1855
  buf = taosDecodeFixedI32(buf, &sz);
  pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData));
  for (int32_t i = 0; i < sz; i++) {
    SColumnInfoData data = {0};
    buf = taosDecodeFixedI16(buf, &data.info.colId);
1856
    buf = taosDecodeFixedI8(buf, &data.info.type);
H
Haojun Liao 已提交
1857
    buf = taosDecodeFixedI32(buf, &data.info.bytes);
S
slzhou@taodata.com 已提交
1858
    buf = taosDecodeFixedBool(buf, &data.hasNull);
H
Haojun Liao 已提交
1859 1860 1861 1862 1863 1864 1865 1866 1867 1868

    if (IS_VAR_DATA_TYPE(data.info.type)) {
      buf = taosDecodeBinary(buf, (void**)&data.varmeta.offset, pBlock->info.rows * sizeof(int32_t));
    } else {
      buf = taosDecodeBinary(buf, (void**)&data.nullbitmap, BitmapLen(pBlock->info.rows));
    }

    int32_t len = 0;
    buf = taosDecodeFixedI32(buf, &len);
    buf = taosDecodeBinary(buf, (void**)&data.pData, len);
1869 1870 1871 1872
    if (IS_VAR_DATA_TYPE(data.info.type)) {
      data.varmeta.length = len;
      data.varmeta.allocLen = len;
    }
S
slzhou 已提交
1873
    taosArrayPush(pBlock->pDataBlock, &data);
H
Haojun Liao 已提交
1874 1875
  }
  return (void*)buf;
L
Liu Jicong 已提交
1876
}
L
Liu Jicong 已提交
1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890

int32_t tEncodeDataBlocks(void** buf, const SArray* blocks) {
  int32_t tlen = 0;
  int32_t sz = taosArrayGetSize(blocks);
  tlen += taosEncodeFixedI32(buf, sz);

  for (int32_t i = 0; i < sz; i++) {
    SSDataBlock* pBlock = taosArrayGet(blocks, i);
    tlen += tEncodeDataBlock(buf, pBlock);
  }

  return tlen;
}

L
Liu Jicong 已提交
1891
void* tDecodeDataBlocks(const void* buf, SArray** blocks) {
L
Liu Jicong 已提交
1892 1893
  int32_t sz;
  buf = taosDecodeFixedI32(buf, &sz);
L
Liu Jicong 已提交
1894 1895

  *blocks = taosArrayInit(sz, sizeof(SSDataBlock));
L
Liu Jicong 已提交
1896 1897 1898
  for (int32_t i = 0; i < sz; i++) {
    SSDataBlock pBlock = {0};
    buf = tDecodeDataBlock(buf, &pBlock);
L
Liu Jicong 已提交
1899
    taosArrayPush(*blocks, &pBlock);
L
Liu Jicong 已提交
1900 1901 1902
  }
  return (void*)buf;
}
L
Liu Jicong 已提交
1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935

static char* formatTimestamp(char* buf, int64_t val, int precision) {
  time_t  tt;
  int32_t ms = 0;
  if (precision == TSDB_TIME_PRECISION_NANO) {
    tt = (time_t)(val / 1000000000);
    ms = val % 1000000000;
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
    tt = (time_t)(val / 1000000);
    ms = val % 1000000;
  } else {
    tt = (time_t)(val / 1000);
    ms = val % 1000;
  }

  /* comment out as it make testcases like select_with_tags.sim fail.
    but in windows, this may cause the call to localtime crash if tt < 0,
    need to find a better solution.
    if (tt < 0) {
      tt = 0;
    }
    */

  if (tt <= 0 && ms < 0) {
    tt--;
    if (precision == TSDB_TIME_PRECISION_NANO) {
      ms += 1000000000;
    } else if (precision == TSDB_TIME_PRECISION_MICRO) {
      ms += 1000000;
    } else {
      ms += 1000;
    }
  }
1936
  struct tm ptm = {0};
1937
  if (taosLocalTime(&tt, &ptm, buf) == NULL) {
1938 1939
    return buf;
  }
L
Liu Jicong 已提交
1940
  size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", &ptm);
L
Liu Jicong 已提交
1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951

  if (precision == TSDB_TIME_PRECISION_NANO) {
    sprintf(buf + pos, ".%09d", ms);
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
    sprintf(buf + pos, ".%06d", ms);
  } else {
    sprintf(buf + pos, ".%03d", ms);
  }

  return buf;
}
H
Haojun Liao 已提交
1952

1953
#if 0
1954
void blockDebugShowDataBlock(SSDataBlock* pBlock, const char* flag) {
S
slzhou 已提交
1955 1956
  SArray* dataBlocks = taosArrayInit(1, sizeof(SSDataBlock*));
  taosArrayPush(dataBlocks, &pBlock);
1957 1958 1959 1960
  blockDebugShowDataBlocks(dataBlocks, flag);
  taosArrayDestroy(dataBlocks);
}

S
slzhou 已提交
1961
void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) {
C
Cary Xu 已提交
1962
  char    pBuf[128] = {0};
L
Liu Jicong 已提交
1963 1964
  int32_t sz = taosArrayGetSize(dataBlocks);
  for (int32_t i = 0; i < sz; i++) {
1965
    SSDataBlock* pDataBlock = taosArrayGet(dataBlocks, i);
L
Liu Jicong 已提交
1966
    size_t       numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
1967

L
Liu Jicong 已提交
1968
    int32_t rows = pDataBlock->info.rows;
C
Cary Xu 已提交
1969 1970
    printf("%s |block ver %" PRIi64 " |block type %d |child id %d|group id %" PRIu64 "\n", flag,
           pDataBlock->info.version, (int32_t)pDataBlock->info.type, pDataBlock->info.childId,
H
Haojun Liao 已提交
1971
           pDataBlock->info.id.groupId);
L
Liu Jicong 已提交
1972
    for (int32_t j = 0; j < rows; j++) {
C
Cary Xu 已提交
1973
      printf("%s |", flag);
1974
      for (int32_t k = 0; k < numOfCols; k++) {
L
Liu Jicong 已提交
1975 1976
        SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
        void*            var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
1977 1978 1979
        if (k == 0) {
          printf("cols:%d |", (int32_t)numOfCols);
        }
5
54liuyao 已提交
1980
        if (colDataIsNull(pColInfoData, rows, j, NULL)) {
5
54liuyao 已提交
1981 1982 1983
          printf(" %15s |", "NULL");
          continue;
        }
1984

L
Liu Jicong 已提交
1985 1986 1987 1988 1989
        switch (pColInfoData->info.type) {
          case TSDB_DATA_TYPE_TIMESTAMP:
            formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI);
            printf(" %25s |", pBuf);
            break;
C
Cary Xu 已提交
1990
          case TSDB_DATA_TYPE_BOOL:
1991 1992 1993 1994 1995 1996
            printf(" %15" PRIi8 " |", *(int8_t*)var);
            break;
          case TSDB_DATA_TYPE_TINYINT:
            printf(" %15" PRIi8 " |", *(int8_t*)var);
            break;
          case TSDB_DATA_TYPE_SMALLINT:
K
kailixu 已提交
1997
            printf(" %15" PRIi16 " |", *(int16_t*)var);
C
Cary Xu 已提交
1998
            break;
L
Liu Jicong 已提交
1999 2000 2001
          case TSDB_DATA_TYPE_INT:
            printf(" %15d |", *(int32_t*)var);
            break;
2002
          case TSDB_DATA_TYPE_UTINYINT:
K
kailixu 已提交
2003
            printf(" %15" PRIu8 " |", *(uint8_t*)var);
2004 2005
            break;
          case TSDB_DATA_TYPE_USMALLINT:
K
kailixu 已提交
2006
            printf(" %15" PRIu16 " |", *(uint16_t*)var);
2007
            break;
L
Liu Jicong 已提交
2008 2009 2010
          case TSDB_DATA_TYPE_UINT:
            printf(" %15u |", *(uint32_t*)var);
            break;
L
Liu Jicong 已提交
2011
          case TSDB_DATA_TYPE_BIGINT:
2012
            printf(" %15" PRId64 " |", *(int64_t*)var);
L
Liu Jicong 已提交
2013
            break;
L
Liu Jicong 已提交
2014
          case TSDB_DATA_TYPE_UBIGINT:
2015
            printf(" %15" PRIu64 " |", *(uint64_t*)var);
L
Liu Jicong 已提交
2016
            break;
C
Cary Xu 已提交
2017 2018 2019
          case TSDB_DATA_TYPE_FLOAT:
            printf(" %15f |", *(float*)var);
            break;
5
54liuyao 已提交
2020
          case TSDB_DATA_TYPE_DOUBLE:
C
Cary Xu 已提交
2021
            printf(" %15lf |", *(double*)var);
5
54liuyao 已提交
2022
            break;
D
Dingle Zhang 已提交
2023 2024
          case TSDB_DATA_TYPE_VARCHAR:
          case TSDB_DATA_TYPE_GEOMETRY: {
C
Cary Xu 已提交
2025
            char*   pData = colDataGetVarData(pColInfoData, j);
C
Cary Xu 已提交
2026 2027
            int32_t dataSize = TMIN(sizeof(pBuf) - 1, varDataLen(pData));
            memset(pBuf, 0, dataSize + 1);
C
Cary Xu 已提交
2028 2029 2030 2031 2032
            strncpy(pBuf, varDataVal(pData), dataSize);
            printf(" %15s |", pBuf);
          } break;
          case TSDB_DATA_TYPE_NCHAR: {
            char*   pData = colDataGetVarData(pColInfoData, j);
C
Cary Xu 已提交
2033
            int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData));
C
Cary Xu 已提交
2034
            memset(pBuf, 0, dataSize);
C
Cary Xu 已提交
2035
            (void)taosUcs4ToMbs((TdUcs4*)varDataVal(pData), dataSize, pBuf);
C
Cary Xu 已提交
2036 2037 2038 2039
            printf(" %15s |", pBuf);
          } break;
          default:
            break;
L
Liu Jicong 已提交
2040 2041 2042 2043 2044
        }
      }
      printf("\n");
    }
  }
5
54liuyao 已提交
2045
}
2046 2047
#endif

5
54liuyao 已提交
2048 2049
// for debug
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) {
2050
  int32_t size = 2048*1024;
5
54liuyao 已提交
2051
  *pDataBuf = taosMemoryCalloc(size, 1);
2052 2053
  char*   dumpBuf = *pDataBuf;
  char    pBuf[128] = {0};
5
54liuyao 已提交
2054 2055 2056
  int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
  int32_t rows = pDataBlock->info.rows;
  int32_t len = 0;
S
Shengliang Guan 已提交
2057 2058
  len += snprintf(dumpBuf + len, size - len,
                  "===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64
D
dapan1121 已提交
2059
                  "|rows:%" PRId64 "|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "|tbl:%s\n",
H
Haojun Liao 已提交
2060
                  flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId,
L
Liu Jicong 已提交
2061
                  pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version,
X
Xiaoyu Wang 已提交
2062
                  pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey, pDataBlock->info.parTbName);
S
Shengliang Guan 已提交
2063
  if (len >= size - 1) return dumpBuf;
5
54liuyao 已提交
2064

5
54liuyao 已提交
2065
  for (int32_t j = 0; j < rows; j++) {
L
liuyao 已提交
2066
    len += snprintf(dumpBuf + len, size - len, "%s|", flag);
L
Liu Jicong 已提交
2067
    if (len >= size - 1) return dumpBuf;
5
54liuyao 已提交
2068

5
54liuyao 已提交
2069 2070
    for (int32_t k = 0; k < colNum; k++) {
      SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
5
54liuyao 已提交
2071
      if (colDataIsNull(pColInfoData, rows, j, NULL) || !pColInfoData->pData) {
5
54liuyao 已提交
2072
        len += snprintf(dumpBuf + len, size - len, " %15s |", "NULL");
L
Liu Jicong 已提交
2073
        if (len >= size - 1) return dumpBuf;
5
54liuyao 已提交
2074 2075
        continue;
      }
5
54liuyao 已提交
2076 2077

      void* var = colDataGetData(pColInfoData, j);
5
54liuyao 已提交
2078 2079
      switch (pColInfoData->info.type) {
        case TSDB_DATA_TYPE_TIMESTAMP:
5
54liuyao 已提交
2080
          memset(pBuf, 0, sizeof(pBuf));
5
54liuyao 已提交
2081
          formatTimestamp(pBuf, *(uint64_t*)var, pColInfoData->info.precision);
5
54liuyao 已提交
2082
          len += snprintf(dumpBuf + len, size - len, " %25s |", pBuf);
L
Liu Jicong 已提交
2083
          if (len >= size - 1) return dumpBuf;
5
54liuyao 已提交
2084
          break;
2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100
        case TSDB_DATA_TYPE_TINYINT:
          len += snprintf(dumpBuf + len, size - len, " %15d |", *(int8_t*)var);
          if (len >= size - 1) return dumpBuf;
          break;
        case TSDB_DATA_TYPE_UTINYINT:
          len += snprintf(dumpBuf + len, size - len, " %15d |", *(uint8_t*)var);
          if (len >= size - 1) return dumpBuf;
          break;
        case TSDB_DATA_TYPE_SMALLINT:
          len += snprintf(dumpBuf + len, size - len, " %15d |", *(int16_t*)var);
          if (len >= size - 1) return dumpBuf;
          break;
        case TSDB_DATA_TYPE_USMALLINT:
          len += snprintf(dumpBuf + len, size - len, " %15d |", *(uint16_t*)var);
          if (len >= size - 1) return dumpBuf;
          break;
5
54liuyao 已提交
2101 2102
        case TSDB_DATA_TYPE_INT:
          len += snprintf(dumpBuf + len, size - len, " %15d |", *(int32_t*)var);
L
Liu Jicong 已提交
2103
          if (len >= size - 1) return dumpBuf;
5
54liuyao 已提交
2104 2105 2106
          break;
        case TSDB_DATA_TYPE_UINT:
          len += snprintf(dumpBuf + len, size - len, " %15u |", *(uint32_t*)var);
L
Liu Jicong 已提交
2107
          if (len >= size - 1) return dumpBuf;
5
54liuyao 已提交
2108 2109
          break;
        case TSDB_DATA_TYPE_BIGINT:
2110
          len += snprintf(dumpBuf + len, size - len, " %15" PRId64 " |", *(int64_t*)var);
L
Liu Jicong 已提交
2111
          if (len >= size - 1) return dumpBuf;
5
54liuyao 已提交
2112 2113
          break;
        case TSDB_DATA_TYPE_UBIGINT:
2114
          len += snprintf(dumpBuf + len, size - len, " %15" PRIu64 " |", *(uint64_t*)var);
L
Liu Jicong 已提交
2115
          if (len >= size - 1) return dumpBuf;
5
54liuyao 已提交
2116 2117 2118
          break;
        case TSDB_DATA_TYPE_FLOAT:
          len += snprintf(dumpBuf + len, size - len, " %15f |", *(float*)var);
L
Liu Jicong 已提交
2119
          if (len >= size - 1) return dumpBuf;
5
54liuyao 已提交
2120 2121
          break;
        case TSDB_DATA_TYPE_DOUBLE:
5
54liuyao 已提交
2122 2123
          len += snprintf(dumpBuf + len, size - len, " %15f |", *(double*)var);
          if (len >= size - 1) return dumpBuf;
5
54liuyao 已提交
2124
          break;
5
54liuyao 已提交
2125 2126 2127 2128
        case TSDB_DATA_TYPE_BOOL:
          len += snprintf(dumpBuf + len, size - len, " %15d |", *(bool*)var);
          if (len >= size - 1) return dumpBuf;
          break;
D
Dingle Zhang 已提交
2129 2130
        case TSDB_DATA_TYPE_VARCHAR:
        case TSDB_DATA_TYPE_GEOMETRY: {
5
54liuyao 已提交
2131
          memset(pBuf, 0, sizeof(pBuf));
L
Liu Jicong 已提交
2132
          char*   pData = colDataGetVarData(pColInfoData, j);
5
54liuyao 已提交
2133
          int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData));
5
54liuyao 已提交
2134
          dataSize = TMIN(dataSize, 50);
5
54liuyao 已提交
2135
          memcpy(pBuf, varDataVal(pData), dataSize);
5
54liuyao 已提交
2136 2137
          len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf);
          if (len >= size - 1) return dumpBuf;
L
Liu Jicong 已提交
2138
        } break;
5
54liuyao 已提交
2139
        case TSDB_DATA_TYPE_NCHAR: {
L
Liu Jicong 已提交
2140
          char*   pData = colDataGetVarData(pColInfoData, j);
5
54liuyao 已提交
2141 2142
          int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData));
          memset(pBuf, 0, sizeof(pBuf));
C
Cary Xu 已提交
2143
          (void)taosUcs4ToMbs((TdUcs4*)varDataVal(pData), dataSize, pBuf);
5
54liuyao 已提交
2144 2145
          len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf);
          if (len >= size - 1) return dumpBuf;
L
Liu Jicong 已提交
2146
        } break;
5
54liuyao 已提交
2147 2148
      }
    }
L
liuyao 已提交
2149
    len += snprintf(dumpBuf + len, size - len, "%d\n", j);
L
Liu Jicong 已提交
2150
    if (len >= size - 1) return dumpBuf;
5
54liuyao 已提交
2151 2152 2153
  }
  len += snprintf(dumpBuf + len, size - len, "%s |end\n", flag);
  return dumpBuf;
L
Liu Jicong 已提交
2154 2155
}

C
Cary Xu 已提交
2156 2157 2158 2159
/**
 * @brief TODO: Assume that the final generated result it less than 3M
 *
 * @param pReq
C
Cary Xu 已提交
2160
 * @param pDataBlocks
C
Cary Xu 已提交
2161
 * @param vgId
C
Cary Xu 已提交
2162
 * @param suid
L
Liu Jicong 已提交
2163
 *
C
Cary Xu 已提交
2164
 */
2165
#if 0
K
kailixu 已提交
2166
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataBlock, STSchema* pTSchema, int32_t vgId,
2167
                                    tb_uid_t suid) {
K
kailixu 已提交
2168
  int32_t bufSize = sizeof(SSubmitReq);
C
Cary Xu 已提交
2169
  int32_t sz = 1;
C
Cary Xu 已提交
2170
  for (int32_t i = 0; i < sz; ++i) {
C
Cary Xu 已提交
2171
    const SDataBlockInfo* pBlkInfo = &pDataBlock->info;
2172

C
Cary Xu 已提交
2173 2174
    int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
    bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(colNum));
C
Cary Xu 已提交
2175 2176 2177 2178
    bufSize += sizeof(SSubmitBlk);
  }

  *pReq = taosMemoryCalloc(1, bufSize);
2179
  if (!(*pReq)) {
C
Cary Xu 已提交
2180 2181 2182
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
2183 2184
  void* pDataBuf = *pReq;

2185
  int32_t     msgLen = sizeof(SSubmitReq);
C
Cary Xu 已提交
2186 2187
  int32_t     numOfBlks = 0;
  SRowBuilder rb = {0};
C
Cary Xu 已提交
2188
  tdSRowInit(&rb, pTSchema->version);
C
Cary Xu 已提交
2189 2190

  for (int32_t i = 0; i < sz; ++i) {
L
Liu Jicong 已提交
2191 2192
    int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
    int32_t rows = pDataBlock->info.rows;
2193

2194 2195 2196 2197 2198
    if (colNum <= 1) {
      // invalid if only with TS col
      continue;
    }

2199
    if (rb.nCols != colNum) {
C
Cary Xu 已提交
2200 2201 2202 2203 2204
      tdSRowSetTpInfo(&rb, colNum, pTSchema->flen);
    }

    SSubmitBlk* pSubmitBlk = POINTER_SHIFT(pDataBuf, msgLen);
    pSubmitBlk->suid = suid;
H
Haojun Liao 已提交
2205
    pSubmitBlk->uid = pDataBlock->info.id.groupId;
C
Cary Xu 已提交
2206
    pSubmitBlk->numOfRows = rows;
C
Cary Xu 已提交
2207
    pSubmitBlk->sversion = pTSchema->version;
C
Cary Xu 已提交
2208 2209 2210

    msgLen += sizeof(SSubmitBlk);
    int32_t dataLen = 0;
L
Liu Jicong 已提交
2211
    for (int32_t j = 0; j < rows; ++j) {                               // iterate by row
C
Cary Xu 已提交
2212
      tdSRowResetBuf(&rb, POINTER_SHIFT(pDataBuf, msgLen + dataLen));  // set row buf
2213
      bool    isStartKey = false;
2214
      int32_t offset = 0;
C
Cary Xu 已提交
2215 2216
      for (int32_t k = 0; k < colNum; ++k) {  // iterate by column
        SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
2217 2218
        STColumn*        pCol = &pTSchema->columns[k];
        void*            var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
C
Cary Xu 已提交
2219 2220 2221 2222
        switch (pColInfoData->info.type) {
          case TSDB_DATA_TYPE_TIMESTAMP:
            if (!isStartKey) {
              isStartKey = true;
2223
              tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true,
2224
                                  offset, k);
2225
              continue; // offset should keep 0 for next column
2226

2227 2228 2229
            } else if (colDataIsNull_s(pColInfoData, j)) {
              tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NULL, NULL,
                                  false, offset, k);
C
Cary Xu 已提交
2230
            } else {
2231 2232
              tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var,
                                  true, offset, k);
C
Cary Xu 已提交
2233 2234
            }
            break;
2235
          case TSDB_DATA_TYPE_NCHAR:
D
Dingle Zhang 已提交
2236 2237
          case TSDB_DATA_TYPE_VARCHAR:  // TSDB_DATA_TYPE_BINARY
          case TSDB_DATA_TYPE_GEOMETRY: {
2238 2239 2240 2241 2242 2243 2244 2245
            if (colDataIsNull_s(pColInfoData, j)) {
              tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pColInfoData->info.type, TD_VTYPE_NULL, NULL,
                                  false, offset, k);
            } else {
              void* data = colDataGetData(pColInfoData, j);
              tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pColInfoData->info.type, TD_VTYPE_NORM, data,
                                  true, offset, k);
            }
C
Cary Xu 已提交
2246 2247 2248 2249 2250
            break;
          }
          case TSDB_DATA_TYPE_VARBINARY:
          case TSDB_DATA_TYPE_DECIMAL:
          case TSDB_DATA_TYPE_BLOB:
2251
          case TSDB_DATA_TYPE_JSON:
C
Cary Xu 已提交
2252
          case TSDB_DATA_TYPE_MEDIUMBLOB:
C
Cary Xu 已提交
2253
            uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
C
Cary Xu 已提交
2254 2255 2256
            break;
          default:
            if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
2257
              if (colDataIsNull_s(pColInfoData, j)) {
L
Liu Jicong 已提交
2258 2259
                tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NULL, NULL, false,
                                    offset, k);
2260
              } else if (pCol->type == pColInfoData->info.type) {
C
Cary Xu 已提交
2261 2262
                tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NORM, var, true, offset,
                                    k);
2263
              } else {
C
Cary Xu 已提交
2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283
                char tv[8] = {0};
                if (pColInfoData->info.type == TSDB_DATA_TYPE_FLOAT) {
                  float v = 0;
                  GET_TYPED_DATA(v, float, pColInfoData->info.type, var);
                  SET_TYPED_DATA(&tv, pCol->type, v);
                } else if (pColInfoData->info.type == TSDB_DATA_TYPE_DOUBLE) {
                  double v = 0;
                  GET_TYPED_DATA(v, double, pColInfoData->info.type, var);
                  SET_TYPED_DATA(&tv, pCol->type, v);
                } else if (IS_SIGNED_NUMERIC_TYPE(pColInfoData->info.type)) {
                  int64_t v = 0;
                  GET_TYPED_DATA(v, int64_t, pColInfoData->info.type, var);
                  SET_TYPED_DATA(&tv, pCol->type, v);
                } else {
                  uint64_t v = 0;
                  GET_TYPED_DATA(v, uint64_t, pColInfoData->info.type, var);
                  SET_TYPED_DATA(&tv, pCol->type, v);
                }
                tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NORM, tv, true, offset,
                                    k);
2284
              }
C
Cary Xu 已提交
2285
            } else {
C
Cary Xu 已提交
2286
              uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
C
Cary Xu 已提交
2287 2288 2289
            }
            break;
        }
2290
        offset += TYPE_BYTES[pCol->type];  // sum/avg would convert to int64_t/uint64_t/double during aggregation
C
Cary Xu 已提交
2291
      }
2292
      tdSRowEnd(&rb);
C
Cary Xu 已提交
2293
      dataLen += TD_ROW_LEN(rb.pBuf);
C
Cary Xu 已提交
2294 2295 2296
#ifdef TD_DEBUG_PRINT_ROW
      tdSRowPrint(rb.pBuf, pTSchema, __func__);
#endif
C
Cary Xu 已提交
2297
    }
C
Cary Xu 已提交
2298 2299 2300

    ++numOfBlks;

C
Cary Xu 已提交
2301 2302 2303 2304
    pSubmitBlk->dataLen = dataLen;
    msgLen += pSubmitBlk->dataLen;
  }

2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319
  if (numOfBlks > 0) {
    (*pReq)->length = msgLen;

    (*pReq)->header.vgId = htonl(vgId);
    (*pReq)->header.contLen = htonl(msgLen);
    (*pReq)->length = (*pReq)->header.contLen;
    (*pReq)->numOfBlocks = htonl(numOfBlks);
    SSubmitBlk* blk = (SSubmitBlk*)((*pReq) + 1);
    while (numOfBlks--) {
      int32_t dataLen = blk->dataLen;
      blk->uid = htobe64(blk->uid);
      blk->suid = htobe64(blk->suid);
      blk->sversion = htonl(blk->sversion);
      blk->dataLen = htonl(blk->dataLen);
      blk->schemaLen = htonl(blk->schemaLen);
2320
      blk->numOfRows = htonl(blk->numOfRows);
2321 2322 2323 2324 2325
      blk = (SSubmitBlk*)(blk->data + dataLen);
    }
  } else {
    // no valid rows
    taosMemoryFreeClear(*pReq);
C
Cary Xu 已提交
2326
  }
C
Cary Xu 已提交
2327 2328

  return TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
2329
}
2330 2331
#endif

2332 2333 2334
int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDataBlock, const STSchema* pTSchema,
                                    int64_t uid, int32_t vgId, tb_uid_t suid) {
  SSubmitReq2* pReq = *ppReq;
2335 2336 2337 2338
  SArray*      pVals = NULL;
  int32_t      numOfBlks = 0;
  int32_t      sz = 1;

K
kailixu 已提交
2339 2340
  terrno = TSDB_CODE_SUCCESS;

2341 2342 2343 2344 2345
  if (NULL == pReq) {
    if (!(pReq = taosMemoryMalloc(sizeof(SSubmitReq2)))) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _end;
    }
2346

2347 2348 2349
    if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
      goto _end;
    }
2350 2351 2352 2353 2354 2355 2356 2357 2358 2359
  }

  for (int32_t i = 0; i < sz; ++i) {
    int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
    int32_t rows = pDataBlock->info.rows;

    if (colNum <= 1) {  // invalid if only with TS col
      continue;
    }

K
kailixu 已提交
2360 2361 2362
    // the rsma result should has the same column number with schema.
    ASSERT(colNum == pTSchema->numOfCols);

K
kailixu 已提交
2363
    SSubmitTbData tbData = {0};
2364

K
kailixu 已提交
2365
    if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
2366
      goto _end;
2367
    }
K
kailixu 已提交
2368
    tbData.suid = suid;
2369
    tbData.uid = uid;
K
kailixu 已提交
2370
    tbData.sver = pTSchema->version;
2371 2372

    if (!pVals && !(pVals = taosArrayInit(colNum, sizeof(SColVal)))) {
K
kailixu 已提交
2373
      taosArrayDestroy(tbData.aRowP);
2374
      goto _end;
2375 2376 2377 2378 2379 2380 2381 2382 2383 2384
    }

    for (int32_t j = 0; j < rows; ++j) {  // iterate by row

      taosArrayClear(pVals);

      bool    isStartKey = false;
      int32_t offset = 0;
      for (int32_t k = 0; k < colNum; ++k) {  // iterate by column
        SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
2385
        const STColumn*  pCol = &pTSchema->columns[k];
2386 2387 2388 2389 2390 2391 2392
        void*            var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);

        switch (pColInfoData->info.type) {
          case TSDB_DATA_TYPE_TIMESTAMP:
            ASSERT(pColInfoData->info.type == pCol->type);
            if (!isStartKey) {
              isStartKey = true;
2393 2394
              ASSERT(PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId);
              SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, (SValue){.val = *(TSKEY*)var});
2395 2396
              taosArrayPush(pVals, &cv);
            } else if (colDataIsNull_s(pColInfoData, j)) {
2397
              SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
2398 2399
              taosArrayPush(pVals, &cv);
            } else {
2400
              SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, (SValue){.val = *(int64_t*)var});
2401 2402 2403 2404 2405 2406 2407
              taosArrayPush(pVals, &cv);
            }
            break;
          case TSDB_DATA_TYPE_NCHAR:
          case TSDB_DATA_TYPE_VARCHAR: {  // TSDB_DATA_TYPE_BINARY
            ASSERT(pColInfoData->info.type == pCol->type);
            if (colDataIsNull_s(pColInfoData, j)) {
2408
              SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
2409 2410
              taosArrayPush(pVals, &cv);
            } else {
K
kailixu 已提交
2411
              void*   data = colDataGetVarData(pColInfoData, j);
2412
              SValue  sv = (SValue){.nData = varDataLen(data), .pData = varDataVal(data)};  // address copy, no value
2413
              SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428
              taosArrayPush(pVals, &cv);
            }
            break;
          }
          case TSDB_DATA_TYPE_VARBINARY:
          case TSDB_DATA_TYPE_DECIMAL:
          case TSDB_DATA_TYPE_BLOB:
          case TSDB_DATA_TYPE_JSON:
          case TSDB_DATA_TYPE_MEDIUMBLOB:
            uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
            ASSERT(0);
            break;
          default:
            if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
              if (colDataIsNull_s(pColInfoData, j)) {
2429
                SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);  // should use pCol->type
2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459
                taosArrayPush(pVals, &cv);
              } else {
                SValue sv;
                if (pCol->type == pColInfoData->info.type) {
                  memcpy(&sv.val, var, tDataTypes[pCol->type].bytes);
                } else {
                  /**
                   *  1. sum/avg would convert to int64_t/uint64_t/double during aggregation
                   *  2. below conversion may lead to overflow or loss, the app should select the right data type.
                   */
                  char tv[8] = {0};
                  if (pColInfoData->info.type == TSDB_DATA_TYPE_FLOAT) {
                    float v = 0;
                    GET_TYPED_DATA(v, float, pColInfoData->info.type, var);
                    SET_TYPED_DATA(&tv, pCol->type, v);
                  } else if (pColInfoData->info.type == TSDB_DATA_TYPE_DOUBLE) {
                    double v = 0;
                    GET_TYPED_DATA(v, double, pColInfoData->info.type, var);
                    SET_TYPED_DATA(&tv, pCol->type, v);
                  } else if (IS_SIGNED_NUMERIC_TYPE(pColInfoData->info.type)) {
                    int64_t v = 0;
                    GET_TYPED_DATA(v, int64_t, pColInfoData->info.type, var);
                    SET_TYPED_DATA(&tv, pCol->type, v);
                  } else {
                    uint64_t v = 0;
                    GET_TYPED_DATA(v, uint64_t, pColInfoData->info.type, var);
                    SET_TYPED_DATA(&tv, pCol->type, v);
                  }
                  memcpy(&sv.val, tv, tDataTypes[pCol->type].bytes);
                }
2460
                SColVal cv = COL_VAL_VALUE(pCol->colId, pColInfoData->info.type, sv);
2461 2462 2463 2464 2465 2466 2467 2468 2469 2470
                taosArrayPush(pVals, &cv);
              }
            } else {
              uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
              ASSERT(0);
            }
            break;
        }
      }
      SRow* pRow = NULL;
2471
      if ((terrno = tRowBuild(pVals, pTSchema, &pRow)) < 0) {
2472
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
2473
        goto _end;
2474
      }
2475
      ASSERT(pRow);
K
kailixu 已提交
2476
      taosArrayPush(tbData.aRowP, &pRow);
2477
    }
2478

K
kailixu 已提交
2479
    taosArrayPush(pReq->aSubmitTbData, &tbData);
2480 2481
  }
_end:
2482
  taosArrayDestroy(pVals);
2483
  if (terrno != 0) {
2484
    *ppReq = NULL;
H
Haojun Liao 已提交
2485
    if (pReq) {
2486
      tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
H
Haojun Liao 已提交
2487 2488 2489
      taosMemoryFreeClear(pReq);
    }

2490 2491
    return TSDB_CODE_FAILED;
  }
2492
  *ppReq = pReq;
2493 2494
  return TSDB_CODE_SUCCESS;
}
L
Liu Jicong 已提交
2495

L
Liu Jicong 已提交
2496
char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) {
L
liuyao 已提交
2497 2498 2499 2500 2501 2502 2503
  char* pBuf = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1);
  if (!pBuf) {
    return NULL;
  }
  int32_t code = buildCtbNameByGroupIdImpl(stbFullName, groupId, pBuf);
  if (code != TSDB_CODE_SUCCESS) {
    taosMemoryFree(pBuf);
G
Ganlin Zhao 已提交
2504 2505
    return NULL;
  }
L
liuyao 已提交
2506 2507 2508 2509 2510 2511 2512
  return pBuf;
}

int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, char* cname) {
  if (stbFullName[0] == 0) {
    return TSDB_CODE_FAILED;
  }
G
Ganlin Zhao 已提交
2513

L
Liu Jicong 已提交
2514
  SArray* tags = taosArrayInit(0, sizeof(SSmlKv));
L
Liu Jicong 已提交
2515
  if (tags == NULL) {
L
liuyao 已提交
2516
    return TSDB_CODE_FAILED;
L
Liu Jicong 已提交
2517 2518 2519 2520
  }

  if (cname == NULL) {
    taosArrayDestroy(tags);
L
liuyao 已提交
2521
    return TSDB_CODE_FAILED;
L
Liu Jicong 已提交
2522 2523
  }

L
Liu Jicong 已提交
2524 2525 2526 2527
  SSmlKv pTag = {.key = "group_id",
                 .keyLen = sizeof("group_id") - 1,
                 .type = TSDB_DATA_TYPE_UBIGINT,
                 .u = groupId,
2528
                 .length = sizeof(uint64_t)};
2529 2530 2531 2532
  taosArrayPush(tags, &pTag);

  RandTableName rname = {
      .tags = tags,
L
Liu Jicong 已提交
2533 2534 2535
      .stbFullName = stbFullName,
      .stbFullNameLen = strlen(stbFullName),
      .ctbShortName = cname,
2536 2537 2538 2539 2540 2541
  };

  buildChildTableName(&rname);

  taosArrayDestroy(tags);

G
Ganlin Zhao 已提交
2542
  if ((rname.ctbShortName && rname.ctbShortName[0]) == 0) {
L
liuyao 已提交
2543
    return TSDB_CODE_FAILED;
G
Ganlin Zhao 已提交
2544
  }
L
liuyao 已提交
2545
  return TSDB_CODE_SUCCESS;
2546 2547
}

H
Haojun Liao 已提交
2548 2549 2550
int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
  int32_t dataLen = 0;

2551
  // todo extract method
2552 2553 2554 2555
  int32_t* version = (int32_t*)data;
  *version = 1;
  data += sizeof(int32_t);

2556 2557 2558
  int32_t* actualLen = (int32_t*)data;
  data += sizeof(int32_t);

2559 2560 2561
  int32_t* rows = (int32_t*)data;
  *rows = pBlock->info.rows;
  data += sizeof(int32_t);
H
Haojun Liao 已提交
2562
  ASSERT(*rows > 0);
2563 2564 2565 2566 2567

  int32_t* cols = (int32_t*)data;
  *cols = numOfCols;
  data += sizeof(int32_t);

2568 2569 2570
  // flag segment.
  // the inital bit is for column info
  int32_t* flagSegment = (int32_t*)data;
L
Liu Jicong 已提交
2571
  *flagSegment = (1 << 31);
2572

2573 2574
  data += sizeof(int32_t);

2575 2576 2577
  uint64_t* groupId = (uint64_t*)data;
  data += sizeof(uint64_t);

L
Liu Jicong 已提交
2578
  for (int32_t i = 0; i < numOfCols; ++i) {
2579
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
2580

2581 2582
    *((int8_t*)data) = pColInfoData->info.type;
    data += sizeof(int8_t);
2583

L
Liu Jicong 已提交
2584
    *((int32_t*)data) = pColInfoData->info.bytes;
2585 2586 2587
    data += sizeof(int32_t);
  }

2588 2589 2590
  int32_t* colSizes = (int32_t*)data;
  data += numOfCols * sizeof(int32_t);

H
Haojun Liao 已提交
2591
  dataLen = blockDataGetSerialMetaSize(numOfCols);
2592 2593 2594 2595 2596 2597

  int32_t numOfRows = pBlock->info.rows;
  for (int32_t col = 0; col < numOfCols; ++col) {
    SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, col);

    // copy the null bitmap
2598
    size_t metaSize = 0;
2599
    if (IS_VAR_DATA_TYPE(pColRes->info.type)) {
2600
      metaSize = numOfRows * sizeof(int32_t);
2601 2602
      memcpy(data, pColRes->varmeta.offset, metaSize);
    } else {
2603 2604
      metaSize = BitmapLen(numOfRows);
      memcpy(data, pColRes->nullbitmap, metaSize);
2605 2606
    }

2607
    data += metaSize;
H
Haojun Liao 已提交
2608
    dataLen += metaSize;
2609

2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631
     if (pColRes->reassigned && IS_VAR_DATA_TYPE(pColRes->info.type)) {
        colSizes[col] = 0;
        for (int32_t row = 0; row < numOfRows; ++row) {
          char* pColData = pColRes->pData + pColRes->varmeta.offset[row];
          int32_t colSize = 0;
          if (pColRes->info.type == TSDB_DATA_TYPE_JSON) {
            colSize = getJsonValueLen(pColData);
          } else {
            colSize = varDataTLen(pColData);
          }
          colSizes[col] += colSize;
          dataLen += colSize;
          memmove(data, pColData, colSize);
          data += colSize;
        }
    } else {
      colSizes[col] = colDataGetLength(pColRes, numOfRows);
      dataLen += colSizes[col];
      if (pColRes->pData != NULL) {
        memmove(data, pColRes->pData, colSizes[col]);
      }
      data += colSizes[col];
2632
    }
2633

2634
    colSizes[col] = htonl(colSizes[col]);
2635
//    uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type, htonl(colSizes[col]), colSizes[col]);
2636 2637
  }

H
Haojun Liao 已提交
2638
  *actualLen = dataLen;
H
Haojun Liao 已提交
2639
  *groupId = pBlock->info.id.groupId;
H
Haojun Liao 已提交
2640 2641
  ASSERT(dataLen > 0);
  return dataLen;
2642 2643
}

2644
const char* blockDecode(SSDataBlock* pBlock, const char* pData) {
2645 2646
  const char* pStart = pData;

L
Liu Jicong 已提交
2647
  int32_t version = *(int32_t*)pStart;
2648 2649 2650
  pStart += sizeof(int32_t);
  ASSERT(version == 1);

2651
  // total length sizeof(int32_t)
2652 2653 2654
  int32_t dataLen = *(int32_t*)pStart;
  pStart += sizeof(int32_t);

2655
  // total rows sizeof(int32_t)
L
Liu Jicong 已提交
2656
  int32_t numOfRows = *(int32_t*)pStart;
2657 2658 2659 2660 2661 2662 2663
  pStart += sizeof(int32_t);

  // total columns sizeof(int32_t)
  int32_t numOfCols = *(int32_t*)pStart;
  pStart += sizeof(int32_t);

  // has column info segment
2664 2665
  int32_t flagSeg = *(int32_t*)pStart;
  int32_t hasColumnInfo = (flagSeg >> 31);
2666 2667 2668
  pStart += sizeof(int32_t);

  // group id sizeof(uint64_t)
H
Haojun Liao 已提交
2669
  pBlock->info.id.groupId = *(uint64_t*)pStart;
2670 2671
  pStart += sizeof(uint64_t);

2672
  if (pBlock->pDataBlock == NULL) {
2673
    pBlock->pDataBlock = taosArrayInit_s(sizeof(SColumnInfoData), numOfCols);
2674 2675
  }

L
Liu Jicong 已提交
2676
  for (int32_t i = 0; i < numOfCols; ++i) {
2677
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
麦壳饼's avatar
麦壳饼 已提交
2678
    pColInfoData->info.type = *(int8_t*)pStart;
2679
    pStart += sizeof(int8_t);
2680 2681 2682

    pColInfoData->info.bytes = *(int32_t*)pStart;
    pStart += sizeof(int32_t);
2683 2684 2685 2686

    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
      pBlock->info.hasVarCol = true;
    }
2687 2688 2689 2690
  }

  blockDataEnsureCapacity(pBlock, numOfRows);

2691 2692 2693 2694
  int32_t* colLen = (int32_t*)pStart;
  pStart += sizeof(int32_t) * numOfCols;

  for (int32_t i = 0; i < numOfCols; ++i) {
2695
    colLen[i] = htonl(colLen[i]);
2696 2697 2698 2699 2700 2701 2702
    ASSERT(colLen[i] >= 0);

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
      memcpy(pColInfoData->varmeta.offset, pStart, sizeof(int32_t) * numOfRows);
      pStart += sizeof(int32_t) * numOfRows;

2703 2704 2705 2706 2707 2708 2709 2710
      if (colLen[i] > 0 && pColInfoData->varmeta.allocLen < colLen[i]) {
        char* tmp = taosMemoryRealloc(pColInfoData->pData, colLen[i]);
        if (tmp == NULL) {
          return NULL;
        }

        pColInfoData->pData = tmp;
        pColInfoData->varmeta.allocLen = colLen[i];
2711
      }
2712 2713

      pColInfoData->varmeta.length = colLen[i];
2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729
    } else {
      memcpy(pColInfoData->nullbitmap, pStart, BitmapLen(numOfRows));
      pStart += BitmapLen(numOfRows);
    }

    if (colLen[i] > 0) {
      memcpy(pColInfoData->pData, pStart, colLen[i]);
    }

    // TODO
    // setting this flag to true temporarily so aggregate function on stable will
    // examine NULL value for non-primary key column
    pColInfoData->hasNull = true;
    pStart += colLen[i];
  }

2730
  pBlock->info.dataLoad = 1;
2731
  pBlock->info.rows = numOfRows;
2732 2733
  ASSERT(pStart - pData == dataLen);
  return pStart;
L
Liu Jicong 已提交
2734
}