tsort.c 24.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "query.h"
H
Hongze Cheng 已提交
17
#include "tcommon.h"
18

H
Hongze Cheng 已提交
19
#include "tcompare.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
S
Shengliang Guan 已提交
21
#include "tdef.h"
22 23
#include "tlosertree.h"
#include "tpagedbuf.h"
H
Haojun Liao 已提交
24
#include "tsort.h"
25 26
#include "tutil.h"

27 28 29 30 31
struct STupleHandle {
  SSDataBlock* pBlock;
  int32_t      rowIndex;
};

H
Haojun Liao 已提交
32
struct SSortHandle {
H
Hongze Cheng 已提交
33 34 35 36
  int32_t        type;
  int32_t        pageSize;
  int32_t        numOfPages;
  SDiskbasedBuf* pBuf;
37

H
Hongze Cheng 已提交
38 39
  SArray* pSortInfo;
  SArray* pOrderedSource;
40

H
Hongze Cheng 已提交
41 42 43 44
  int32_t  loops;
  uint64_t sortElapsed;
  int64_t  startTs;
  uint64_t totalElapsed;
45 46

  int32_t           sourceId;
H
Hongze Cheng 已提交
47
  SSDataBlock*      pDataBlock;
48 49 50
  SMsortComparParam cmpParam;
  int32_t           numOfCompletedSources;
  bool              opened;
H
Hongze Cheng 已提交
51
  const char*       idStr;
52 53 54
  bool              inMemSort;
  bool              needAdjust;
  STupleHandle      tupleHandle;
H
Hongze Cheng 已提交
55
  void*             param;
56
  void (*beforeFp)(SSDataBlock* pBlock, void* param);
57 58 59

  _sort_fetch_block_fn_t  fetchfp;
  _sort_merge_compar_fn_t comparFn;
H
Hongze Cheng 已提交
60
  SMultiwayMergeTreeInfo* pMergeTree;
H
Haojun Liao 已提交
61
};
62

H
Hongze Cheng 已提交
63
static int32_t msortComparFn(const void* pLeft, const void* pRight, void* param);
64

65 66
SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle) {
  return createOneDataBlock(pSortHandle->pDataBlock, false);
67 68 69 70 71 72 73
}

/**
 *
 * @param type
 * @return
 */
H
Hongze Cheng 已提交
74 75
SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t numOfPages,
                                   SSDataBlock* pBlock, const char* idstr) {
wafwerar's avatar
wafwerar 已提交
76
  SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle));
77

H
Hongze Cheng 已提交
78 79
  pSortHandle->type = type;
  pSortHandle->pageSize = pageSize;
80
  pSortHandle->numOfPages = numOfPages;
H
Hongze Cheng 已提交
81 82
  pSortHandle->pSortInfo = pSortInfo;
  pSortHandle->loops = 0;
83 84 85 86

  if (pBlock != NULL) {
    pSortHandle->pDataBlock = createOneDataBlock(pBlock, false);
  }
H
Haojun Liao 已提交
87

H
Hongze Cheng 已提交
88
  pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES);
H
Haojun Liao 已提交
89
  pSortHandle->cmpParam.orderInfo = pSortInfo;
90
  pSortHandle->cmpParam.cmpGroupId = false;
91

H
Haojun Liao 已提交
92
  tsortSetComparFp(pSortHandle, msortComparFn);
93 94

  if (idstr != NULL) {
95
    pSortHandle->idStr = strdup(idstr);
96 97 98 99 100
  }

  return pSortHandle;
}

101
static int32_t sortComparCleanup(SMsortComparParam* cmpParam) {
H
Hongze Cheng 已提交
102 103 104
  for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
    SSortSource* pSource =
        cmpParam->pSources[i];  // NOTICE: pSource may be SGenericSource *, if it is SORT_MULTISOURCE_MERGE
wmmhello's avatar
wmmhello 已提交
105 106 107 108 109 110 111 112
    blockDataDestroy(pSource->src.pBlock);
    taosMemoryFreeClear(pSource);
  }

  cmpParam->numOfSources = 0;
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
113
void tsortDestroySortHandle(SSortHandle* pSortHandle) {
114 115 116 117
  if (pSortHandle == NULL) {
    return;
  }

H
Haojun Liao 已提交
118
  tsortClose(pSortHandle);
119 120 121 122
  if (pSortHandle->pMergeTree != NULL) {
    tMergeTreeDestroy(pSortHandle->pMergeTree);
  }

H
Haojun Liao 已提交
123
  destroyDiskbasedBuf(pSortHandle->pBuf);
wafwerar's avatar
wafwerar 已提交
124
  taosMemoryFreeClear(pSortHandle->idStr);
wmmhello's avatar
wmmhello 已提交
125
  blockDataDestroy(pSortHandle->pDataBlock);
H
Hongze Cheng 已提交
126
  for (size_t i = 0; i < taosArrayGetSize(pSortHandle->pOrderedSource); i++) {
wmmhello's avatar
wmmhello 已提交
127
    SSortSource** pSource = taosArrayGet(pSortHandle->pOrderedSource, i);
wmmhello's avatar
wmmhello 已提交
128
    taosMemoryFreeClear(*pSource);
wmmhello's avatar
wmmhello 已提交
129 130
  }
  taosArrayDestroy(pSortHandle->pOrderedSource);
wafwerar's avatar
wafwerar 已提交
131
  taosMemoryFreeClear(pSortHandle);
132 133
}

H
Haojun Liao 已提交
134
int32_t tsortAddSource(SSortHandle* pSortHandle, void* pSource) {
H
Haojun Liao 已提交
135
  taosArrayPush(pSortHandle->pOrderedSource, &pSource);
136
  return TSDB_CODE_SUCCESS;
137 138
}

H
Hongze Cheng 已提交
139 140
static int32_t doAddNewExternalMemSource(SDiskbasedBuf* pBuf, SArray* pAllSources, SSDataBlock* pBlock,
                                         int32_t* sourceId, SArray* pPageIdList) {
wmmhello's avatar
wmmhello 已提交
141
  SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource));
142 143 144 145 146
  if (pSource == NULL) {
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }

  pSource->src.pBlock = pBlock;
147
  pSource->pageIdList = pPageIdList;
148 149 150 151 152
  taosArrayPush(pAllSources, &pSource);

  (*sourceId) += 1;

  int32_t rowSize = blockDataGetSerialRowSize(pSource->src.pBlock);
153 154

  // The value of numOfRows must be greater than 0, which is guaranteed by the previous memory allocation
H
Hongze Cheng 已提交
155 156
  int32_t numOfRows =
      (getBufPageSize(pBuf) - blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock))) / rowSize;
wmmhello's avatar
wmmhello 已提交
157
  ASSERT(numOfRows > 0);
158 159 160 161 162 163 164
  return blockDataEnsureCapacity(pSource->src.pBlock, numOfRows);
}

static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
  int32_t start = 0;

  if (pHandle->pBuf == NULL) {
wafwerar's avatar
wafwerar 已提交
165 166 167 168 169
    if (!osTempSpaceAvailable()) {
      terrno = TSDB_CODE_NO_AVAIL_DISK;
      qError("Add to buf failed since %s", terrstr(terrno));
      return terrno;
    }
H
Hongze Cheng 已提交
170 171
    int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize,
                                      "doAddToBuf", tsTempDir);
H
Haojun Liao 已提交
172
    dBufSetPrintInfo(pHandle->pBuf);
173 174 175 176 177
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
  }

178
  SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
H
Hongze Cheng 已提交
179
  while (start < pDataBlock->info.rows) {
180
    int32_t stop = 0;
181
    blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pHandle->pageSize);
182 183
    SSDataBlock* p = blockDataExtractBlock(pDataBlock, start, stop - start + 1);
    if (p == NULL) {
H
Haojun Liao 已提交
184
      taosArrayDestroy(pPageIdList);
185 186 187 188
      return terrno;
    }

    int32_t pageId = -1;
H
Hongze Cheng 已提交
189
    void*   pPage = getNewBufPage(pHandle->pBuf, &pageId);
190
    if (pPage == NULL) {
wmmhello's avatar
wmmhello 已提交
191
      blockDataDestroy(p);
192 193 194
      return terrno;
    }

195 196
    taosArrayPush(pPageIdList, &pageId);

H
Hongze Cheng 已提交
197
    int32_t size = blockDataGetSize(p) + sizeof(int32_t) + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t);
198 199
    assert(size <= getBufPageSize(pHandle->pBuf));

200
    blockDataToBuf(pPage, p);
201 202 203 204 205 206 207 208

    setBufPageDirty(pPage, true);
    releaseBufPage(pHandle->pBuf, pPage);

    blockDataDestroy(p);
    start = stop + 1;
  }

209
  blockDataCleanup(pDataBlock);
210

211
  SSDataBlock* pBlock = createOneDataBlock(pDataBlock, false);
212
  return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId, pPageIdList);
213 214
}

215 216 217 218 219
static void setCurrentSourceIsDone(SSortSource* pSource, SSortHandle* pHandle) {
  pSource->src.rowIndex = -1;
  ++pHandle->numOfCompletedSources;
}

H
Hongze Cheng 已提交
220 221 222
static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int32_t startIndex, int32_t endIndex,
                              SSortHandle* pHandle) {
  cmpParam->pSources = taosArrayGet(pSources, startIndex);
223 224 225 226
  cmpParam->numOfSources = (endIndex - startIndex + 1);

  int32_t code = 0;

H
Haojun Liao 已提交
227
  if (pHandle->type == SORT_SINGLESOURCE_SORT) {
228
    for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
wmmhello's avatar
wmmhello 已提交
229
      SSortSource* pSource = cmpParam->pSources[i];
230

231
      // set current source is done
232
      if (taosArrayGetSize(pSource->pageIdList) == 0) {
233 234
        setCurrentSourceIsDone(pSource, pHandle);
        continue;
235 236
      }

D
dapan1121 已提交
237
      int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex);
238

D
dapan1121 已提交
239
      void* pPage = getBufPage(pHandle->pBuf, *pPgId);
240
      code = blockDataFromBuf(pSource->src.pBlock, pPage);
241 242 243 244 245 246 247 248 249
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

      releaseBufPage(pHandle->pBuf, pPage);
    }
  } else {
    // multi-pass internal merge sort is required
    if (pHandle->pBuf == NULL) {
wafwerar's avatar
wafwerar 已提交
250 251 252 253 254 255
      if (!osTempSpaceAvailable()) {
        terrno = TSDB_CODE_NO_AVAIL_DISK;
        code = terrno;
        qError("Sort compare init failed since %s", terrstr(terrno));
        return code;
      }
H
Hongze Cheng 已提交
256 257
      code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize,
                                "sortComparInit", tsTempDir);
H
Haojun Liao 已提交
258
      dBufSetPrintInfo(pHandle->pBuf);
259 260 261 262 263 264
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
    }

    for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
wmmhello's avatar
wmmhello 已提交
265
      SSortSource* pSource = cmpParam->pSources[i];
266
      pSource->src.pBlock = pHandle->fetchfp(pSource->param);
267

268
      // set current source is done
269
      if (pSource->src.pBlock == NULL) {
270
        setCurrentSourceIsDone(pSource, pHandle);
271
      }
272 273 274 275 276 277
    }
  }

  return code;
}

H
Hongze Cheng 已提交
278
static void appendOneRowToDataBlock(SSDataBlock* pBlock, const SSDataBlock* pSource, int32_t* rowIndex) {
279
  for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
280 281 282
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);

    SColumnInfoData* pSrcColInfo = taosArrayGet(pSource->pDataBlock, i);
H
Hongze Cheng 已提交
283
    bool             isNull = colDataIsNull(pSrcColInfo, pSource->info.rows, *rowIndex, NULL);
284 285 286 287

    if (isNull) {
      colDataAppend(pColInfo, pBlock->info.rows, NULL, true);
    } else {
288
      char* pData = colDataGetData(pSrcColInfo, *rowIndex);
289 290 291 292 293 294 295 296
      colDataAppend(pColInfo, pBlock->info.rows, pData, false);
    }
  }

  pBlock->info.rows += 1;
  *rowIndex += 1;
}

H
Hongze Cheng 已提交
297 298
static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeTreeInfo* pTree, SSortHandle* pHandle,
                                           int32_t* numOfCompleted) {
299 300 301 302 303 304 305
  /*
   * load a new SDataBlock into memory of a given intermediate data-set source,
   * since it's last record in buffer has been chosen to be processed, as the winner of loser-tree
   */
  if (pSource->src.rowIndex >= pSource->src.pBlock->info.rows) {
    pSource->src.rowIndex = 0;

306
    if (pHandle->type == SORT_SINGLESOURCE_SORT) {
H
Hongze Cheng 已提交
307
      pSource->pageIndex++;
308 309 310 311 312 313
      if (pSource->pageIndex >= taosArrayGetSize(pSource->pageIdList)) {
        (*numOfCompleted) += 1;
        pSource->src.rowIndex = -1;
        pSource->pageIndex = -1;
        pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock);
      } else {
D
dapan1121 已提交
314
        int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex);
315

H
Hongze Cheng 已提交
316 317
        void*   pPage = getBufPage(pHandle->pBuf, *pPgId);
        int32_t code = blockDataFromBuf(pSource->src.pBlock, pPage);
318 319 320 321 322
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

        releaseBufPage(pHandle->pBuf, pPage);
323 324
      }
    } else {
wmmhello's avatar
wmmhello 已提交
325
      pSource->src.pBlock = pHandle->fetchfp(((SSortSource*)pSource)->param);
326 327 328
      if (pSource->src.pBlock == NULL) {
        (*numOfCompleted) += 1;
        pSource->src.rowIndex = -1;
329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349
      }
    }
  }

  /*
   * Adjust loser tree otherwise, according to new candidate data
   * if the loser tree is rebuild completed, we do not need to adjust
   */
  int32_t leafNodeIndex = tMergeTreeGetAdjustIndex(pTree);

#ifdef _DEBUG_VIEW
  printf("before adjust:\t");
  tMergeTreePrint(pTree);
#endif

  tMergeTreeAdjust(pTree, leafNodeIndex);

#ifdef _DEBUG_VIEW
  printf("\nafter adjust:\t");
  tMergeTreePrint(pTree);
#endif
350
  return TSDB_CODE_SUCCESS;
351 352
}

wmmhello's avatar
wmmhello 已提交
353
static SSDataBlock* getSortedBlockDataInner(SSortHandle* pHandle, SMsortComparParam* cmpParam, int32_t capacity) {
354
  blockDataCleanup(pHandle->pDataBlock);
355

H
Hongze Cheng 已提交
356
  while (1) {
357 358 359 360 361 362
    if (cmpParam->numOfSources == pHandle->numOfCompletedSources) {
      break;
    }

    int32_t index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);

H
Hongze Cheng 已提交
363
    SSortSource* pSource = (*cmpParam).pSources[index];
364 365 366 367 368 369 370 371 372 373 374 375 376
    appendOneRowToDataBlock(pHandle->pDataBlock, pSource->src.pBlock, &pSource->src.rowIndex);

    int32_t code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return NULL;
    }

    if (pHandle->pDataBlock->info.rows >= capacity) {
      return pHandle->pDataBlock;
    }
  }

H
Hongze Cheng 已提交
377
  return (pHandle->pDataBlock->info.rows > 0) ? pHandle->pDataBlock : NULL;
378 379
}

H
Hongze Cheng 已提交
380 381 382
int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
  int32_t pLeftIdx = *(int32_t*)pLeft;
  int32_t pRightIdx = *(int32_t*)pRight;
383

H
Hongze Cheng 已提交
384
  SMsortComparParam* pParam = (SMsortComparParam*)param;
385

H
Hongze Cheng 已提交
386
  SArray* pInfo = pParam->orderInfo;
387

H
Hongze Cheng 已提交
388
  SSortSource* pLeftSource = pParam->pSources[pLeftIdx];
wmmhello's avatar
wmmhello 已提交
389
  SSortSource* pRightSource = pParam->pSources[pRightIdx];
390 391 392 393 394 395 396 397 398 399 400 401 402

  // this input is exhausted, set the special value to denote this
  if (pLeftSource->src.rowIndex == -1) {
    return 1;
  }

  if (pRightSource->src.rowIndex == -1) {
    return -1;
  }

  SSDataBlock* pLeftBlock = pLeftSource->src.pBlock;
  SSDataBlock* pRightBlock = pRightSource->src.pBlock;

403 404 405 406
  if (pParam->cmpGroupId) {
    if (pLeftBlock->info.groupId != pRightBlock->info.groupId) {
      return pLeftBlock->info.groupId < pRightBlock->info.groupId ? -1 : 1;
    }
407 408
  }

H
Hongze Cheng 已提交
409
  for (int32_t i = 0; i < pInfo->size; ++i) {
410
    SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
H
Haojun Liao 已提交
411
    SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
412

H
Hongze Cheng 已提交
413
    bool leftNull = false;
414
    if (pLeftColInfoData->hasNull) {
415 416 417
      if (pLeftBlock->pBlockAgg == NULL) {
        leftNull = colDataIsNull_s(pLeftColInfoData, pLeftSource->src.rowIndex);
      } else {
H
Hongze Cheng 已提交
418 419
        leftNull =
            colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg[i]);
420
      }
421 422
    }

H
Haojun Liao 已提交
423
    SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
H
Hongze Cheng 已提交
424
    bool             rightNull = false;
425
    if (pRightColInfoData->hasNull) {
H
Haojun Liao 已提交
426
      if (pRightBlock->pBlockAgg == NULL) {
427 428
        rightNull = colDataIsNull_s(pRightColInfoData, pRightSource->src.rowIndex);
      } else {
H
Hongze Cheng 已提交
429 430
        rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex,
                                  pRightBlock->pBlockAgg[i]);
431
      }
432 433 434
    }

    if (leftNull && rightNull) {
H
Hongze Cheng 已提交
435
      continue;  // continue to next slot
436 437 438
    }

    if (rightNull) {
H
Hongze Cheng 已提交
439
      return pOrder->nullFirst ? 1 : -1;
440 441 442
    }

    if (leftNull) {
H
Hongze Cheng 已提交
443
      return pOrder->nullFirst ? -1 : 1;
444 445
    }

H
Hongze Cheng 已提交
446
    void* left1 = colDataGetData(pLeftColInfoData, pLeftSource->src.rowIndex);
447
    void* right1 = colDataGetData(pRightColInfoData, pRightSource->src.rowIndex);
448

449
    __compar_fn_t fn = getKeyComparFunc(pLeftColInfoData->info.type, pOrder->order);
450

451 452 453 454 455
    int ret = fn(left1, right1);
    if (ret == 0) {
      continue;
    } else {
      return ret;
456 457
    }
  }
458
  return 0;
459 460 461 462
}

static int32_t doInternalMergeSort(SSortHandle* pHandle) {
  size_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
H
Haojun Liao 已提交
463 464 465
  if (numOfSources == 0) {
    return 0;
  }
466 467 468 469 470

  // Calculate the I/O counts to complete the data sort.
  double sortPass = floorl(log2(numOfSources) / log2(pHandle->numOfPages));

  pHandle->totalElapsed = taosGetTimestampUs() - pHandle->startTs;
471 472 473 474 475 476 477

  if (sortPass > 0) {
    size_t s = pHandle->pBuf ? getTotalBufSize(pHandle->pBuf) : 0;
    qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%" PRIzu
           ", sort elapsed:%" PRId64 ", total elapsed:%" PRId64,
           pHandle->idStr, (int32_t)(sortPass + 1), s, pHandle->sortElapsed, pHandle->totalElapsed);
  } else {
H
Hongze Cheng 已提交
478 479
    qDebug("%s ordered source:%" PRIzu ", available buf:%d, no need internal sort", pHandle->idStr, numOfSources,
           pHandle->numOfPages);
480
  }
481

H
Haojun Liao 已提交
482
  int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize);
wmmhello's avatar
wmmhello 已提交
483
  blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows);
484

485 486 487
  // the initial pass + sortPass + final mergePass
  pHandle->loops = sortPass + 2;

488
  size_t numOfSorted = taosArrayGetSize(pHandle->pOrderedSource);
H
Hongze Cheng 已提交
489
  for (int32_t t = 0; t < sortPass; ++t) {
490 491 492 493 494 495 496 497
    int64_t st = taosGetTimestampUs();

    SArray* pResList = taosArrayInit(4, POINTER_BYTES);

    int32_t numOfInputSources = pHandle->numOfPages;
    int32_t sortGroup = (numOfSorted + numOfInputSources - 1) / numOfInputSources;

    // Only *numOfInputSources* can be loaded into buffer to perform the external sort.
H
Hongze Cheng 已提交
498
    for (int32_t i = 0; i < sortGroup; ++i) {
499 500 501 502 503 504 505 506 507 508 509 510 511 512
      pHandle->sourceId += 1;

      int32_t end = (i + 1) * numOfInputSources - 1;
      if (end > numOfSorted - 1) {
        end = numOfSorted - 1;
      }

      pHandle->cmpParam.numOfSources = end - i * numOfInputSources + 1;

      int32_t code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, i * numOfInputSources, end, pHandle);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Hongze Cheng 已提交
513 514
      code =
          tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
515 516 517 518
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

519
      SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
520
      while (1) {
wmmhello's avatar
wmmhello 已提交
521
        SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows);
522 523 524 525 526
        if (pDataBlock == NULL) {
          break;
        }

        int32_t pageId = -1;
H
Hongze Cheng 已提交
527
        void*   pPage = getNewBufPage(pHandle->pBuf, &pageId);
528 529 530 531
        if (pPage == NULL) {
          return terrno;
        }

532 533
        taosArrayPush(pPageIdList, &pageId);

H
Hongze Cheng 已提交
534 535
        int32_t size =
            blockDataGetSize(pDataBlock) + sizeof(int32_t) + taosArrayGetSize(pDataBlock->pDataBlock) * sizeof(int32_t);
536 537
        assert(size <= getBufPageSize(pHandle->pBuf));

538
        blockDataToBuf(pPage, pDataBlock);
539 540 541 542

        setBufPageDirty(pPage, true);
        releaseBufPage(pHandle->pBuf, pPage);

543
        blockDataCleanup(pDataBlock);
544 545
      }

546
      sortComparCleanup(&pHandle->cmpParam);
547 548 549
      tMergeTreeDestroy(pHandle->pMergeTree);
      pHandle->numOfCompletedSources = 0;

550
      SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false);
551
      code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList);
H
Haojun Liao 已提交
552 553
      if (code != TSDB_CODE_SUCCESS) {
        taosArrayDestroy(pResList);
554 555 556 557 558 559 560 561 562 563 564 565 566 567
        return code;
      }
    }

    taosArrayClear(pHandle->pOrderedSource);
    taosArrayAddAll(pHandle->pOrderedSource, pResList);
    taosArrayDestroy(pResList);

    numOfSorted = taosArrayGetSize(pHandle->pOrderedSource);

    int64_t el = taosGetTimestampUs() - st;
    pHandle->totalElapsed += el;

    SDiskbasedBufStatis statis = getDBufStatis(pHandle->pBuf);
H
Hongze Cheng 已提交
568 569
    qDebug("%s %d round mergesort, elapsed:%" PRId64 " readDisk:%.2f Kb, flushDisk:%.2f Kb", pHandle->idStr, t + 1, el,
           statis.loadBytes / 1024.0, statis.flushBytes / 1024.0);
570

H
Haojun Liao 已提交
571 572
    if (pHandle->type == SORT_MULTISOURCE_MERGE) {
      pHandle->type = SORT_SINGLESOURCE_SORT;
573 574 575 576 577 578 579 580
      pHandle->comparFn = msortComparFn;
    }
  }

  pHandle->cmpParam.numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
  return 0;
}

581
// TODO consider the page meta size
582 583 584 585 586 587 588 589 590 591 592 593 594
int32_t getProperSortPageSize(size_t rowSize) {
  uint32_t defaultPageSize = 4096;

  uint32_t pgSize = 0;
  if (rowSize * 4 > defaultPageSize) {
    pgSize = rowSize * 4;
  } else {
    pgSize = defaultPageSize;
  }

  return pgSize;
}

595
static int32_t createInitialSources(SSortHandle* pHandle) {
596
  size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
H
Haojun Liao 已提交
597 598

  if (pHandle->type == SORT_SINGLESOURCE_SORT) {
wmmhello's avatar
wmmhello 已提交
599
    SSortSource* source = taosArrayGetP(pHandle->pOrderedSource, 0);
H
Haojun Liao 已提交
600
    taosArrayClear(pHandle->pOrderedSource);
601

602
    while (1) {
603
      SSDataBlock* pBlock = pHandle->fetchfp(source->param);
604 605 606 607
      if (pBlock == NULL) {
        break;
      }

608
      if (pHandle->pDataBlock == NULL) {
609 610 611
        pHandle->pageSize = getProperSortPageSize(blockDataGetRowSize(pBlock));

        // todo, number of pages are set according to the total available sort buffer
612 613
        pHandle->numOfPages = 1024;
        sortBufSize = pHandle->numOfPages * pHandle->pageSize;
614
        pHandle->pDataBlock = createOneDataBlock(pBlock, false);
615 616
      }

617 618 619
      if (pHandle->beforeFp != NULL) {
        pHandle->beforeFp(pBlock, pHandle->param);
      }
620

621 622 623 624
      int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock);
      if (code != 0) {
        return code;
      }
625

626 627 628 629
      size_t size = blockDataGetSize(pHandle->pDataBlock);
      if (size > sortBufSize) {
        // Perform the in-memory sort and then flush data in the buffer into disk.
        int64_t p = taosGetTimestampUs();
wmmhello's avatar
wmmhello 已提交
630 631 632 633
        code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
        if (code != 0) {
          return code;
        }
634

635 636
        int64_t el = taosGetTimestampUs() - p;
        pHandle->sortElapsed += el;
637

638
        doAddToBuf(pHandle->pDataBlock, pHandle);
639 640 641
      }
    }

642
    if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) {
643 644 645
      size_t size = blockDataGetSize(pHandle->pDataBlock);

      // Perform the in-memory sort and then flush data in the buffer into disk.
646 647
      int64_t p = taosGetTimestampUs();

wmmhello's avatar
wmmhello 已提交
648 649 650 651
      int32_t code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
      if (code != 0) {
        return code;
      }
652

653 654 655
      int64_t el = taosGetTimestampUs() - p;
      pHandle->sortElapsed += el;

656
      // All sorted data can fit in memory, external memory sort is not needed. Return to directly
657
      if (size <= sortBufSize && pHandle->pBuf == NULL) {
658 659 660
        pHandle->cmpParam.numOfSources = 1;
        pHandle->inMemSort = true;

661
        pHandle->loops = 1;
H
Hongze Cheng 已提交
662
        pHandle->tupleHandle.rowIndex = -1;
663 664 665 666 667 668
        pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
        return 0;
      } else {
        doAddToBuf(pHandle->pDataBlock, pHandle);
      }
    }
H
Haojun Liao 已提交
669 670 671 672 673
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
674
int32_t tsortOpen(SSortHandle* pHandle) {
H
Haojun Liao 已提交
675 676 677 678 679 680 681 682 683 684
  if (pHandle->opened) {
    return 0;
  }

  if (pHandle->fetchfp == NULL || pHandle->comparFn == NULL) {
    return -1;
  }

  pHandle->opened = true;

685
  int32_t code = createInitialSources(pHandle);
H
Haojun Liao 已提交
686 687
  if (code != TSDB_CODE_SUCCESS) {
    return code;
688 689 690
  }

  // do internal sort
H
Haojun Liao 已提交
691
  code = doInternalMergeSort(pHandle);
692 693 694 695 696
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  int32_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
H
Haojun Liao 已提交
697 698 699 700
  if (pHandle->pBuf != NULL) {
    ASSERT(numOfSources <= getNumOfInMemBufPages(pHandle->pBuf));
  }

H
Haojun Liao 已提交
701 702 703 704
  if (numOfSources == 0) {
    return 0;
  }

705 706 707 708 709
  code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, 0, numOfSources - 1, pHandle);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

710
  return tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
711 712
}

H
Haojun Liao 已提交
713
int32_t tsortClose(SSortHandle* pHandle) {
714
  // do nothing
715
  return TSDB_CODE_SUCCESS;
716 717
}

H
Hongze Cheng 已提交
718 719
int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*),
                               void* param) {
720 721 722
  pHandle->fetchfp = fetchFp;
  pHandle->beforeFp = fp;
  pHandle->param = param;
723
  return TSDB_CODE_SUCCESS;
724 725
}

H
Haojun Liao 已提交
726
int32_t tsortSetComparFp(SSortHandle* pHandle, _sort_merge_compar_fn_t fp) {
727
  pHandle->comparFn = fp;
728
  return TSDB_CODE_SUCCESS;
729 730
}

731 732 733 734 735
int32_t tsortSetCompareGroupId(SSortHandle* pHandle, bool compareGroupId) {
  pHandle->cmpParam.cmpGroupId = compareGroupId;
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
736
STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
737 738 739 740
  if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) {
    return NULL;
  }

741
  // All the data are hold in the buffer, no external sort is invoked.
742 743 744 745 746 747 748 749 750 751
  if (pHandle->inMemSort) {
    pHandle->tupleHandle.rowIndex += 1;
    if (pHandle->tupleHandle.rowIndex == pHandle->pDataBlock->info.rows) {
      pHandle->numOfCompletedSources = 1;
      return NULL;
    }

    return &pHandle->tupleHandle;
  }

H
Hongze Cheng 已提交
752 753
  int32_t      index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
  SSortSource* pSource = pHandle->cmpParam.pSources[index];
754 755 756 757 758 759 760 761 762 763 764 765 766

  if (pHandle->needAdjust) {
    int32_t code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return NULL;
    }
  }

  if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) {
    return NULL;
  }

767
  // Get the adjusted value after the loser tree is updated.
768 769 770 771 772 773 774 775 776 777 778 779 780 781
  index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
  pSource = pHandle->cmpParam.pSources[index];

  assert(pSource->src.pBlock != NULL);

  pHandle->tupleHandle.rowIndex = pSource->src.rowIndex;
  pHandle->tupleHandle.pBlock = pSource->src.pBlock;

  pHandle->needAdjust = true;
  pSource->src.rowIndex += 1;

  return &pHandle->tupleHandle;
}

H
Haojun Liao 已提交
782
bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colIndex) {
783
  SColumnInfoData* pColInfoSrc = taosArrayGet(pVHandle->pBlock->pDataBlock, colIndex);
784
  return colDataIsNull_s(pColInfoSrc, pVHandle->rowIndex);
785 786
}

H
Haojun Liao 已提交
787
void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) {
788
  SColumnInfoData* pColInfo = TARRAY_GET_ELEM(pVHandle->pBlock->pDataBlock, colIndex);
789 790 791 792 793
  if (pColInfo->pData == NULL) {
    return NULL;
  } else {
    return colDataGetData(pColInfo, pVHandle->rowIndex);
  }
794
}
795

H
Hongze Cheng 已提交
796
uint64_t tsortGetGroupId(STupleHandle* pVHandle) { return pVHandle->pBlock->info.groupId; }
S
slzhou 已提交
797

798 799 800 801
SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) {
  SSortExecInfo info = {0};

  info.sortBuffer = pHandle->pageSize * pHandle->numOfPages;
H
Hongze Cheng 已提交
802 803
  info.sortMethod = pHandle->inMemSort ? SORT_QSORT_T : SORT_SPILLED_MERGE_SORT_T;
  info.loops = pHandle->loops;
804 805 806 807 808 809 810 811 812

  if (pHandle->pBuf != NULL) {
    SDiskbasedBufStatis st = getDBufStatis(pHandle->pBuf);
    info.writeBytes = st.flushBytes;
    info.readBytes = st.loadBytes;
  }

  return info;
}