tsort.c 23.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
common  
Shengliang Guan 已提交
16
#include "tcommon.h"
17 18
#include "query.h"

H
Haojun Liao 已提交
19
#include "tdatablock.h"
S
Shengliang Guan 已提交
20
#include "tdef.h"
21 22
#include "tlosertree.h"
#include "tpagedbuf.h"
H
Haojun Liao 已提交
23
#include "tsort.h"
24
#include "tutil.h"
25
#include "tcompare.h"
26

27 28 29 30 31
struct STupleHandle {
  SSDataBlock* pBlock;
  int32_t      rowIndex;
};

H
Haojun Liao 已提交
32
struct SSortHandle {
33 34 35 36 37
  int32_t           type;
  int32_t           pageSize;
  int32_t           numOfPages;
  SDiskbasedBuf    *pBuf;

H
Haojun Liao 已提交
38
  SArray           *pSortInfo;
39 40
  SArray           *pOrderedSource;

41
  int32_t           loops;
42
  uint64_t          sortElapsed;
43
  int64_t           startTs;
44 45 46 47 48 49 50 51 52 53 54
  uint64_t          totalElapsed;

  int32_t           sourceId;
  SSDataBlock      *pDataBlock;
  SMsortComparParam cmpParam;
  int32_t           numOfCompletedSources;
  bool              opened;
  const char       *idStr;
  bool              inMemSort;
  bool              needAdjust;
  STupleHandle      tupleHandle;
55 56
  void             *param;
  void (*beforeFp)(SSDataBlock* pBlock, void* param);
57 58 59 60

  _sort_fetch_block_fn_t  fetchfp;
  _sort_merge_compar_fn_t comparFn;
  SMultiwayMergeTreeInfo *pMergeTree;
H
Haojun Liao 已提交
61
};
62

63 64
static int32_t msortComparFn(const void *pLeft, const void *pRight, void *param);

65 66
SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle) {
  return createOneDataBlock(pSortHandle->pDataBlock, false);
67 68 69 70 71 72 73
}

/**
 *
 * @param type
 * @return
 */
74
SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t numOfPages, SSDataBlock* pBlock, const char* idstr) {
wafwerar's avatar
wafwerar 已提交
75
  SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle));
76 77 78 79

  pSortHandle->type       = type;
  pSortHandle->pageSize   = pageSize;
  pSortHandle->numOfPages = numOfPages;
H
Haojun Liao 已提交
80
  pSortHandle->pSortInfo  = pSortInfo;
81
  pSortHandle->loops      = 0;
82 83 84 85

  if (pBlock != NULL) {
    pSortHandle->pDataBlock = createOneDataBlock(pBlock, false);
  }
H
Haojun Liao 已提交
86 87 88

  pSortHandle->pOrderedSource     = taosArrayInit(4, POINTER_BYTES);
  pSortHandle->cmpParam.orderInfo = pSortInfo;
89
  pSortHandle->cmpParam.cmpGroupId = false;
90

H
Haojun Liao 已提交
91
  tsortSetComparFp(pSortHandle, msortComparFn);
92 93

  if (idstr != NULL) {
94
    pSortHandle->idStr = strdup(idstr);
95 96 97 98 99
  }

  return pSortHandle;
}

wmmhello's avatar
wmmhello 已提交
100 101
static int32_t sortComparClearup(SMsortComparParam* cmpParam) {
  for(int32_t i = 0; i < cmpParam->numOfSources; ++i) {
wmmhello's avatar
wmmhello 已提交
102
    SSortSource* pSource = cmpParam->pSources[i];    // NOTICE: pSource may be SGenericSource *, if it is SORT_MULTISOURCE_MERGE
wmmhello's avatar
wmmhello 已提交
103 104 105 106 107 108 109 110
    blockDataDestroy(pSource->src.pBlock);
    taosMemoryFreeClear(pSource);
  }

  cmpParam->numOfSources = 0;
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
111
void tsortDestroySortHandle(SSortHandle* pSortHandle) {
112 113 114 115
  if (pSortHandle == NULL) {
    return;
  }

H
Haojun Liao 已提交
116
  tsortClose(pSortHandle);
117 118 119 120
  if (pSortHandle->pMergeTree != NULL) {
    tMergeTreeDestroy(pSortHandle->pMergeTree);
  }

H
Haojun Liao 已提交
121
  destroyDiskbasedBuf(pSortHandle->pBuf);
wafwerar's avatar
wafwerar 已提交
122
  taosMemoryFreeClear(pSortHandle->idStr);
wmmhello's avatar
wmmhello 已提交
123
  blockDataDestroy(pSortHandle->pDataBlock);
wmmhello's avatar
wmmhello 已提交
124
  for (size_t i = 0; i < taosArrayGetSize(pSortHandle->pOrderedSource); i++){
wmmhello's avatar
wmmhello 已提交
125
    SSortSource** pSource = taosArrayGet(pSortHandle->pOrderedSource, i);
wmmhello's avatar
wmmhello 已提交
126
    taosMemoryFreeClear(*pSource);
wmmhello's avatar
wmmhello 已提交
127 128
  }
  taosArrayDestroy(pSortHandle->pOrderedSource);
wafwerar's avatar
wafwerar 已提交
129
  taosMemoryFreeClear(pSortHandle);
130 131
}

H
Haojun Liao 已提交
132
int32_t tsortAddSource(SSortHandle* pSortHandle, void* pSource) {
H
Haojun Liao 已提交
133
  taosArrayPush(pSortHandle->pOrderedSource, &pSource);
134
  return TSDB_CODE_SUCCESS;
135 136 137
}

static int32_t doAddNewExternalMemSource(SDiskbasedBuf *pBuf, SArray* pAllSources, SSDataBlock* pBlock, int32_t* sourceId) {
wmmhello's avatar
wmmhello 已提交
138
  SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource));
139 140 141 142 143 144 145 146 147 148 149 150
  if (pSource == NULL) {
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }

  pSource->pageIdList = getDataBufPagesIdList(pBuf, (*sourceId));
  pSource->src.pBlock = pBlock;

  taosArrayPush(pAllSources, &pSource);

  (*sourceId) += 1;

  int32_t rowSize = blockDataGetSerialRowSize(pSource->src.pBlock);
151 152

  // The value of numOfRows must be greater than 0, which is guaranteed by the previous memory allocation
153
  int32_t numOfRows = (getBufPageSize(pBuf) - blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)))/rowSize;
wmmhello's avatar
wmmhello 已提交
154
  ASSERT(numOfRows > 0);
155 156 157 158 159 160 161
  return blockDataEnsureCapacity(pSource->src.pBlock, numOfRows);
}

static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
  int32_t start = 0;

  if (pHandle->pBuf == NULL) {
162
    int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "doAddToBuf", TD_TMP_DIR_PATH);
H
Haojun Liao 已提交
163
    dBufSetPrintInfo(pHandle->pBuf);
164 165 166 167 168 169 170
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
  }

  while(start < pDataBlock->info.rows) {
    int32_t stop = 0;
171
    blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pHandle->pageSize);
172 173 174 175 176 177
    SSDataBlock* p = blockDataExtractBlock(pDataBlock, start, stop - start + 1);
    if (p == NULL) {
      return terrno;
    }

    int32_t pageId = -1;
178
    void* pPage = getNewBufPage(pHandle->pBuf, pHandle->sourceId, &pageId);
179
    if (pPage == NULL) {
wmmhello's avatar
wmmhello 已提交
180
      blockDataDestroy(p);
181 182 183
      return terrno;
    }

184
    int32_t size = blockDataGetSize(p) + sizeof(int32_t)  + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t);
185 186
    assert(size <= getBufPageSize(pHandle->pBuf));

187
    blockDataToBuf(pPage, p);
188 189 190 191 192 193 194 195

    setBufPageDirty(pPage, true);
    releaseBufPage(pHandle->pBuf, pPage);

    blockDataDestroy(p);
    start = stop + 1;
  }

196
  blockDataCleanup(pDataBlock);
197

198
  SSDataBlock* pBlock = createOneDataBlock(pDataBlock, false);
199
  return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId);
200 201
}

202 203 204 205 206
static void setCurrentSourceIsDone(SSortSource* pSource, SSortHandle* pHandle) {
  pSource->src.rowIndex = -1;
  ++pHandle->numOfCompletedSources;
}

207 208 209 210 211 212
static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int32_t startIndex, int32_t endIndex, SSortHandle* pHandle) {
  cmpParam->pSources  = taosArrayGet(pSources, startIndex);
  cmpParam->numOfSources = (endIndex - startIndex + 1);

  int32_t code = 0;

H
Haojun Liao 已提交
213
  if (pHandle->type == SORT_SINGLESOURCE_SORT) {
214
    for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
wmmhello's avatar
wmmhello 已提交
215
      SSortSource* pSource = cmpParam->pSources[i];
216

217
      // set current source is done
218
      if (taosArrayGetSize(pSource->pageIdList) == 0) {
219 220
        setCurrentSourceIsDone(pSource, pHandle);
        continue;
221 222 223
      }

      SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);
224

225 226
      void* pPage = getBufPage(pHandle->pBuf, getPageId(pPgInfo));
      code = blockDataFromBuf(pSource->src.pBlock, pPage);
227 228 229 230 231 232 233 234 235
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

      releaseBufPage(pHandle->pBuf, pPage);
    }
  } else {
    // multi-pass internal merge sort is required
    if (pHandle->pBuf == NULL) {
236
      code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "sortComparInit", TD_TMP_DIR_PATH);
H
Haojun Liao 已提交
237
      dBufSetPrintInfo(pHandle->pBuf);
238 239 240 241 242 243
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
    }

    for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
wmmhello's avatar
wmmhello 已提交
244
      SSortSource* pSource = cmpParam->pSources[i];
245
      pSource->src.pBlock = pHandle->fetchfp(pSource->param);
246

247
      // set current source is done
248
      if (pSource->src.pBlock == NULL) {
249
        setCurrentSourceIsDone(pSource, pHandle);
250
      }
251 252 253 254 255 256 257
    }
  }

  return code;
}

static void appendOneRowToDataBlock(SSDataBlock *pBlock, const SSDataBlock* pSource, int32_t* rowIndex) {
258
  for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
259 260 261 262 263 264 265 266
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);

    SColumnInfoData* pSrcColInfo = taosArrayGet(pSource->pDataBlock, i);
    bool isNull = colDataIsNull(pSrcColInfo, pSource->info.rows, *rowIndex, NULL);

    if (isNull) {
      colDataAppend(pColInfo, pBlock->info.rows, NULL, true);
    } else {
267
      char* pData = colDataGetData(pSrcColInfo, *rowIndex);
268 269 270 271 272 273 274 275
      colDataAppend(pColInfo, pBlock->info.rows, pData, false);
    }
  }

  pBlock->info.rows += 1;
  *rowIndex += 1;
}

wmmhello's avatar
wmmhello 已提交
276
static int32_t adjustMergeTreeForNextTuple(SSortSource *pSource, SMultiwayMergeTreeInfo *pTree, SSortHandle *pHandle, int32_t* numOfCompleted) {
277 278 279 280 281 282 283
  /*
   * load a new SDataBlock into memory of a given intermediate data-set source,
   * since it's last record in buffer has been chosen to be processed, as the winner of loser-tree
   */
  if (pSource->src.rowIndex >= pSource->src.pBlock->info.rows) {
    pSource->src.rowIndex = 0;

284
    if (pHandle->type == SORT_SINGLESOURCE_SORT) {
wmmhello's avatar
wmmhello 已提交
285
      pSource->pageIndex ++;
286 287 288 289 290 291
      if (pSource->pageIndex >= taosArrayGetSize(pSource->pageIdList)) {
        (*numOfCompleted) += 1;
        pSource->src.rowIndex = -1;
        pSource->pageIndex = -1;
        pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock);
      } else {
292 293
        SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);

wmmhello's avatar
wmmhello 已提交
294 295
        void* pPage = getBufPage(pHandle->pBuf, getPageId(pPgInfo));
        int32_t    code = blockDataFromBuf(pSource->src.pBlock, pPage);
296 297 298 299 300
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

        releaseBufPage(pHandle->pBuf, pPage);
301 302
      }
    } else {
wmmhello's avatar
wmmhello 已提交
303
      pSource->src.pBlock = pHandle->fetchfp(((SSortSource*)pSource)->param);
304 305 306
      if (pSource->src.pBlock == NULL) {
        (*numOfCompleted) += 1;
        pSource->src.rowIndex = -1;
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
      }
    }
  }

  /*
   * Adjust loser tree otherwise, according to new candidate data
   * if the loser tree is rebuild completed, we do not need to adjust
   */
  int32_t leafNodeIndex = tMergeTreeGetAdjustIndex(pTree);

#ifdef _DEBUG_VIEW
  printf("before adjust:\t");
  tMergeTreePrint(pTree);
#endif

  tMergeTreeAdjust(pTree, leafNodeIndex);

#ifdef _DEBUG_VIEW
  printf("\nafter adjust:\t");
  tMergeTreePrint(pTree);
#endif
328
  return TSDB_CODE_SUCCESS;
329 330
}

wmmhello's avatar
wmmhello 已提交
331
static SSDataBlock* getSortedBlockDataInner(SSortHandle* pHandle, SMsortComparParam* cmpParam, int32_t capacity) {
332
  blockDataCleanup(pHandle->pDataBlock);
333 334 335 336 337 338 339 340

  while(1) {
    if (cmpParam->numOfSources == pHandle->numOfCompletedSources) {
      break;
    }

    int32_t index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);

wmmhello's avatar
wmmhello 已提交
341
    SSortSource *pSource = (*cmpParam).pSources[index];
342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
    appendOneRowToDataBlock(pHandle->pDataBlock, pSource->src.pBlock, &pSource->src.rowIndex);

    int32_t code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return NULL;
    }

    if (pHandle->pDataBlock->info.rows >= capacity) {
      return pHandle->pDataBlock;
    }
  }

  return (pHandle->pDataBlock->info.rows > 0)? pHandle->pDataBlock:NULL;
}

358
int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
359 360 361 362 363 364 365
  int32_t pLeftIdx  = *(int32_t *)pLeft;
  int32_t pRightIdx = *(int32_t *)pRight;

  SMsortComparParam *pParam = (SMsortComparParam *)param;

  SArray *pInfo = pParam->orderInfo;

wmmhello's avatar
wmmhello 已提交
366 367
  SSortSource* pLeftSource  = pParam->pSources[pLeftIdx];
  SSortSource* pRightSource = pParam->pSources[pRightIdx];
368 369 370 371 372 373 374 375 376 377 378 379 380

  // this input is exhausted, set the special value to denote this
  if (pLeftSource->src.rowIndex == -1) {
    return 1;
  }

  if (pRightSource->src.rowIndex == -1) {
    return -1;
  }

  SSDataBlock* pLeftBlock = pLeftSource->src.pBlock;
  SSDataBlock* pRightBlock = pRightSource->src.pBlock;

381 382 383 384
  if (pParam->cmpGroupId) {
    if (pLeftBlock->info.groupId != pRightBlock->info.groupId) {
      return pLeftBlock->info.groupId < pRightBlock->info.groupId ? -1 : 1;
    }
385 386
  }

387 388
  for(int32_t i = 0; i < pInfo->size; ++i) {
    SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
H
Haojun Liao 已提交
389
    SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
390 391 392

    bool leftNull  = false;
    if (pLeftColInfoData->hasNull) {
393 394 395 396 397
      if (pLeftBlock->pBlockAgg == NULL) {
        leftNull = colDataIsNull_s(pLeftColInfoData, pLeftSource->src.rowIndex);
      } else {
        leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg[i]);
      }
398 399
    }

H
Haojun Liao 已提交
400
    SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
401 402
    bool rightNull = false;
    if (pRightColInfoData->hasNull) {
403 404 405 406 407
      if (pLeftBlock->pBlockAgg == NULL) {
        rightNull = colDataIsNull_s(pRightColInfoData, pRightSource->src.rowIndex);
      } else {
        rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg[i]);
      }
408 409 410 411 412 413 414
    }

    if (leftNull && rightNull) {
      continue; // continue to next slot
    }

    if (rightNull) {
wmmhello's avatar
wmmhello 已提交
415
      return pOrder->nullFirst? 1:-1;
416 417 418
    }

    if (leftNull) {
wmmhello's avatar
wmmhello 已提交
419
      return pOrder->nullFirst? -1:1;
420 421
    }

422 423
    void* left1  = colDataGetData(pLeftColInfoData, pLeftSource->src.rowIndex);
    void* right1 = colDataGetData(pRightColInfoData, pRightSource->src.rowIndex);
424

425
    __compar_fn_t fn = getKeyComparFunc(pLeftColInfoData->info.type, pOrder->order);
426

427 428 429 430 431
    int ret = fn(left1, right1);
    if (ret == 0) {
      continue;
    } else {
      return ret;
432 433
    }
  }
434
  return 0;
435 436 437 438
}

static int32_t doInternalMergeSort(SSortHandle* pHandle) {
  size_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
H
Haojun Liao 已提交
439 440 441
  if (numOfSources == 0) {
    return 0;
  }
442 443 444 445 446

  // Calculate the I/O counts to complete the data sort.
  double sortPass = floorl(log2(numOfSources) / log2(pHandle->numOfPages));

  pHandle->totalElapsed = taosGetTimestampUs() - pHandle->startTs;
447 448 449 450 451 452 453 454 455 456

  if (sortPass > 0) {
    size_t s = pHandle->pBuf ? getTotalBufSize(pHandle->pBuf) : 0;
    qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%" PRIzu
           ", sort elapsed:%" PRId64 ", total elapsed:%" PRId64,
           pHandle->idStr, (int32_t)(sortPass + 1), s, pHandle->sortElapsed, pHandle->totalElapsed);
  } else {
    qDebug("%s ordered source:%"PRIzu", available buf:%d, no need internal sort", pHandle->idStr, numOfSources,
        pHandle->numOfPages);
  }
457

H
Haojun Liao 已提交
458
  int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize);
wmmhello's avatar
wmmhello 已提交
459
  blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows);
460

461 462 463
  // the initial pass + sortPass + final mergePass
  pHandle->loops = sortPass + 2;

464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494
  size_t numOfSorted = taosArrayGetSize(pHandle->pOrderedSource);
  for(int32_t t = 0; t < sortPass; ++t) {
    int64_t st = taosGetTimestampUs();

    SArray* pResList = taosArrayInit(4, POINTER_BYTES);

    int32_t numOfInputSources = pHandle->numOfPages;
    int32_t sortGroup = (numOfSorted + numOfInputSources - 1) / numOfInputSources;

    // Only *numOfInputSources* can be loaded into buffer to perform the external sort.
    for(int32_t i = 0; i < sortGroup; ++i) {
      pHandle->sourceId += 1;

      int32_t end = (i + 1) * numOfInputSources - 1;
      if (end > numOfSorted - 1) {
        end = numOfSorted - 1;
      }

      pHandle->cmpParam.numOfSources = end - i * numOfInputSources + 1;

      int32_t code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, i * numOfInputSources, end, pHandle);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

      code = tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

      while (1) {
wmmhello's avatar
wmmhello 已提交
495
        SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows);
496 497 498 499 500
        if (pDataBlock == NULL) {
          break;
        }

        int32_t pageId = -1;
501
        void* pPage = getNewBufPage(pHandle->pBuf, pHandle->sourceId, &pageId);
502 503 504 505
        if (pPage == NULL) {
          return terrno;
        }

506
        int32_t size = blockDataGetSize(pDataBlock) + sizeof(int32_t) + taosArrayGetSize(pDataBlock->pDataBlock) * sizeof(int32_t);
507 508
        assert(size <= getBufPageSize(pHandle->pBuf));

509
        blockDataToBuf(pPage, pDataBlock);
510 511 512 513

        setBufPageDirty(pPage, true);
        releaseBufPage(pHandle->pBuf, pPage);

514
        blockDataCleanup(pDataBlock);
515 516
      }

wmmhello's avatar
wmmhello 已提交
517
      sortComparClearup(&pHandle->cmpParam);
518 519 520
      tMergeTreeDestroy(pHandle->pMergeTree);
      pHandle->numOfCompletedSources = 0;

521
      SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false);
522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540
      code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId);
      if (code != 0) {
        return code;
      }
    }

    taosArrayClear(pHandle->pOrderedSource);
    taosArrayAddAll(pHandle->pOrderedSource, pResList);
    taosArrayDestroy(pResList);

    numOfSorted = taosArrayGetSize(pHandle->pOrderedSource);

    int64_t el = taosGetTimestampUs() - st;
    pHandle->totalElapsed += el;

    SDiskbasedBufStatis statis = getDBufStatis(pHandle->pBuf);
    qDebug("%s %d round mergesort, elapsed:%"PRId64" readDisk:%.2f Kb, flushDisk:%.2f Kb", pHandle->idStr, t + 1, el, statis.loadBytes/1024.0,
           statis.flushBytes/1024.0);

H
Haojun Liao 已提交
541 542
    if (pHandle->type == SORT_MULTISOURCE_MERGE) {
      pHandle->type = SORT_SINGLESOURCE_SORT;
543 544 545 546 547 548 549 550
      pHandle->comparFn = msortComparFn;
    }
  }

  pHandle->cmpParam.numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
  return 0;
}

551
// TODO consider the page meta size
552 553 554 555 556 557 558 559 560 561 562 563 564
int32_t getProperSortPageSize(size_t rowSize) {
  uint32_t defaultPageSize = 4096;

  uint32_t pgSize = 0;
  if (rowSize * 4 > defaultPageSize) {
    pgSize = rowSize * 4;
  } else {
    pgSize = defaultPageSize;
  }

  return pgSize;
}

565
static int32_t createInitialSources(SSortHandle* pHandle) {
566
  size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
H
Haojun Liao 已提交
567 568

  if (pHandle->type == SORT_SINGLESOURCE_SORT) {
wmmhello's avatar
wmmhello 已提交
569
    SSortSource* source = taosArrayGetP(pHandle->pOrderedSource, 0);
H
Haojun Liao 已提交
570
    taosArrayClear(pHandle->pOrderedSource);
571

572
    while (1) {
573
      SSDataBlock* pBlock = pHandle->fetchfp(source->param);
574 575 576 577
      if (pBlock == NULL) {
        break;
      }

578
      if (pHandle->pDataBlock == NULL) {
579 580 581
        pHandle->pageSize = getProperSortPageSize(blockDataGetRowSize(pBlock));

        // todo, number of pages are set according to the total available sort buffer
582 583
        pHandle->numOfPages = 1024;
        sortBufSize = pHandle->numOfPages * pHandle->pageSize;
584
        pHandle->pDataBlock = createOneDataBlock(pBlock, false);
585 586
      }

587 588 589
      if (pHandle->beforeFp != NULL) {
        pHandle->beforeFp(pBlock, pHandle->param);
      }
590

591 592 593 594
      int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock);
      if (code != 0) {
        return code;
      }
595

596 597 598 599
      size_t size = blockDataGetSize(pHandle->pDataBlock);
      if (size > sortBufSize) {
        // Perform the in-memory sort and then flush data in the buffer into disk.
        int64_t p = taosGetTimestampUs();
wmmhello's avatar
wmmhello 已提交
600 601 602 603
        code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
        if (code != 0) {
          return code;
        }
604

605 606
        int64_t el = taosGetTimestampUs() - p;
        pHandle->sortElapsed += el;
607

608
        doAddToBuf(pHandle->pDataBlock, pHandle);
609 610 611
      }
    }

612
    if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) {
613 614 615
      size_t size = blockDataGetSize(pHandle->pDataBlock);

      // Perform the in-memory sort and then flush data in the buffer into disk.
616 617
      int64_t p = taosGetTimestampUs();

wmmhello's avatar
wmmhello 已提交
618 619 620 621
      int32_t code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
      if (code != 0) {
        return code;
      }
622

623 624 625
      int64_t el = taosGetTimestampUs() - p;
      pHandle->sortElapsed += el;

626
      // All sorted data can fit in memory, external memory sort is not needed. Return to directly
627
      if (size <= sortBufSize && pHandle->pBuf == NULL) {
628 629 630
        pHandle->cmpParam.numOfSources = 1;
        pHandle->inMemSort = true;

631
        pHandle->loops = 1;
632 633 634 635 636 637 638
        pHandle->tupleHandle.rowIndex  = -1;
        pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
        return 0;
      } else {
        doAddToBuf(pHandle->pDataBlock, pHandle);
      }
    }
H
Haojun Liao 已提交
639 640 641 642 643
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
644
int32_t tsortOpen(SSortHandle* pHandle) {
H
Haojun Liao 已提交
645 646 647 648 649 650 651 652 653 654
  if (pHandle->opened) {
    return 0;
  }

  if (pHandle->fetchfp == NULL || pHandle->comparFn == NULL) {
    return -1;
  }

  pHandle->opened = true;

655
  int32_t code = createInitialSources(pHandle);
H
Haojun Liao 已提交
656 657
  if (code != TSDB_CODE_SUCCESS) {
    return code;
658 659 660
  }

  // do internal sort
H
Haojun Liao 已提交
661
  code = doInternalMergeSort(pHandle);
662 663 664 665 666
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  int32_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
H
Haojun Liao 已提交
667 668 669 670
  if (pHandle->pBuf != NULL) {
    ASSERT(numOfSources <= getNumOfInMemBufPages(pHandle->pBuf));
  }

H
Haojun Liao 已提交
671 672 673 674
  if (numOfSources == 0) {
    return 0;
  }

675 676 677 678 679
  code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, 0, numOfSources - 1, pHandle);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

680
  return tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
681 682
}

H
Haojun Liao 已提交
683
int32_t tsortClose(SSortHandle* pHandle) {
684
  // do nothing
685
  return TSDB_CODE_SUCCESS;
686 687
}

688 689 690 691
int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*), void* param) {
  pHandle->fetchfp = fetchFp;
  pHandle->beforeFp = fp;
  pHandle->param = param;
692
  return TSDB_CODE_SUCCESS;
693 694
}

H
Haojun Liao 已提交
695
int32_t tsortSetComparFp(SSortHandle* pHandle, _sort_merge_compar_fn_t fp) {
696
  pHandle->comparFn = fp;
697
  return TSDB_CODE_SUCCESS;
698 699
}

700 701 702 703 704
int32_t tsortSetCompareGroupId(SSortHandle* pHandle, bool compareGroupId) {
  pHandle->cmpParam.cmpGroupId = compareGroupId;
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
705
STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
706 707 708 709
  if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) {
    return NULL;
  }

710
  // All the data are hold in the buffer, no external sort is invoked.
711 712 713 714 715 716 717 718 719 720 721
  if (pHandle->inMemSort) {
    pHandle->tupleHandle.rowIndex += 1;
    if (pHandle->tupleHandle.rowIndex == pHandle->pDataBlock->info.rows) {
      pHandle->numOfCompletedSources = 1;
      return NULL;
    }

    return &pHandle->tupleHandle;
  }

  int32_t index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
wmmhello's avatar
wmmhello 已提交
722
  SSortSource *pSource = pHandle->cmpParam.pSources[index];
723 724 725 726 727 728 729 730 731 732 733 734 735

  if (pHandle->needAdjust) {
    int32_t code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return NULL;
    }
  }

  if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) {
    return NULL;
  }

736
  // Get the adjusted value after the loser tree is updated.
737 738 739 740 741 742 743 744 745 746 747 748 749 750
  index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
  pSource = pHandle->cmpParam.pSources[index];

  assert(pSource->src.pBlock != NULL);

  pHandle->tupleHandle.rowIndex = pSource->src.rowIndex;
  pHandle->tupleHandle.pBlock = pSource->src.pBlock;

  pHandle->needAdjust = true;
  pSource->src.rowIndex += 1;

  return &pHandle->tupleHandle;
}

H
Haojun Liao 已提交
751
bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colIndex) {
752
  SColumnInfoData* pColInfoSrc = taosArrayGet(pVHandle->pBlock->pDataBlock, colIndex);
753
  return colDataIsNull_s(pColInfoSrc, pVHandle->rowIndex);
754 755
}

H
Haojun Liao 已提交
756
void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) {
757
  SColumnInfoData* pColInfo = TARRAY_GET_ELEM(pVHandle->pBlock->pDataBlock, colIndex);
758 759 760 761 762
  if (pColInfo->pData == NULL) {
    return NULL;
  } else {
    return colDataGetData(pColInfo, pVHandle->rowIndex);
  }
763
}
764

S
slzhou 已提交
765 766 767 768
uint64_t tsortGetGroupId(STupleHandle* pVHandle) {
  return pVHandle->pBlock->info.groupId;
}

769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784
SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) {
  SSortExecInfo info = {0};

  info.sortBuffer = pHandle->pageSize * pHandle->numOfPages;
  info.sortMethod = pHandle->inMemSort? SORT_QSORT_T:SORT_SPILLED_MERGE_SORT_T;
  info.loops  = pHandle->loops;

  if (pHandle->pBuf != NULL) {
    SDiskbasedBufStatis st = getDBufStatis(pHandle->pBuf);
    info.writeBytes = st.flushBytes;
    info.readBytes = st.loadBytes;
  }

  return info;
}