tsort.c 28.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "query.h"
H
Hongze Cheng 已提交
17
#include "tcommon.h"
18

H
Hongze Cheng 已提交
19
#include "tcompare.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
S
Shengliang Guan 已提交
21
#include "tdef.h"
22 23
#include "tlosertree.h"
#include "tpagedbuf.h"
H
Haojun Liao 已提交
24
#include "tsort.h"
25 26
#include "tutil.h"

27 28 29 30 31
struct STupleHandle {
  SSDataBlock* pBlock;
  int32_t      rowIndex;
};

H
Haojun Liao 已提交
32
struct SSortHandle {
H
Hongze Cheng 已提交
33 34 35 36
  int32_t        type;
  int32_t        pageSize;
  int32_t        numOfPages;
  SDiskbasedBuf* pBuf;
37 38 39 40 41 42
  SArray*        pSortInfo;
  SArray*        pOrderedSource;
  int32_t        loops;
  uint64_t       sortElapsed;
  int64_t        startTs;
  uint64_t       totalElapsed;
43 44

  int32_t           sourceId;
H
Hongze Cheng 已提交
45
  SSDataBlock*      pDataBlock;
46 47 48
  SMsortComparParam cmpParam;
  int32_t           numOfCompletedSources;
  bool              opened;
D
dapan1121 已提交
49
  int8_t            closed;
H
Hongze Cheng 已提交
50
  const char*       idStr;
51 52 53
  bool              inMemSort;
  bool              needAdjust;
  STupleHandle      tupleHandle;
H
Hongze Cheng 已提交
54
  void*             param;
55
  void (*beforeFp)(SSDataBlock* pBlock, void* param);
56 57 58

  _sort_fetch_block_fn_t  fetchfp;
  _sort_merge_compar_fn_t comparFn;
H
Hongze Cheng 已提交
59
  SMultiwayMergeTreeInfo* pMergeTree;
H
Haojun Liao 已提交
60
};
61

H
Hongze Cheng 已提交
62
static int32_t msortComparFn(const void* pLeft, const void* pRight, void* param);
63

64 65
SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle) {
  return createOneDataBlock(pSortHandle->pDataBlock, false);
66 67 68 69 70 71 72
}

/**
 *
 * @param type
 * @return
 */
H
Hongze Cheng 已提交
73 74
SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t numOfPages,
                                   SSDataBlock* pBlock, const char* idstr) {
wafwerar's avatar
wafwerar 已提交
75
  SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle));
76

H
Hongze Cheng 已提交
77 78
  pSortHandle->type = type;
  pSortHandle->pageSize = pageSize;
79
  pSortHandle->numOfPages = numOfPages;
H
Hongze Cheng 已提交
80 81
  pSortHandle->pSortInfo = pSortInfo;
  pSortHandle->loops = 0;
82 83 84 85

  if (pBlock != NULL) {
    pSortHandle->pDataBlock = createOneDataBlock(pBlock, false);
  }
H
Haojun Liao 已提交
86

H
Hongze Cheng 已提交
87
  pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES);
H
Haojun Liao 已提交
88
  pSortHandle->cmpParam.orderInfo = pSortInfo;
89
  pSortHandle->cmpParam.cmpGroupId = false;
90

H
Haojun Liao 已提交
91
  tsortSetComparFp(pSortHandle, msortComparFn);
92 93

  if (idstr != NULL) {
94
    pSortHandle->idStr = taosStrdup(idstr);
95 96 97 98 99
  }

  return pSortHandle;
}

100
static int32_t sortComparCleanup(SMsortComparParam* cmpParam) {
101
  // NOTICE: pSource may be, if it is SORT_MULTISOURCE_MERGE
H
Hongze Cheng 已提交
102
  for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
103
    SSortSource* pSource = cmpParam->pSources[i];
wmmhello's avatar
wmmhello 已提交
104
    blockDataDestroy(pSource->src.pBlock);
S
slzhou 已提交
105 106 107
    if (pSource->pageIdList) {
      taosArrayDestroy(pSource->pageIdList);
    }
wmmhello's avatar
wmmhello 已提交
108
    taosMemoryFreeClear(pSource);
S
slzhou 已提交
109
    cmpParam->pSources[i] = NULL;
wmmhello's avatar
wmmhello 已提交
110 111 112 113 114 115
  }

  cmpParam->numOfSources = 0;
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
116
void tsortClearOrderdSource(SArray* pOrderedSource, int64_t *fetchUs, int64_t *fetchNum) {
117 118 119 120 121
  for (size_t i = 0; i < taosArrayGetSize(pOrderedSource); i++) {
    SSortSource** pSource = taosArrayGet(pOrderedSource, i);
    if (NULL == *pSource) {
      continue;
    }
D
dapan1121 已提交
122 123 124 125 126 127

    if (fetchUs) {
      *fetchUs += (*pSource)->fetchUs;
      *fetchNum += (*pSource)->fetchNum;
    }
    
128 129 130
    // release pageIdList
    if ((*pSource)->pageIdList) {
      taosArrayDestroy((*pSource)->pageIdList);
S
slzhou 已提交
131
      (*pSource)->pageIdList = NULL;
132
    }
133
    if ((*pSource)->param && !(*pSource)->onlyRef) {
134
      taosMemoryFree((*pSource)->param);
S
slzhou 已提交
135
      (*pSource)->param = NULL;
136
    }
dengyihao's avatar
dengyihao 已提交
137 138

    if (!(*pSource)->onlyRef && (*pSource)->src.pBlock) {
dengyihao's avatar
dengyihao 已提交
139 140
      blockDataDestroy((*pSource)->src.pBlock);
      (*pSource)->src.pBlock = NULL;
dengyihao's avatar
dengyihao 已提交
141
    }
dengyihao's avatar
dengyihao 已提交
142

143 144 145 146 147 148
    taosMemoryFreeClear(*pSource);
  }

  taosArrayClear(pOrderedSource);
}

H
Haojun Liao 已提交
149
void tsortDestroySortHandle(SSortHandle* pSortHandle) {
150 151 152 153
  if (pSortHandle == NULL) {
    return;
  }

H
Haojun Liao 已提交
154
  tsortClose(pSortHandle);
155
  if (pSortHandle->pMergeTree != NULL) {
D
dapan1121 已提交
156
    tMergeTreeDestroy(&pSortHandle->pMergeTree);
157 158
  }

H
Haojun Liao 已提交
159
  destroyDiskbasedBuf(pSortHandle->pBuf);
wafwerar's avatar
wafwerar 已提交
160
  taosMemoryFreeClear(pSortHandle->idStr);
wmmhello's avatar
wmmhello 已提交
161
  blockDataDestroy(pSortHandle->pDataBlock);
162

D
dapan1121 已提交
163 164
  int64_t fetchUs = 0, fetchNum = 0;
  tsortClearOrderdSource(pSortHandle->pOrderedSource, &fetchUs, &fetchNum);
165
  qDebug("all source fetch time: %" PRId64 "us num:%" PRId64 " %s", fetchUs, fetchNum, pSortHandle->idStr);
D
dapan1121 已提交
166
  
wmmhello's avatar
wmmhello 已提交
167
  taosArrayDestroy(pSortHandle->pOrderedSource);
wafwerar's avatar
wafwerar 已提交
168
  taosMemoryFreeClear(pSortHandle);
169 170
}

H
Haojun Liao 已提交
171
int32_t tsortAddSource(SSortHandle* pSortHandle, void* pSource) {
H
Haojun Liao 已提交
172
  taosArrayPush(pSortHandle->pOrderedSource, &pSource);
173
  return TSDB_CODE_SUCCESS;
174 175
}

H
Hongze Cheng 已提交
176 177
static int32_t doAddNewExternalMemSource(SDiskbasedBuf* pBuf, SArray* pAllSources, SSDataBlock* pBlock,
                                         int32_t* sourceId, SArray* pPageIdList) {
wmmhello's avatar
wmmhello 已提交
178
  SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource));
179
  if (pSource == NULL) {
H
Haojun Liao 已提交
180
    taosArrayDestroy(pPageIdList);
S
Shengliang Guan 已提交
181
    return TSDB_CODE_OUT_OF_MEMORY;
182 183 184
  }

  pSource->src.pBlock = pBlock;
185
  pSource->pageIdList = pPageIdList;
186 187 188 189 190
  taosArrayPush(pAllSources, &pSource);

  (*sourceId) += 1;

  int32_t rowSize = blockDataGetSerialRowSize(pSource->src.pBlock);
191 192

  // The value of numOfRows must be greater than 0, which is guaranteed by the previous memory allocation
H
Hongze Cheng 已提交
193 194
  int32_t numOfRows =
      (getBufPageSize(pBuf) - blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock))) / rowSize;
wmmhello's avatar
wmmhello 已提交
195
  ASSERT(numOfRows > 0);
H
Haojun Liao 已提交
196

197 198 199 200 201 202 203
  return blockDataEnsureCapacity(pSource->src.pBlock, numOfRows);
}

static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
  int32_t start = 0;

  if (pHandle->pBuf == NULL) {
wafwerar's avatar
wafwerar 已提交
204
    if (!osTempSpaceAvailable()) {
205 206
      terrno = TSDB_CODE_NO_DISKSPACE;
      qError("Add to buf failed since %s, tempDir:%s", terrstr(), tsTempDir);
wafwerar's avatar
wafwerar 已提交
207 208
      return terrno;
    }
209

H
Hongze Cheng 已提交
210
    int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize,
211
                                      "sortExternalBuf", tsTempDir);
H
Haojun Liao 已提交
212
    dBufSetPrintInfo(pHandle->pBuf);
213 214 215 216 217
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
  }

218
  SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
H
Hongze Cheng 已提交
219
  while (start < pDataBlock->info.rows) {
220
    int32_t stop = 0;
221
    blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pHandle->pageSize);
222 223
    SSDataBlock* p = blockDataExtractBlock(pDataBlock, start, stop - start + 1);
    if (p == NULL) {
H
Haojun Liao 已提交
224
      taosArrayDestroy(pPageIdList);
225 226 227 228
      return terrno;
    }

    int32_t pageId = -1;
H
Hongze Cheng 已提交
229
    void*   pPage = getNewBufPage(pHandle->pBuf, &pageId);
230
    if (pPage == NULL) {
H
Haojun Liao 已提交
231
      taosArrayDestroy(pPageIdList);
wmmhello's avatar
wmmhello 已提交
232
      blockDataDestroy(p);
233 234 235
      return terrno;
    }

236 237
    taosArrayPush(pPageIdList, &pageId);

H
Hongze Cheng 已提交
238
    int32_t size = blockDataGetSize(p) + sizeof(int32_t) + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t);
H
Haojun Liao 已提交
239
    ASSERT(size <= getBufPageSize(pHandle->pBuf));
240

241
    blockDataToBuf(pPage, p);
242 243 244 245 246 247 248 249

    setBufPageDirty(pPage, true);
    releaseBufPage(pHandle->pBuf, pPage);

    blockDataDestroy(p);
    start = stop + 1;
  }

250
  blockDataCleanup(pDataBlock);
251

252
  SSDataBlock* pBlock = createOneDataBlock(pDataBlock, false);
253
  return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId, pPageIdList);
254 255
}

256
static void setCurrentSourceDone(SSortSource* pSource, SSortHandle* pHandle) {
257 258 259 260
  pSource->src.rowIndex = -1;
  ++pHandle->numOfCompletedSources;
}

261
static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32_t startIndex, int32_t endIndex,
H
Hongze Cheng 已提交
262
                              SSortHandle* pHandle) {
263 264
  pParam->pSources = taosArrayGet(pSources, startIndex);
  pParam->numOfSources = (endIndex - startIndex + 1);
265 266 267

  int32_t code = 0;

H
Haojun Liao 已提交
268 269 270
  // multi-pass internal merge sort is required
  if (pHandle->pBuf == NULL) {
    if (!osTempSpaceAvailable()) {
271 272
      code = terrno = TSDB_CODE_NO_DISKSPACE;
      qError("Sort compare init failed since %s, tempDir:%s, idStr:%s", terrstr(), tsTempDir, pHandle->idStr);
H
Haojun Liao 已提交
273 274 275 276 277 278 279
      return code;
    }

    code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize,
                              "sortComparInit", tsTempDir);
    dBufSetPrintInfo(pHandle->pBuf);
    if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
280
      terrno = code;
H
Haojun Liao 已提交
281 282 283 284
      return code;
    }
  }

H
Haojun Liao 已提交
285
  if (pHandle->type == SORT_SINGLESOURCE_SORT) {
286 287
    for (int32_t i = 0; i < pParam->numOfSources; ++i) {
      SSortSource* pSource = pParam->pSources[i];
288

289
      // set current source is done
290
      if (taosArrayGetSize(pSource->pageIdList) == 0) {
291
        setCurrentSourceDone(pSource, pHandle);
292
        continue;
293 294
      }

D
dapan1121 已提交
295
      int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex);
296

D
dapan1121 已提交
297
      void* pPage = getBufPage(pHandle->pBuf, *pPgId);
298 299 300 301
      if (NULL == pPage) {
        return terrno;
      }
      
302
      code = blockDataFromBuf(pSource->src.pBlock, pPage);
303
      if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
304
        terrno = code;
305 306 307 308 309 310
        return code;
      }

      releaseBufPage(pHandle->pBuf, pPage);
    }
  } else {
311 312 313 314 315
    qDebug("start init for the multiway merge sort, %s", pHandle->idStr);
    int64_t st = taosGetTimestampUs();

    for (int32_t i = 0; i < pParam->numOfSources; ++i) {
      SSortSource* pSource = pParam->pSources[i];
316
      pSource->src.pBlock = pHandle->fetchfp(pSource->param);
317

318
      // set current source is done
319
      if (pSource->src.pBlock == NULL) {
320
        setCurrentSourceDone(pSource, pHandle);
321
      }
322
    }
323 324

    int64_t et = taosGetTimestampUs();
325
    qDebug("init for merge sort completed, elapsed time:%.2f ms, %s", (et - st) / 1000.0, pHandle->idStr);
326 327 328 329 330
  }

  return code;
}

H
Hongze Cheng 已提交
331
static void appendOneRowToDataBlock(SSDataBlock* pBlock, const SSDataBlock* pSource, int32_t* rowIndex) {
332
  for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
333 334 335
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);

    SColumnInfoData* pSrcColInfo = taosArrayGet(pSource->pDataBlock, i);
H
Hongze Cheng 已提交
336
    bool             isNull = colDataIsNull(pSrcColInfo, pSource->info.rows, *rowIndex, NULL);
337 338

    if (isNull) {
339
      colDataSetVal(pColInfo, pBlock->info.rows, NULL, true);
340
    } else {
341
      char* pData = colDataGetData(pSrcColInfo, *rowIndex);
342
      colDataSetVal(pColInfo, pBlock->info.rows, pData, false);
343 344 345 346 347 348 349
    }
  }

  pBlock->info.rows += 1;
  *rowIndex += 1;
}

H
Hongze Cheng 已提交
350 351
static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeTreeInfo* pTree, SSortHandle* pHandle,
                                           int32_t* numOfCompleted) {
352 353 354 355 356 357 358
  /*
   * load a new SDataBlock into memory of a given intermediate data-set source,
   * since it's last record in buffer has been chosen to be processed, as the winner of loser-tree
   */
  if (pSource->src.rowIndex >= pSource->src.pBlock->info.rows) {
    pSource->src.rowIndex = 0;

359
    if (pHandle->type == SORT_SINGLESOURCE_SORT) {
H
Hongze Cheng 已提交
360
      pSource->pageIndex++;
361 362 363 364 365 366
      if (pSource->pageIndex >= taosArrayGetSize(pSource->pageIdList)) {
        (*numOfCompleted) += 1;
        pSource->src.rowIndex = -1;
        pSource->pageIndex = -1;
        pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock);
      } else {
D
dapan1121 已提交
367
        int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex);
368

H
Hongze Cheng 已提交
369
        void*   pPage = getBufPage(pHandle->pBuf, *pPgId);
370 371 372 373 374
        if (pPage == NULL) {
          qError("failed to get buffer, code:%s", tstrerror(terrno));
          return terrno;
        }

H
Hongze Cheng 已提交
375
        int32_t code = blockDataFromBuf(pSource->src.pBlock, pPage);
376 377 378 379 380
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

        releaseBufPage(pHandle->pBuf, pPage);
381 382
      }
    } else {
D
dapan1121 已提交
383
      int64_t st = taosGetTimestampUs();      
wmmhello's avatar
wmmhello 已提交
384
      pSource->src.pBlock = pHandle->fetchfp(((SSortSource*)pSource)->param);
D
dapan1121 已提交
385 386
      pSource->fetchUs += taosGetTimestampUs() - st;
      pSource->fetchNum++;
387 388 389
      if (pSource->src.pBlock == NULL) {
        (*numOfCompleted) += 1;
        pSource->src.rowIndex = -1;
390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410
      }
    }
  }

  /*
   * Adjust loser tree otherwise, according to new candidate data
   * if the loser tree is rebuild completed, we do not need to adjust
   */
  int32_t leafNodeIndex = tMergeTreeGetAdjustIndex(pTree);

#ifdef _DEBUG_VIEW
  printf("before adjust:\t");
  tMergeTreePrint(pTree);
#endif

  tMergeTreeAdjust(pTree, leafNodeIndex);

#ifdef _DEBUG_VIEW
  printf("\nafter adjust:\t");
  tMergeTreePrint(pTree);
#endif
411
  return TSDB_CODE_SUCCESS;
412 413
}

wmmhello's avatar
wmmhello 已提交
414
static SSDataBlock* getSortedBlockDataInner(SSortHandle* pHandle, SMsortComparParam* cmpParam, int32_t capacity) {
415
  blockDataCleanup(pHandle->pDataBlock);
416

H
Hongze Cheng 已提交
417
  while (1) {
418 419 420 421 422 423
    if (cmpParam->numOfSources == pHandle->numOfCompletedSources) {
      break;
    }

    int32_t index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);

H
Hongze Cheng 已提交
424
    SSortSource* pSource = (*cmpParam).pSources[index];
425 426 427 428 429 430 431 432 433 434 435 436 437
    appendOneRowToDataBlock(pHandle->pDataBlock, pSource->src.pBlock, &pSource->src.rowIndex);

    int32_t code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return NULL;
    }

    if (pHandle->pDataBlock->info.rows >= capacity) {
      return pHandle->pDataBlock;
    }
  }

H
Hongze Cheng 已提交
438
  return (pHandle->pDataBlock->info.rows > 0) ? pHandle->pDataBlock : NULL;
439 440
}

H
Hongze Cheng 已提交
441 442 443
int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
  int32_t pLeftIdx = *(int32_t*)pLeft;
  int32_t pRightIdx = *(int32_t*)pRight;
444

H
Hongze Cheng 已提交
445
  SMsortComparParam* pParam = (SMsortComparParam*)param;
446

H
Hongze Cheng 已提交
447
  SArray* pInfo = pParam->orderInfo;
448

H
Hongze Cheng 已提交
449
  SSortSource* pLeftSource = pParam->pSources[pLeftIdx];
wmmhello's avatar
wmmhello 已提交
450
  SSortSource* pRightSource = pParam->pSources[pRightIdx];
451 452 453 454 455 456 457 458 459 460 461 462 463

  // this input is exhausted, set the special value to denote this
  if (pLeftSource->src.rowIndex == -1) {
    return 1;
  }

  if (pRightSource->src.rowIndex == -1) {
    return -1;
  }

  SSDataBlock* pLeftBlock = pLeftSource->src.pBlock;
  SSDataBlock* pRightBlock = pRightSource->src.pBlock;

464
  if (pParam->cmpGroupId) {
H
Haojun Liao 已提交
465 466
    if (pLeftBlock->info.id.groupId != pRightBlock->info.id.groupId) {
      return pLeftBlock->info.id.groupId < pRightBlock->info.id.groupId ? -1 : 1;
467
    }
468 469
  }

H
Hongze Cheng 已提交
470
  for (int32_t i = 0; i < pInfo->size; ++i) {
471
    SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
H
Haojun Liao 已提交
472
    SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
473

H
Hongze Cheng 已提交
474
    bool leftNull = false;
475
    if (pLeftColInfoData->hasNull) {
476 477 478
      if (pLeftBlock->pBlockAgg == NULL) {
        leftNull = colDataIsNull_s(pLeftColInfoData, pLeftSource->src.rowIndex);
      } else {
H
Hongze Cheng 已提交
479 480
        leftNull =
            colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg[i]);
481
      }
482 483
    }

H
Haojun Liao 已提交
484
    SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
H
Hongze Cheng 已提交
485
    bool             rightNull = false;
486
    if (pRightColInfoData->hasNull) {
H
Haojun Liao 已提交
487
      if (pRightBlock->pBlockAgg == NULL) {
488 489
        rightNull = colDataIsNull_s(pRightColInfoData, pRightSource->src.rowIndex);
      } else {
H
Hongze Cheng 已提交
490 491
        rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex,
                                  pRightBlock->pBlockAgg[i]);
492
      }
493 494 495
    }

    if (leftNull && rightNull) {
H
Hongze Cheng 已提交
496
      continue;  // continue to next slot
497 498 499
    }

    if (rightNull) {
H
Hongze Cheng 已提交
500
      return pOrder->nullFirst ? 1 : -1;
501 502 503
    }

    if (leftNull) {
H
Hongze Cheng 已提交
504
      return pOrder->nullFirst ? -1 : 1;
505 506
    }

H
Hongze Cheng 已提交
507
    void* left1 = colDataGetData(pLeftColInfoData, pLeftSource->src.rowIndex);
508
    void* right1 = colDataGetData(pRightColInfoData, pRightSource->src.rowIndex);
509

510
    __compar_fn_t fn = getKeyComparFunc(pLeftColInfoData->info.type, pOrder->order);
511

512 513 514 515 516
    int ret = fn(left1, right1);
    if (ret == 0) {
      continue;
    } else {
      return ret;
517 518
    }
  }
519
  return 0;
520 521 522 523
}

static int32_t doInternalMergeSort(SSortHandle* pHandle) {
  size_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
H
Haojun Liao 已提交
524 525 526
  if (numOfSources == 0) {
    return 0;
  }
527 528 529 530 531

  // Calculate the I/O counts to complete the data sort.
  double sortPass = floorl(log2(numOfSources) / log2(pHandle->numOfPages));

  pHandle->totalElapsed = taosGetTimestampUs() - pHandle->startTs;
532 533 534 535 536 537 538

  if (sortPass > 0) {
    size_t s = pHandle->pBuf ? getTotalBufSize(pHandle->pBuf) : 0;
    qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%" PRIzu
           ", sort elapsed:%" PRId64 ", total elapsed:%" PRId64,
           pHandle->idStr, (int32_t)(sortPass + 1), s, pHandle->sortElapsed, pHandle->totalElapsed);
  } else {
H
Hongze Cheng 已提交
539 540
    qDebug("%s ordered source:%" PRIzu ", available buf:%d, no need internal sort", pHandle->idStr, numOfSources,
           pHandle->numOfPages);
541
  }
542

G
Ganlin Zhao 已提交
543 544
  int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize,
                                                blockDataGetSerialMetaSize(taosArrayGetSize(pHandle->pDataBlock->pDataBlock)));
wmmhello's avatar
wmmhello 已提交
545
  blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows);
546

547 548 549
  // the initial pass + sortPass + final mergePass
  pHandle->loops = sortPass + 2;

550
  size_t numOfSorted = taosArrayGetSize(pHandle->pOrderedSource);
H
Hongze Cheng 已提交
551
  for (int32_t t = 0; t < sortPass; ++t) {
552 553 554 555 556 557 558 559
    int64_t st = taosGetTimestampUs();

    SArray* pResList = taosArrayInit(4, POINTER_BYTES);

    int32_t numOfInputSources = pHandle->numOfPages;
    int32_t sortGroup = (numOfSorted + numOfInputSources - 1) / numOfInputSources;

    // Only *numOfInputSources* can be loaded into buffer to perform the external sort.
H
Hongze Cheng 已提交
560
    for (int32_t i = 0; i < sortGroup; ++i) {
561 562 563 564 565 566 567 568 569 570 571
      pHandle->sourceId += 1;

      int32_t end = (i + 1) * numOfInputSources - 1;
      if (end > numOfSorted - 1) {
        end = numOfSorted - 1;
      }

      pHandle->cmpParam.numOfSources = end - i * numOfInputSources + 1;

      int32_t code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, i * numOfInputSources, end, pHandle);
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
572
        taosArrayDestroy(pResList);
573 574 575
        return code;
      }

H
Hongze Cheng 已提交
576 577
      code =
          tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
578
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
579
        taosArrayDestroy(pResList);
580 581 582
        return code;
      }

583
      SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
584
      while (1) {
D
dapan1121 已提交
585 586 587 588 589
        if (tsortIsClosed(pHandle)) {
          code = terrno = TSDB_CODE_TSC_QUERY_CANCELLED;
          return code;
        }
        
wmmhello's avatar
wmmhello 已提交
590
        SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows);
591 592 593 594 595
        if (pDataBlock == NULL) {
          break;
        }

        int32_t pageId = -1;
H
Hongze Cheng 已提交
596
        void*   pPage = getNewBufPage(pHandle->pBuf, &pageId);
597
        if (pPage == NULL) {
H
Haojun Liao 已提交
598 599
          taosArrayDestroy(pResList);
          taosArrayDestroy(pPageIdList);
600 601 602
          return terrno;
        }

603 604
        taosArrayPush(pPageIdList, &pageId);

H
Hongze Cheng 已提交
605 606
        int32_t size =
            blockDataGetSize(pDataBlock) + sizeof(int32_t) + taosArrayGetSize(pDataBlock->pDataBlock) * sizeof(int32_t);
H
Haojun Liao 已提交
607
        ASSERT(size <= getBufPageSize(pHandle->pBuf));
608

609
        blockDataToBuf(pPage, pDataBlock);
610 611 612 613

        setBufPageDirty(pPage, true);
        releaseBufPage(pHandle->pBuf, pPage);

614
        blockDataCleanup(pDataBlock);
615 616
      }

617
      sortComparCleanup(&pHandle->cmpParam);
D
dapan1121 已提交
618
      tMergeTreeDestroy(&pHandle->pMergeTree);
619 620
      pHandle->numOfCompletedSources = 0;

621
      SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false);
622
      code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList);
H
Haojun Liao 已提交
623 624
      if (code != TSDB_CODE_SUCCESS) {
        taosArrayDestroy(pResList);
625 626 627 628
        return code;
      }
    }

D
dapan1121 已提交
629
    tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
630 631 632 633 634 635 636 637 638
    taosArrayAddAll(pHandle->pOrderedSource, pResList);
    taosArrayDestroy(pResList);

    numOfSorted = taosArrayGetSize(pHandle->pOrderedSource);

    int64_t el = taosGetTimestampUs() - st;
    pHandle->totalElapsed += el;

    SDiskbasedBufStatis statis = getDBufStatis(pHandle->pBuf);
H
Hongze Cheng 已提交
639 640
    qDebug("%s %d round mergesort, elapsed:%" PRId64 " readDisk:%.2f Kb, flushDisk:%.2f Kb", pHandle->idStr, t + 1, el,
           statis.loadBytes / 1024.0, statis.flushBytes / 1024.0);
641

H
Haojun Liao 已提交
642 643
    if (pHandle->type == SORT_MULTISOURCE_MERGE) {
      pHandle->type = SORT_SINGLESOURCE_SORT;
644 645 646 647 648 649 650 651
      pHandle->comparFn = msortComparFn;
    }
  }

  pHandle->cmpParam.numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
  return 0;
}

652 653 654 655 656
// get sort page size
int32_t getProperSortPageSize(size_t rowSize, uint32_t numOfCols) {
  uint32_t pgSize = rowSize * 4 + blockDataGetSerialMetaSize(numOfCols);
  if (pgSize < DEFAULT_PAGESIZE) {
    return DEFAULT_PAGESIZE;
657 658 659 660 661
  }

  return pgSize;
}

662
static int32_t createInitialSources(SSortHandle* pHandle) {
663
  size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
664
  int32_t code = 0;
H
Haojun Liao 已提交
665 666

  if (pHandle->type == SORT_SINGLESOURCE_SORT) {
667
    SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
dengyihao's avatar
dengyihao 已提交
668
    SSortSource*  source = *pSource;
669
    *pSource = NULL;
dengyihao's avatar
dengyihao 已提交
670

D
dapan1121 已提交
671
    tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
672

673
    while (1) {
674
      SSDataBlock* pBlock = pHandle->fetchfp(source->param);
675 676 677 678
      if (pBlock == NULL) {
        break;
      }

679
      if (pHandle->pDataBlock == NULL) {
680 681
        uint32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
        pHandle->pageSize = getProperSortPageSize(blockDataGetRowSize(pBlock), numOfCols);
682 683

        // todo, number of pages are set according to the total available sort buffer
684 685
        pHandle->numOfPages = 1024;
        sortBufSize = pHandle->numOfPages * pHandle->pageSize;
686
        pHandle->pDataBlock = createOneDataBlock(pBlock, false);
687 688
      }

689 690 691
      if (pHandle->beforeFp != NULL) {
        pHandle->beforeFp(pBlock, pHandle->param);
      }
692

693 694
      code = blockDataMerge(pHandle->pDataBlock, pBlock);
      if (code != TSDB_CODE_SUCCESS) {
695 696 697
        if (source->param && !source->onlyRef) {
          taosMemoryFree(source->param);
        }
dengyihao's avatar
dengyihao 已提交
698 699 700 701
        if (!source->onlyRef && source->src.pBlock) {
          blockDataDestroy(source->src.pBlock);
          source->src.pBlock = NULL;
        }
702
        taosMemoryFree(source);
703 704
        return code;
      }
705

706 707 708 709
      size_t size = blockDataGetSize(pHandle->pDataBlock);
      if (size > sortBufSize) {
        // Perform the in-memory sort and then flush data in the buffer into disk.
        int64_t p = taosGetTimestampUs();
wmmhello's avatar
wmmhello 已提交
710 711
        code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
        if (code != 0) {
712 713 714
          if (source->param && !source->onlyRef) {
            taosMemoryFree(source->param);
          }
dengyihao's avatar
dengyihao 已提交
715 716 717 718
          if (!source->onlyRef && source->src.pBlock) {
            blockDataDestroy(source->src.pBlock);
            source->src.pBlock = NULL;
          }
719

720
          taosMemoryFree(source);
wmmhello's avatar
wmmhello 已提交
721 722
          return code;
        }
723

724 725
        int64_t el = taosGetTimestampUs() - p;
        pHandle->sortElapsed += el;
726

727 728 729 730
        code = doAddToBuf(pHandle->pDataBlock, pHandle);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
731 732 733
      }
    }

734 735 736
    if (source->param && !source->onlyRef) {
      taosMemoryFree(source->param);
    }
737

738 739
    taosMemoryFree(source);

740
    if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) {
741 742 743
      size_t size = blockDataGetSize(pHandle->pDataBlock);

      // Perform the in-memory sort and then flush data in the buffer into disk.
744 745
      int64_t p = taosGetTimestampUs();

746
      code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
wmmhello's avatar
wmmhello 已提交
747 748 749
      if (code != 0) {
        return code;
      }
750

751 752 753
      int64_t el = taosGetTimestampUs() - p;
      pHandle->sortElapsed += el;

754
      // All sorted data can fit in memory, external memory sort is not needed. Return to directly
755
      if (size <= sortBufSize && pHandle->pBuf == NULL) {
756 757 758
        pHandle->cmpParam.numOfSources = 1;
        pHandle->inMemSort = true;

759
        pHandle->loops = 1;
H
Hongze Cheng 已提交
760
        pHandle->tupleHandle.rowIndex = -1;
761 762 763
        pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
        return 0;
      } else {
764
        code = doAddToBuf(pHandle->pDataBlock, pHandle);
765 766
      }
    }
H
Haojun Liao 已提交
767 768
  }

769
  return code;
H
Haojun Liao 已提交
770 771
}

H
Haojun Liao 已提交
772
int32_t tsortOpen(SSortHandle* pHandle) {
H
Haojun Liao 已提交
773 774 775 776 777 778 779 780 781 782
  if (pHandle->opened) {
    return 0;
  }

  if (pHandle->fetchfp == NULL || pHandle->comparFn == NULL) {
    return -1;
  }

  pHandle->opened = true;

783
  int32_t code = createInitialSources(pHandle);
H
Haojun Liao 已提交
784 785
  if (code != TSDB_CODE_SUCCESS) {
    return code;
786 787 788
  }

  // do internal sort
H
Haojun Liao 已提交
789
  code = doInternalMergeSort(pHandle);
790 791 792 793 794
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  int32_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
H
Haojun Liao 已提交
795 796 797 798
  if (pHandle->pBuf != NULL) {
    ASSERT(numOfSources <= getNumOfInMemBufPages(pHandle->pBuf));
  }

H
Haojun Liao 已提交
799 800 801 802
  if (numOfSources == 0) {
    return 0;
  }

803 804 805 806 807
  code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, 0, numOfSources - 1, pHandle);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

808
  return tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
809 810
}

H
Haojun Liao 已提交
811
int32_t tsortClose(SSortHandle* pHandle) {
D
dapan1121 已提交
812 813
  atomic_val_compare_exchange_8(&pHandle->closed, 0, 1);
  taosMsleep(10);
814
  return TSDB_CODE_SUCCESS;
815 816
}

D
dapan1121 已提交
817 818 819 820 821 822 823 824
bool tsortIsClosed(SSortHandle* pHandle) {
  return atomic_val_compare_exchange_8(&pHandle->closed, 1, 2);
}

void tsortSetClosed(SSortHandle* pHandle) {
  atomic_store_8(&pHandle->closed, 2);
}

H
Hongze Cheng 已提交
825 826
int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*),
                               void* param) {
827 828 829
  pHandle->fetchfp = fetchFp;
  pHandle->beforeFp = fp;
  pHandle->param = param;
830
  return TSDB_CODE_SUCCESS;
831 832
}

H
Haojun Liao 已提交
833
int32_t tsortSetComparFp(SSortHandle* pHandle, _sort_merge_compar_fn_t fp) {
834
  pHandle->comparFn = fp;
835
  return TSDB_CODE_SUCCESS;
836 837
}

838 839 840 841 842
int32_t tsortSetCompareGroupId(SSortHandle* pHandle, bool compareGroupId) {
  pHandle->cmpParam.cmpGroupId = compareGroupId;
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
843
STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
D
dapan1121 已提交
844 845 846
  if (tsortIsClosed(pHandle)) {
    return NULL;
  }
847 848 849 850
  if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) {
    return NULL;
  }

851
  // All the data are hold in the buffer, no external sort is invoked.
852 853 854 855 856 857 858 859 860 861
  if (pHandle->inMemSort) {
    pHandle->tupleHandle.rowIndex += 1;
    if (pHandle->tupleHandle.rowIndex == pHandle->pDataBlock->info.rows) {
      pHandle->numOfCompletedSources = 1;
      return NULL;
    }

    return &pHandle->tupleHandle;
  }

H
Hongze Cheng 已提交
862 863
  int32_t      index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
  SSortSource* pSource = pHandle->cmpParam.pSources[index];
864 865 866 867 868 869 870 871 872

  if (pHandle->needAdjust) {
    int32_t code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return NULL;
    }
  }

H
Haojun Liao 已提交
873
  // all sources are completed.
874 875 876 877
  if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) {
    return NULL;
  }

878
  // Get the adjusted value after the loser tree is updated.
879 880 881
  index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
  pSource = pHandle->cmpParam.pSources[index];

882
  ASSERT(pSource->src.pBlock != NULL);
883 884 885 886 887 888 889 890 891 892

  pHandle->tupleHandle.rowIndex = pSource->src.rowIndex;
  pHandle->tupleHandle.pBlock = pSource->src.pBlock;

  pHandle->needAdjust = true;
  pSource->src.rowIndex += 1;

  return &pHandle->tupleHandle;
}

H
Haojun Liao 已提交
893
bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colIndex) {
894
  SColumnInfoData* pColInfoSrc = taosArrayGet(pVHandle->pBlock->pDataBlock, colIndex);
895
  return colDataIsNull_s(pColInfoSrc, pVHandle->rowIndex);
896 897
}

H
Haojun Liao 已提交
898
void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) {
899
  SColumnInfoData* pColInfo = TARRAY_GET_ELEM(pVHandle->pBlock->pDataBlock, colIndex);
900 901 902 903 904
  if (pColInfo->pData == NULL) {
    return NULL;
  } else {
    return colDataGetData(pColInfo, pVHandle->rowIndex);
  }
905
}
906

H
Haojun Liao 已提交
907
uint64_t tsortGetGroupId(STupleHandle* pVHandle) { return pVHandle->pBlock->info.id.groupId; }
908
void*    tsortGetBlockInfo(STupleHandle* pVHandle) { return &pVHandle->pBlock->info; }
S
slzhou 已提交
909

910 911 912
SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) {
  SSortExecInfo info = {0};

913
  if (pHandle == NULL) {
dengyihao's avatar
dengyihao 已提交
914 915
    info.sortMethod = SORT_QSORT_T;  // by default
    info.sortBuffer = 2 * 1048576;   // 2mb by default
916 917 918 919 920 921 922 923 924 925
  } else {
    info.sortBuffer = pHandle->pageSize * pHandle->numOfPages;
    info.sortMethod = pHandle->inMemSort ? SORT_QSORT_T : SORT_SPILLED_MERGE_SORT_T;
    info.loops = pHandle->loops;

    if (pHandle->pBuf != NULL) {
      SDiskbasedBufStatis st = getDBufStatis(pHandle->pBuf);
      info.writeBytes = st.flushBytes;
      info.readBytes = st.loadBytes;
    }
926 927 928 929
  }

  return info;
}