tsort.c 36.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "query.h"
H
Hongze Cheng 已提交
17
#include "tcommon.h"
18

H
Hongze Cheng 已提交
19
#include "tcompare.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
S
Shengliang Guan 已提交
21
#include "tdef.h"
22
#include "theap.h"
23 24
#include "tlosertree.h"
#include "tpagedbuf.h"
H
Haojun Liao 已提交
25
#include "tsort.h"
26 27
#include "tutil.h"

28 29 30 31 32
struct STupleHandle {
  SSDataBlock* pBlock;
  int32_t      rowIndex;
};

H
Haojun Liao 已提交
33
struct SSortHandle {
H
Hongze Cheng 已提交
34 35 36 37
  int32_t        type;
  int32_t        pageSize;
  int32_t        numOfPages;
  SDiskbasedBuf* pBuf;
38 39 40 41 42 43
  SArray*        pSortInfo;
  SArray*        pOrderedSource;
  int32_t        loops;
  uint64_t       sortElapsed;
  int64_t        startTs;
  uint64_t       totalElapsed;
44

45 46 47 48 49 50
  uint64_t         maxRows;
  uint32_t         maxTupleLength;
  uint32_t         sortBufSize;
  BoundedQueue*    pBoundedQueue;
  uint32_t         tmpRowIdx;

51
  int32_t           sourceId;
H
Hongze Cheng 已提交
52
  SSDataBlock*      pDataBlock;
53 54 55
  SMsortComparParam cmpParam;
  int32_t           numOfCompletedSources;
  bool              opened;
D
dapan1121 已提交
56
  int8_t            closed;
H
Hongze Cheng 已提交
57
  const char*       idStr;
58 59 60
  bool              inMemSort;
  bool              needAdjust;
  STupleHandle      tupleHandle;
H
Hongze Cheng 已提交
61
  void*             param;
62
  void (*beforeFp)(SSDataBlock* pBlock, void* param);
63 64 65

  _sort_fetch_block_fn_t  fetchfp;
  _sort_merge_compar_fn_t comparFn;
H
Hongze Cheng 已提交
66
  SMultiwayMergeTreeInfo* pMergeTree;
H
Haojun Liao 已提交
67
};
68

H
Hongze Cheng 已提交
69
static int32_t msortComparFn(const void* pLeft, const void* pRight, void* param);
70

71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
// | offset[0] | offset[1] |....| nullbitmap | data |...|
static void* createTuple(uint32_t columnNum, uint32_t tupleLen) {
  uint32_t totalLen = sizeof(uint32_t) * columnNum + BitmapLen(columnNum) + tupleLen;
  return taosMemoryCalloc(1, totalLen);
}
static void destoryTuple(void* t) { taosMemoryFree(t); }

#define tupleOffset(tuple, colIdx) ((uint32_t*)(tuple + sizeof(uint32_t) * colIdx))
#define tupleSetOffset(tuple, colIdx, offset) (*tupleOffset(tuple, colIdx) = offset)
#define tupleSetNull(tuple, colIdx, colNum) colDataSetNull_f((char*)tuple + sizeof(uint32_t) * colNum, colIdx)
#define tupleColIsNull(tuple, colIdx, colNum) colDataIsNull_f((char*)tuple + sizeof(uint32_t) * colNum, colIdx)
#define tupleGetDataStartOffset(colNum) (sizeof(uint32_t) * colNum + BitmapLen(colNum))
#define tupleSetData(tuple, offset, data, length) memcpy(tuple + offset, data, length)

/**
 * @param t the tuple pointer addr, if realloced, *t is changed to the new addr
 * @param offset copy data into pTuple start from offset
 * @param colIndex the columnIndex, for setting null bitmap
 * @return the next offset to add field
 * */
static inline size_t tupleAddField(char** t, uint32_t colNum, uint32_t offset, uint32_t colIdx, void* data, size_t length,
                            bool isNull, uint32_t tupleLen) {
  tupleSetOffset(*t, colIdx, offset);
  if (isNull) {
    tupleSetNull(*t, colIdx, colNum);
  } else {
    if (offset + length > tupleLen + tupleGetDataStartOffset(colNum)) {
      *t = taosMemoryRealloc(*t, offset + length);
    }
    tupleSetData(*t, offset, data, length);
  }
  return offset + length;
}

static void* tupleGetField(char* t, uint32_t colIdx, uint32_t colNum) {
  if (tupleColIsNull(t, colIdx, colNum)) return NULL;
  return t + *tupleOffset(t, colIdx);
}

static int32_t colDataComparFn(const void* pLeft, const void* pRight, void* param);

112 113
SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle) {
  return createOneDataBlock(pSortHandle->pDataBlock, false);
114 115 116 117 118 119 120
}

/**
 *
 * @param type
 * @return
 */
H
Hongze Cheng 已提交
121
SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t numOfPages,
122 123
                                   SSDataBlock* pBlock, const char* idstr, uint64_t maxRows, uint32_t maxTupleLength,
                                   uint32_t sortBufSize) {
wafwerar's avatar
wafwerar 已提交
124
  SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle));
125

H
Hongze Cheng 已提交
126 127
  pSortHandle->type = type;
  pSortHandle->pageSize = pageSize;
128
  pSortHandle->numOfPages = numOfPages;
H
Hongze Cheng 已提交
129 130
  pSortHandle->pSortInfo = pSortInfo;
  pSortHandle->loops = 0;
131

132 133 134 135 136 137 138
  pSortHandle->maxTupleLength = maxTupleLength;
  if (maxRows < 0)
    pSortHandle->sortBufSize = 0;
  else
    pSortHandle->sortBufSize = sortBufSize;
  pSortHandle->maxRows = maxRows;

139 140 141
  if (pBlock != NULL) {
    pSortHandle->pDataBlock = createOneDataBlock(pBlock, false);
  }
H
Haojun Liao 已提交
142

H
Hongze Cheng 已提交
143
  pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES);
H
Haojun Liao 已提交
144
  pSortHandle->cmpParam.orderInfo = pSortInfo;
145
  pSortHandle->cmpParam.cmpGroupId = false;
146

H
Haojun Liao 已提交
147
  tsortSetComparFp(pSortHandle, msortComparFn);
148 149

  if (idstr != NULL) {
150
    pSortHandle->idStr = taosStrdup(idstr);
151 152 153 154 155
  }

  return pSortHandle;
}

156
static int32_t sortComparCleanup(SMsortComparParam* cmpParam) {
157
  // NOTICE: pSource may be, if it is SORT_MULTISOURCE_MERGE
H
Hongze Cheng 已提交
158
  for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
159
    SSortSource* pSource = cmpParam->pSources[i];
wmmhello's avatar
wmmhello 已提交
160
    blockDataDestroy(pSource->src.pBlock);
S
slzhou 已提交
161 162 163
    if (pSource->pageIdList) {
      taosArrayDestroy(pSource->pageIdList);
    }
wmmhello's avatar
wmmhello 已提交
164
    taosMemoryFreeClear(pSource);
S
slzhou 已提交
165
    cmpParam->pSources[i] = NULL;
wmmhello's avatar
wmmhello 已提交
166 167 168 169 170 171
  }

  cmpParam->numOfSources = 0;
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
172
void tsortClearOrderdSource(SArray* pOrderedSource, int64_t *fetchUs, int64_t *fetchNum) {
173 174 175 176 177
  for (size_t i = 0; i < taosArrayGetSize(pOrderedSource); i++) {
    SSortSource** pSource = taosArrayGet(pOrderedSource, i);
    if (NULL == *pSource) {
      continue;
    }
D
dapan1121 已提交
178 179 180 181 182 183

    if (fetchUs) {
      *fetchUs += (*pSource)->fetchUs;
      *fetchNum += (*pSource)->fetchNum;
    }
    
184 185 186
    // release pageIdList
    if ((*pSource)->pageIdList) {
      taosArrayDestroy((*pSource)->pageIdList);
S
slzhou 已提交
187
      (*pSource)->pageIdList = NULL;
188
    }
189
    if ((*pSource)->param && !(*pSource)->onlyRef) {
190
      taosMemoryFree((*pSource)->param);
S
slzhou 已提交
191
      (*pSource)->param = NULL;
192
    }
dengyihao's avatar
dengyihao 已提交
193 194

    if (!(*pSource)->onlyRef && (*pSource)->src.pBlock) {
dengyihao's avatar
dengyihao 已提交
195 196
      blockDataDestroy((*pSource)->src.pBlock);
      (*pSource)->src.pBlock = NULL;
dengyihao's avatar
dengyihao 已提交
197
    }
dengyihao's avatar
dengyihao 已提交
198

199 200 201 202 203 204
    taosMemoryFreeClear(*pSource);
  }

  taosArrayClear(pOrderedSource);
}

H
Haojun Liao 已提交
205
void tsortDestroySortHandle(SSortHandle* pSortHandle) {
206 207 208
  if (pSortHandle == NULL) {
    return;
  }
H
Haojun Liao 已提交
209
  tsortClose(pSortHandle);
210
  if (pSortHandle->pMergeTree != NULL) {
D
dapan1121 已提交
211
    tMergeTreeDestroy(&pSortHandle->pMergeTree);
212 213
  }

H
Haojun Liao 已提交
214
  destroyDiskbasedBuf(pSortHandle->pBuf);
wafwerar's avatar
wafwerar 已提交
215
  taosMemoryFreeClear(pSortHandle->idStr);
wmmhello's avatar
wmmhello 已提交
216
  blockDataDestroy(pSortHandle->pDataBlock);
217
  if (pSortHandle->pBoundedQueue) destroyBoundedQueue(pSortHandle->pBoundedQueue);
218

D
dapan1121 已提交
219 220
  int64_t fetchUs = 0, fetchNum = 0;
  tsortClearOrderdSource(pSortHandle->pOrderedSource, &fetchUs, &fetchNum);
221
  qDebug("all source fetch time: %" PRId64 "us num:%" PRId64 " %s", fetchUs, fetchNum, pSortHandle->idStr);
D
dapan1121 已提交
222
  
wmmhello's avatar
wmmhello 已提交
223
  taosArrayDestroy(pSortHandle->pOrderedSource);
wafwerar's avatar
wafwerar 已提交
224
  taosMemoryFreeClear(pSortHandle);
225 226
}

H
Haojun Liao 已提交
227
int32_t tsortAddSource(SSortHandle* pSortHandle, void* pSource) {
H
Haojun Liao 已提交
228
  taosArrayPush(pSortHandle->pOrderedSource, &pSource);
229
  return TSDB_CODE_SUCCESS;
230 231
}

H
Hongze Cheng 已提交
232 233
static int32_t doAddNewExternalMemSource(SDiskbasedBuf* pBuf, SArray* pAllSources, SSDataBlock* pBlock,
                                         int32_t* sourceId, SArray* pPageIdList) {
wmmhello's avatar
wmmhello 已提交
234
  SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource));
235
  if (pSource == NULL) {
H
Haojun Liao 已提交
236
    taosArrayDestroy(pPageIdList);
S
Shengliang Guan 已提交
237
    return TSDB_CODE_OUT_OF_MEMORY;
238 239 240
  }

  pSource->src.pBlock = pBlock;
241
  pSource->pageIdList = pPageIdList;
242 243 244 245 246
  taosArrayPush(pAllSources, &pSource);

  (*sourceId) += 1;

  int32_t rowSize = blockDataGetSerialRowSize(pSource->src.pBlock);
247 248

  // The value of numOfRows must be greater than 0, which is guaranteed by the previous memory allocation
H
Hongze Cheng 已提交
249 250
  int32_t numOfRows =
      (getBufPageSize(pBuf) - blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock))) / rowSize;
wmmhello's avatar
wmmhello 已提交
251
  ASSERT(numOfRows > 0);
H
Haojun Liao 已提交
252

253 254 255 256 257 258 259
  return blockDataEnsureCapacity(pSource->src.pBlock, numOfRows);
}

static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
  int32_t start = 0;

  if (pHandle->pBuf == NULL) {
wafwerar's avatar
wafwerar 已提交
260
    if (!osTempSpaceAvailable()) {
261 262
      terrno = TSDB_CODE_NO_DISKSPACE;
      qError("Add to buf failed since %s, tempDir:%s", terrstr(), tsTempDir);
wafwerar's avatar
wafwerar 已提交
263 264
      return terrno;
    }
265

H
Hongze Cheng 已提交
266
    int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize,
267
                                      "sortExternalBuf", tsTempDir);
H
Haojun Liao 已提交
268
    dBufSetPrintInfo(pHandle->pBuf);
269 270 271 272 273
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
  }

274
  SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
H
Hongze Cheng 已提交
275
  while (start < pDataBlock->info.rows) {
276
    int32_t stop = 0;
277
    blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pHandle->pageSize);
278 279
    SSDataBlock* p = blockDataExtractBlock(pDataBlock, start, stop - start + 1);
    if (p == NULL) {
H
Haojun Liao 已提交
280
      taosArrayDestroy(pPageIdList);
281 282 283 284
      return terrno;
    }

    int32_t pageId = -1;
H
Hongze Cheng 已提交
285
    void*   pPage = getNewBufPage(pHandle->pBuf, &pageId);
286
    if (pPage == NULL) {
H
Haojun Liao 已提交
287
      taosArrayDestroy(pPageIdList);
wmmhello's avatar
wmmhello 已提交
288
      blockDataDestroy(p);
289 290 291
      return terrno;
    }

292 293
    taosArrayPush(pPageIdList, &pageId);

H
Hongze Cheng 已提交
294
    int32_t size = blockDataGetSize(p) + sizeof(int32_t) + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t);
H
Haojun Liao 已提交
295
    ASSERT(size <= getBufPageSize(pHandle->pBuf));
296

297
    blockDataToBuf(pPage, p);
298 299 300 301 302 303 304 305

    setBufPageDirty(pPage, true);
    releaseBufPage(pHandle->pBuf, pPage);

    blockDataDestroy(p);
    start = stop + 1;
  }

306
  blockDataCleanup(pDataBlock);
307

308
  SSDataBlock* pBlock = createOneDataBlock(pDataBlock, false);
309
  return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId, pPageIdList);
310 311
}

312
static void setCurrentSourceDone(SSortSource* pSource, SSortHandle* pHandle) {
313 314 315 316
  pSource->src.rowIndex = -1;
  ++pHandle->numOfCompletedSources;
}

317
static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32_t startIndex, int32_t endIndex,
H
Hongze Cheng 已提交
318
                              SSortHandle* pHandle) {
319 320
  pParam->pSources = taosArrayGet(pSources, startIndex);
  pParam->numOfSources = (endIndex - startIndex + 1);
321 322 323

  int32_t code = 0;

H
Haojun Liao 已提交
324 325 326
  // multi-pass internal merge sort is required
  if (pHandle->pBuf == NULL) {
    if (!osTempSpaceAvailable()) {
327 328
      code = terrno = TSDB_CODE_NO_DISKSPACE;
      qError("Sort compare init failed since %s, tempDir:%s, idStr:%s", terrstr(), tsTempDir, pHandle->idStr);
H
Haojun Liao 已提交
329 330 331 332 333 334 335
      return code;
    }

    code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize,
                              "sortComparInit", tsTempDir);
    dBufSetPrintInfo(pHandle->pBuf);
    if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
336
      terrno = code;
H
Haojun Liao 已提交
337 338 339 340
      return code;
    }
  }

H
Haojun Liao 已提交
341
  if (pHandle->type == SORT_SINGLESOURCE_SORT) {
342 343
    for (int32_t i = 0; i < pParam->numOfSources; ++i) {
      SSortSource* pSource = pParam->pSources[i];
344

345
      // set current source is done
346
      if (taosArrayGetSize(pSource->pageIdList) == 0) {
347
        setCurrentSourceDone(pSource, pHandle);
348
        continue;
349 350
      }

D
dapan1121 已提交
351
      int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex);
352

D
dapan1121 已提交
353
      void* pPage = getBufPage(pHandle->pBuf, *pPgId);
354 355 356 357
      if (NULL == pPage) {
        return terrno;
      }
      
358
      code = blockDataFromBuf(pSource->src.pBlock, pPage);
359
      if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
360
        terrno = code;
361 362 363 364 365 366
        return code;
      }

      releaseBufPage(pHandle->pBuf, pPage);
    }
  } else {
367 368 369 370 371
    qDebug("start init for the multiway merge sort, %s", pHandle->idStr);
    int64_t st = taosGetTimestampUs();

    for (int32_t i = 0; i < pParam->numOfSources; ++i) {
      SSortSource* pSource = pParam->pSources[i];
372
      pSource->src.pBlock = pHandle->fetchfp(pSource->param);
373

374
      // set current source is done
375
      if (pSource->src.pBlock == NULL) {
376
        setCurrentSourceDone(pSource, pHandle);
377
      }
378
    }
379 380

    int64_t et = taosGetTimestampUs();
381
    qDebug("init for merge sort completed, elapsed time:%.2f ms, %s", (et - st) / 1000.0, pHandle->idStr);
382 383 384 385 386
  }

  return code;
}

H
Hongze Cheng 已提交
387
static void appendOneRowToDataBlock(SSDataBlock* pBlock, const SSDataBlock* pSource, int32_t* rowIndex) {
388
  for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
389 390 391
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);

    SColumnInfoData* pSrcColInfo = taosArrayGet(pSource->pDataBlock, i);
H
Hongze Cheng 已提交
392
    bool             isNull = colDataIsNull(pSrcColInfo, pSource->info.rows, *rowIndex, NULL);
393 394

    if (isNull) {
395
      colDataSetVal(pColInfo, pBlock->info.rows, NULL, true);
396
    } else {
397
      char* pData = colDataGetData(pSrcColInfo, *rowIndex);
398
      colDataSetVal(pColInfo, pBlock->info.rows, pData, false);
399 400 401 402 403 404 405
    }
  }

  pBlock->info.rows += 1;
  *rowIndex += 1;
}

H
Hongze Cheng 已提交
406 407
static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeTreeInfo* pTree, SSortHandle* pHandle,
                                           int32_t* numOfCompleted) {
408 409 410 411 412 413 414
  /*
   * load a new SDataBlock into memory of a given intermediate data-set source,
   * since it's last record in buffer has been chosen to be processed, as the winner of loser-tree
   */
  if (pSource->src.rowIndex >= pSource->src.pBlock->info.rows) {
    pSource->src.rowIndex = 0;

415
    if (pHandle->type == SORT_SINGLESOURCE_SORT) {
H
Hongze Cheng 已提交
416
      pSource->pageIndex++;
417 418 419 420 421 422
      if (pSource->pageIndex >= taosArrayGetSize(pSource->pageIdList)) {
        (*numOfCompleted) += 1;
        pSource->src.rowIndex = -1;
        pSource->pageIndex = -1;
        pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock);
      } else {
D
dapan1121 已提交
423
        int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex);
424

H
Hongze Cheng 已提交
425
        void*   pPage = getBufPage(pHandle->pBuf, *pPgId);
426 427 428 429 430
        if (pPage == NULL) {
          qError("failed to get buffer, code:%s", tstrerror(terrno));
          return terrno;
        }

H
Hongze Cheng 已提交
431
        int32_t code = blockDataFromBuf(pSource->src.pBlock, pPage);
432 433 434 435 436
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

        releaseBufPage(pHandle->pBuf, pPage);
437 438
      }
    } else {
D
dapan1121 已提交
439
      int64_t st = taosGetTimestampUs();      
wmmhello's avatar
wmmhello 已提交
440
      pSource->src.pBlock = pHandle->fetchfp(((SSortSource*)pSource)->param);
D
dapan1121 已提交
441 442
      pSource->fetchUs += taosGetTimestampUs() - st;
      pSource->fetchNum++;
443 444 445
      if (pSource->src.pBlock == NULL) {
        (*numOfCompleted) += 1;
        pSource->src.rowIndex = -1;
446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466
      }
    }
  }

  /*
   * Adjust loser tree otherwise, according to new candidate data
   * if the loser tree is rebuild completed, we do not need to adjust
   */
  int32_t leafNodeIndex = tMergeTreeGetAdjustIndex(pTree);

#ifdef _DEBUG_VIEW
  printf("before adjust:\t");
  tMergeTreePrint(pTree);
#endif

  tMergeTreeAdjust(pTree, leafNodeIndex);

#ifdef _DEBUG_VIEW
  printf("\nafter adjust:\t");
  tMergeTreePrint(pTree);
#endif
467
  return TSDB_CODE_SUCCESS;
468 469
}

wmmhello's avatar
wmmhello 已提交
470
static SSDataBlock* getSortedBlockDataInner(SSortHandle* pHandle, SMsortComparParam* cmpParam, int32_t capacity) {
471
  blockDataCleanup(pHandle->pDataBlock);
472

H
Hongze Cheng 已提交
473
  while (1) {
474 475 476 477 478 479
    if (cmpParam->numOfSources == pHandle->numOfCompletedSources) {
      break;
    }

    int32_t index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);

H
Hongze Cheng 已提交
480
    SSortSource* pSource = (*cmpParam).pSources[index];
481 482 483 484 485 486 487 488 489 490 491 492 493
    appendOneRowToDataBlock(pHandle->pDataBlock, pSource->src.pBlock, &pSource->src.rowIndex);

    int32_t code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return NULL;
    }

    if (pHandle->pDataBlock->info.rows >= capacity) {
      return pHandle->pDataBlock;
    }
  }

H
Hongze Cheng 已提交
494
  return (pHandle->pDataBlock->info.rows > 0) ? pHandle->pDataBlock : NULL;
495 496
}

H
Hongze Cheng 已提交
497 498 499
int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
  int32_t pLeftIdx = *(int32_t*)pLeft;
  int32_t pRightIdx = *(int32_t*)pRight;
500

H
Hongze Cheng 已提交
501
  SMsortComparParam* pParam = (SMsortComparParam*)param;
502

H
Hongze Cheng 已提交
503
  SArray* pInfo = pParam->orderInfo;
504

H
Hongze Cheng 已提交
505
  SSortSource* pLeftSource = pParam->pSources[pLeftIdx];
wmmhello's avatar
wmmhello 已提交
506
  SSortSource* pRightSource = pParam->pSources[pRightIdx];
507 508 509 510 511 512 513 514 515 516 517 518 519

  // this input is exhausted, set the special value to denote this
  if (pLeftSource->src.rowIndex == -1) {
    return 1;
  }

  if (pRightSource->src.rowIndex == -1) {
    return -1;
  }

  SSDataBlock* pLeftBlock = pLeftSource->src.pBlock;
  SSDataBlock* pRightBlock = pRightSource->src.pBlock;

520
  if (pParam->cmpGroupId) {
H
Haojun Liao 已提交
521 522
    if (pLeftBlock->info.id.groupId != pRightBlock->info.id.groupId) {
      return pLeftBlock->info.id.groupId < pRightBlock->info.id.groupId ? -1 : 1;
523
    }
524 525
  }

H
Hongze Cheng 已提交
526
  for (int32_t i = 0; i < pInfo->size; ++i) {
527
    SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
H
Haojun Liao 已提交
528
    SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
529

H
Hongze Cheng 已提交
530
    bool leftNull = false;
531
    if (pLeftColInfoData->hasNull) {
532 533 534
      if (pLeftBlock->pBlockAgg == NULL) {
        leftNull = colDataIsNull_s(pLeftColInfoData, pLeftSource->src.rowIndex);
      } else {
H
Hongze Cheng 已提交
535 536
        leftNull =
            colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg[i]);
537
      }
538 539
    }

H
Haojun Liao 已提交
540
    SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
H
Hongze Cheng 已提交
541
    bool             rightNull = false;
542
    if (pRightColInfoData->hasNull) {
H
Haojun Liao 已提交
543
      if (pRightBlock->pBlockAgg == NULL) {
544 545
        rightNull = colDataIsNull_s(pRightColInfoData, pRightSource->src.rowIndex);
      } else {
H
Hongze Cheng 已提交
546 547
        rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex,
                                  pRightBlock->pBlockAgg[i]);
548
      }
549 550 551
    }

    if (leftNull && rightNull) {
H
Hongze Cheng 已提交
552
      continue;  // continue to next slot
553 554 555
    }

    if (rightNull) {
H
Hongze Cheng 已提交
556
      return pOrder->nullFirst ? 1 : -1;
557 558 559
    }

    if (leftNull) {
H
Hongze Cheng 已提交
560
      return pOrder->nullFirst ? -1 : 1;
561 562
    }

H
Hongze Cheng 已提交
563
    void* left1 = colDataGetData(pLeftColInfoData, pLeftSource->src.rowIndex);
564
    void* right1 = colDataGetData(pRightColInfoData, pRightSource->src.rowIndex);
565

566
    __compar_fn_t fn = getKeyComparFunc(pLeftColInfoData->info.type, pOrder->order);
567

568 569 570 571 572
    int ret = fn(left1, right1);
    if (ret == 0) {
      continue;
    } else {
      return ret;
573 574
    }
  }
575
  return 0;
576 577 578 579
}

static int32_t doInternalMergeSort(SSortHandle* pHandle) {
  size_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
H
Haojun Liao 已提交
580 581 582
  if (numOfSources == 0) {
    return 0;
  }
583 584 585 586 587

  // Calculate the I/O counts to complete the data sort.
  double sortPass = floorl(log2(numOfSources) / log2(pHandle->numOfPages));

  pHandle->totalElapsed = taosGetTimestampUs() - pHandle->startTs;
588 589 590 591 592 593 594

  if (sortPass > 0) {
    size_t s = pHandle->pBuf ? getTotalBufSize(pHandle->pBuf) : 0;
    qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%" PRIzu
           ", sort elapsed:%" PRId64 ", total elapsed:%" PRId64,
           pHandle->idStr, (int32_t)(sortPass + 1), s, pHandle->sortElapsed, pHandle->totalElapsed);
  } else {
H
Hongze Cheng 已提交
595 596
    qDebug("%s ordered source:%" PRIzu ", available buf:%d, no need internal sort", pHandle->idStr, numOfSources,
           pHandle->numOfPages);
597
  }
598

G
Ganlin Zhao 已提交
599 600
  int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize,
                                                blockDataGetSerialMetaSize(taosArrayGetSize(pHandle->pDataBlock->pDataBlock)));
wmmhello's avatar
wmmhello 已提交
601
  blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows);
602

603 604 605
  // the initial pass + sortPass + final mergePass
  pHandle->loops = sortPass + 2;

606
  size_t numOfSorted = taosArrayGetSize(pHandle->pOrderedSource);
H
Hongze Cheng 已提交
607
  for (int32_t t = 0; t < sortPass; ++t) {
608 609 610 611 612 613 614 615
    int64_t st = taosGetTimestampUs();

    SArray* pResList = taosArrayInit(4, POINTER_BYTES);

    int32_t numOfInputSources = pHandle->numOfPages;
    int32_t sortGroup = (numOfSorted + numOfInputSources - 1) / numOfInputSources;

    // Only *numOfInputSources* can be loaded into buffer to perform the external sort.
H
Hongze Cheng 已提交
616
    for (int32_t i = 0; i < sortGroup; ++i) {
617 618 619 620 621 622 623 624 625 626 627
      pHandle->sourceId += 1;

      int32_t end = (i + 1) * numOfInputSources - 1;
      if (end > numOfSorted - 1) {
        end = numOfSorted - 1;
      }

      pHandle->cmpParam.numOfSources = end - i * numOfInputSources + 1;

      int32_t code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, i * numOfInputSources, end, pHandle);
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
628
        taosArrayDestroy(pResList);
629 630 631
        return code;
      }

H
Hongze Cheng 已提交
632 633
      code =
          tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
634
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
635
        taosArrayDestroy(pResList);
636 637 638
        return code;
      }

639
      SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
640
      while (1) {
D
dapan1121 已提交
641 642 643 644 645
        if (tsortIsClosed(pHandle)) {
          code = terrno = TSDB_CODE_TSC_QUERY_CANCELLED;
          return code;
        }
        
wmmhello's avatar
wmmhello 已提交
646
        SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows);
647 648 649 650 651
        if (pDataBlock == NULL) {
          break;
        }

        int32_t pageId = -1;
H
Hongze Cheng 已提交
652
        void*   pPage = getNewBufPage(pHandle->pBuf, &pageId);
653
        if (pPage == NULL) {
H
Haojun Liao 已提交
654 655
          taosArrayDestroy(pResList);
          taosArrayDestroy(pPageIdList);
656 657 658
          return terrno;
        }

659 660
        taosArrayPush(pPageIdList, &pageId);

H
Hongze Cheng 已提交
661 662
        int32_t size =
            blockDataGetSize(pDataBlock) + sizeof(int32_t) + taosArrayGetSize(pDataBlock->pDataBlock) * sizeof(int32_t);
H
Haojun Liao 已提交
663
        ASSERT(size <= getBufPageSize(pHandle->pBuf));
664

665
        blockDataToBuf(pPage, pDataBlock);
666 667 668 669

        setBufPageDirty(pPage, true);
        releaseBufPage(pHandle->pBuf, pPage);

670
        blockDataCleanup(pDataBlock);
671 672
      }

673
      sortComparCleanup(&pHandle->cmpParam);
D
dapan1121 已提交
674
      tMergeTreeDestroy(&pHandle->pMergeTree);
675 676
      pHandle->numOfCompletedSources = 0;

677
      SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false);
678
      code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList);
H
Haojun Liao 已提交
679 680
      if (code != TSDB_CODE_SUCCESS) {
        taosArrayDestroy(pResList);
681 682 683 684
        return code;
      }
    }

D
dapan1121 已提交
685
    tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
686 687 688 689 690 691 692 693 694
    taosArrayAddAll(pHandle->pOrderedSource, pResList);
    taosArrayDestroy(pResList);

    numOfSorted = taosArrayGetSize(pHandle->pOrderedSource);

    int64_t el = taosGetTimestampUs() - st;
    pHandle->totalElapsed += el;

    SDiskbasedBufStatis statis = getDBufStatis(pHandle->pBuf);
H
Hongze Cheng 已提交
695 696
    qDebug("%s %d round mergesort, elapsed:%" PRId64 " readDisk:%.2f Kb, flushDisk:%.2f Kb", pHandle->idStr, t + 1, el,
           statis.loadBytes / 1024.0, statis.flushBytes / 1024.0);
697

H
Haojun Liao 已提交
698 699
    if (pHandle->type == SORT_MULTISOURCE_MERGE) {
      pHandle->type = SORT_SINGLESOURCE_SORT;
700 701 702 703 704 705 706 707
      pHandle->comparFn = msortComparFn;
    }
  }

  pHandle->cmpParam.numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
  return 0;
}

708 709 710 711 712
// get sort page size
int32_t getProperSortPageSize(size_t rowSize, uint32_t numOfCols) {
  uint32_t pgSize = rowSize * 4 + blockDataGetSerialMetaSize(numOfCols);
  if (pgSize < DEFAULT_PAGESIZE) {
    return DEFAULT_PAGESIZE;
713 714 715 716 717
  }

  return pgSize;
}

718
static int32_t createInitialSources(SSortHandle* pHandle) {
719
  size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
720
  int32_t code = 0;
H
Haojun Liao 已提交
721 722

  if (pHandle->type == SORT_SINGLESOURCE_SORT) {
723
    SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
dengyihao's avatar
dengyihao 已提交
724
    SSortSource*  source = *pSource;
725
    *pSource = NULL;
dengyihao's avatar
dengyihao 已提交
726

D
dapan1121 已提交
727
    tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
728

729
    while (1) {
730
      SSDataBlock* pBlock = pHandle->fetchfp(source->param);
731 732 733 734
      if (pBlock == NULL) {
        break;
      }

735
      if (pHandle->pDataBlock == NULL) {
736 737
        uint32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
        pHandle->pageSize = getProperSortPageSize(blockDataGetRowSize(pBlock), numOfCols);
738 739

        // todo, number of pages are set according to the total available sort buffer
740 741
        pHandle->numOfPages = 1024;
        sortBufSize = pHandle->numOfPages * pHandle->pageSize;
742
        pHandle->pDataBlock = createOneDataBlock(pBlock, false);
743 744
      }

745 746 747
      if (pHandle->beforeFp != NULL) {
        pHandle->beforeFp(pBlock, pHandle->param);
      }
748

749 750
      code = blockDataMerge(pHandle->pDataBlock, pBlock);
      if (code != TSDB_CODE_SUCCESS) {
751 752 753
        if (source->param && !source->onlyRef) {
          taosMemoryFree(source->param);
        }
dengyihao's avatar
dengyihao 已提交
754 755 756 757
        if (!source->onlyRef && source->src.pBlock) {
          blockDataDestroy(source->src.pBlock);
          source->src.pBlock = NULL;
        }
758
        taosMemoryFree(source);
759 760
        return code;
      }
761

762 763 764 765
      size_t size = blockDataGetSize(pHandle->pDataBlock);
      if (size > sortBufSize) {
        // Perform the in-memory sort and then flush data in the buffer into disk.
        int64_t p = taosGetTimestampUs();
wmmhello's avatar
wmmhello 已提交
766 767
        code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
        if (code != 0) {
768 769 770
          if (source->param && !source->onlyRef) {
            taosMemoryFree(source->param);
          }
dengyihao's avatar
dengyihao 已提交
771 772 773 774
          if (!source->onlyRef && source->src.pBlock) {
            blockDataDestroy(source->src.pBlock);
            source->src.pBlock = NULL;
          }
775

776
          taosMemoryFree(source);
wmmhello's avatar
wmmhello 已提交
777 778
          return code;
        }
779

780 781
        int64_t el = taosGetTimestampUs() - p;
        pHandle->sortElapsed += el;
782

783 784 785 786
        code = doAddToBuf(pHandle->pDataBlock, pHandle);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
787 788 789
      }
    }

790 791 792
    if (source->param && !source->onlyRef) {
      taosMemoryFree(source->param);
    }
793

794 795
    taosMemoryFree(source);

796
    if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) {
797 798 799
      size_t size = blockDataGetSize(pHandle->pDataBlock);

      // Perform the in-memory sort and then flush data in the buffer into disk.
800 801
      int64_t p = taosGetTimestampUs();

802
      code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
wmmhello's avatar
wmmhello 已提交
803 804 805
      if (code != 0) {
        return code;
      }
806

807 808 809
      int64_t el = taosGetTimestampUs() - p;
      pHandle->sortElapsed += el;

810
      // All sorted data can fit in memory, external memory sort is not needed. Return to directly
811
      if (size <= sortBufSize && pHandle->pBuf == NULL) {
812 813 814
        pHandle->cmpParam.numOfSources = 1;
        pHandle->inMemSort = true;

815
        pHandle->loops = 1;
H
Hongze Cheng 已提交
816
        pHandle->tupleHandle.rowIndex = -1;
817 818 819
        pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
        return 0;
      } else {
820
        code = doAddToBuf(pHandle->pDataBlock, pHandle);
821 822
      }
    }
H
Haojun Liao 已提交
823 824
  }

825
  return code;
H
Haojun Liao 已提交
826 827
}

828
static bool tsortOpenForBufMergeSort(SSortHandle* pHandle) {
829
  int32_t code = createInitialSources(pHandle);
H
Haojun Liao 已提交
830 831
  if (code != TSDB_CODE_SUCCESS) {
    return code;
832 833 834
  }

  // do internal sort
H
Haojun Liao 已提交
835
  code = doInternalMergeSort(pHandle);
836 837 838 839 840
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  int32_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
H
Haojun Liao 已提交
841 842 843 844
  if (pHandle->pBuf != NULL) {
    ASSERT(numOfSources <= getNumOfInMemBufPages(pHandle->pBuf));
  }

H
Haojun Liao 已提交
845 846 847 848
  if (numOfSources == 0) {
    return 0;
  }

849 850 851 852 853
  code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, 0, numOfSources - 1, pHandle);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

854
  return tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
855 856
}

H
Haojun Liao 已提交
857
int32_t tsortClose(SSortHandle* pHandle) {
D
dapan1121 已提交
858 859
  atomic_val_compare_exchange_8(&pHandle->closed, 0, 1);
  taosMsleep(10);
860
  return TSDB_CODE_SUCCESS;
861 862
}

D
dapan1121 已提交
863 864 865 866 867 868 869 870
bool tsortIsClosed(SSortHandle* pHandle) {
  return atomic_val_compare_exchange_8(&pHandle->closed, 1, 2);
}

void tsortSetClosed(SSortHandle* pHandle) {
  atomic_store_8(&pHandle->closed, 2);
}

H
Hongze Cheng 已提交
871 872
int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*),
                               void* param) {
873 874 875
  pHandle->fetchfp = fetchFp;
  pHandle->beforeFp = fp;
  pHandle->param = param;
876
  return TSDB_CODE_SUCCESS;
877 878
}

H
Haojun Liao 已提交
879
int32_t tsortSetComparFp(SSortHandle* pHandle, _sort_merge_compar_fn_t fp) {
880
  pHandle->comparFn = fp;
881
  return TSDB_CODE_SUCCESS;
882 883
}

884 885 886 887 888
int32_t tsortSetCompareGroupId(SSortHandle* pHandle, bool compareGroupId) {
  pHandle->cmpParam.cmpGroupId = compareGroupId;
  return TSDB_CODE_SUCCESS;
}

889
static STupleHandle* tsortBufMergeSortNextTuple(SSortHandle* pHandle) {
D
dapan1121 已提交
890 891 892
  if (tsortIsClosed(pHandle)) {
    return NULL;
  }
893 894 895 896
  if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) {
    return NULL;
  }

897
  // All the data are hold in the buffer, no external sort is invoked.
898 899 900 901 902 903 904 905 906 907
  if (pHandle->inMemSort) {
    pHandle->tupleHandle.rowIndex += 1;
    if (pHandle->tupleHandle.rowIndex == pHandle->pDataBlock->info.rows) {
      pHandle->numOfCompletedSources = 1;
      return NULL;
    }

    return &pHandle->tupleHandle;
  }

H
Hongze Cheng 已提交
908 909
  int32_t      index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
  SSortSource* pSource = pHandle->cmpParam.pSources[index];
910 911 912 913 914 915 916 917 918

  if (pHandle->needAdjust) {
    int32_t code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return NULL;
    }
  }

H
Haojun Liao 已提交
919
  // all sources are completed.
920 921 922 923
  if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) {
    return NULL;
  }

924
  // Get the adjusted value after the loser tree is updated.
925 926 927
  index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
  pSource = pHandle->cmpParam.pSources[index];

928
  ASSERT(pSource->src.pBlock != NULL);
929 930 931 932 933 934 935 936 937 938

  pHandle->tupleHandle.rowIndex = pSource->src.rowIndex;
  pHandle->tupleHandle.pBlock = pSource->src.pBlock;

  pHandle->needAdjust = true;
  pSource->src.rowIndex += 1;

  return &pHandle->tupleHandle;
}

939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100
static bool tsortIsPQSortApplicable(SSortHandle* pHandle) {
  if (pHandle->type != SORT_SINGLESOURCE_SORT) return false;
  uint64_t maxRowsFitInMemory = pHandle->sortBufSize / (pHandle->maxTupleLength + sizeof(char*));
  return maxRowsFitInMemory > pHandle->maxRows;
}

static bool tsortPQCompFn(void* a, void* b, void* param) {
  SSortHandle* pHandle = param;
  int32_t res = pHandle->comparFn(a, b, param);
  if (res < 0) return 1;
  return 0;
}

static bool tsortPQComFnReverse(void*a, void* b, void* param) {
  SSortHandle* pHandle = param;
  int32_t res = pHandle->comparFn(a, b, param);
  if (res > 0) return 1;
  return 0;
}

static int32_t colDataComparFn(const void* pLeft, const void* pRight, void* param) {
  char* pLTuple = (char*)pLeft;
  char* pRTuple = (char*)pRight;
  SSortHandle* pHandle = (SSortHandle*)param;
  SArray* orderInfo = (SArray*)pHandle->pSortInfo;
  uint32_t colNum = blockDataGetNumOfCols(pHandle->pDataBlock);
  for (int32_t i = 0; i < orderInfo->size; ++i) {
    SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(orderInfo, i);
    void *lData = tupleGetField(pLTuple, pOrder->slotId, colNum);
    void *rData = tupleGetField(pRTuple, pOrder->slotId, colNum);
    if (!lData && !rData) continue;
    if (!lData) return pOrder->nullFirst ? -1 : 1;
    if (!rData) return pOrder->nullFirst ? 1 : -1;

    int           type = ((SColumnInfoData*)taosArrayGet(pHandle->pDataBlock->pDataBlock, pOrder->slotId))->info.type;
    __compar_fn_t fn = getKeyComparFunc(type, pOrder->order);

    int ret = fn(lData, rData);
    if (ret == 0) {
      continue;
    } else {
      return ret;
    }
  }
  return 0;
}

static int32_t tsortOpenForPQSort(SSortHandle* pHandle) {
  pHandle->pBoundedQueue = createBoundedQueue(pHandle->maxRows, tsortPQCompFn, destoryTuple, pHandle);
  if (NULL == pHandle->pBoundedQueue) return TSDB_CODE_OUT_OF_MEMORY;
  tsortSetComparFp(pHandle, colDataComparFn);

  SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
  SSortSource*  source = *pSource;

  pHandle->pDataBlock = NULL;
  uint32_t tupleLen = 0;
  PriorityQueueNode pqNode;
  while (1) {
    // fetch data
    SSDataBlock* pBlock = pHandle->fetchfp(source->param);
    if (NULL == pBlock) break;

    if (pHandle->beforeFp != NULL) {
      pHandle->beforeFp(pBlock, pHandle->param);
    }
    if (pHandle->pDataBlock == NULL) {
      pHandle->pDataBlock = createOneDataBlock(pBlock, false);
    }
    if (pHandle->pDataBlock == NULL) return TSDB_CODE_OUT_OF_MEMORY;

    size_t colNum = blockDataGetNumOfCols(pBlock);

    if (tupleLen == 0) {
      for (size_t colIdx = 0; colIdx < colNum; ++colIdx) {
        SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx);
        tupleLen += pCol->info.bytes;
        if (IS_VAR_DATA_TYPE(pCol->info.type)) {
          tupleLen += sizeof(VarDataLenT);
        }
      }
    }
    size_t colLen = 0;
    for (size_t rowIdx = 0; rowIdx < pBlock->info.rows; ++rowIdx) {
      void* pTuple = createTuple(colNum, tupleLen);
      if (pTuple == NULL) return TSDB_CODE_OUT_OF_MEMORY;

      uint32_t offset = tupleGetDataStartOffset(colNum);
      for (size_t colIdx = 0; colIdx < colNum; ++colIdx) {
        SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx);
        if (colDataIsNull_s(pCol, rowIdx)) {
          offset = tupleAddField((char**)&pTuple, colNum, offset, colIdx, 0, 0, true, tupleLen);
        } else {
          colLen = colDataGetRowLength(pCol, rowIdx);
          offset = tupleAddField((char**)&pTuple, colNum, offset, colIdx, colDataGetData(pCol, rowIdx), colLen, false,
                                 tupleLen);
        }
      }
      pqNode.data = pTuple;
      taosBQPush(pHandle->pBoundedQueue, &pqNode);
    }
  }
  return TSDB_CODE_SUCCESS;
}

static STupleHandle* tsortPQSortNextTuple(SSortHandle* pHandle) {
  blockDataCleanup(pHandle->pDataBlock);
  blockDataEnsureCapacity(pHandle->pDataBlock, 1);
  // abondan the top tuple if queue size bigger than max size
  if (taosBQSize(pHandle->pBoundedQueue) == taosBQMaxSize(pHandle->pBoundedQueue) + 1) {
    taosBQPop(pHandle->pBoundedQueue);
  }
  if (pHandle->tmpRowIdx == 0) {
    // sort the results
    taosBQSetFn(pHandle->pBoundedQueue, tsortPQComFnReverse);
    taosBQBuildHeap(pHandle->pBoundedQueue);
  }
  if (taosBQSize(pHandle->pBoundedQueue) > 0) {
    uint32_t           colNum = blockDataGetNumOfCols(pHandle->pDataBlock);
    PriorityQueueNode* node = taosBQTop(pHandle->pBoundedQueue);
    char*              pTuple = (char*)node->data;

    for (uint32_t i = 0; i < colNum; ++i) {
      void* pData = tupleGetField(pTuple, i, colNum);
      if (!pData) {
        colDataSetNULL(bdGetColumnInfoData(pHandle->pDataBlock, i), 0);
      } else {
        colDataAppend(bdGetColumnInfoData(pHandle->pDataBlock, i), 0, pData, false);
      }
    }
    pHandle->pDataBlock->info.rows++;
    pHandle->tmpRowIdx++;
    taosBQPop(pHandle->pBoundedQueue);
  }
  if (pHandle->pDataBlock->info.rows == 0) return NULL;
  pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
  return &pHandle->tupleHandle;
}

int32_t tsortOpen(SSortHandle* pHandle) {
  if (pHandle->opened) {
    return 0;
  }

  if (pHandle->fetchfp == NULL || pHandle->comparFn == NULL) {
    return -1;
  }

  pHandle->opened = true;
  if (tsortIsPQSortApplicable(pHandle))
    return tsortOpenForPQSort(pHandle);
  else
    return tsortOpenForBufMergeSort(pHandle);
}

STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
  if (pHandle->pBoundedQueue)
    return tsortPQSortNextTuple(pHandle);
  else
    return tsortBufMergeSortNextTuple(pHandle);
}

H
Haojun Liao 已提交
1101
bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colIndex) {
1102
  SColumnInfoData* pColInfoSrc = taosArrayGet(pVHandle->pBlock->pDataBlock, colIndex);
1103
  return colDataIsNull_s(pColInfoSrc, pVHandle->rowIndex);
1104 1105
}

H
Haojun Liao 已提交
1106
void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) {
1107
  SColumnInfoData* pColInfo = TARRAY_GET_ELEM(pVHandle->pBlock->pDataBlock, colIndex);
1108 1109 1110 1111 1112
  if (pColInfo->pData == NULL) {
    return NULL;
  } else {
    return colDataGetData(pColInfo, pVHandle->rowIndex);
  }
1113
}
1114

H
Haojun Liao 已提交
1115
uint64_t tsortGetGroupId(STupleHandle* pVHandle) { return pVHandle->pBlock->info.id.groupId; }
1116
void*    tsortGetBlockInfo(STupleHandle* pVHandle) { return &pVHandle->pBlock->info; }
S
slzhou 已提交
1117

1118 1119 1120
SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) {
  SSortExecInfo info = {0};

1121
  if (pHandle == NULL) {
dengyihao's avatar
dengyihao 已提交
1122 1123
    info.sortMethod = SORT_QSORT_T;  // by default
    info.sortBuffer = 2 * 1048576;   // 2mb by default
1124 1125 1126 1127 1128 1129 1130 1131 1132 1133
  } else {
    info.sortBuffer = pHandle->pageSize * pHandle->numOfPages;
    info.sortMethod = pHandle->inMemSort ? SORT_QSORT_T : SORT_SPILLED_MERGE_SORT_T;
    info.loops = pHandle->loops;

    if (pHandle->pBuf != NULL) {
      SDiskbasedBufStatis st = getDBufStatis(pHandle->pBuf);
      info.writeBytes = st.flushBytes;
      info.readBytes = st.loadBytes;
    }
1134 1135 1136 1137
  }

  return info;
}