tdatablock.c 65.1 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
H
Haojun Liao 已提交
17
#include "tdatablock.h"
S
compare  
Shengliang Guan 已提交
18
#include "tcompare.h"
19
#include "tlog.h"
20
#include "tname.h"
21

22
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
H
Haojun Liao 已提交
23 24 25 26
  ASSERT(pColumnInfoData != NULL);
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
    return pColumnInfoData->varmeta.length;
  } else {
27 28 29 30 31
    if (pColumnInfoData->info.type == TSDB_DATA_TYPE_NULL) {
      return 0;
    } else {
      return pColumnInfoData->info.bytes * numOfRows;
    }
H
Haojun Liao 已提交
32 33 34
  }
}

H
Haojun Liao 已提交
35 36 37 38 39 40 41 42
int32_t colDataGetFullLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
    return pColumnInfoData->varmeta.length + sizeof(int32_t) * numOfRows;
  } else {
    return pColumnInfoData->info.bytes * numOfRows + BitmapLen(numOfRows);
  }
}

H
Haojun Liao 已提交
43 44 45 46
void colDataTrim(SColumnInfoData* pColumnInfoData) {
  // TODO
}

47
int32_t getJsonValueLen(const char* data) {
wmmhello's avatar
wmmhello 已提交
48 49 50 51 52 53 54 55 56
  int32_t dataLen = 0;
  if (*data == TSDB_DATA_TYPE_NULL) {
    dataLen = CHAR_BYTES;
  } else if (*data == TSDB_DATA_TYPE_NCHAR) {
    dataLen = varDataTLen(data + CHAR_BYTES) + CHAR_BYTES;
  } else if (*data == TSDB_DATA_TYPE_DOUBLE) {
    dataLen = DOUBLE_BYTES + CHAR_BYTES;
  } else if (*data == TSDB_DATA_TYPE_BOOL) {
    dataLen = CHAR_BYTES + CHAR_BYTES;
wmmhello's avatar
wmmhello 已提交
57
  } else if (tTagIsJson(data)) {  // json string
wmmhello's avatar
wmmhello 已提交
58 59 60 61 62 63 64
    dataLen = ((STag*)(data))->len;
  } else {
    ASSERT(0);
  }
  return dataLen;
}

65 66 67 68
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull) {
  ASSERT(pColumnInfoData != NULL);

  if (isNull) {
H
Haojun Liao 已提交
69 70
    // There is a placehold for each NULL value of binary or nchar type.
    if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
L
Liu Jicong 已提交
71
      pColumnInfoData->varmeta.offset[currentRow] = -1;  // it is a null value of VAR type.
H
Haojun Liao 已提交
72 73 74 75
    } else {
      colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow);
    }

H
Haojun Liao 已提交
76
    pColumnInfoData->hasNull = true;
77 78 79 80 81
    return 0;
  }

  int32_t type = pColumnInfoData->info.type;
  if (IS_VAR_DATA_TYPE(type)) {
wmmhello's avatar
wmmhello 已提交
82
    int32_t dataLen = 0;
83
    if (type == TSDB_DATA_TYPE_JSON) {
wmmhello's avatar
wmmhello 已提交
84
      dataLen = getJsonValueLen(pData);
85
    } else {
wmmhello's avatar
wmmhello 已提交
86
      dataLen = varDataTLen(pData);
87
    }
88

H
Haojun Liao 已提交
89
    SVarColAttr* pAttr = &pColumnInfoData->varmeta;
90
    if (pAttr->allocLen < pAttr->length + dataLen) {
H
Haojun Liao 已提交
91
      uint32_t newSize = pAttr->allocLen;
92
      if (newSize <= 1) {
H
Haojun Liao 已提交
93 94 95
        newSize = 8;
      }

96
      while (newSize < pAttr->length + dataLen) {
H
Haojun Liao 已提交
97 98 99
        newSize = newSize * 1.5;
      }

wafwerar's avatar
wafwerar 已提交
100
      char* buf = taosMemoryRealloc(pColumnInfoData->pData, newSize);
H
Haojun Liao 已提交
101
      if (buf == NULL) {
H
Haojun Liao 已提交
102
        return TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
103 104 105 106 107 108 109 110 111
      }

      pColumnInfoData->pData = buf;
      pAttr->allocLen = newSize;
    }

    uint32_t len = pColumnInfoData->varmeta.length;
    pColumnInfoData->varmeta.offset[currentRow] = len;

112 113
    memcpy(pColumnInfoData->pData + len, pData, dataLen);
    pColumnInfoData->varmeta.length += dataLen;
114
  } else {
wmmhello's avatar
wmmhello 已提交
115
    memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow, pData, pColumnInfoData->info.bytes);
116 117 118 119 120
  }

  return 0;
}

L
Liu Jicong 已提交
121 122
static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, const SColumnInfoData* pSource,
                          int32_t numOfRow2) {
wmmhello's avatar
wmmhello 已提交
123 124
  if (numOfRow2 <= 0) return;

H
Haojun Liao 已提交
125 126 127 128 129 130 131
  uint32_t total = numOfRow1 + numOfRow2;

  uint32_t remindBits = BitPos(numOfRow1);
  uint32_t shiftBits = 8 - remindBits;

  if (remindBits == 0) {  // no need to shift bits of bitmap
    memcpy(pColumnInfoData->nullbitmap + BitmapLen(numOfRow1), pSource->nullbitmap, BitmapLen(numOfRow2));
wmmhello's avatar
wmmhello 已提交
132 133
    return;
  }
H
Haojun Liao 已提交
134

wmmhello's avatar
wmmhello 已提交
135 136
  uint8_t* p = (uint8_t*)pSource->nullbitmap;
  pColumnInfoData->nullbitmap[BitmapLen(numOfRow1) - 1] |= (p[0] >> remindBits);  // copy remind bits
H
Haojun Liao 已提交
137

wmmhello's avatar
wmmhello 已提交
138 139 140
  if (BitmapLen(numOfRow1) == BitmapLen(total)) {
    return;
  }
H
Haojun Liao 已提交
141

wmmhello's avatar
wmmhello 已提交
142 143
  int32_t len = BitmapLen(numOfRow2);
  int32_t i = 0;
H
Haojun Liao 已提交
144

wmmhello's avatar
wmmhello 已提交
145
  uint8_t* start = (uint8_t*)&pColumnInfoData->nullbitmap[BitmapLen(numOfRow1)];
C
Cary Xu 已提交
146 147
  int32_t  overCount = BitmapLen(total) - BitmapLen(numOfRow1);
  while (i < len) {  // size limit of pSource->nullbitmap
wmmhello's avatar
wmmhello 已提交
148
    if (i >= 1) {
C
Cary Xu 已提交
149
      start[i - 1] |= (p[i] >> remindBits);  // copy remind bits
H
Haojun Liao 已提交
150
    }
wmmhello's avatar
wmmhello 已提交
151

C
Cary Xu 已提交
152
    if (i >= overCount) {  // size limit of pColumnInfoData->nullbitmap
wmmhello's avatar
wmmhello 已提交
153 154 155
      return;
    }

C
Cary Xu 已提交
156
    start[i] |= (p[i] << shiftBits);  // copy shift bits
wmmhello's avatar
wmmhello 已提交
157
    i += 1;
H
Haojun Liao 已提交
158 159 160
  }
}

H
Haojun Liao 已提交
161 162
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity,
                        const SColumnInfoData* pSource, int32_t numOfRow2) {
H
Haojun Liao 已提交
163 164 165 166 167
  ASSERT(pColumnInfoData != NULL && pSource != NULL && pColumnInfoData->info.type == pSource->info.type);
  if (numOfRow2 == 0) {
    return numOfRow1;
  }

wmmhello's avatar
wmmhello 已提交
168 169 170
  if (pSource->hasNull) {
    pColumnInfoData->hasNull = pSource->hasNull;
  }
171

172
  uint32_t finalNumOfRows = numOfRow1 + numOfRow2;
H
Haojun Liao 已提交
173 174
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
    // Handle the bitmap
175
    if (finalNumOfRows > *capacity || (numOfRow1 == 0 && pColumnInfoData->info.bytes != 0)) {
176 177 178 179 180 181 182
      char* p = taosMemoryRealloc(pColumnInfoData->varmeta.offset, sizeof(int32_t) * (numOfRow1 + numOfRow2));
      if (p == NULL) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }

      *capacity = finalNumOfRows;
      pColumnInfoData->varmeta.offset = (int32_t*)p;
H
Haojun Liao 已提交
183 184
    }

L
Liu Jicong 已提交
185
    for (int32_t i = 0; i < numOfRow2; ++i) {
186 187 188 189 190
      if (pSource->varmeta.offset[i] == -1) {
        pColumnInfoData->varmeta.offset[i + numOfRow1] = -1;
      } else {
        pColumnInfoData->varmeta.offset[i + numOfRow1] = pSource->varmeta.offset[i] + pColumnInfoData->varmeta.length;
      }
191
    }
H
Haojun Liao 已提交
192

193
    // copy data
H
Haojun Liao 已提交
194 195 196
    uint32_t len = pSource->varmeta.length;
    uint32_t oldLen = pColumnInfoData->varmeta.length;
    if (pColumnInfoData->varmeta.allocLen < len + oldLen) {
wafwerar's avatar
wafwerar 已提交
197
      char* tmp = taosMemoryRealloc(pColumnInfoData->pData, len + oldLen);
H
Haojun Liao 已提交
198
      if (tmp == NULL) {
199
        return TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
200 201 202 203 204 205
      }

      pColumnInfoData->pData = tmp;
      pColumnInfoData->varmeta.allocLen = len + oldLen;
    }

206 207
    memcpy(pColumnInfoData->pData + oldLen, pSource->pData, len);
    pColumnInfoData->varmeta.length = len + oldLen;
H
Haojun Liao 已提交
208
  } else {
209
    if (finalNumOfRows > *capacity || (numOfRow1 == 0 && pColumnInfoData->info.bytes != 0)) {
H
Haojun Liao 已提交
210 211
      // all data may be null, when the pColumnInfoData->info.type == 0, bytes == 0;
//      ASSERT(finalNumOfRows * pColumnInfoData->info.bytes);
212
      char* tmp = taosMemoryRealloc(pColumnInfoData->pData, finalNumOfRows * pColumnInfoData->info.bytes);
213 214 215
      if (tmp == NULL) {
        return TSDB_CODE_VND_OUT_OF_MEMORY;
      }
H
Haojun Liao 已提交
216

217 218 219 220 221 222 223 224 225 226 227
      pColumnInfoData->pData = tmp;

      if (BitmapLen(numOfRow1) < BitmapLen(finalNumOfRows)) {
        char*    btmp = taosMemoryRealloc(pColumnInfoData->nullbitmap, BitmapLen(finalNumOfRows));
        uint32_t extend = BitmapLen(finalNumOfRows) - BitmapLen(numOfRow1);
        memset(btmp + BitmapLen(numOfRow1), 0, extend);

        pColumnInfoData->nullbitmap = btmp;
      }

      *capacity = finalNumOfRows;
H
Haojun Liao 已提交
228 229
    }

230 231
    doBitmapMerge(pColumnInfoData, numOfRow1, pSource, numOfRow2);

X
Xiaoyu Wang 已提交
232 233 234 235
    if (pSource->pData) {
      int32_t offset = pColumnInfoData->info.bytes * numOfRow1;
      memcpy(pColumnInfoData->pData + offset, pSource->pData, pSource->info.bytes * numOfRow2);
    }
H
Haojun Liao 已提交
236 237 238 239 240
  }

  return numOfRow1 + numOfRow2;
}

L
Liu Jicong 已提交
241 242
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows,
                      const SDataBlockInfo* pBlockInfo) {
243
  ASSERT(pColumnInfoData != NULL && pSource != NULL && pColumnInfoData->info.type == pSource->info.type);
244
  if (numOfRows <= 0) {
245 246 247
    return numOfRows;
  }

248 249 250
  if (pBlockInfo != NULL) {
    ASSERT(pBlockInfo->capacity >= numOfRows);
  }
251

252 253
  if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
    memcpy(pColumnInfoData->varmeta.offset, pSource->varmeta.offset, sizeof(int32_t) * numOfRows);
254 255 256 257 258 259 260 261 262
    if (pColumnInfoData->varmeta.allocLen < pSource->varmeta.length) {
      char* tmp = taosMemoryRealloc(pColumnInfoData->pData, pSource->varmeta.length);
      if (tmp == NULL) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }

      pColumnInfoData->pData = tmp;
      pColumnInfoData->varmeta.allocLen = pSource->varmeta.length;
    }
263 264

    pColumnInfoData->varmeta.length = pSource->varmeta.length;
265
    memcpy(pColumnInfoData->pData, pSource->pData, pSource->varmeta.length);
266 267
  } else {
    memcpy(pColumnInfoData->nullbitmap, pSource->nullbitmap, BitmapLen(numOfRows));
D
dapan1121 已提交
268 269 270
    if (pSource->pData) {
      memcpy(pColumnInfoData->pData, pSource->pData, pSource->info.bytes * numOfRows);
    }
271 272
  }

273 274
  pColumnInfoData->hasNull = pSource->hasNull;
  pColumnInfoData->info = pSource->info;
275 276 277
  return 0;
}

L
Liu Jicong 已提交
278
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) { return taosArrayGetSize(pBlock->pDataBlock); }
279

L
Liu Jicong 已提交
280
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; }
281

282
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex) {
283 284 285 286
  if (pDataBlock == NULL || pDataBlock->info.rows <= 0) {
    return 0;
  }

287 288
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
  if (numOfCols <= 0) {
289 290 291
    return -1;
  }

292 293
  int32_t index = (tsColumnIndex == -1) ? 0 : tsColumnIndex;

294
  SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, index);
295 296 297 298
  if (pColInfoData->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
    return 0;
  }

299 300 301 302 303 304
  TSKEY skey = *(TSKEY*)colDataGetData(pColInfoData, 0);
  TSKEY ekey = *(TSKEY*)colDataGetData(pColInfoData, (pDataBlock->info.rows - 1));

  pDataBlock->info.window.skey = TMIN(skey, ekey);
  pDataBlock->info.window.ekey = TMAX(skey, ekey);

305 306 307
  return 0;
}

308
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) {
wmmhello's avatar
wmmhello 已提交
309
  assert(pSrc != NULL && pDest != NULL);
310 311
  int32_t capacity = pDest->info.capacity;

312 313
  size_t numOfCols = taosArrayGetSize(pDest->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
314
    SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
315
    SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i);
H
Haojun Liao 已提交
316

317 318
    capacity = pDest->info.capacity;
    colDataMergeCol(pCol2, pDest->info.rows, &capacity, pCol1, pSrc->info.rows);
H
Haojun Liao 已提交
319 320
  }

321
  pDest->info.capacity = capacity;
H
Haojun Liao 已提交
322 323 324 325 326 327 328
  pDest->info.rows += pSrc->info.rows;
  return TSDB_CODE_SUCCESS;
}

size_t blockDataGetSize(const SSDataBlock* pBlock) {
  assert(pBlock != NULL);

329 330
  size_t total = 0;
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
L
Liu Jicong 已提交
331
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
332
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
H
Haojun Liao 已提交
333
    total += colDataGetFullLength(pColInfoData, pBlock->info.rows);
H
Haojun Liao 已提交
334 335 336 337 338 339 340
  }

  return total;
}

// the number of tuples can be fit in one page.
// Actual data rows pluses the corresponding meta data must fit in one memory buffer of the given page size.
L
Liu Jicong 已提交
341 342
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex,
                           int32_t pageSize) {
H
Haojun Liao 已提交
343 344
  ASSERT(pBlock != NULL && stopIndex != NULL);

L
Liu Jicong 已提交
345
  size_t  numOfCols = taosArrayGetSize(pBlock->pDataBlock);
H
Haojun Liao 已提交
346 347
  int32_t numOfRows = pBlock->info.rows;

H
Haojun Liao 已提交
348 349
  int32_t bitmapChar = 1;

L
Liu Jicong 已提交
350
  size_t headerSize = sizeof(int32_t);
351
  size_t colHeaderSize = sizeof(int32_t) * numOfCols;
L
Liu Jicong 已提交
352
  size_t payloadSize = pageSize - (headerSize + colHeaderSize);
353

H
Haojun Liao 已提交
354
  // TODO speedup by checking if the whole page can fit in firstly.
H
Haojun Liao 已提交
355
  if (!hasVarCol) {
L
Liu Jicong 已提交
356
    size_t  rowSize = blockDataGetRowSize(pBlock);
357 358
    int32_t capacity = payloadSize / (rowSize + numOfCols * bitmapChar / 8.0);
    ASSERT(capacity > 0);
359

360
    *stopIndex = startIndex + capacity - 1;
H
Haojun Liao 已提交
361 362 363
    if (*stopIndex >= numOfRows) {
      *stopIndex = numOfRows - 1;
    }
364

H
Haojun Liao 已提交
365
    return TSDB_CODE_SUCCESS;
366
  }
wmmhello's avatar
wmmhello 已提交
367 368 369 370 371 372
  // iterate the rows that can be fit in this buffer page
  int32_t size = (headerSize + colHeaderSize);
  for (int32_t j = startIndex; j < numOfRows; ++j) {
    for (int32_t i = 0; i < numOfCols; ++i) {
      SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, i);
      if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
373
        if (pColInfoData->varmeta.offset[j] != -1) {
wmmhello's avatar
wmmhello 已提交
374 375
          char* p = colDataGetData(pColInfoData, j);
          size += varDataTLen(p);
H
Haojun Liao 已提交
376 377
        }

wmmhello's avatar
wmmhello 已提交
378
        size += sizeof(pColInfoData->varmeta.offset[0]);
379
      } else {
wmmhello's avatar
wmmhello 已提交
380
        size += pColInfoData->info.bytes;
H
Haojun Liao 已提交
381

wmmhello's avatar
wmmhello 已提交
382 383 384
        if (((j - startIndex) & 0x07) == 0) {
          size += 1;  // the space for null bitmap
        }
H
Haojun Liao 已提交
385 386 387
      }
    }

wmmhello's avatar
wmmhello 已提交
388
    if (size > pageSize) {  // pageSize must be able to hold one row
wmmhello's avatar
wmmhello 已提交
389
      *stopIndex = j - 1;
wmmhello's avatar
wmmhello 已提交
390
      ASSERT(*stopIndex >= startIndex);
wmmhello's avatar
wmmhello 已提交
391 392 393

      return TSDB_CODE_SUCCESS;
    }
H
Haojun Liao 已提交
394
  }
wmmhello's avatar
wmmhello 已提交
395 396 397 398

  // all fit in
  *stopIndex = numOfRows - 1;
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
399 400
}

401 402 403 404 405
SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount) {
  if (pBlock == NULL || startIndex < 0 || rowCount > pBlock->info.rows || rowCount + startIndex > pBlock->info.rows) {
    return NULL;
  }

406
  SSDataBlock* pDst = createDataBlock();
407 408 409 410
  if (pDst == NULL) {
    return NULL;
  }

411
  pDst->info = pBlock->info;
412
  pDst->info.rows = 0;
413
  pDst->info.capacity = 0;
414 415
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
L
Liu Jicong 已提交
416
    SColumnInfoData  colInfo = {0};
417 418
    SColumnInfoData* pSrcCol = taosArrayGet(pBlock->pDataBlock, i);
    colInfo.info = pSrcCol->info;
419
    blockDataAppendColInfo(pDst, &colInfo);
420 421
  }

422 423 424
  blockDataEnsureCapacity(pDst, rowCount);

  for (int32_t i = 0; i < numOfCols; ++i) {
425 426 427 428
    SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
    SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);

    for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) {
429 430
      bool isNull = false;
      if (pBlock->pBlockAgg == NULL) {
431
        isNull = colDataIsNull_s(pColData, j);
432 433 434
      } else {
        isNull = colDataIsNull(pColData, pBlock->info.rows, j, pBlock->pBlockAgg[i]);
      }
435

436
      char* p = colDataGetData(pColData, j);
437 438 439 440 441 442 443 444
      colDataAppend(pDstCol, j - startIndex, p, isNull);
    }
  }

  pDst->info.rows = rowCount;
  return pDst;
}

H
Haojun Liao 已提交
445 446
/**
 *
447 448 449 450 451
 * +------------------+---------------------------------------------+
 * |the number of rows|                    column #1                |
 * |    (4 bytes)     |------------+-----------------------+--------+
 * |                  | null bitmap| column length(4bytes) | values |
 * +------------------+------------+-----------------------+--------+
H
Haojun Liao 已提交
452 453 454 455
 * @param buf
 * @param pBlock
 * @return
 */
456
int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) {
H
Haojun Liao 已提交
457 458 459
  ASSERT(pBlock != NULL);

  // write the number of rows
L
Liu Jicong 已提交
460
  *(uint32_t*)buf = pBlock->info.rows;
H
Haojun Liao 已提交
461

L
Liu Jicong 已提交
462
  size_t  numOfCols = taosArrayGetSize(pBlock->pDataBlock);
H
Haojun Liao 已提交
463 464 465 466
  int32_t numOfRows = pBlock->info.rows;

  char* pStart = buf + sizeof(uint32_t);

L
Liu Jicong 已提交
467
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
468 469 470 471 472 473 474 475 476
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      memcpy(pStart, pCol->varmeta.offset, numOfRows * sizeof(int32_t));
      pStart += numOfRows * sizeof(int32_t);
    } else {
      memcpy(pStart, pCol->nullbitmap, BitmapLen(numOfRows));
      pStart += BitmapLen(pBlock->info.rows);
    }

477
    uint32_t dataSize = colDataGetLength(pCol, numOfRows);
478

L
Liu Jicong 已提交
479
    *(int32_t*)pStart = dataSize;
480 481
    pStart += sizeof(int32_t);

H
Haojun Liao 已提交
482 483 484 485 486 487 488
    memcpy(pStart, pCol->pData, dataSize);
    pStart += dataSize;
  }

  return 0;
}

489
int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
490 491
  int32_t numOfRows = *(int32_t*) buf;
  blockDataEnsureCapacity(pBlock, numOfRows);
492

493
  pBlock->info.rows = numOfRows;
L
Liu Jicong 已提交
494
  size_t      numOfCols = taosArrayGetSize(pBlock->pDataBlock);
495 496
  const char* pStart = buf + sizeof(uint32_t);

L
Liu Jicong 已提交
497
  for (int32_t i = 0; i < numOfCols; ++i) {
498 499 500
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);

    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
H
Haojun Liao 已提交
501
      size_t metaSize = pBlock->info.rows * sizeof(int32_t);
C
Cary Xu 已提交
502
      char*  tmp = taosMemoryRealloc(pCol->varmeta.offset, metaSize);  // preview calloc is too small
wmmhello's avatar
wmmhello 已提交
503 504 505 506
      if (tmp == NULL) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      pCol->varmeta.offset = (int32_t*)tmp;
507 508 509 510 511 512 513
      memcpy(pCol->varmeta.offset, pStart, metaSize);
      pStart += metaSize;
    } else {
      memcpy(pCol->nullbitmap, pStart, BitmapLen(pBlock->info.rows));
      pStart += BitmapLen(pBlock->info.rows);
    }

L
Liu Jicong 已提交
514
    int32_t colLength = *(int32_t*)pStart;
515 516
    pStart += sizeof(int32_t);

517 518
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      if (pCol->varmeta.allocLen < colLength) {
wafwerar's avatar
wafwerar 已提交
519
        char* tmp = taosMemoryRealloc(pCol->pData, colLength);
520 521 522 523 524 525 526 527 528 529
        if (tmp == NULL) {
          return TSDB_CODE_OUT_OF_MEMORY;
        }

        pCol->pData = tmp;
        pCol->varmeta.allocLen = colLength;
      }

      pCol->varmeta.length = colLength;
      ASSERT(pCol->varmeta.length <= pCol->varmeta.allocLen);
530 531 532 533 534
    }

    memcpy(pCol->pData, pStart, colLength);
    pStart += colLength;
  }
535 536

  return TSDB_CODE_SUCCESS;
537 538
}

539
// todo remove this
H
Haojun Liao 已提交
540
int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity) {
X
Xiaoyu Wang 已提交
541 542
  pBlock->info.rows = *(int32_t*)buf;
  pBlock->info.groupId = *(uint64_t*)(buf + sizeof(int32_t));
H
Haojun Liao 已提交
543

544 545
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);

546
  const char* pStart = buf + sizeof(uint32_t) + sizeof(uint64_t);
H
Haojun Liao 已提交
547 548 549

  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
550
    pCol->hasNull = true;
H
Haojun Liao 已提交
551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579

    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      size_t metaSize = capacity * sizeof(int32_t);
      memcpy(pCol->varmeta.offset, pStart, metaSize);
      pStart += metaSize;
    } else {
      memcpy(pCol->nullbitmap, pStart, BitmapLen(capacity));
      pStart += BitmapLen(capacity);
    }

    int32_t colLength = *(int32_t*)pStart;
    pStart += sizeof(int32_t);

    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      if (pCol->varmeta.allocLen < colLength) {
        char* tmp = taosMemoryRealloc(pCol->pData, colLength);
        if (tmp == NULL) {
          return TSDB_CODE_OUT_OF_MEMORY;
        }

        pCol->pData = tmp;
        pCol->varmeta.allocLen = colLength;
      }

      pCol->varmeta.length = colLength;
      ASSERT(pCol->varmeta.length <= pCol->varmeta.allocLen);
    }

    memcpy(pCol->pData, pStart, colLength);
580
    pStart += pCol->info.bytes * capacity;
H
Haojun Liao 已提交
581 582 583 584 585
  }

  return TSDB_CODE_SUCCESS;
}

586
size_t blockDataGetRowSize(SSDataBlock* pBlock) {
H
Haojun Liao 已提交
587
  ASSERT(pBlock != NULL);
588 589
  if (pBlock->info.rowSize == 0) {
    size_t rowSize = 0;
H
Haojun Liao 已提交
590

591
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
L
Liu Jicong 已提交
592 593 594 595
    for (int32_t i = 0; i < numOfCols; ++i) {
      SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
      rowSize += pColInfo->info.bytes;
    }
596 597

    pBlock->info.rowSize = rowSize;
H
Haojun Liao 已提交
598 599
  }

600
  return pBlock->info.rowSize;
H
Haojun Liao 已提交
601 602
}

H
Haojun Liao 已提交
603 604 605 606 607
/**
 * @refitem blockDataToBuf for the meta size
 * @param pBlock
 * @return
 */
608 609
size_t blockDataGetSerialMetaSize(uint32_t numOfCols) {
  // | total rows/total length | block group id | column schema | each column length |
L
Liu Jicong 已提交
610 611
  return sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t)) +
         numOfCols * sizeof(int32_t);
H
Haojun Liao 已提交
612 613
}

X
Xiaoyu Wang 已提交
614
double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
H
Haojun Liao 已提交
615 616 617
  ASSERT(pBlock != NULL);
  double rowSize = 0;

618
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
L
Liu Jicong 已提交
619
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
620 621 622 623 624 625
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
    rowSize += pColInfo->info.bytes;

    if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
      rowSize += sizeof(int32_t);
    } else {
L
Liu Jicong 已提交
626
      rowSize += 1 / 8.0;  // one bit for each record
H
Haojun Liao 已提交
627 628 629 630 631 632
    }
  }

  return rowSize;
}

H
Haojun Liao 已提交
633
typedef struct SSDataBlockSortHelper {
L
Liu Jicong 已提交
634 635
  SArray*      orderInfo;  // SArray<SBlockOrderInfo>
  SSDataBlock* pDataBlock;
H
Haojun Liao 已提交
636 637 638
} SSDataBlockSortHelper;

int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
L
Liu Jicong 已提交
639
  const SSDataBlockSortHelper* pHelper = (const SSDataBlockSortHelper*)param;
H
Haojun Liao 已提交
640 641 642

  SSDataBlock* pDataBlock = pHelper->pDataBlock;

L
Liu Jicong 已提交
643 644
  int32_t left = *(int32_t*)p1;
  int32_t right = *(int32_t*)p2;
H
Haojun Liao 已提交
645 646

  SArray* pInfo = pHelper->orderInfo;
647

L
Liu Jicong 已提交
648
  for (int32_t i = 0; i < pInfo->size; ++i) {
H
Haojun Liao 已提交
649
    SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
L
Liu Jicong 已提交
650
    SColumnInfoData* pColInfoData = pOrder->pColData;  // TARRAY_GET_ELEM(pDataBlock->pDataBlock, pOrder->colIndex);
H
Haojun Liao 已提交
651

652
    if (pColInfoData->hasNull) {
653 654
      bool leftNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, left, NULL);
      bool rightNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, right, NULL);
655
      if (leftNull && rightNull) {
L
Liu Jicong 已提交
656
        continue;  // continue to next slot
657
      }
H
Haojun Liao 已提交
658

659
      if (rightNull) {
H
Haojun Liao 已提交
660
        return pOrder->nullFirst ? 1 : -1;
661
      }
H
Haojun Liao 已提交
662

663
      if (leftNull) {
H
Haojun Liao 已提交
664
        return pOrder->nullFirst ? -1 : 1;
665
      }
H
Haojun Liao 已提交
666 667
    }

L
Liu Jicong 已提交
668
    void* left1 = colDataGetData(pColInfoData, left);
669
    void* right1 = colDataGetData(pColInfoData, right);
wmmhello's avatar
wmmhello 已提交
670 671 672 673 674 675
    if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
      if (tTagIsJson(left1) || tTagIsJson(right1)) {
        terrno = TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR;
        return 0;
      }
    }
676
    __compar_fn_t fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order);
677

678 679 680 681 682
    int ret = fn(left1, right1);
    if (ret == 0) {
      continue;
    } else {
      return ret;
H
Haojun Liao 已提交
683 684 685 686 687 688
    }
  }

  return 0;
}

L
Liu Jicong 已提交
689 690
static int32_t doAssignOneTuple(SColumnInfoData* pDstCols, int32_t numOfRows, const SSDataBlock* pSrcBlock,
                                int32_t tupleIndex) {
H
Haojun Liao 已提交
691
  int32_t code = 0;
L
Liu Jicong 已提交
692
  size_t  numOfCols = taosArrayGetSize(pSrcBlock->pDataBlock);
H
Haojun Liao 已提交
693 694 695 696 697

  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pDst = &pDstCols[i];
    SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, i);

698
    if (pSrc->hasNull && colDataIsNull(pSrc, pSrcBlock->info.rows, tupleIndex, pSrcBlock->pBlockAgg[i])) {
H
Haojun Liao 已提交
699 700 701 702
      code = colDataAppend(pDst, numOfRows, NULL, true);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
H
Haojun Liao 已提交
703
    } else {
704
      char* p = colDataGetData(pSrc, tupleIndex);
H
Haojun Liao 已提交
705 706 707 708
      code = colDataAppend(pDst, numOfRows, p, false);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
H
Haojun Liao 已提交
709 710
    }
  }
711 712

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
713 714
}

715
static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataBlock, const int32_t* index) {
H
Haojun Liao 已提交
716
#if 0
H
Haojun Liao 已提交
717
  for (int32_t i = 0; i < pDataBlock->info.rows; ++i) {
H
Haojun Liao 已提交
718 719 720 721
    int32_t code = doAssignOneTuple(pCols, i, pDataBlock, index[i]);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
H
Haojun Liao 已提交
722
  }
H
Haojun Liao 已提交
723
#else
724 725
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
726 727 728 729
    SColumnInfoData* pDst = &pCols[i];
    SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);

    if (IS_VAR_DATA_TYPE(pSrc->info.type)) {
L
Liu Jicong 已提交
730 731
      memcpy(pDst->pData, pSrc->pData, pSrc->varmeta.length);
      pDst->varmeta.length = pSrc->varmeta.length;
H
Haojun Liao 已提交
732

L
Liu Jicong 已提交
733 734 735
      for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
        pDst->varmeta.offset[j] = pSrc->varmeta.offset[index[j]];
      }
H
Haojun Liao 已提交
736
    } else {
wmmhello's avatar
wmmhello 已提交
737 738 739 740
      for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
        if (colDataIsNull_f(pSrc->nullbitmap, index[j])) {
          colDataSetNull_f(pDst->nullbitmap, j);
          continue;
H
Haojun Liao 已提交
741
        }
wmmhello's avatar
wmmhello 已提交
742
        memcpy(pDst->pData + j * pDst->info.bytes, pSrc->pData + index[j] * pDst->info.bytes, pDst->info.bytes);
H
Haojun Liao 已提交
743 744 745 746
      }
    }
  }
#endif
H
Haojun Liao 已提交
747
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
748 749 750 751
}

static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) {
  int32_t rows = pDataBlock->info.rows;
L
Liu Jicong 已提交
752
  size_t  numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
H
Haojun Liao 已提交
753

wafwerar's avatar
wafwerar 已提交
754
  SColumnInfoData* pCols = taosMemoryCalloc(numOfCols, sizeof(SColumnInfoData));
H
Haojun Liao 已提交
755 756 757 758
  if (pCols == NULL) {
    return NULL;
  }

L
Liu Jicong 已提交
759
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
760 761 762 763
    SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i);
    pCols[i].info = pColInfoData->info;

    if (IS_VAR_DATA_TYPE(pCols[i].info.type)) {
wafwerar's avatar
wafwerar 已提交
764 765
      pCols[i].varmeta.offset = taosMemoryCalloc(rows, sizeof(int32_t));
      pCols[i].pData = taosMemoryCalloc(1, pColInfoData->varmeta.length);
H
Haojun Liao 已提交
766 767 768

      pCols[i].varmeta.length = pColInfoData->varmeta.length;
      pCols[i].varmeta.allocLen = pCols[i].varmeta.length;
H
Haojun Liao 已提交
769
    } else {
wafwerar's avatar
wafwerar 已提交
770 771
      pCols[i].nullbitmap = taosMemoryCalloc(1, BitmapLen(rows));
      pCols[i].pData = taosMemoryCalloc(rows, pCols[i].info.bytes);
H
Haojun Liao 已提交
772 773 774 775 776 777
    }
  }

  return pCols;
}

H
Haojun Liao 已提交
778
static void copyBackToBlock(SSDataBlock* pDataBlock, SColumnInfoData* pCols) {
779
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
H
Haojun Liao 已提交
780

L
Liu Jicong 已提交
781
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
782 783 784 785
    SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i);
    pColInfoData->info = pCols[i].info;

    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
wafwerar's avatar
wafwerar 已提交
786
      taosMemoryFreeClear(pColInfoData->varmeta.offset);
H
Haojun Liao 已提交
787 788
      pColInfoData->varmeta = pCols[i].varmeta;
    } else {
wafwerar's avatar
wafwerar 已提交
789
      taosMemoryFreeClear(pColInfoData->nullbitmap);
H
Haojun Liao 已提交
790 791 792
      pColInfoData->nullbitmap = pCols[i].nullbitmap;
    }

wafwerar's avatar
wafwerar 已提交
793
    taosMemoryFreeClear(pColInfoData->pData);
H
Haojun Liao 已提交
794 795 796
    pColInfoData->pData = pCols[i].pData;
  }

wafwerar's avatar
wafwerar 已提交
797
  taosMemoryFreeClear(pCols);
H
Haojun Liao 已提交
798 799 800
}

static int32_t* createTupleIndex(size_t rows) {
wafwerar's avatar
wafwerar 已提交
801
  int32_t* index = taosMemoryCalloc(rows, sizeof(int32_t));
H
Haojun Liao 已提交
802 803 804 805
  if (index == NULL) {
    return NULL;
  }

L
Liu Jicong 已提交
806
  for (int32_t i = 0; i < rows; ++i) {
H
Haojun Liao 已提交
807 808 809 810 811 812
    index[i] = i;
  }

  return index;
}

wafwerar's avatar
wafwerar 已提交
813
static void destroyTupleIndex(int32_t* index) { taosMemoryFreeClear(index); }
H
Haojun Liao 已提交
814

H
Haojun Liao 已提交
815
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
H
Haojun Liao 已提交
816 817 818 819 820 821 822
  ASSERT(pDataBlock != NULL && pOrderInfo != NULL);
  if (pDataBlock->info.rows <= 1) {
    return TSDB_CODE_SUCCESS;
  }

  // Allocate the additional buffer.
  uint32_t rows = pDataBlock->info.rows;
H
Haojun Liao 已提交
823 824 825 826 827 828 829

  bool sortColumnHasNull = false;
  bool varTypeSort = false;

  for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) {
    SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i);

H
Haojun Liao 已提交
830
    SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId);
H
Haojun Liao 已提交
831 832 833 834 835 836 837 838 839
    if (pColInfoData->hasNull) {
      sortColumnHasNull = true;
    }

    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
      varTypeSort = true;
    }
  }

840 841
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);

H
Haojun Liao 已提交
842
  if (taosArrayGetSize(pOrderInfo) == 1 && (!sortColumnHasNull)) {
843
    if (numOfCols == 1) {
H
Haojun Liao 已提交
844 845 846 847 848
      if (!varTypeSort) {
        SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, 0);
        SBlockOrderInfo* pOrder = taosArrayGet(pOrderInfo, 0);

        int64_t p0 = taosGetTimestampUs();
X
Xiaoyu Wang 已提交
849

850
        __compar_fn_t fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order);
wafwerar's avatar
wafwerar 已提交
851
        taosSort(pColInfoData->pData, pDataBlock->info.rows, pColInfoData->info.bytes, fn);
H
Haojun Liao 已提交
852 853

        int64_t p1 = taosGetTimestampUs();
854
        uDebug("blockDataSort easy cost:%" PRId64 ", rows:%d\n", p1 - p0, pDataBlock->info.rows);
H
Haojun Liao 已提交
855 856 857 858

        return TSDB_CODE_SUCCESS;
      } else {  // var data type
      }
859
    } else if (numOfCols == 2) {
H
Haojun Liao 已提交
860 861 862
    }
  }

H
Haojun Liao 已提交
863 864 865 866 867 868
  int32_t* index = createTupleIndex(rows);
  if (index == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return terrno;
  }

869 870
  int64_t p0 = taosGetTimestampUs();

H
Haojun Liao 已提交
871
  SSDataBlockSortHelper helper = {.pDataBlock = pDataBlock, .orderInfo = pOrderInfo};
L
Liu Jicong 已提交
872
  for (int32_t i = 0; i < taosArrayGetSize(helper.orderInfo); ++i) {
H
Haojun Liao 已提交
873
    struct SBlockOrderInfo* pInfo = taosArrayGet(helper.orderInfo, i);
H
Haojun Liao 已提交
874
    pInfo->pColData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId);
H
Haojun Liao 已提交
875 876
  }

X
Xiaoyu Wang 已提交
877
  terrno = 0;
H
Haojun Liao 已提交
878
  taosqsort(index, rows, sizeof(int32_t), &helper, dataBlockCompar);
X
Xiaoyu Wang 已提交
879
  if (terrno) return terrno;
H
Haojun Liao 已提交
880

881 882
  int64_t p1 = taosGetTimestampUs();

H
Haojun Liao 已提交
883 884
  SColumnInfoData* pCols = createHelpColInfoData(pDataBlock);
  if (pCols == NULL) {
885
    destroyTupleIndex(index);
H
Haojun Liao 已提交
886 887 888 889
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return terrno;
  }

890 891
  int64_t p2 = taosGetTimestampUs();

wmmhello's avatar
wmmhello 已提交
892
  blockDataAssign(pCols, pDataBlock, index);
H
Haojun Liao 已提交
893

894
  int64_t p3 = taosGetTimestampUs();
H
Haojun Liao 已提交
895 896

  copyBackToBlock(pDataBlock, pCols);
897 898
  int64_t p4 = taosGetTimestampUs();

C
Cary Xu 已提交
899 900 901
  uDebug("blockDataSort complex sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64
         ", rows:%d\n",
         p1 - p0, p2 - p1, p3 - p2, p4 - p3, rows);
H
Haojun Liao 已提交
902
  destroyTupleIndex(index);
H
Haojun Liao 已提交
903 904

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
905
}
906

H
Haojun Liao 已提交
907 908
typedef struct SHelper {
  int32_t index;
L
Liu Jicong 已提交
909 910 911 912 913
  union {
    char*   pData;
    int64_t i64;
    double  d64;
  };
H
Haojun Liao 已提交
914 915 916 917 918 919
} SHelper;

SHelper* createTupleIndex_rv(int32_t numOfRows, SArray* pOrderInfo, SSDataBlock* pBlock) {
  int32_t sortValLengthPerRow = 0;
  int32_t numOfCols = taosArrayGetSize(pOrderInfo);

L
Liu Jicong 已提交
920
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
921
    SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i);
H
Haojun Liao 已提交
922
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->slotId);
H
Haojun Liao 已提交
923 924 925 926 927 928
    pInfo->pColData = pColInfo;
    sortValLengthPerRow += pColInfo->info.bytes;
  }

  size_t len = sortValLengthPerRow * pBlock->info.rows;

wafwerar's avatar
wafwerar 已提交
929 930
  char*    buf = taosMemoryCalloc(1, len);
  SHelper* phelper = taosMemoryCalloc(numOfRows, sizeof(SHelper));
L
Liu Jicong 已提交
931
  for (int32_t i = 0; i < numOfRows; ++i) {
H
Haojun Liao 已提交
932 933 934 935 936
    phelper[i].index = i;
    phelper[i].pData = buf + sortValLengthPerRow * i;
  }

  int32_t offset = 0;
L
Liu Jicong 已提交
937
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
938
    SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i);
L
Liu Jicong 已提交
939 940 941 942
    for (int32_t j = 0; j < numOfRows; ++j) {
      phelper[j].i64 = *(int32_t*)pInfo->pColData->pData + pInfo->pColData->info.bytes * j;
      //      memcpy(phelper[j].pData + offset, pInfo->pColData->pData + pInfo->pColData->info.bytes * j,
      //      pInfo->pColData->info.bytes);
H
Haojun Liao 已提交
943 944 945 946 947 948 949 950 951
    }

    offset += pInfo->pColData->info.bytes;
  }

  return phelper;
}

int32_t dataBlockCompar_rv(const void* p1, const void* p2, const void* param) {
L
Liu Jicong 已提交
952
  const SSDataBlockSortHelper* pHelper = (const SSDataBlockSortHelper*)param;
H
Haojun Liao 已提交
953

L
Liu Jicong 已提交
954
  //  SSDataBlock* pDataBlock = pHelper->pDataBlock;
H
Haojun Liao 已提交
955

L
Liu Jicong 已提交
956 957
  SHelper* left = (SHelper*)p1;
  SHelper* right = (SHelper*)p2;
H
Haojun Liao 已提交
958 959 960 961

  SArray* pInfo = pHelper->orderInfo;

  int32_t offset = 0;
L
Liu Jicong 已提交
962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005
  //  for(int32_t i = 0; i < pInfo->size; ++i) {
  //    SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, 0);
  //    SColumnInfoData* pColInfoData = pOrder->pColData;//TARRAY_GET_ELEM(pDataBlock->pDataBlock, pOrder->colIndex);

  //    if (pColInfoData->hasNull) {
  //      bool leftNull  = colDataIsNull(pColInfoData, pDataBlock->info.rows, left, pDataBlock->pBlockAgg);
  //      bool rightNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, right, pDataBlock->pBlockAgg);
  //      if (leftNull && rightNull) {
  //        continue; // continue to next slot
  //      }
  //
  //      if (rightNull) {
  //        return pHelper->nullFirst? 1:-1;
  //      }
  //
  //      if (leftNull) {
  //        return pHelper->nullFirst? -1:1;
  //      }
  //    }

  //    void* left1  = colDataGetData(pColInfoData, left);
  //    void* right1 = colDataGetData(pColInfoData, right);

  //    switch(pColInfoData->info.type) {
  //      case TSDB_DATA_TYPE_INT: {
  int32_t leftx = *(int32_t*)left->pData;    //*(int32_t*)(left->pData + offset);
  int32_t rightx = *(int32_t*)right->pData;  //*(int32_t*)(right->pData + offset);

  //        offset += pColInfoData->info.bytes;
  if (leftx == rightx) {
    //          break;
    return 0;
  } else {
    //          if (pOrder->order == TSDB_ORDER_ASC) {
    return (leftx < rightx) ? -1 : 1;
    //          } else {
    //            return (leftx < rightx)? 1:-1;
    //          }
  }
  //      }
  //      default:
  //        assert(0);
  //    }
  //  }
H
Haojun Liao 已提交
1006 1007 1008 1009

  return 0;
}

L
Liu Jicong 已提交
1010
int32_t varColSort(SColumnInfoData* pColumnInfoData, SBlockOrderInfo* pOrder) { return 0; }
H
Haojun Liao 已提交
1011 1012

int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst) {
L
Liu Jicong 已提交
1013
  // Allocate the additional buffer.
H
Haojun Liao 已提交
1014 1015
  int64_t p0 = taosGetTimestampUs();

H
Haojun Liao 已提交
1016
  SSDataBlockSortHelper helper = {.pDataBlock = pDataBlock, .orderInfo = pOrderInfo};
H
Haojun Liao 已提交
1017 1018 1019 1020 1021 1022 1023 1024 1025 1026

  uint32_t rows = pDataBlock->info.rows;
  SHelper* index = createTupleIndex_rv(rows, helper.orderInfo, pDataBlock);
  if (index == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return terrno;
  }

  taosqsort(index, rows, sizeof(SHelper), &helper, dataBlockCompar_rv);

L
Liu Jicong 已提交
1027
  int64_t          p1 = taosGetTimestampUs();
H
Haojun Liao 已提交
1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046
  SColumnInfoData* pCols = createHelpColInfoData(pDataBlock);
  if (pCols == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return terrno;
  }

  int64_t p2 = taosGetTimestampUs();

  //  int32_t code = blockDataAssign(pCols, pDataBlock, index);
  //  if (code != TSDB_CODE_SUCCESS) {
  //    terrno = code;
  //    return code;
  //  }

  int64_t p3 = taosGetTimestampUs();

  copyBackToBlock(pDataBlock, pCols);
  int64_t p4 = taosGetTimestampUs();

L
Liu Jicong 已提交
1047 1048
  printf("sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64 ", rows:%d\n", p1 - p0, p2 - p1,
         p3 - p2, p4 - p3, rows);
H
Haojun Liao 已提交
1049
  //  destroyTupleIndex(index);
1050
  return 0;
H
Haojun Liao 已提交
1051 1052
}

1053
void blockDataCleanup(SSDataBlock* pDataBlock) {
1054
  pDataBlock->info.rows = 0;
1055 1056 1057 1058 1059
  pDataBlock->info.groupId = 0;

  pDataBlock->info.window.ekey = 0;
  pDataBlock->info.window.skey = 0;

1060 1061
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
1062 1063
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
    colInfoDataCleanup(p, pDataBlock->info.capacity);
1064 1065 1066
  }
}

L
Liu Jicong 已提交
1067
static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows) {
1068 1069
  ASSERT(numOfRows > 0 && pBlockInfo->capacity >= pBlockInfo->rows);
  if (numOfRows < pBlockInfo->capacity) {
X
Xiaoyu Wang 已提交
1070 1071 1072
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
1073
  // todo temp disable it
L
Liu Jicong 已提交
1074
  //  ASSERT(pColumn->info.bytes != 0);
H
Haojun Liao 已提交
1075

1076 1077
  int32_t existedRows = pBlockInfo->rows;

D
dapan1121 已提交
1078
  if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
wafwerar's avatar
wafwerar 已提交
1079
    char* tmp = taosMemoryRealloc(pColumn->varmeta.offset, sizeof(int32_t) * numOfRows);
D
dapan1121 已提交
1080 1081 1082
    if (tmp == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1083

D
dapan1121 已提交
1084
    pColumn->varmeta.offset = (int32_t*)tmp;
1085
    memset(&pColumn->varmeta.offset[existedRows], 0, sizeof(int32_t) * (numOfRows - existedRows));
D
dapan1121 已提交
1086
  } else {
wafwerar's avatar
wafwerar 已提交
1087
    char* tmp = taosMemoryRealloc(pColumn->nullbitmap, BitmapLen(numOfRows));
D
dapan1121 已提交
1088 1089 1090
    if (tmp == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1091

1092
    int32_t oldLen = BitmapLen(existedRows);
D
dapan1121 已提交
1093
    pColumn->nullbitmap = tmp;
1094
    memset(&pColumn->nullbitmap[oldLen], 0, BitmapLen(numOfRows) - oldLen);
1095 1096 1097 1098 1099

    if (pColumn->info.type == TSDB_DATA_TYPE_NULL) {
      return TSDB_CODE_SUCCESS;
    }

X
Xiaoyu Wang 已提交
1100
    assert(pColumn->info.bytes);
wafwerar's avatar
wafwerar 已提交
1101
    tmp = taosMemoryRealloc(pColumn->pData, numOfRows * pColumn->info.bytes);
D
dapan1121 已提交
1102 1103 1104
    if (tmp == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
L
Liu Jicong 已提交
1105

1106
    memset(tmp + pColumn->info.bytes * existedRows, 0, pColumn->info.bytes * (numOfRows - existedRows));
D
dapan1121 已提交
1107 1108 1109 1110 1111 1112
    pColumn->pData = tmp;
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
1113
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) {
1114 1115
  if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
    pColumn->varmeta.length = 0;
H
Haojun Liao 已提交
1116 1117 1118
    if (pColumn->varmeta.offset > 0) {
      memset(pColumn->varmeta.offset, 0, sizeof(int32_t) * numOfRows);
    }
1119
  } else {
dengyihao's avatar
dengyihao 已提交
1120 1121
    if (pColumn->nullbitmap != NULL) {
      memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows));
1122 1123 1124
      if (pColumn->pData != NULL) {
        memset(pColumn->pData, 0, pColumn->info.bytes * numOfRows);
      }
dengyihao's avatar
dengyihao 已提交
1125
    }
1126 1127 1128
  }
}

1129 1130 1131 1132 1133
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows) {
  SDataBlockInfo info = {0};
  return doEnsureCapacity(pColumn, &info, numOfRows);
}

D
dapan1121 已提交
1134 1135
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
  int32_t code = 0;
H
Haojun Liao 已提交
1136 1137 1138
  if (numOfRows == 0) {
    return TSDB_CODE_SUCCESS;
  }
L
Liu Jicong 已提交
1139

1140 1141 1142
  if (pDataBlock->info.capacity < numOfRows) {
    pDataBlock->info.capacity = numOfRows;
  }
1143

1144 1145
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
D
dapan1121 已提交
1146
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
1147
    code = doEnsureCapacity(p, &pDataBlock->info, numOfRows);
D
dapan1121 已提交
1148 1149
    if (code) {
      return code;
1150 1151
    }
  }
H
Haojun Liao 已提交
1152 1153 1154 1155

  return TSDB_CODE_SUCCESS;
}

1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167
void blockDataFreeRes(SSDataBlock* pBlock) {
  int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock);
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
    colDataDestroy(pColInfoData);
  }

  taosArrayDestroy(pBlock->pDataBlock);
  taosMemoryFreeClear(pBlock->pBlockAgg);
  memset(&pBlock->info, 0, sizeof(SDataBlockInfo));
}

H
Haojun Liao 已提交
1168 1169 1170 1171 1172
void* blockDataDestroy(SSDataBlock* pBlock) {
  if (pBlock == NULL) {
    return NULL;
  }

1173
  blockDataFreeRes(pBlock);
wafwerar's avatar
wafwerar 已提交
1174
  taosMemoryFreeClear(pBlock);
H
Haojun Liao 已提交
1175
  return NULL;
D
dapan1121 已提交
1176
}
1177

L
Liu Jicong 已提交
1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
  ASSERT(src != NULL);

  dst->info = src->info;
  dst->info.rows = 0;
  dst->info.capacity = 0;

  size_t numOfCols = taosArrayGetSize(src->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* p = taosArrayGet(src->pDataBlock, i);
    SColumnInfoData  colInfo = {.hasNull = true, .info = p->info};
    blockDataAppendColInfo(dst, &colInfo);
  }

  int32_t code = blockDataEnsureCapacity(dst, src->info.rows);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return -1;
  }

  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pDst = taosArrayGet(dst->pDataBlock, i);
    SColumnInfoData* pSrc = taosArrayGet(src->pDataBlock, i);
D
dapan1121 已提交
1201
    if (pSrc->pData == NULL && (!IS_VAR_DATA_TYPE(pSrc->info.type))) {
L
Liu Jicong 已提交
1202 1203 1204 1205 1206 1207 1208 1209 1210 1211
      continue;
    }

    colDataAssign(pDst, pSrc, src->info.rows, &src->info);
  }

  dst->info.rows = src->info.rows;
  dst->info.capacity = src->info.rows;
  return 0;
}
1212

5
54liuyao 已提交
1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233
int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
  ASSERT(src != NULL && dst != NULL);

  blockDataCleanup(dst);
  int32_t code = blockDataEnsureCapacity(dst, src->info.rows);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return code;
  }

  size_t numOfCols = taosArrayGetSize(src->pDataBlock);
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pDst = taosArrayGet(dst->pDataBlock, i);
    SColumnInfoData* pSrc = taosArrayGet(src->pDataBlock, i);
    if (pSrc->pData == NULL) {
      continue;
    }

    colDataAssign(pDst, pSrc, src->info.rows, &src->info);
  }

1234
  dst->info = src->info;
5
54liuyao 已提交
1235 1236 1237
  return TSDB_CODE_SUCCESS;
}

1238
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
C
Cary Xu 已提交
1239
  if (pDataBlock == NULL) {
1240 1241
    return NULL;
  }
1242

1243 1244 1245 1246
  SSDataBlock* pBlock = createDataBlock();
  pBlock->info = pDataBlock->info;
  pBlock->info.rows = 0;
  pBlock->info.capacity = 0;
1247

1248
  size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
L
Liu Jicong 已提交
1249
  for (int32_t i = 0; i < numOfCols; ++i) {
1250
    SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
1251 1252
    SColumnInfoData  colInfo = {.hasNull = true, .info = p->info};
    blockDataAppendColInfo(pBlock, &colInfo);
1253 1254
  }

1255
  if (copyData) {
1256 1257 1258 1259 1260 1261 1262
    int32_t code = blockDataEnsureCapacity(pBlock, pDataBlock->info.rows);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      blockDataDestroy(pBlock);
      return NULL;
    }

1263 1264 1265
    for (int32_t i = 0; i < numOfCols; ++i) {
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
      SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i);
1266
      if (pSrc->pData == NULL) {
1267 1268
        continue;
      }
1269 1270

      colDataAssign(pDst, pSrc, pDataBlock->info.rows, &pDataBlock->info);
1271 1272 1273 1274 1275 1276
    }

    pBlock->info.rows = pDataBlock->info.rows;
    pBlock->info.capacity = pDataBlock->info.rows;
  }

1277 1278 1279
  return pBlock;
}

1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310
SSDataBlock* createDataBlock() {
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
  if (pBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
  }

  pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
  if (pBlock->pDataBlock == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    taosMemoryFree(pBlock);
  }

  return pBlock;
}

int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData) {
  ASSERT(pBlock != NULL && pColInfoData != NULL);
  if (pBlock->pDataBlock == NULL) {
    pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
    if (pBlock->pDataBlock == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return terrno;
    }
  }

  void* p = taosArrayPush(pBlock->pDataBlock, pColInfoData);
  if (p == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return terrno;
  }

H
Haojun Liao 已提交
1311
  // todo disable it temporarily
L
Liu Jicong 已提交
1312
  //  ASSERT(pColInfoData->info.type != 0);
1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323
  if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
    pBlock->info.hasVarCol = true;
  }

  pBlock->info.rowSize += pColInfoData->info.bytes;
  return TSDB_CODE_SUCCESS;
}

SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId) {
  SColumnInfoData col = {.hasNull = true};
  col.info.colId = colId;
L
Liu Jicong 已提交
1324
  col.info.type = type;
1325 1326 1327 1328 1329
  col.info.bytes = bytes;

  return col;
}

L
Liu Jicong 已提交
1330
SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index) {
1331 1332 1333 1334 1335 1336 1337 1338
  ASSERT(pBlock != NULL);
  if (index >= taosArrayGetSize(pBlock->pDataBlock)) {
    return NULL;
  }

  return taosArrayGet(pBlock->pDataBlock, index);
}

1339
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
1340
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
1341

1342
  int32_t payloadSize = pageSize - blockDataGetSerialMetaSize(numOfCols);
1343 1344 1345 1346 1347
  int32_t rowSize = pBlock->info.rowSize;
  int32_t nRows = payloadSize / rowSize;

  // the true value must be less than the value of nRows
  int32_t additional = 0;
1348
  for (int32_t i = 0; i < numOfCols; ++i) {
1349 1350 1351 1352 1353 1354 1355 1356 1357
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
    if (IS_VAR_DATA_TYPE(pCol->info.type)) {
      additional += nRows * sizeof(int32_t);
    } else {
      additional += BitmapLen(nRows);
    }
  }

  int32_t newRows = (payloadSize - additional) / rowSize;
1358
  ASSERT(newRows <= nRows && newRows >= 1);
1359 1360

  return newRows;
1361
}
H
Haojun Liao 已提交
1362

1363 1364
void colDataDestroy(SColumnInfoData* pColData) {
  if (IS_VAR_DATA_TYPE(pColData->info.type)) {
wmmhello's avatar
wmmhello 已提交
1365
    taosMemoryFreeClear(pColData->varmeta.offset);
1366
  } else {
wmmhello's avatar
wmmhello 已提交
1367
    taosMemoryFreeClear(pColData->nullbitmap);
1368 1369
  }

wmmhello's avatar
wmmhello 已提交
1370
  taosMemoryFreeClear(pColData->pData);
1371 1372
}

H
Haojun Liao 已提交
1373 1374 1375 1376
static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
  int32_t len = BitmapLen(total);

  int32_t newLen = BitmapLen(total - n);
C
Cary Xu 已提交
1377 1378
  if (n % 8 == 0) {
    memmove(nullBitmap, nullBitmap + n / 8, newLen);
H
Haojun Liao 已提交
1379
  } else {
1380
    int32_t  tail = n % 8;
1381
    int32_t  i = 0;
C
Cary Xu 已提交
1382
    uint8_t* p = (uint8_t*)nullBitmap;
H
Haojun Liao 已提交
1383

1384 1385 1386 1387 1388 1389 1390 1391 1392
    if (n < 8) {
      while (i < len) {
        uint8_t v = p[i];  // source bitmap value
        p[i] = (v << tail);

        if (i < len - 1) {
          uint8_t next = p[i + 1];
          p[i] |= (next >> (8 - tail));
        }
H
Haojun Liao 已提交
1393

1394
        i += 1;
H
Haojun Liao 已提交
1395
      }
1396 1397
    } else if (n > 8) {
      int32_t gap = len - newLen;
1398
      while (i < newLen) {
1399 1400 1401 1402 1403 1404 1405
        uint8_t v = p[i + gap];
        p[i] = (v << tail);

        if (i < newLen - 1) {
          uint8_t next = p[i + gap + 1];
          p[i] |= (next >> (8 - tail));
        }
H
Haojun Liao 已提交
1406

1407 1408
        i += 1;
      }
H
Haojun Liao 已提交
1409 1410 1411 1412
    }
  }
}

X
Xiaoyu Wang 已提交
1413
static int32_t colDataMoveVarData(SColumnInfoData* pColInfoData, size_t start, size_t end) {
wmmhello's avatar
wmmhello 已提交
1414 1415 1416
  int32_t dataOffset = -1;
  int32_t dataLen = 0;
  int32_t beigin = start;
X
Xiaoyu Wang 已提交
1417
  while (beigin < end) {
wmmhello's avatar
wmmhello 已提交
1418
    int32_t offset = pColInfoData->varmeta.offset[beigin];
X
Xiaoyu Wang 已提交
1419
    if (offset == -1) {
wmmhello's avatar
wmmhello 已提交
1420 1421 1422
      beigin++;
      continue;
    }
X
Xiaoyu Wang 已提交
1423
    if (start != 0) {
wmmhello's avatar
wmmhello 已提交
1424 1425
      pColInfoData->varmeta.offset[beigin] = dataLen;
    }
X
Xiaoyu Wang 已提交
1426 1427
    char* data = pColInfoData->pData + offset;
    if (dataOffset == -1) dataOffset = offset;  // mark the begin of data
wmmhello's avatar
wmmhello 已提交
1428 1429 1430 1431 1432 1433 1434 1435
    int32_t type = pColInfoData->info.type;
    if (type == TSDB_DATA_TYPE_JSON) {
      dataLen += getJsonValueLen(data);
    } else {
      dataLen += varDataTLen(data);
    }
    beigin++;
  }
1436

X
Xiaoyu Wang 已提交
1437
  if (dataOffset > 0) {
wmmhello's avatar
wmmhello 已提交
1438 1439
    memmove(pColInfoData->pData, pColInfoData->pData + dataOffset, dataLen);
  }
1440 1441

  memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[start], (end - start) * sizeof(int32_t));
wmmhello's avatar
wmmhello 已提交
1442 1443 1444
  return dataLen;
}

H
Haojun Liao 已提交
1445 1446
static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
  if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
wmmhello's avatar
wmmhello 已提交
1447
    pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, n, total);
1448
    memset(&pColInfoData->varmeta.offset[total - n], 0, n);
H
Haojun Liao 已提交
1449 1450 1451 1452 1453 1454 1455
  } else {
    int32_t bytes = pColInfoData->info.bytes;
    memmove(pColInfoData->pData, ((char*)pColInfoData->pData + n * bytes), (total - n) * bytes);
    doShiftBitmap(pColInfoData->nullbitmap, n, total);
  }
}

C
Cary Xu 已提交
1456
int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n) {
H
Haojun Liao 已提交
1457 1458 1459 1460 1461 1462 1463
  if (n == 0) {
    return TSDB_CODE_SUCCESS;
  }

  if (pBlock->info.rows <= n) {
    blockDataCleanup(pBlock);
  } else {
1464 1465
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
    for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
1466 1467 1468 1469 1470 1471 1472 1473 1474
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
      colDataTrimFirstNRows(pColInfoData, n, pBlock->info.rows);
    }

    pBlock->info.rows -= n;
  }
  return TSDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
1475 1476
static void colDataKeepFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
  if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
wmmhello's avatar
wmmhello 已提交
1477
    pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, 0, n);
wmmhello's avatar
wmmhello 已提交
1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501
    memset(&pColInfoData->varmeta.offset[n], 0, total - n);
  }
}

int32_t blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n) {
  if (n == 0) {
    blockDataCleanup(pBlock);
    return TSDB_CODE_SUCCESS;
  }

  if (pBlock->info.rows <= n) {
    return TSDB_CODE_SUCCESS;
  } else {
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
    for (int32_t i = 0; i < numOfCols; ++i) {
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
      colDataKeepFirstNRows(pColInfoData, n, pBlock->info.rows);
    }

    pBlock->info.rows = n;
  }
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
1502 1503
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
  int64_t tbUid = pBlock->info.uid;
1504
  int16_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
H
Haojun Liao 已提交
1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519
  int16_t hasVarCol = pBlock->info.hasVarCol;
  int32_t rows = pBlock->info.rows;
  int32_t sz = taosArrayGetSize(pBlock->pDataBlock);

  int32_t tlen = 0;
  tlen += taosEncodeFixedI64(buf, tbUid);
  tlen += taosEncodeFixedI16(buf, numOfCols);
  tlen += taosEncodeFixedI16(buf, hasVarCol);
  tlen += taosEncodeFixedI32(buf, rows);
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
    SColumnInfoData* pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
    tlen += taosEncodeFixedI16(buf, pColData->info.colId);
    tlen += taosEncodeFixedI16(buf, pColData->info.type);
    tlen += taosEncodeFixedI32(buf, pColData->info.bytes);
S
slzhou@taodata.com 已提交
1520
    tlen += taosEncodeFixedBool(buf, pColData->hasNull);
H
Haojun Liao 已提交
1521 1522 1523 1524 1525 1526 1527 1528

    if (IS_VAR_DATA_TYPE(pColData->info.type)) {
      tlen += taosEncodeBinary(buf, pColData->varmeta.offset, sizeof(int32_t) * rows);
    } else {
      tlen += taosEncodeBinary(buf, pColData->nullbitmap, BitmapLen(rows));
    }

    int32_t len = colDataGetLength(pColData, rows);
X
Xiaoyu Wang 已提交
1529
    tlen += taosEncodeFixedI32(buf, len);
H
Haojun Liao 已提交
1530 1531 1532 1533 1534 1535 1536 1537 1538

    tlen += taosEncodeBinary(buf, pColData->pData, len);
  }
  return tlen;
}

void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) {
  int32_t sz;

1539 1540 1541 1542
  int16_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);

  buf = taosDecodeFixedU64(buf, &pBlock->info.uid);
  buf = taosDecodeFixedI16(buf, &numOfCols);
H
Haojun Liao 已提交
1543 1544 1545 1546 1547 1548 1549 1550 1551
  buf = taosDecodeFixedI16(buf, &pBlock->info.hasVarCol);
  buf = taosDecodeFixedI32(buf, &pBlock->info.rows);
  buf = taosDecodeFixedI32(buf, &sz);
  pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData));
  for (int32_t i = 0; i < sz; i++) {
    SColumnInfoData data = {0};
    buf = taosDecodeFixedI16(buf, &data.info.colId);
    buf = taosDecodeFixedI16(buf, &data.info.type);
    buf = taosDecodeFixedI32(buf, &data.info.bytes);
S
slzhou@taodata.com 已提交
1552
    buf = taosDecodeFixedBool(buf, &data.hasNull);
H
Haojun Liao 已提交
1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567

    if (IS_VAR_DATA_TYPE(data.info.type)) {
      buf = taosDecodeBinary(buf, (void**)&data.varmeta.offset, pBlock->info.rows * sizeof(int32_t));
      data.varmeta.length = pBlock->info.rows * sizeof(int32_t);
      data.varmeta.allocLen = data.varmeta.length;
    } else {
      buf = taosDecodeBinary(buf, (void**)&data.nullbitmap, BitmapLen(pBlock->info.rows));
    }

    int32_t len = 0;
    buf = taosDecodeFixedI32(buf, &len);
    buf = taosDecodeBinary(buf, (void**)&data.pData, len);
    taosArrayPush(pBlock->pDataBlock, &data);
  }
  return (void*)buf;
L
Liu Jicong 已提交
1568
}
L
Liu Jicong 已提交
1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582

int32_t tEncodeDataBlocks(void** buf, const SArray* blocks) {
  int32_t tlen = 0;
  int32_t sz = taosArrayGetSize(blocks);
  tlen += taosEncodeFixedI32(buf, sz);

  for (int32_t i = 0; i < sz; i++) {
    SSDataBlock* pBlock = taosArrayGet(blocks, i);
    tlen += tEncodeDataBlock(buf, pBlock);
  }

  return tlen;
}

L
Liu Jicong 已提交
1583
void* tDecodeDataBlocks(const void* buf, SArray** blocks) {
L
Liu Jicong 已提交
1584 1585
  int32_t sz;
  buf = taosDecodeFixedI32(buf, &sz);
L
Liu Jicong 已提交
1586 1587

  *blocks = taosArrayInit(sz, sizeof(SSDataBlock));
L
Liu Jicong 已提交
1588 1589 1590
  for (int32_t i = 0; i < sz; i++) {
    SSDataBlock pBlock = {0};
    buf = tDecodeDataBlock(buf, &pBlock);
L
Liu Jicong 已提交
1591
    taosArrayPush(*blocks, &pBlock);
L
Liu Jicong 已提交
1592 1593 1594
  }
  return (void*)buf;
}
L
Liu Jicong 已提交
1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627

static char* formatTimestamp(char* buf, int64_t val, int precision) {
  time_t  tt;
  int32_t ms = 0;
  if (precision == TSDB_TIME_PRECISION_NANO) {
    tt = (time_t)(val / 1000000000);
    ms = val % 1000000000;
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
    tt = (time_t)(val / 1000000);
    ms = val % 1000000;
  } else {
    tt = (time_t)(val / 1000);
    ms = val % 1000;
  }

  /* comment out as it make testcases like select_with_tags.sim fail.
    but in windows, this may cause the call to localtime crash if tt < 0,
    need to find a better solution.
    if (tt < 0) {
      tt = 0;
    }
    */

  if (tt <= 0 && ms < 0) {
    tt--;
    if (precision == TSDB_TIME_PRECISION_NANO) {
      ms += 1000000000;
    } else if (precision == TSDB_TIME_PRECISION_MICRO) {
      ms += 1000000;
    } else {
      ms += 1000;
    }
  }
1628 1629 1630
  struct tm ptm = {0};
  taosLocalTime(&tt, &ptm);
  size_t     pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", &ptm);
L
Liu Jicong 已提交
1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641

  if (precision == TSDB_TIME_PRECISION_NANO) {
    sprintf(buf + pos, ".%09d", ms);
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
    sprintf(buf + pos, ".%06d", ms);
  } else {
    sprintf(buf + pos, ".%03d", ms);
  }

  return buf;
}
H
Haojun Liao 已提交
1642

1643 1644 1645 1646 1647 1648 1649
void blockDebugShowDataBlock(SSDataBlock* pBlock, const char* flag) {
  SArray* dataBlocks = taosArrayInit(1, sizeof(SSDataBlock));
  taosArrayPush(dataBlocks, pBlock);
  blockDebugShowDataBlocks(dataBlocks, flag);
  taosArrayDestroy(dataBlocks);
}

S
slzhou 已提交
1650
void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) {
C
Cary Xu 已提交
1651
  char    pBuf[128] = {0};
L
Liu Jicong 已提交
1652 1653 1654
  int32_t sz = taosArrayGetSize(dataBlocks);
  for (int32_t i = 0; i < sz; i++) {
    SSDataBlock* pDataBlock = taosArrayGet(dataBlocks, i);
L
Liu Jicong 已提交
1655
    size_t       numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
1656

L
Liu Jicong 已提交
1657
    int32_t rows = pDataBlock->info.rows;
C
Cary Xu 已提交
1658 1659 1660
    printf("%s |block ver %" PRIi64 " |block type %d |child id %d|group id %" PRIu64 "\n", flag,
           pDataBlock->info.version, (int32_t)pDataBlock->info.type, pDataBlock->info.childId,
           pDataBlock->info.groupId);
L
Liu Jicong 已提交
1661
    for (int32_t j = 0; j < rows; j++) {
C
Cary Xu 已提交
1662
      printf("%s |", flag);
1663
      for (int32_t k = 0; k < numOfCols; k++) {
L
Liu Jicong 已提交
1664 1665
        SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
        void*            var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
5
54liuyao 已提交
1666
        if (colDataIsNull(pColInfoData, rows, j, NULL)) {
5
54liuyao 已提交
1667 1668 1669
          printf(" %15s |", "NULL");
          continue;
        }
L
Liu Jicong 已提交
1670 1671 1672 1673 1674 1675 1676 1677
        switch (pColInfoData->info.type) {
          case TSDB_DATA_TYPE_TIMESTAMP:
            formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI);
            printf(" %25s |", pBuf);
            break;
          case TSDB_DATA_TYPE_INT:
            printf(" %15d |", *(int32_t*)var);
            break;
L
Liu Jicong 已提交
1678 1679 1680
          case TSDB_DATA_TYPE_UINT:
            printf(" %15u |", *(uint32_t*)var);
            break;
L
Liu Jicong 已提交
1681 1682 1683
          case TSDB_DATA_TYPE_BIGINT:
            printf(" %15ld |", *(int64_t*)var);
            break;
L
Liu Jicong 已提交
1684 1685 1686
          case TSDB_DATA_TYPE_UBIGINT:
            printf(" %15lu |", *(uint64_t*)var);
            break;
C
Cary Xu 已提交
1687 1688 1689
          case TSDB_DATA_TYPE_FLOAT:
            printf(" %15f |", *(float*)var);
            break;
5
54liuyao 已提交
1690
          case TSDB_DATA_TYPE_DOUBLE:
C
Cary Xu 已提交
1691
            printf(" %15lf |", *(double*)var);
5
54liuyao 已提交
1692
            break;
L
Liu Jicong 已提交
1693 1694 1695 1696 1697
        }
      }
      printf("\n");
    }
  }
5
54liuyao 已提交
1698 1699 1700 1701 1702 1703
}

// for debug
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) {
  int32_t size = 2048;
  *pDataBuf = taosMemoryCalloc(size, 1);
1704 1705
  char*   dumpBuf = *pDataBuf;
  char    pBuf[128] = {0};
5
54liuyao 已提交
1706 1707 1708
  int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
  int32_t rows = pDataBlock->info.rows;
  int32_t len = 0;
1709
  len += snprintf(dumpBuf + len, size - len, "===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%ld|rows:%d|version:%" PRIu64 "\n", flag,
L
Liu Jicong 已提交
1710
                  (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId,
1711
                  pDataBlock->info.uid, pDataBlock->info.rows, pDataBlock->info.version);
S
Shengliang Guan 已提交
1712
  if (len >= size - 1) return dumpBuf;
5
54liuyao 已提交
1713

5
54liuyao 已提交
1714 1715
  for (int32_t j = 0; j < rows; j++) {
    len += snprintf(dumpBuf + len, size - len, "%s |", flag);
L
Liu Jicong 已提交
1716
    if (len >= size - 1) return dumpBuf;
5
54liuyao 已提交
1717

5
54liuyao 已提交
1718 1719 1720
    for (int32_t k = 0; k < colNum; k++) {
      SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
      void*            var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
5
54liuyao 已提交
1721
      if (colDataIsNull(pColInfoData, rows, j, NULL) || !pColInfoData->pData) {
5
54liuyao 已提交
1722
        len += snprintf(dumpBuf + len, size - len, " %15s |", "NULL");
L
Liu Jicong 已提交
1723
        if (len >= size - 1) return dumpBuf;
5
54liuyao 已提交
1724 1725 1726 1727 1728 1729
        continue;
      }
      switch (pColInfoData->info.type) {
        case TSDB_DATA_TYPE_TIMESTAMP:
          formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI);
          len += snprintf(dumpBuf + len, size - len, " %25s |", pBuf);
L
Liu Jicong 已提交
1730
          if (len >= size - 1) return dumpBuf;
5
54liuyao 已提交
1731 1732 1733
          break;
        case TSDB_DATA_TYPE_INT:
          len += snprintf(dumpBuf + len, size - len, " %15d |", *(int32_t*)var);
L
Liu Jicong 已提交
1734
          if (len >= size - 1) return dumpBuf;
5
54liuyao 已提交
1735 1736 1737
          break;
        case TSDB_DATA_TYPE_UINT:
          len += snprintf(dumpBuf + len, size - len, " %15u |", *(uint32_t*)var);
L
Liu Jicong 已提交
1738
          if (len >= size - 1) return dumpBuf;
5
54liuyao 已提交
1739 1740 1741
          break;
        case TSDB_DATA_TYPE_BIGINT:
          len += snprintf(dumpBuf + len, size - len, " %15ld |", *(int64_t*)var);
L
Liu Jicong 已提交
1742
          if (len >= size - 1) return dumpBuf;
5
54liuyao 已提交
1743 1744 1745
          break;
        case TSDB_DATA_TYPE_UBIGINT:
          len += snprintf(dumpBuf + len, size - len, " %15lu |", *(uint64_t*)var);
L
Liu Jicong 已提交
1746
          if (len >= size - 1) return dumpBuf;
5
54liuyao 已提交
1747 1748 1749
          break;
        case TSDB_DATA_TYPE_FLOAT:
          len += snprintf(dumpBuf + len, size - len, " %15f |", *(float*)var);
L
Liu Jicong 已提交
1750
          if (len >= size - 1) return dumpBuf;
5
54liuyao 已提交
1751 1752 1753
          break;
        case TSDB_DATA_TYPE_DOUBLE:
          len += snprintf(dumpBuf + len, size - len, " %15lf |", *(double*)var);
L
Liu Jicong 已提交
1754
          if (len >= size - 1) return dumpBuf;
5
54liuyao 已提交
1755 1756 1757 1758
          break;
      }
    }
    len += snprintf(dumpBuf + len, size - len, "\n");
L
Liu Jicong 已提交
1759
    if (len >= size - 1) return dumpBuf;
5
54liuyao 已提交
1760 1761 1762
  }
  len += snprintf(dumpBuf + len, size - len, "%s |end\n", flag);
  return dumpBuf;
L
Liu Jicong 已提交
1763 1764
}

C
Cary Xu 已提交
1765 1766 1767 1768 1769 1770 1771
/**
 * @brief TODO: Assume that the final generated result it less than 3M
 *
 * @param pReq
 * @param pDataBlocks
 * @param vgId
 * @param suid  // TODO: check with Liao whether suid response is reasonable
1772
 *
C
Cary Xu 已提交
1773 1774
 * TODO: colId should be set
 */
1775 1776
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
                                    tb_uid_t suid) {
C
Cary Xu 已提交
1777 1778 1779 1780
  int32_t sz = taosArrayGetSize(pDataBlocks);
  int32_t bufSize = sizeof(SSubmitReq);
  for (int32_t i = 0; i < sz; ++i) {
    SDataBlockInfo* pBlkInfo = &((SSDataBlock*)taosArrayGet(pDataBlocks, i))->info;
1781 1782 1783

    int32_t numOfCols = taosArrayGetSize(pDataBlocks);
    bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(numOfCols));
C
Cary Xu 已提交
1784 1785 1786 1787
    bufSize += sizeof(SSubmitBlk);
  }

  *pReq = taosMemoryCalloc(1, bufSize);
1788
  if (!(*pReq)) {
C
Cary Xu 已提交
1789 1790 1791
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return TSDB_CODE_FAILED;
  }
C
Cary Xu 已提交
1792 1793
  void* pDataBuf = *pReq;

1794
  int32_t     msgLen = sizeof(SSubmitReq);
C
Cary Xu 已提交
1795 1796
  int32_t     numOfBlks = 0;
  SRowBuilder rb = {0};
C
Cary Xu 已提交
1797
  tdSRowInit(&rb, pTSchema->version);
C
Cary Xu 已提交
1798 1799 1800

  for (int32_t i = 0; i < sz; ++i) {
    SSDataBlock* pDataBlock = taosArrayGet(pDataBlocks, i);
1801
    int32_t      colNum = taosArrayGetSize(pDataBlock->pDataBlock);
C
Cary Xu 已提交
1802
    int32_t      rows = pDataBlock->info.rows;
L
Liu Jicong 已提交
1803 1804
    //    int32_t      rowSize = pDataBlock->info.rowSize;
    //    int64_t      groupId = pDataBlock->info.groupId;
1805

1806 1807 1808 1809 1810
    if (colNum <= 1) {
      // invalid if only with TS col
      continue;
    }

1811
    if (rb.nCols != colNum) {
C
Cary Xu 已提交
1812 1813 1814 1815 1816
      tdSRowSetTpInfo(&rb, colNum, pTSchema->flen);
    }

    SSubmitBlk* pSubmitBlk = POINTER_SHIFT(pDataBuf, msgLen);
    pSubmitBlk->suid = suid;
1817
    pSubmitBlk->uid = pDataBlock->info.groupId;
C
Cary Xu 已提交
1818
    pSubmitBlk->numOfRows = rows;
C
Cary Xu 已提交
1819
    pSubmitBlk->sversion = pTSchema->version;
C
Cary Xu 已提交
1820 1821 1822

    msgLen += sizeof(SSubmitBlk);
    int32_t dataLen = 0;
1823
    for (int32_t j = 0; j < rows; ++j) {                     // iterate by row
C
Cary Xu 已提交
1824
      tdSRowResetBuf(&rb, POINTER_SHIFT(pDataBuf, msgLen + dataLen));  // set row buf
1825
      bool    isStartKey = false;
1826
      int32_t offset = 0;
C
Cary Xu 已提交
1827 1828
      for (int32_t k = 0; k < colNum; ++k) {  // iterate by column
        SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
1829 1830
        STColumn*        pCol = &pTSchema->columns[k];
        void*            var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
C
Cary Xu 已提交
1831 1832 1833 1834
        switch (pColInfoData->info.type) {
          case TSDB_DATA_TYPE_TIMESTAMP:
            if (!isStartKey) {
              isStartKey = true;
1835
              tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true,
1836 1837
                                  offset, k);

C
Cary Xu 已提交
1838
            } else {
1839 1840
              tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var,
                                  true, offset, k);
C
Cary Xu 已提交
1841 1842 1843
            }
            break;
          case TSDB_DATA_TYPE_NCHAR: {
1844 1845
            tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_NCHAR, TD_VTYPE_NORM, var, true,
                                offset, k);
C
Cary Xu 已提交
1846 1847 1848
            break;
          }
          case TSDB_DATA_TYPE_VARCHAR: {  // TSDB_DATA_TYPE_BINARY
1849 1850
            tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_VARCHAR, TD_VTYPE_NORM, var, true,
                                offset, k);
C
Cary Xu 已提交
1851 1852 1853 1854 1855
            break;
          }
          case TSDB_DATA_TYPE_VARBINARY:
          case TSDB_DATA_TYPE_DECIMAL:
          case TSDB_DATA_TYPE_BLOB:
1856
          case TSDB_DATA_TYPE_JSON:
C
Cary Xu 已提交
1857
          case TSDB_DATA_TYPE_MEDIUMBLOB:
C
Cary Xu 已提交
1858
            uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
C
Cary Xu 已提交
1859 1860 1861 1862
            TASSERT(0);
            break;
          default:
            if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
C
Cary Xu 已提交
1863 1864 1865
              if (pCol->type == pColInfoData->info.type) {
                tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NORM, var, true, offset,
                                    k);
1866
              } else {
C
Cary Xu 已提交
1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886
                char tv[8] = {0};
                if (pColInfoData->info.type == TSDB_DATA_TYPE_FLOAT) {
                  float v = 0;
                  GET_TYPED_DATA(v, float, pColInfoData->info.type, var);
                  SET_TYPED_DATA(&tv, pCol->type, v);
                } else if (pColInfoData->info.type == TSDB_DATA_TYPE_DOUBLE) {
                  double v = 0;
                  GET_TYPED_DATA(v, double, pColInfoData->info.type, var);
                  SET_TYPED_DATA(&tv, pCol->type, v);
                } else if (IS_SIGNED_NUMERIC_TYPE(pColInfoData->info.type)) {
                  int64_t v = 0;
                  GET_TYPED_DATA(v, int64_t, pColInfoData->info.type, var);
                  SET_TYPED_DATA(&tv, pCol->type, v);
                } else {
                  uint64_t v = 0;
                  GET_TYPED_DATA(v, uint64_t, pColInfoData->info.type, var);
                  SET_TYPED_DATA(&tv, pCol->type, v);
                }
                tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NORM, tv, true, offset,
                                    k);
1887
              }
C
Cary Xu 已提交
1888
            } else {
C
Cary Xu 已提交
1889
              uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
C
Cary Xu 已提交
1890 1891 1892 1893
              TASSERT(0);
            }
            break;
        }
1894
        offset += TYPE_BYTES[pCol->type];  // sum/avg would convert to int64_t/uint64_t/double during aggregation
C
Cary Xu 已提交
1895 1896
      }
      dataLen += TD_ROW_LEN(rb.pBuf);
C
Cary Xu 已提交
1897 1898 1899
#ifdef TD_DEBUG_PRINT_ROW
      tdSRowPrint(rb.pBuf, pTSchema, __func__);
#endif
C
Cary Xu 已提交
1900
    }
C
Cary Xu 已提交
1901 1902 1903

    ++numOfBlks;

C
Cary Xu 已提交
1904 1905 1906 1907
    pSubmitBlk->dataLen = dataLen;
    msgLen += pSubmitBlk->dataLen;
  }

1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929
  if (numOfBlks > 0) {
    (*pReq)->length = msgLen;

    (*pReq)->header.vgId = htonl(vgId);
    (*pReq)->header.contLen = htonl(msgLen);
    (*pReq)->length = (*pReq)->header.contLen;
    (*pReq)->numOfBlocks = htonl(numOfBlks);
    SSubmitBlk* blk = (SSubmitBlk*)((*pReq) + 1);
    while (numOfBlks--) {
      int32_t dataLen = blk->dataLen;
      blk->uid = htobe64(blk->uid);
      blk->suid = htobe64(blk->suid);
      blk->padding = htonl(blk->padding);
      blk->sversion = htonl(blk->sversion);
      blk->dataLen = htonl(blk->dataLen);
      blk->schemaLen = htonl(blk->schemaLen);
      blk->numOfRows = htons(blk->numOfRows);
      blk = (SSubmitBlk*)(blk->data + dataLen);
    }
  } else {
    // no valid rows
    taosMemoryFreeClear(*pReq);
C
Cary Xu 已提交
1930
  }
C
Cary Xu 已提交
1931 1932

  return TSDB_CODE_SUCCESS;
C
Cary Xu 已提交
1933
}
L
Liu Jicong 已提交
1934

1935
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId) {
L
Liu Jicong 已提交
1936
  ASSERT(stbName[0] != 0);
1937 1938 1939 1940 1941 1942
  SArray* tags = taosArrayInit(0, sizeof(void*));
  SSmlKv* pTag = taosMemoryCalloc(1, sizeof(SSmlKv));
  pTag->key = "group_id";
  pTag->keyLen = strlen(pTag->key);
  pTag->type = TSDB_DATA_TYPE_UBIGINT;
  pTag->u = groupId;
L
Liu Jicong 已提交
1943
  pTag->length = sizeof(uint64_t);
1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963
  taosArrayPush(tags, &pTag);

  void* cname = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1);

  RandTableName rname = {
      .tags = tags,
      .sTableName = stbName,
      .sTableNameLen = strlen(stbName),
      .childTableName = cname,
  };

  buildChildTableName(&rname);

  taosMemoryFree(pTag);
  taosArrayDestroy(tags);

  ASSERT(rname.childTableName && rname.childTableName[0]);
  return rname.childTableName;
}

1964
void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress) {
1965
  // todo extract method
1966 1967 1968 1969 1970 1971
  int32_t* actualLen = (int32_t*)data;
  data += sizeof(int32_t);

  uint64_t* groupId = (uint64_t*)data;
  data += sizeof(uint64_t);

L
Liu Jicong 已提交
1972
  for (int32_t i = 0; i < numOfCols; ++i) {
1973
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
1974

L
Liu Jicong 已提交
1975
    *((int16_t*)data) = pColInfoData->info.type;
1976 1977
    data += sizeof(int16_t);

L
Liu Jicong 已提交
1978
    *((int32_t*)data) = pColInfoData->info.bytes;
1979 1980 1981
    data += sizeof(int32_t);
  }

1982 1983 1984
  int32_t* colSizes = (int32_t*)data;
  data += numOfCols * sizeof(int32_t);

1985
  *dataLen = blockDataGetSerialMetaSize(numOfCols);
1986 1987 1988 1989 1990 1991

  int32_t numOfRows = pBlock->info.rows;
  for (int32_t col = 0; col < numOfCols; ++col) {
    SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, col);

    // copy the null bitmap
1992
    size_t metaSize = 0;
1993
    if (IS_VAR_DATA_TYPE(pColRes->info.type)) {
1994
      metaSize = numOfRows * sizeof(int32_t);
1995 1996
      memcpy(data, pColRes->varmeta.offset, metaSize);
    } else {
1997 1998
      metaSize = BitmapLen(numOfRows);
      memcpy(data, pColRes->nullbitmap, metaSize);
1999 2000
    }

2001 2002 2003
    data += metaSize;
    (*dataLen) += metaSize;

2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021
    if (needCompress) {
      colSizes[col] = blockCompressColData(pColRes, numOfRows, data, needCompress);
      data += colSizes[col];
      (*dataLen) += colSizes[col];
    } else {
      colSizes[col] = colDataGetLength(pColRes, numOfRows);
      (*dataLen) += colSizes[col];
      memmove(data, pColRes->pData, colSizes[col]);
      data += colSizes[col];
    }

    colSizes[col] = htonl(colSizes[col]);
  }

  *actualLen = *dataLen;
  *groupId = pBlock->info.groupId;
}

2022
const char* blockDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData) {
2023 2024 2025 2026 2027 2028 2029 2030
  const char* pStart = pData;

  int32_t dataLen = *(int32_t*)pStart;
  pStart += sizeof(int32_t);

  pBlock->info.groupId = *(uint64_t*)pStart;
  pStart += sizeof(uint64_t);

2031 2032 2033 2034 2035
  if (pBlock->pDataBlock == NULL) {
    pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
    taosArraySetSize(pBlock->pDataBlock, numOfCols);
  }

L
Liu Jicong 已提交
2036
  for (int32_t i = 0; i < numOfCols; ++i) {
2037 2038 2039 2040 2041 2042
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    pColInfoData->info.type = *(int16_t*)pStart;
    pStart += sizeof(int16_t);

    pColInfoData->info.bytes = *(int32_t*)pStart;
    pStart += sizeof(int32_t);
2043 2044 2045 2046

    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
      pBlock->info.hasVarCol = true;
    }
2047 2048 2049 2050
  }

  blockDataEnsureCapacity(pBlock, numOfRows);

2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062
  int32_t* colLen = (int32_t*)pStart;
  pStart += sizeof(int32_t) * numOfCols;

  for (int32_t i = 0; i < numOfCols; ++i) {
    colLen[i] = htonl(colLen[i]);
    ASSERT(colLen[i] >= 0);

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
      memcpy(pColInfoData->varmeta.offset, pStart, sizeof(int32_t) * numOfRows);
      pStart += sizeof(int32_t) * numOfRows;

2063 2064 2065 2066 2067 2068 2069 2070
      if (colLen[i] > 0 && pColInfoData->varmeta.allocLen < colLen[i]) {
        char* tmp = taosMemoryRealloc(pColInfoData->pData, colLen[i]);
        if (tmp == NULL) {
          return NULL;
        }

        pColInfoData->pData = tmp;
        pColInfoData->varmeta.allocLen = colLen[i];
2071
      }
2072 2073

      pColInfoData->varmeta.length = colLen[i];
2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089
    } else {
      memcpy(pColInfoData->nullbitmap, pStart, BitmapLen(numOfRows));
      pStart += BitmapLen(numOfRows);
    }

    if (colLen[i] > 0) {
      memcpy(pColInfoData->pData, pStart, colLen[i]);
    }

    // TODO
    // setting this flag to true temporarily so aggregate function on stable will
    // examine NULL value for non-primary key column
    pColInfoData->hasNull = true;
    pStart += colLen[i];
  }

2090
  pBlock->info.rows = numOfRows;
2091 2092
  ASSERT(pStart - pData == dataLen);
  return pStart;
L
Liu Jicong 已提交
2093
}
D
dapan1121 已提交
2094