tsort.c 38.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "query.h"
H
Hongze Cheng 已提交
17
#include "tcommon.h"
18

H
Hongze Cheng 已提交
19
#include "tcompare.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
S
Shengliang Guan 已提交
21
#include "tdef.h"
22
#include "theap.h"
23 24
#include "tlosertree.h"
#include "tpagedbuf.h"
H
Haojun Liao 已提交
25
#include "tsort.h"
26 27
#include "tutil.h"

28 29 30 31 32
struct STupleHandle {
  SSDataBlock* pBlock;
  int32_t      rowIndex;
};

H
Haojun Liao 已提交
33
struct SSortHandle {
H
Hongze Cheng 已提交
34 35 36 37
  int32_t        type;
  int32_t        pageSize;
  int32_t        numOfPages;
  SDiskbasedBuf* pBuf;
38 39 40 41 42 43
  SArray*        pSortInfo;
  SArray*        pOrderedSource;
  int32_t        loops;
  uint64_t       sortElapsed;
  int64_t        startTs;
  uint64_t       totalElapsed;
44

45 46 47
  uint64_t         maxRows;
  uint32_t         maxTupleLength;
  uint32_t         sortBufSize;
48
  bool             forceUsePQSort;
49 50 51
  BoundedQueue*    pBoundedQueue;
  uint32_t         tmpRowIdx;

52
  int32_t           sourceId;
H
Hongze Cheng 已提交
53
  SSDataBlock*      pDataBlock;
54 55 56
  SMsortComparParam cmpParam;
  int32_t           numOfCompletedSources;
  bool              opened;
D
dapan1121 已提交
57
  int8_t            closed;
H
Hongze Cheng 已提交
58
  const char*       idStr;
59 60 61
  bool              inMemSort;
  bool              needAdjust;
  STupleHandle      tupleHandle;
H
Hongze Cheng 已提交
62
  void*             param;
63
  void (*beforeFp)(SSDataBlock* pBlock, void* param);
64 65 66

  _sort_fetch_block_fn_t  fetchfp;
  _sort_merge_compar_fn_t comparFn;
H
Hongze Cheng 已提交
67
  SMultiwayMergeTreeInfo* pMergeTree;
H
Haojun Liao 已提交
68
};
69

H
Hongze Cheng 已提交
70
static int32_t msortComparFn(const void* pLeft, const void* pRight, void* param);
71

72 73 74 75 76
// | offset[0] | offset[1] |....| nullbitmap | data |...|
static void* createTuple(uint32_t columnNum, uint32_t tupleLen) {
  uint32_t totalLen = sizeof(uint32_t) * columnNum + BitmapLen(columnNum) + tupleLen;
  return taosMemoryCalloc(1, totalLen);
}
77
static void destoryAllocatedTuple(void* t) { taosMemoryFree(t); }
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110

#define tupleOffset(tuple, colIdx) ((uint32_t*)(tuple + sizeof(uint32_t) * colIdx))
#define tupleSetOffset(tuple, colIdx, offset) (*tupleOffset(tuple, colIdx) = offset)
#define tupleSetNull(tuple, colIdx, colNum) colDataSetNull_f((char*)tuple + sizeof(uint32_t) * colNum, colIdx)
#define tupleColIsNull(tuple, colIdx, colNum) colDataIsNull_f((char*)tuple + sizeof(uint32_t) * colNum, colIdx)
#define tupleGetDataStartOffset(colNum) (sizeof(uint32_t) * colNum + BitmapLen(colNum))
#define tupleSetData(tuple, offset, data, length) memcpy(tuple + offset, data, length)

/**
 * @param t the tuple pointer addr, if realloced, *t is changed to the new addr
 * @param offset copy data into pTuple start from offset
 * @param colIndex the columnIndex, for setting null bitmap
 * @return the next offset to add field
 * */
static inline size_t tupleAddField(char** t, uint32_t colNum, uint32_t offset, uint32_t colIdx, void* data, size_t length,
                            bool isNull, uint32_t tupleLen) {
  tupleSetOffset(*t, colIdx, offset);
  if (isNull) {
    tupleSetNull(*t, colIdx, colNum);
  } else {
    if (offset + length > tupleLen + tupleGetDataStartOffset(colNum)) {
      *t = taosMemoryRealloc(*t, offset + length);
    }
    tupleSetData(*t, offset, data, length);
  }
  return offset + length;
}

static void* tupleGetField(char* t, uint32_t colIdx, uint32_t colNum) {
  if (tupleColIsNull(t, colIdx, colNum)) return NULL;
  return t + *tupleOffset(t, colIdx);
}

111 112
SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle) {
  return createOneDataBlock(pSortHandle->pDataBlock, false);
113 114
}

115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
#define AllocatedTupleType 0
#define ReferencedTupleType 1 // tuple references to one row in pDataBlock
typedef struct TupleDesc {
  uint8_t type;
  char*   data; // if type is AllocatedTuple, then points to the created tuple, otherwise points to the DataBlock
} TupleDesc;

typedef struct ReferencedTuple {
  TupleDesc desc;
  size_t    rowIndex;
} ReferencedTuple;

static TupleDesc* createAllocatedTuple(SSDataBlock* pBlock, size_t colNum, uint32_t tupleLen, size_t rowIdx) {
  TupleDesc* t = taosMemoryCalloc(1, sizeof(TupleDesc));
  void*      pTuple = createTuple(colNum, tupleLen);
  if (!pTuple) {
    taosMemoryFree(t);
    return NULL;
  }
  size_t   colLen = 0;
  uint32_t offset = tupleGetDataStartOffset(colNum);
  for (size_t colIdx = 0; colIdx < colNum; ++colIdx) {
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx);
    if (colDataIsNull_s(pCol, rowIdx)) {
      offset = tupleAddField((char**)&pTuple, colNum, offset, colIdx, 0, 0, true, tupleLen);
    } else {
      colLen = colDataGetRowLength(pCol, rowIdx);
      offset =
          tupleAddField((char**)&pTuple, colNum, offset, colIdx, colDataGetData(pCol, rowIdx), colLen, false, tupleLen);
    }
  }
  t->type = AllocatedTupleType;
  t->data = pTuple;
  return t;
}

void* tupleDescGetField(const TupleDesc* pDesc, int32_t colIdx, uint32_t colNum) {
  if (pDesc->type == ReferencedTupleType) {
    ReferencedTuple* pRefTuple = (ReferencedTuple*)pDesc;
    SColumnInfoData* pCol = taosArrayGet(((SSDataBlock*)pDesc->data)->pDataBlock, colIdx);
    if (colDataIsNull_s(pCol, pRefTuple->rowIndex)) return NULL;
    return colDataGetData(pCol, pRefTuple->rowIndex);
  } else {
    return tupleGetField(pDesc->data, colIdx, colNum);
  }
}

void destroyTuple(void* t) {
  TupleDesc* pDesc = t;
  if (pDesc->type == AllocatedTupleType) {
    destoryAllocatedTuple(pDesc->data);
    taosMemoryFree(pDesc);
  }
}

170 171 172 173 174
/**
 *
 * @param type
 * @return
 */
H
Hongze Cheng 已提交
175
SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t numOfPages,
176 177
                                   SSDataBlock* pBlock, const char* idstr, uint64_t maxRows, uint32_t maxTupleLength,
                                   uint32_t sortBufSize) {
wafwerar's avatar
wafwerar 已提交
178
  SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle));
179

H
Hongze Cheng 已提交
180 181
  pSortHandle->type = type;
  pSortHandle->pageSize = pageSize;
182
  pSortHandle->numOfPages = numOfPages;
H
Hongze Cheng 已提交
183 184
  pSortHandle->pSortInfo = pSortInfo;
  pSortHandle->loops = 0;
185

186
  pSortHandle->maxTupleLength = maxTupleLength;
187
  if (maxRows != 0) {
188
    pSortHandle->sortBufSize = sortBufSize;
189 190 191
    pSortHandle->maxRows = maxRows;
  }
  pSortHandle->forceUsePQSort = false;
192

193 194 195
  if (pBlock != NULL) {
    pSortHandle->pDataBlock = createOneDataBlock(pBlock, false);
  }
H
Haojun Liao 已提交
196

H
Hongze Cheng 已提交
197
  pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES);
H
Haojun Liao 已提交
198
  pSortHandle->cmpParam.orderInfo = pSortInfo;
199
  pSortHandle->cmpParam.cmpGroupId = false;
200

H
Haojun Liao 已提交
201
  tsortSetComparFp(pSortHandle, msortComparFn);
202 203

  if (idstr != NULL) {
204
    pSortHandle->idStr = taosStrdup(idstr);
205 206 207 208 209
  }

  return pSortHandle;
}

210
static int32_t sortComparCleanup(SMsortComparParam* cmpParam) {
211
  // NOTICE: pSource may be, if it is SORT_MULTISOURCE_MERGE
H
Hongze Cheng 已提交
212
  for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
213
    SSortSource* pSource = cmpParam->pSources[i];
wmmhello's avatar
wmmhello 已提交
214
    blockDataDestroy(pSource->src.pBlock);
S
slzhou 已提交
215 216 217
    if (pSource->pageIdList) {
      taosArrayDestroy(pSource->pageIdList);
    }
wmmhello's avatar
wmmhello 已提交
218
    taosMemoryFreeClear(pSource);
S
slzhou 已提交
219
    cmpParam->pSources[i] = NULL;
wmmhello's avatar
wmmhello 已提交
220 221 222 223 224 225
  }

  cmpParam->numOfSources = 0;
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
226
void tsortClearOrderdSource(SArray* pOrderedSource, int64_t *fetchUs, int64_t *fetchNum) {
227 228 229 230 231
  for (size_t i = 0; i < taosArrayGetSize(pOrderedSource); i++) {
    SSortSource** pSource = taosArrayGet(pOrderedSource, i);
    if (NULL == *pSource) {
      continue;
    }
D
dapan1121 已提交
232 233 234 235 236 237

    if (fetchUs) {
      *fetchUs += (*pSource)->fetchUs;
      *fetchNum += (*pSource)->fetchNum;
    }
    
238 239 240
    // release pageIdList
    if ((*pSource)->pageIdList) {
      taosArrayDestroy((*pSource)->pageIdList);
S
slzhou 已提交
241
      (*pSource)->pageIdList = NULL;
242
    }
243
    if ((*pSource)->param && !(*pSource)->onlyRef) {
244
      taosMemoryFree((*pSource)->param);
S
slzhou 已提交
245
      (*pSource)->param = NULL;
246
    }
dengyihao's avatar
dengyihao 已提交
247 248

    if (!(*pSource)->onlyRef && (*pSource)->src.pBlock) {
dengyihao's avatar
dengyihao 已提交
249 250
      blockDataDestroy((*pSource)->src.pBlock);
      (*pSource)->src.pBlock = NULL;
dengyihao's avatar
dengyihao 已提交
251
    }
dengyihao's avatar
dengyihao 已提交
252

253 254 255 256 257 258
    taosMemoryFreeClear(*pSource);
  }

  taosArrayClear(pOrderedSource);
}

H
Haojun Liao 已提交
259
void tsortDestroySortHandle(SSortHandle* pSortHandle) {
260 261 262
  if (pSortHandle == NULL) {
    return;
  }
H
Haojun Liao 已提交
263
  tsortClose(pSortHandle);
264
  if (pSortHandle->pMergeTree != NULL) {
D
dapan1121 已提交
265
    tMergeTreeDestroy(&pSortHandle->pMergeTree);
266 267
  }

H
Haojun Liao 已提交
268
  destroyDiskbasedBuf(pSortHandle->pBuf);
wafwerar's avatar
wafwerar 已提交
269
  taosMemoryFreeClear(pSortHandle->idStr);
wmmhello's avatar
wmmhello 已提交
270
  blockDataDestroy(pSortHandle->pDataBlock);
271
  if (pSortHandle->pBoundedQueue) destroyBoundedQueue(pSortHandle->pBoundedQueue);
272

D
dapan1121 已提交
273 274
  int64_t fetchUs = 0, fetchNum = 0;
  tsortClearOrderdSource(pSortHandle->pOrderedSource, &fetchUs, &fetchNum);
275
  qDebug("all source fetch time: %" PRId64 "us num:%" PRId64 " %s", fetchUs, fetchNum, pSortHandle->idStr);
D
dapan1121 已提交
276
  
wmmhello's avatar
wmmhello 已提交
277
  taosArrayDestroy(pSortHandle->pOrderedSource);
wafwerar's avatar
wafwerar 已提交
278
  taosMemoryFreeClear(pSortHandle);
279 280
}

H
Haojun Liao 已提交
281
int32_t tsortAddSource(SSortHandle* pSortHandle, void* pSource) {
H
Haojun Liao 已提交
282
  taosArrayPush(pSortHandle->pOrderedSource, &pSource);
283
  return TSDB_CODE_SUCCESS;
284 285
}

H
Hongze Cheng 已提交
286 287
static int32_t doAddNewExternalMemSource(SDiskbasedBuf* pBuf, SArray* pAllSources, SSDataBlock* pBlock,
                                         int32_t* sourceId, SArray* pPageIdList) {
wmmhello's avatar
wmmhello 已提交
288
  SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource));
289
  if (pSource == NULL) {
H
Haojun Liao 已提交
290
    taosArrayDestroy(pPageIdList);
S
Shengliang Guan 已提交
291
    return TSDB_CODE_OUT_OF_MEMORY;
292 293 294
  }

  pSource->src.pBlock = pBlock;
295
  pSource->pageIdList = pPageIdList;
296 297 298 299 300
  taosArrayPush(pAllSources, &pSource);

  (*sourceId) += 1;

  int32_t rowSize = blockDataGetSerialRowSize(pSource->src.pBlock);
301 302

  // The value of numOfRows must be greater than 0, which is guaranteed by the previous memory allocation
H
Hongze Cheng 已提交
303 304
  int32_t numOfRows =
      (getBufPageSize(pBuf) - blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock))) / rowSize;
wmmhello's avatar
wmmhello 已提交
305
  ASSERT(numOfRows > 0);
H
Haojun Liao 已提交
306

307 308 309 310 311 312 313
  return blockDataEnsureCapacity(pSource->src.pBlock, numOfRows);
}

static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
  int32_t start = 0;

  if (pHandle->pBuf == NULL) {
wafwerar's avatar
wafwerar 已提交
314
    if (!osTempSpaceAvailable()) {
315 316
      terrno = TSDB_CODE_NO_DISKSPACE;
      qError("Add to buf failed since %s, tempDir:%s", terrstr(), tsTempDir);
wafwerar's avatar
wafwerar 已提交
317 318
      return terrno;
    }
319

H
Hongze Cheng 已提交
320
    int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize,
321
                                      "sortExternalBuf", tsTempDir);
H
Haojun Liao 已提交
322
    dBufSetPrintInfo(pHandle->pBuf);
323 324 325 326 327
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
  }

328
  SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
H
Hongze Cheng 已提交
329
  while (start < pDataBlock->info.rows) {
330
    int32_t stop = 0;
331
    blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pHandle->pageSize);
332 333
    SSDataBlock* p = blockDataExtractBlock(pDataBlock, start, stop - start + 1);
    if (p == NULL) {
H
Haojun Liao 已提交
334
      taosArrayDestroy(pPageIdList);
335 336 337 338
      return terrno;
    }

    int32_t pageId = -1;
H
Hongze Cheng 已提交
339
    void*   pPage = getNewBufPage(pHandle->pBuf, &pageId);
340
    if (pPage == NULL) {
H
Haojun Liao 已提交
341
      taosArrayDestroy(pPageIdList);
wmmhello's avatar
wmmhello 已提交
342
      blockDataDestroy(p);
343 344 345
      return terrno;
    }

346 347
    taosArrayPush(pPageIdList, &pageId);

H
Hongze Cheng 已提交
348
    int32_t size = blockDataGetSize(p) + sizeof(int32_t) + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t);
H
Haojun Liao 已提交
349
    ASSERT(size <= getBufPageSize(pHandle->pBuf));
350

351
    blockDataToBuf(pPage, p);
352 353 354 355 356 357 358 359

    setBufPageDirty(pPage, true);
    releaseBufPage(pHandle->pBuf, pPage);

    blockDataDestroy(p);
    start = stop + 1;
  }

360
  blockDataCleanup(pDataBlock);
361

362
  SSDataBlock* pBlock = createOneDataBlock(pDataBlock, false);
363
  return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId, pPageIdList);
364 365
}

366
static void setCurrentSourceDone(SSortSource* pSource, SSortHandle* pHandle) {
367 368 369 370
  pSource->src.rowIndex = -1;
  ++pHandle->numOfCompletedSources;
}

371
static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32_t startIndex, int32_t endIndex,
H
Hongze Cheng 已提交
372
                              SSortHandle* pHandle) {
373 374
  pParam->pSources = taosArrayGet(pSources, startIndex);
  pParam->numOfSources = (endIndex - startIndex + 1);
375 376 377

  int32_t code = 0;

H
Haojun Liao 已提交
378 379 380
  // multi-pass internal merge sort is required
  if (pHandle->pBuf == NULL) {
    if (!osTempSpaceAvailable()) {
381 382
      code = terrno = TSDB_CODE_NO_DISKSPACE;
      qError("Sort compare init failed since %s, tempDir:%s, idStr:%s", terrstr(), tsTempDir, pHandle->idStr);
H
Haojun Liao 已提交
383 384 385 386 387 388 389
      return code;
    }

    code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize,
                              "sortComparInit", tsTempDir);
    dBufSetPrintInfo(pHandle->pBuf);
    if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
390
      terrno = code;
H
Haojun Liao 已提交
391 392 393 394
      return code;
    }
  }

H
Haojun Liao 已提交
395
  if (pHandle->type == SORT_SINGLESOURCE_SORT) {
396 397
    for (int32_t i = 0; i < pParam->numOfSources; ++i) {
      SSortSource* pSource = pParam->pSources[i];
398

399
      // set current source is done
400
      if (taosArrayGetSize(pSource->pageIdList) == 0) {
401
        setCurrentSourceDone(pSource, pHandle);
402
        continue;
403 404
      }

D
dapan1121 已提交
405
      int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex);
406

D
dapan1121 已提交
407
      void* pPage = getBufPage(pHandle->pBuf, *pPgId);
408 409 410 411
      if (NULL == pPage) {
        return terrno;
      }
      
412
      code = blockDataFromBuf(pSource->src.pBlock, pPage);
413
      if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
414
        terrno = code;
415 416 417 418 419 420
        return code;
      }

      releaseBufPage(pHandle->pBuf, pPage);
    }
  } else {
421 422 423 424 425
    qDebug("start init for the multiway merge sort, %s", pHandle->idStr);
    int64_t st = taosGetTimestampUs();

    for (int32_t i = 0; i < pParam->numOfSources; ++i) {
      SSortSource* pSource = pParam->pSources[i];
426
      pSource->src.pBlock = pHandle->fetchfp(pSource->param);
427

428
      // set current source is done
429
      if (pSource->src.pBlock == NULL) {
430
        setCurrentSourceDone(pSource, pHandle);
431
      }
432
    }
433 434

    int64_t et = taosGetTimestampUs();
435
    qDebug("init for merge sort completed, elapsed time:%.2f ms, %s", (et - st) / 1000.0, pHandle->idStr);
436 437 438 439 440
  }

  return code;
}

H
Hongze Cheng 已提交
441
static void appendOneRowToDataBlock(SSDataBlock* pBlock, const SSDataBlock* pSource, int32_t* rowIndex) {
442
  for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
443 444 445
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);

    SColumnInfoData* pSrcColInfo = taosArrayGet(pSource->pDataBlock, i);
H
Hongze Cheng 已提交
446
    bool             isNull = colDataIsNull(pSrcColInfo, pSource->info.rows, *rowIndex, NULL);
447 448

    if (isNull) {
449
      colDataSetVal(pColInfo, pBlock->info.rows, NULL, true);
450
    } else {
451
      char* pData = colDataGetData(pSrcColInfo, *rowIndex);
452
      colDataSetVal(pColInfo, pBlock->info.rows, pData, false);
453 454 455 456 457 458 459
    }
  }

  pBlock->info.rows += 1;
  *rowIndex += 1;
}

H
Hongze Cheng 已提交
460 461
static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeTreeInfo* pTree, SSortHandle* pHandle,
                                           int32_t* numOfCompleted) {
462 463 464 465 466 467 468
  /*
   * load a new SDataBlock into memory of a given intermediate data-set source,
   * since it's last record in buffer has been chosen to be processed, as the winner of loser-tree
   */
  if (pSource->src.rowIndex >= pSource->src.pBlock->info.rows) {
    pSource->src.rowIndex = 0;

469
    if (pHandle->type == SORT_SINGLESOURCE_SORT) {
H
Hongze Cheng 已提交
470
      pSource->pageIndex++;
471 472 473 474 475 476
      if (pSource->pageIndex >= taosArrayGetSize(pSource->pageIdList)) {
        (*numOfCompleted) += 1;
        pSource->src.rowIndex = -1;
        pSource->pageIndex = -1;
        pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock);
      } else {
D
dapan1121 已提交
477
        int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex);
478

H
Hongze Cheng 已提交
479
        void*   pPage = getBufPage(pHandle->pBuf, *pPgId);
480 481 482 483 484
        if (pPage == NULL) {
          qError("failed to get buffer, code:%s", tstrerror(terrno));
          return terrno;
        }

H
Hongze Cheng 已提交
485
        int32_t code = blockDataFromBuf(pSource->src.pBlock, pPage);
486 487 488 489 490
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

        releaseBufPage(pHandle->pBuf, pPage);
491 492
      }
    } else {
D
dapan1121 已提交
493
      int64_t st = taosGetTimestampUs();      
wmmhello's avatar
wmmhello 已提交
494
      pSource->src.pBlock = pHandle->fetchfp(((SSortSource*)pSource)->param);
D
dapan1121 已提交
495 496
      pSource->fetchUs += taosGetTimestampUs() - st;
      pSource->fetchNum++;
497 498 499
      if (pSource->src.pBlock == NULL) {
        (*numOfCompleted) += 1;
        pSource->src.rowIndex = -1;
500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520
      }
    }
  }

  /*
   * Adjust loser tree otherwise, according to new candidate data
   * if the loser tree is rebuild completed, we do not need to adjust
   */
  int32_t leafNodeIndex = tMergeTreeGetAdjustIndex(pTree);

#ifdef _DEBUG_VIEW
  printf("before adjust:\t");
  tMergeTreePrint(pTree);
#endif

  tMergeTreeAdjust(pTree, leafNodeIndex);

#ifdef _DEBUG_VIEW
  printf("\nafter adjust:\t");
  tMergeTreePrint(pTree);
#endif
521
  return TSDB_CODE_SUCCESS;
522 523
}

wmmhello's avatar
wmmhello 已提交
524
static SSDataBlock* getSortedBlockDataInner(SSortHandle* pHandle, SMsortComparParam* cmpParam, int32_t capacity) {
525
  blockDataCleanup(pHandle->pDataBlock);
526

H
Hongze Cheng 已提交
527
  while (1) {
528 529 530 531 532 533
    if (cmpParam->numOfSources == pHandle->numOfCompletedSources) {
      break;
    }

    int32_t index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);

H
Hongze Cheng 已提交
534
    SSortSource* pSource = (*cmpParam).pSources[index];
535 536 537 538 539 540 541 542 543 544 545 546 547
    appendOneRowToDataBlock(pHandle->pDataBlock, pSource->src.pBlock, &pSource->src.rowIndex);

    int32_t code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return NULL;
    }

    if (pHandle->pDataBlock->info.rows >= capacity) {
      return pHandle->pDataBlock;
    }
  }

H
Hongze Cheng 已提交
548
  return (pHandle->pDataBlock->info.rows > 0) ? pHandle->pDataBlock : NULL;
549 550
}

H
Hongze Cheng 已提交
551 552 553
int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
  int32_t pLeftIdx = *(int32_t*)pLeft;
  int32_t pRightIdx = *(int32_t*)pRight;
554

H
Hongze Cheng 已提交
555
  SMsortComparParam* pParam = (SMsortComparParam*)param;
556

H
Hongze Cheng 已提交
557
  SArray* pInfo = pParam->orderInfo;
558

H
Hongze Cheng 已提交
559
  SSortSource* pLeftSource = pParam->pSources[pLeftIdx];
wmmhello's avatar
wmmhello 已提交
560
  SSortSource* pRightSource = pParam->pSources[pRightIdx];
561 562 563 564 565 566 567 568 569 570 571 572 573

  // this input is exhausted, set the special value to denote this
  if (pLeftSource->src.rowIndex == -1) {
    return 1;
  }

  if (pRightSource->src.rowIndex == -1) {
    return -1;
  }

  SSDataBlock* pLeftBlock = pLeftSource->src.pBlock;
  SSDataBlock* pRightBlock = pRightSource->src.pBlock;

574
  if (pParam->cmpGroupId) {
H
Haojun Liao 已提交
575 576
    if (pLeftBlock->info.id.groupId != pRightBlock->info.id.groupId) {
      return pLeftBlock->info.id.groupId < pRightBlock->info.id.groupId ? -1 : 1;
577
    }
578 579
  }

H
Hongze Cheng 已提交
580
  for (int32_t i = 0; i < pInfo->size; ++i) {
581
    SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
H
Haojun Liao 已提交
582
    SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
583

H
Hongze Cheng 已提交
584
    bool leftNull = false;
585
    if (pLeftColInfoData->hasNull) {
586 587 588
      if (pLeftBlock->pBlockAgg == NULL) {
        leftNull = colDataIsNull_s(pLeftColInfoData, pLeftSource->src.rowIndex);
      } else {
H
Hongze Cheng 已提交
589 590
        leftNull =
            colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg[i]);
591
      }
592 593
    }

H
Haojun Liao 已提交
594
    SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
H
Hongze Cheng 已提交
595
    bool             rightNull = false;
596
    if (pRightColInfoData->hasNull) {
H
Haojun Liao 已提交
597
      if (pRightBlock->pBlockAgg == NULL) {
598 599
        rightNull = colDataIsNull_s(pRightColInfoData, pRightSource->src.rowIndex);
      } else {
H
Hongze Cheng 已提交
600 601
        rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex,
                                  pRightBlock->pBlockAgg[i]);
602
      }
603 604 605
    }

    if (leftNull && rightNull) {
H
Hongze Cheng 已提交
606
      continue;  // continue to next slot
607 608 609
    }

    if (rightNull) {
H
Hongze Cheng 已提交
610
      return pOrder->nullFirst ? 1 : -1;
611 612 613
    }

    if (leftNull) {
H
Hongze Cheng 已提交
614
      return pOrder->nullFirst ? -1 : 1;
615 616
    }

H
Hongze Cheng 已提交
617
    void* left1 = colDataGetData(pLeftColInfoData, pLeftSource->src.rowIndex);
618
    void* right1 = colDataGetData(pRightColInfoData, pRightSource->src.rowIndex);
619

620
    __compar_fn_t fn = getKeyComparFunc(pLeftColInfoData->info.type, pOrder->order);
621

622 623 624 625 626
    int ret = fn(left1, right1);
    if (ret == 0) {
      continue;
    } else {
      return ret;
627 628
    }
  }
629
  return 0;
630 631 632 633
}

static int32_t doInternalMergeSort(SSortHandle* pHandle) {
  size_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
H
Haojun Liao 已提交
634 635 636
  if (numOfSources == 0) {
    return 0;
  }
637 638 639 640 641

  // Calculate the I/O counts to complete the data sort.
  double sortPass = floorl(log2(numOfSources) / log2(pHandle->numOfPages));

  pHandle->totalElapsed = taosGetTimestampUs() - pHandle->startTs;
642 643 644 645 646 647 648

  if (sortPass > 0) {
    size_t s = pHandle->pBuf ? getTotalBufSize(pHandle->pBuf) : 0;
    qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%" PRIzu
           ", sort elapsed:%" PRId64 ", total elapsed:%" PRId64,
           pHandle->idStr, (int32_t)(sortPass + 1), s, pHandle->sortElapsed, pHandle->totalElapsed);
  } else {
H
Hongze Cheng 已提交
649 650
    qDebug("%s ordered source:%" PRIzu ", available buf:%d, no need internal sort", pHandle->idStr, numOfSources,
           pHandle->numOfPages);
651
  }
652

G
Ganlin Zhao 已提交
653 654
  int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize,
                                                blockDataGetSerialMetaSize(taosArrayGetSize(pHandle->pDataBlock->pDataBlock)));
wmmhello's avatar
wmmhello 已提交
655
  blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows);
656

657 658 659
  // the initial pass + sortPass + final mergePass
  pHandle->loops = sortPass + 2;

660
  size_t numOfSorted = taosArrayGetSize(pHandle->pOrderedSource);
H
Hongze Cheng 已提交
661
  for (int32_t t = 0; t < sortPass; ++t) {
662 663 664 665 666 667 668 669
    int64_t st = taosGetTimestampUs();

    SArray* pResList = taosArrayInit(4, POINTER_BYTES);

    int32_t numOfInputSources = pHandle->numOfPages;
    int32_t sortGroup = (numOfSorted + numOfInputSources - 1) / numOfInputSources;

    // Only *numOfInputSources* can be loaded into buffer to perform the external sort.
H
Hongze Cheng 已提交
670
    for (int32_t i = 0; i < sortGroup; ++i) {
671 672 673 674 675 676 677 678 679 680 681
      pHandle->sourceId += 1;

      int32_t end = (i + 1) * numOfInputSources - 1;
      if (end > numOfSorted - 1) {
        end = numOfSorted - 1;
      }

      pHandle->cmpParam.numOfSources = end - i * numOfInputSources + 1;

      int32_t code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, i * numOfInputSources, end, pHandle);
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
682
        taosArrayDestroy(pResList);
683 684 685
        return code;
      }

H
Hongze Cheng 已提交
686 687
      code =
          tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
688
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
689
        taosArrayDestroy(pResList);
690 691 692
        return code;
      }

693
      SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
694
      while (1) {
D
dapan1121 已提交
695 696 697 698 699
        if (tsortIsClosed(pHandle)) {
          code = terrno = TSDB_CODE_TSC_QUERY_CANCELLED;
          return code;
        }
        
wmmhello's avatar
wmmhello 已提交
700
        SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows);
701 702 703 704 705
        if (pDataBlock == NULL) {
          break;
        }

        int32_t pageId = -1;
H
Hongze Cheng 已提交
706
        void*   pPage = getNewBufPage(pHandle->pBuf, &pageId);
707
        if (pPage == NULL) {
H
Haojun Liao 已提交
708 709
          taosArrayDestroy(pResList);
          taosArrayDestroy(pPageIdList);
710 711 712
          return terrno;
        }

713 714
        taosArrayPush(pPageIdList, &pageId);

H
Hongze Cheng 已提交
715 716
        int32_t size =
            blockDataGetSize(pDataBlock) + sizeof(int32_t) + taosArrayGetSize(pDataBlock->pDataBlock) * sizeof(int32_t);
H
Haojun Liao 已提交
717
        ASSERT(size <= getBufPageSize(pHandle->pBuf));
718

719
        blockDataToBuf(pPage, pDataBlock);
720 721 722 723

        setBufPageDirty(pPage, true);
        releaseBufPage(pHandle->pBuf, pPage);

724
        blockDataCleanup(pDataBlock);
725 726
      }

727
      sortComparCleanup(&pHandle->cmpParam);
D
dapan1121 已提交
728
      tMergeTreeDestroy(&pHandle->pMergeTree);
729 730
      pHandle->numOfCompletedSources = 0;

731
      SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false);
732
      code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList);
H
Haojun Liao 已提交
733 734
      if (code != TSDB_CODE_SUCCESS) {
        taosArrayDestroy(pResList);
735 736 737 738
        return code;
      }
    }

D
dapan1121 已提交
739
    tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
740 741 742 743 744 745 746 747 748
    taosArrayAddAll(pHandle->pOrderedSource, pResList);
    taosArrayDestroy(pResList);

    numOfSorted = taosArrayGetSize(pHandle->pOrderedSource);

    int64_t el = taosGetTimestampUs() - st;
    pHandle->totalElapsed += el;

    SDiskbasedBufStatis statis = getDBufStatis(pHandle->pBuf);
H
Hongze Cheng 已提交
749 750
    qDebug("%s %d round mergesort, elapsed:%" PRId64 " readDisk:%.2f Kb, flushDisk:%.2f Kb", pHandle->idStr, t + 1, el,
           statis.loadBytes / 1024.0, statis.flushBytes / 1024.0);
751

H
Haojun Liao 已提交
752 753
    if (pHandle->type == SORT_MULTISOURCE_MERGE) {
      pHandle->type = SORT_SINGLESOURCE_SORT;
754 755 756 757 758 759 760 761
      pHandle->comparFn = msortComparFn;
    }
  }

  pHandle->cmpParam.numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
  return 0;
}

762 763 764 765 766
// get sort page size
int32_t getProperSortPageSize(size_t rowSize, uint32_t numOfCols) {
  uint32_t pgSize = rowSize * 4 + blockDataGetSerialMetaSize(numOfCols);
  if (pgSize < DEFAULT_PAGESIZE) {
    return DEFAULT_PAGESIZE;
767 768 769 770 771
  }

  return pgSize;
}

772
static int32_t createInitialSources(SSortHandle* pHandle) {
773
  size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
774
  int32_t code = 0;
H
Haojun Liao 已提交
775 776

  if (pHandle->type == SORT_SINGLESOURCE_SORT) {
777
    SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
dengyihao's avatar
dengyihao 已提交
778
    SSortSource*  source = *pSource;
779
    *pSource = NULL;
dengyihao's avatar
dengyihao 已提交
780

D
dapan1121 已提交
781
    tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
782

783
    while (1) {
784
      SSDataBlock* pBlock = pHandle->fetchfp(source->param);
785 786 787 788
      if (pBlock == NULL) {
        break;
      }

789
      if (pHandle->pDataBlock == NULL) {
790 791
        uint32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
        pHandle->pageSize = getProperSortPageSize(blockDataGetRowSize(pBlock), numOfCols);
792 793

        // todo, number of pages are set according to the total available sort buffer
794 795
        pHandle->numOfPages = 1024;
        sortBufSize = pHandle->numOfPages * pHandle->pageSize;
796
        pHandle->pDataBlock = createOneDataBlock(pBlock, false);
797 798
      }

799 800 801
      if (pHandle->beforeFp != NULL) {
        pHandle->beforeFp(pBlock, pHandle->param);
      }
802

803 804
      code = blockDataMerge(pHandle->pDataBlock, pBlock);
      if (code != TSDB_CODE_SUCCESS) {
805 806 807
        if (source->param && !source->onlyRef) {
          taosMemoryFree(source->param);
        }
dengyihao's avatar
dengyihao 已提交
808 809 810 811
        if (!source->onlyRef && source->src.pBlock) {
          blockDataDestroy(source->src.pBlock);
          source->src.pBlock = NULL;
        }
812
        taosMemoryFree(source);
813 814
        return code;
      }
815

816 817 818 819
      size_t size = blockDataGetSize(pHandle->pDataBlock);
      if (size > sortBufSize) {
        // Perform the in-memory sort and then flush data in the buffer into disk.
        int64_t p = taosGetTimestampUs();
wmmhello's avatar
wmmhello 已提交
820 821
        code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
        if (code != 0) {
822 823 824
          if (source->param && !source->onlyRef) {
            taosMemoryFree(source->param);
          }
dengyihao's avatar
dengyihao 已提交
825 826 827 828
          if (!source->onlyRef && source->src.pBlock) {
            blockDataDestroy(source->src.pBlock);
            source->src.pBlock = NULL;
          }
829

830
          taosMemoryFree(source);
wmmhello's avatar
wmmhello 已提交
831 832
          return code;
        }
833

834 835
        int64_t el = taosGetTimestampUs() - p;
        pHandle->sortElapsed += el;
836
        if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows);
837 838 839 840
        code = doAddToBuf(pHandle->pDataBlock, pHandle);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
841 842 843
      }
    }

844 845 846
    if (source->param && !source->onlyRef) {
      taosMemoryFree(source->param);
    }
847

848 849
    taosMemoryFree(source);

850
    if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) {
851 852 853
      size_t size = blockDataGetSize(pHandle->pDataBlock);

      // Perform the in-memory sort and then flush data in the buffer into disk.
854 855
      int64_t p = taosGetTimestampUs();

856
      code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
wmmhello's avatar
wmmhello 已提交
857 858 859
      if (code != 0) {
        return code;
      }
860

861
      if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows);
862 863 864
      int64_t el = taosGetTimestampUs() - p;
      pHandle->sortElapsed += el;

865
      // All sorted data can fit in memory, external memory sort is not needed. Return to directly
866
      if (size <= sortBufSize && pHandle->pBuf == NULL) {
867 868 869
        pHandle->cmpParam.numOfSources = 1;
        pHandle->inMemSort = true;

870
        pHandle->loops = 1;
H
Hongze Cheng 已提交
871
        pHandle->tupleHandle.rowIndex = -1;
872 873 874
        pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
        return 0;
      } else {
875
        code = doAddToBuf(pHandle->pDataBlock, pHandle);
876 877
      }
    }
H
Haojun Liao 已提交
878 879
  }

880
  return code;
H
Haojun Liao 已提交
881 882
}

883
static bool tsortOpenForBufMergeSort(SSortHandle* pHandle) {
884
  int32_t code = createInitialSources(pHandle);
H
Haojun Liao 已提交
885 886
  if (code != TSDB_CODE_SUCCESS) {
    return code;
887 888 889
  }

  // do internal sort
H
Haojun Liao 已提交
890
  code = doInternalMergeSort(pHandle);
891 892 893 894 895
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  int32_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
H
Haojun Liao 已提交
896 897 898 899
  if (pHandle->pBuf != NULL) {
    ASSERT(numOfSources <= getNumOfInMemBufPages(pHandle->pBuf));
  }

H
Haojun Liao 已提交
900 901 902 903
  if (numOfSources == 0) {
    return 0;
  }

904 905 906 907 908
  code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, 0, numOfSources - 1, pHandle);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

909
  return tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
910 911
}

H
Haojun Liao 已提交
912
int32_t tsortClose(SSortHandle* pHandle) {
D
dapan1121 已提交
913 914
  atomic_val_compare_exchange_8(&pHandle->closed, 0, 1);
  taosMsleep(10);
915
  return TSDB_CODE_SUCCESS;
916 917
}

D
dapan1121 已提交
918 919 920 921 922 923 924 925
bool tsortIsClosed(SSortHandle* pHandle) {
  return atomic_val_compare_exchange_8(&pHandle->closed, 1, 2);
}

void tsortSetClosed(SSortHandle* pHandle) {
  atomic_store_8(&pHandle->closed, 2);
}

H
Hongze Cheng 已提交
926 927
int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*),
                               void* param) {
928 929 930
  pHandle->fetchfp = fetchFp;
  pHandle->beforeFp = fp;
  pHandle->param = param;
931
  return TSDB_CODE_SUCCESS;
932 933
}

H
Haojun Liao 已提交
934
int32_t tsortSetComparFp(SSortHandle* pHandle, _sort_merge_compar_fn_t fp) {
935
  pHandle->comparFn = fp;
936
  return TSDB_CODE_SUCCESS;
937 938
}

939 940 941 942 943
int32_t tsortSetCompareGroupId(SSortHandle* pHandle, bool compareGroupId) {
  pHandle->cmpParam.cmpGroupId = compareGroupId;
  return TSDB_CODE_SUCCESS;
}

944
static STupleHandle* tsortBufMergeSortNextTuple(SSortHandle* pHandle) {
D
dapan1121 已提交
945 946 947
  if (tsortIsClosed(pHandle)) {
    return NULL;
  }
948 949 950 951
  if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) {
    return NULL;
  }

952
  // All the data are hold in the buffer, no external sort is invoked.
953 954 955 956 957 958 959 960 961 962
  if (pHandle->inMemSort) {
    pHandle->tupleHandle.rowIndex += 1;
    if (pHandle->tupleHandle.rowIndex == pHandle->pDataBlock->info.rows) {
      pHandle->numOfCompletedSources = 1;
      return NULL;
    }

    return &pHandle->tupleHandle;
  }

H
Hongze Cheng 已提交
963 964
  int32_t      index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
  SSortSource* pSource = pHandle->cmpParam.pSources[index];
965 966 967 968 969 970 971 972 973

  if (pHandle->needAdjust) {
    int32_t code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return NULL;
    }
  }

H
Haojun Liao 已提交
974
  // all sources are completed.
975 976 977 978
  if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) {
    return NULL;
  }

979
  // Get the adjusted value after the loser tree is updated.
980 981 982
  index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
  pSource = pHandle->cmpParam.pSources[index];

983
  ASSERT(pSource->src.pBlock != NULL);
984 985 986 987 988 989 990 991 992 993

  pHandle->tupleHandle.rowIndex = pSource->src.rowIndex;
  pHandle->tupleHandle.pBlock = pSource->src.pBlock;

  pHandle->needAdjust = true;
  pSource->src.rowIndex += 1;

  return &pHandle->tupleHandle;
}

994 995 996 997 998 999 1000 1001
static bool tsortIsForceUsePQSort(SSortHandle* pHandle) {
  return pHandle->forceUsePQSort == true;
}

void tsortSetForceUsePQSort(SSortHandle* pHandle) {
  pHandle->forceUsePQSort = true;
}

1002 1003
static bool tsortIsPQSortApplicable(SSortHandle* pHandle) {
  if (pHandle->type != SORT_SINGLESOURCE_SORT) return false;
1004
  if (tsortIsForceUsePQSort(pHandle)) return true;
1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022
  uint64_t maxRowsFitInMemory = pHandle->sortBufSize / (pHandle->maxTupleLength + sizeof(char*));
  return maxRowsFitInMemory > pHandle->maxRows;
}

static bool tsortPQCompFn(void* a, void* b, void* param) {
  SSortHandle* pHandle = param;
  int32_t res = pHandle->comparFn(a, b, param);
  if (res < 0) return 1;
  return 0;
}

static bool tsortPQComFnReverse(void*a, void* b, void* param) {
  SSortHandle* pHandle = param;
  int32_t res = pHandle->comparFn(a, b, param);
  if (res > 0) return 1;
  return 0;
}

1023 1024 1025 1026
static int32_t tupleComparFn(const void* pLeft, const void* pRight, void* param) {
  TupleDesc* pLeftDesc = (TupleDesc*)pLeft;
  TupleDesc* pRightDesc = (TupleDesc*)pRight;

1027 1028 1029 1030 1031
  SSortHandle* pHandle = (SSortHandle*)param;
  SArray* orderInfo = (SArray*)pHandle->pSortInfo;
  uint32_t colNum = blockDataGetNumOfCols(pHandle->pDataBlock);
  for (int32_t i = 0; i < orderInfo->size; ++i) {
    SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(orderInfo, i);
1032 1033
    void *lData = tupleDescGetField(pLeftDesc, pOrder->slotId, colNum);
    void *rData = tupleDescGetField(pRightDesc, pOrder->slotId, colNum);
1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051
    if (!lData && !rData) continue;
    if (!lData) return pOrder->nullFirst ? -1 : 1;
    if (!rData) return pOrder->nullFirst ? 1 : -1;

    int           type = ((SColumnInfoData*)taosArrayGet(pHandle->pDataBlock->pDataBlock, pOrder->slotId))->info.type;
    __compar_fn_t fn = getKeyComparFunc(type, pOrder->order);

    int ret = fn(lData, rData);
    if (ret == 0) {
      continue;
    } else {
      return ret;
    }
  }
  return 0;
}

static int32_t tsortOpenForPQSort(SSortHandle* pHandle) {
1052
  pHandle->pBoundedQueue = createBoundedQueue(pHandle->maxRows, tsortPQCompFn, destroyTuple, pHandle);
1053
  if (NULL == pHandle->pBoundedQueue) return TSDB_CODE_OUT_OF_MEMORY;
1054
  tsortSetComparFp(pHandle, tupleComparFn);
1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085

  SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
  SSortSource*  source = *pSource;

  pHandle->pDataBlock = NULL;
  uint32_t tupleLen = 0;
  PriorityQueueNode pqNode;
  while (1) {
    // fetch data
    SSDataBlock* pBlock = pHandle->fetchfp(source->param);
    if (NULL == pBlock) break;

    if (pHandle->beforeFp != NULL) {
      pHandle->beforeFp(pBlock, pHandle->param);
    }
    if (pHandle->pDataBlock == NULL) {
      pHandle->pDataBlock = createOneDataBlock(pBlock, false);
    }
    if (pHandle->pDataBlock == NULL) return TSDB_CODE_OUT_OF_MEMORY;

    size_t colNum = blockDataGetNumOfCols(pBlock);

    if (tupleLen == 0) {
      for (size_t colIdx = 0; colIdx < colNum; ++colIdx) {
        SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx);
        tupleLen += pCol->info.bytes;
        if (IS_VAR_DATA_TYPE(pCol->info.type)) {
          tupleLen += sizeof(VarDataLenT);
        }
      }
    }
1086
    ReferencedTuple refTuple = {.desc.data = (char*)pBlock, .desc.type = ReferencedTupleType, .rowIndex = 0};
1087
    for (size_t rowIdx = 0; rowIdx < pBlock->info.rows; ++rowIdx) {
1088 1089 1090 1091 1092 1093 1094 1095
      refTuple.rowIndex = rowIdx;
      pqNode.data = &refTuple;
      PriorityQueueNode* pPushedNode = taosBQPush(pHandle->pBoundedQueue, &pqNode);
      if (!pPushedNode) {
        // do nothing if push failed
      } else {
        pPushedNode->data = createAllocatedTuple(pBlock, colNum, tupleLen, rowIdx);
        if (pPushedNode->data == NULL) return TSDB_CODE_OUT_OF_MEMORY;
1096 1097 1098 1099 1100 1101 1102 1103 1104
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

static STupleHandle* tsortPQSortNextTuple(SSortHandle* pHandle) {
  blockDataCleanup(pHandle->pDataBlock);
  blockDataEnsureCapacity(pHandle->pDataBlock, 1);
1105
  // abandon the top tuple if queue size bigger than max size
1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116
  if (taosBQSize(pHandle->pBoundedQueue) == taosBQMaxSize(pHandle->pBoundedQueue) + 1) {
    taosBQPop(pHandle->pBoundedQueue);
  }
  if (pHandle->tmpRowIdx == 0) {
    // sort the results
    taosBQSetFn(pHandle->pBoundedQueue, tsortPQComFnReverse);
    taosBQBuildHeap(pHandle->pBoundedQueue);
  }
  if (taosBQSize(pHandle->pBoundedQueue) > 0) {
    uint32_t           colNum = blockDataGetNumOfCols(pHandle->pDataBlock);
    PriorityQueueNode* node = taosBQTop(pHandle->pBoundedQueue);
1117
    char*              pTuple = ((TupleDesc*)node->data)->data;
1118 1119 1120 1121 1122 1123

    for (uint32_t i = 0; i < colNum; ++i) {
      void* pData = tupleGetField(pTuple, i, colNum);
      if (!pData) {
        colDataSetNULL(bdGetColumnInfoData(pHandle->pDataBlock, i), 0);
      } else {
H
Haojun Liao 已提交
1124
        colDataSetVal(bdGetColumnInfoData(pHandle->pDataBlock, i), 0, pData, false);
1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158
      }
    }
    pHandle->pDataBlock->info.rows++;
    pHandle->tmpRowIdx++;
    taosBQPop(pHandle->pBoundedQueue);
  }
  if (pHandle->pDataBlock->info.rows == 0) return NULL;
  pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
  return &pHandle->tupleHandle;
}

int32_t tsortOpen(SSortHandle* pHandle) {
  if (pHandle->opened) {
    return 0;
  }

  if (pHandle->fetchfp == NULL || pHandle->comparFn == NULL) {
    return -1;
  }

  pHandle->opened = true;
  if (tsortIsPQSortApplicable(pHandle))
    return tsortOpenForPQSort(pHandle);
  else
    return tsortOpenForBufMergeSort(pHandle);
}

STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
  if (pHandle->pBoundedQueue)
    return tsortPQSortNextTuple(pHandle);
  else
    return tsortBufMergeSortNextTuple(pHandle);
}

H
Haojun Liao 已提交
1159
bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colIndex) {
1160
  SColumnInfoData* pColInfoSrc = taosArrayGet(pVHandle->pBlock->pDataBlock, colIndex);
1161
  return colDataIsNull_s(pColInfoSrc, pVHandle->rowIndex);
1162 1163
}

H
Haojun Liao 已提交
1164
void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) {
1165
  SColumnInfoData* pColInfo = TARRAY_GET_ELEM(pVHandle->pBlock->pDataBlock, colIndex);
1166 1167 1168 1169 1170
  if (pColInfo->pData == NULL) {
    return NULL;
  } else {
    return colDataGetData(pColInfo, pVHandle->rowIndex);
  }
1171
}
1172

H
Haojun Liao 已提交
1173
uint64_t tsortGetGroupId(STupleHandle* pVHandle) { return pVHandle->pBlock->info.id.groupId; }
1174
void*    tsortGetBlockInfo(STupleHandle* pVHandle) { return &pVHandle->pBlock->info; }
S
slzhou 已提交
1175

1176 1177 1178
SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) {
  SSortExecInfo info = {0};

1179
  if (pHandle == NULL) {
dengyihao's avatar
dengyihao 已提交
1180 1181
    info.sortMethod = SORT_QSORT_T;  // by default
    info.sortBuffer = 2 * 1048576;   // 2mb by default
1182 1183 1184 1185 1186 1187 1188 1189 1190 1191
  } else {
    info.sortBuffer = pHandle->pageSize * pHandle->numOfPages;
    info.sortMethod = pHandle->inMemSort ? SORT_QSORT_T : SORT_SPILLED_MERGE_SORT_T;
    info.loops = pHandle->loops;

    if (pHandle->pBuf != NULL) {
      SDiskbasedBufStatis st = getDBufStatis(pHandle->pBuf);
      info.writeBytes = st.flushBytes;
      info.readBytes = st.loadBytes;
    }
1192 1193 1194 1195
  }

  return info;
}