tsort.c 21.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
common  
Shengliang Guan 已提交
16
#include "tcommon.h"
17 18
#include "query.h"

H
Haojun Liao 已提交
19
#include "tdatablock.h"
S
Shengliang Guan 已提交
20
#include "tdef.h"
21 22
#include "tlosertree.h"
#include "tpagedbuf.h"
H
Haojun Liao 已提交
23
#include "tsort.h"
24
#include "tutil.h"
25
#include "tcompare.h"
26

27 28 29 30 31
struct STupleHandle {
  SSDataBlock* pBlock;
  int32_t      rowIndex;
};

H
Haojun Liao 已提交
32
struct SSortHandle {
33 34 35 36 37 38
  int32_t           type;

  int32_t           pageSize;
  int32_t           numOfPages;
  SDiskbasedBuf    *pBuf;

H
Haojun Liao 已提交
39
  SArray           *pSortInfo;
40
  SArray           *pIndexMap;
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
  SArray           *pOrderedSource;

  _sort_fetch_block_fn_t  fetchfp;
  _sort_merge_compar_fn_t comparFn;

  void             *pParam;
  SMultiwayMergeTreeInfo *pMergeTree;
  int32_t           numOfCols;

  int64_t           startTs;
  uint64_t          sortElapsed;
  uint64_t          totalElapsed;

  int32_t           sourceId;
  SSDataBlock      *pDataBlock;
  SMsortComparParam cmpParam;
  int32_t           numOfCompletedSources;
  bool              opened;
  const char       *idStr;

  bool              inMemSort;
  bool              needAdjust;
  STupleHandle      tupleHandle;
H
Haojun Liao 已提交
64
};
65

66 67 68
static int32_t msortComparFn(const void *pLeft, const void *pRight, void *param);

static SSDataBlock* createDataBlock_rv(SSchema* pSchema, int32_t numOfCols) {
wafwerar's avatar
wafwerar 已提交
69
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
70 71 72 73 74 75 76 77 78 79
  pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
  pBlock->info.numOfCols = numOfCols;

  for(int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData  colInfo = {0};

    colInfo.info.type  = pSchema[i].type;
    colInfo.info.bytes = pSchema[i].bytes;
    colInfo.info.colId = pSchema[i].colId;
    taosArrayPush(pBlock->pDataBlock, &colInfo);
80 81 82 83

    if (IS_VAR_DATA_TYPE(colInfo.info.type)) {
      pBlock->info.hasVarCol = true;
    }
84 85 86 87 88 89 90 91 92 93
  }

  return pBlock;
}

/**
 *
 * @param type
 * @return
 */
94
SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, SArray* pIndexMap, int32_t type, int32_t pageSize, int32_t numOfPages, SSDataBlock* pBlock, const char* idstr) {
wafwerar's avatar
wafwerar 已提交
95
  SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle));
96 97 98 99

  pSortHandle->type       = type;
  pSortHandle->pageSize   = pageSize;
  pSortHandle->numOfPages = numOfPages;
H
Haojun Liao 已提交
100
  pSortHandle->pSortInfo  = pSortInfo;
101
  pSortHandle->pIndexMap  = pIndexMap;
102
  pSortHandle->pDataBlock = createOneDataBlock(pBlock, false);
H
Haojun Liao 已提交
103 104 105

  pSortHandle->pOrderedSource     = taosArrayInit(4, POINTER_BYTES);
  pSortHandle->cmpParam.orderInfo = pSortInfo;
106

H
Haojun Liao 已提交
107
  tsortSetComparFp(pSortHandle, msortComparFn);
108 109 110 111 112 113 114 115

  if (idstr != NULL) {
    pSortHandle->idStr    = strdup(idstr);
  }

  return pSortHandle;
}

wmmhello's avatar
wmmhello 已提交
116 117
static int32_t sortComparClearup(SMsortComparParam* cmpParam) {
  for(int32_t i = 0; i < cmpParam->numOfSources; ++i) {
wmmhello's avatar
wmmhello 已提交
118
    SSortSource* pSource = cmpParam->pSources[i];    // NOTICE: pSource may be SGenericSource *, if it is SORT_MULTISOURCE_MERGE
wmmhello's avatar
wmmhello 已提交
119 120 121 122 123 124 125 126
    blockDataDestroy(pSource->src.pBlock);
    taosMemoryFreeClear(pSource);
  }

  cmpParam->numOfSources = 0;
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
127 128
void tsortDestroySortHandle(SSortHandle* pSortHandle) {
  tsortClose(pSortHandle);
129 130 131 132
  if (pSortHandle->pMergeTree != NULL) {
    tMergeTreeDestroy(pSortHandle->pMergeTree);
  }

H
Haojun Liao 已提交
133
  destroyDiskbasedBuf(pSortHandle->pBuf);
wafwerar's avatar
wafwerar 已提交
134
  taosMemoryFreeClear(pSortHandle->idStr);
wmmhello's avatar
wmmhello 已提交
135
  blockDataDestroy(pSortHandle->pDataBlock);
wmmhello's avatar
wmmhello 已提交
136
  for (size_t i = 0; i < taosArrayGetSize(pSortHandle->pOrderedSource); i++){
wmmhello's avatar
wmmhello 已提交
137
    SSortSource** pSource = taosArrayGet(pSortHandle->pOrderedSource, i);
wmmhello's avatar
wmmhello 已提交
138 139
    blockDataDestroy((*pSource)->src.pBlock);
    taosMemoryFreeClear(*pSource);
wmmhello's avatar
wmmhello 已提交
140 141
  }
  taosArrayDestroy(pSortHandle->pOrderedSource);
wafwerar's avatar
wafwerar 已提交
142
  taosMemoryFreeClear(pSortHandle);
143 144
}

H
Haojun Liao 已提交
145
int32_t tsortAddSource(SSortHandle* pSortHandle, void* pSource) {
H
Haojun Liao 已提交
146
  taosArrayPush(pSortHandle->pOrderedSource, &pSource);
147
  return TSDB_CODE_SUCCESS;
148 149 150
}

static int32_t doAddNewExternalMemSource(SDiskbasedBuf *pBuf, SArray* pAllSources, SSDataBlock* pBlock, int32_t* sourceId) {
wmmhello's avatar
wmmhello 已提交
151
  SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource));
152 153 154 155 156 157 158 159 160 161 162 163
  if (pSource == NULL) {
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }

  pSource->pageIdList = getDataBufPagesIdList(pBuf, (*sourceId));
  pSource->src.pBlock = pBlock;

  taosArrayPush(pAllSources, &pSource);

  (*sourceId) += 1;

  int32_t rowSize = blockDataGetSerialRowSize(pSource->src.pBlock);
wmmhello's avatar
wmmhello 已提交
164 165
  int32_t numOfRows = (getBufPageSize(pBuf) - blockDataGetSerialMetaSize(pBlock))/rowSize;  // The value of numOfRows must be greater than 0, which is guaranteed by the previous memory allocation
  ASSERT(numOfRows > 0);
166 167 168 169 170 171 172
  return blockDataEnsureCapacity(pSource->src.pBlock, numOfRows);
}

static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
  int32_t start = 0;

  if (pHandle->pBuf == NULL) {
173
    int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "doAddToBuf", "/tmp");
H
Haojun Liao 已提交
174
    dBufSetPrintInfo(pHandle->pBuf);
175 176 177 178 179 180 181
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
  }

  while(start < pDataBlock->info.rows) {
    int32_t stop = 0;
182
    blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pHandle->pageSize);
183 184 185 186 187 188
    SSDataBlock* p = blockDataExtractBlock(pDataBlock, start, stop - start + 1);
    if (p == NULL) {
      return terrno;
    }

    int32_t pageId = -1;
189
    void* pPage = getNewBufPage(pHandle->pBuf, pHandle->sourceId, &pageId);
190
    if (pPage == NULL) {
wmmhello's avatar
wmmhello 已提交
191
      blockDataDestroy(p);
192 193 194 195 196 197
      return terrno;
    }

    int32_t size = blockDataGetSize(p) + sizeof(int32_t)  + p->info.numOfCols * sizeof(int32_t);
    assert(size <= getBufPageSize(pHandle->pBuf));

198
    blockDataToBuf(pPage, p);
199 200 201 202 203 204 205 206

    setBufPageDirty(pPage, true);
    releaseBufPage(pHandle->pBuf, pPage);

    blockDataDestroy(p);
    start = stop + 1;
  }

207
  blockDataCleanup(pDataBlock);
208

209
  SSDataBlock* pBlock = createOneDataBlock(pDataBlock, false);
210
  return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId);
211 212 213 214 215 216 217 218
}

static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int32_t startIndex, int32_t endIndex, SSortHandle* pHandle) {
  cmpParam->pSources  = taosArrayGet(pSources, startIndex);
  cmpParam->numOfSources = (endIndex - startIndex + 1);

  int32_t code = 0;

H
Haojun Liao 已提交
219
  if (pHandle->type == SORT_SINGLESOURCE_SORT) {
220
    for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
wmmhello's avatar
wmmhello 已提交
221
      SSortSource* pSource = cmpParam->pSources[i];
222 223
      SPageInfo*          pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);

224 225
      void* pPage = getBufPage(pHandle->pBuf, getPageId(pPgInfo));
      code = blockDataFromBuf(pSource->src.pBlock, pPage);
226 227 228 229 230 231 232 233 234
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

      releaseBufPage(pHandle->pBuf, pPage);
    }
  } else {
    // multi-pass internal merge sort is required
    if (pHandle->pBuf == NULL) {
235
      code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "sortComparInit", "/tmp");
H
Haojun Liao 已提交
236
      dBufSetPrintInfo(pHandle->pBuf);
237 238 239 240 241 242
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }
    }

    for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
wmmhello's avatar
wmmhello 已提交
243
      SSortSource* pSource = cmpParam->pSources[i];
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260
      pSource->src.pBlock = pHandle->fetchfp(pSource->param);
    }
  }

  return code;
}

static void appendOneRowToDataBlock(SSDataBlock *pBlock, const SSDataBlock* pSource, int32_t* rowIndex) {
  for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);

    SColumnInfoData* pSrcColInfo = taosArrayGet(pSource->pDataBlock, i);
    bool isNull = colDataIsNull(pSrcColInfo, pSource->info.rows, *rowIndex, NULL);

    if (isNull) {
      colDataAppend(pColInfo, pBlock->info.rows, NULL, true);
    } else {
261
      char* pData = colDataGetData(pSrcColInfo, *rowIndex);
262 263 264 265 266 267 268 269
      colDataAppend(pColInfo, pBlock->info.rows, pData, false);
    }
  }

  pBlock->info.rows += 1;
  *rowIndex += 1;
}

wmmhello's avatar
wmmhello 已提交
270
static int32_t adjustMergeTreeForNextTuple(SSortSource *pSource, SMultiwayMergeTreeInfo *pTree, SSortHandle *pHandle, int32_t* numOfCompleted) {
271 272 273 274 275 276 277
  /*
   * load a new SDataBlock into memory of a given intermediate data-set source,
   * since it's last record in buffer has been chosen to be processed, as the winner of loser-tree
   */
  if (pSource->src.rowIndex >= pSource->src.pBlock->info.rows) {
    pSource->src.rowIndex = 0;

278
    if (pHandle->type == SORT_SINGLESOURCE_SORT) {
wmmhello's avatar
wmmhello 已提交
279
      pSource->pageIndex ++;
280 281 282 283 284 285
      if (pSource->pageIndex >= taosArrayGetSize(pSource->pageIdList)) {
        (*numOfCompleted) += 1;
        pSource->src.rowIndex = -1;
        pSource->pageIndex = -1;
        pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock);
      } else {
286 287
        SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);

wmmhello's avatar
wmmhello 已提交
288 289
        void* pPage = getBufPage(pHandle->pBuf, getPageId(pPgInfo));
        int32_t    code = blockDataFromBuf(pSource->src.pBlock, pPage);
290 291 292 293 294
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }

        releaseBufPage(pHandle->pBuf, pPage);
295 296
      }
    } else {
wmmhello's avatar
wmmhello 已提交
297
      pSource->src.pBlock = pHandle->fetchfp(((SSortSource*)pSource)->param);
298 299 300
      if (pSource->src.pBlock == NULL) {
        (*numOfCompleted) += 1;
        pSource->src.rowIndex = -1;
301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
      }
    }
  }

  /*
   * Adjust loser tree otherwise, according to new candidate data
   * if the loser tree is rebuild completed, we do not need to adjust
   */
  int32_t leafNodeIndex = tMergeTreeGetAdjustIndex(pTree);

#ifdef _DEBUG_VIEW
  printf("before adjust:\t");
  tMergeTreePrint(pTree);
#endif

  tMergeTreeAdjust(pTree, leafNodeIndex);

#ifdef _DEBUG_VIEW
  printf("\nafter adjust:\t");
  tMergeTreePrint(pTree);
#endif
322
  return TSDB_CODE_SUCCESS;
323 324
}

wmmhello's avatar
wmmhello 已提交
325
static SSDataBlock* getSortedBlockDataInner(SSortHandle* pHandle, SMsortComparParam* cmpParam, int32_t capacity) {
326
  blockDataCleanup(pHandle->pDataBlock);
327 328 329 330 331 332 333 334

  while(1) {
    if (cmpParam->numOfSources == pHandle->numOfCompletedSources) {
      break;
    }

    int32_t index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);

wmmhello's avatar
wmmhello 已提交
335
    SSortSource *pSource = (*cmpParam).pSources[index];
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351
    appendOneRowToDataBlock(pHandle->pDataBlock, pSource->src.pBlock, &pSource->src.rowIndex);

    int32_t code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return NULL;
    }

    if (pHandle->pDataBlock->info.rows >= capacity) {
      return pHandle->pDataBlock;
    }
  }

  return (pHandle->pDataBlock->info.rows > 0)? pHandle->pDataBlock:NULL;
}

352
int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
353 354 355 356 357 358 359
  int32_t pLeftIdx  = *(int32_t *)pLeft;
  int32_t pRightIdx = *(int32_t *)pRight;

  SMsortComparParam *pParam = (SMsortComparParam *)param;

  SArray *pInfo = pParam->orderInfo;

wmmhello's avatar
wmmhello 已提交
360 361
  SSortSource* pLeftSource  = pParam->pSources[pLeftIdx];
  SSortSource* pRightSource = pParam->pSources[pRightIdx];
362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377

  // this input is exhausted, set the special value to denote this
  if (pLeftSource->src.rowIndex == -1) {
    return 1;
  }

  if (pRightSource->src.rowIndex == -1) {
    return -1;
  }

  SSDataBlock* pLeftBlock = pLeftSource->src.pBlock;
  SSDataBlock* pRightBlock = pRightSource->src.pBlock;

  for(int32_t i = 0; i < pInfo->size; ++i) {
    SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);

H
Haojun Liao 已提交
378
    SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
379 380 381 382 383 384

    bool leftNull  = false;
    if (pLeftColInfoData->hasNull) {
      leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg);
    }

H
Haojun Liao 已提交
385
    SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
386 387 388 389 390 391 392 393 394 395
    bool rightNull = false;
    if (pRightColInfoData->hasNull) {
      rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg);
    }

    if (leftNull && rightNull) {
      continue; // continue to next slot
    }

    if (rightNull) {
wmmhello's avatar
wmmhello 已提交
396
      return pOrder->nullFirst? 1:-1;
397 398 399
    }

    if (leftNull) {
wmmhello's avatar
wmmhello 已提交
400
      return pOrder->nullFirst? -1:1;
401 402
    }

403 404
    void* left1  = colDataGetData(pLeftColInfoData, pLeftSource->src.rowIndex);
    void* right1 = colDataGetData(pRightColInfoData, pRightSource->src.rowIndex);
405

406
    __compar_fn_t fn = getKeyComparFunc(pLeftColInfoData->info.type, pOrder->order);
407

408 409 410 411 412
    int ret = fn(left1, right1);
    if (ret == 0) {
      continue;
    } else {
      return ret;
413 414
    }
  }
415
  return 0;
416 417 418 419
}

static int32_t doInternalMergeSort(SSortHandle* pHandle) {
  size_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
H
Haojun Liao 已提交
420 421 422
  if (numOfSources == 0) {
    return 0;
  }
423 424 425 426 427

  // Calculate the I/O counts to complete the data sort.
  double sortPass = floorl(log2(numOfSources) / log2(pHandle->numOfPages));

  pHandle->totalElapsed = taosGetTimestampUs() - pHandle->startTs;
428
  qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%"PRIzu", sort elapsed:%"PRId64", total elapsed:%"PRId64,
429 430
         pHandle->idStr, (int32_t) (sortPass + 1), getTotalBufSize(pHandle->pBuf), pHandle->sortElapsed, pHandle->totalElapsed);

H
Haojun Liao 已提交
431
  int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize);
wmmhello's avatar
wmmhello 已提交
432
  blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows);
433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464

  size_t numOfSorted = taosArrayGetSize(pHandle->pOrderedSource);
  for(int32_t t = 0; t < sortPass; ++t) {
    int64_t st = taosGetTimestampUs();

    SArray* pResList = taosArrayInit(4, POINTER_BYTES);

    int32_t numOfInputSources = pHandle->numOfPages;
    int32_t sortGroup = (numOfSorted + numOfInputSources - 1) / numOfInputSources;

    // Only *numOfInputSources* can be loaded into buffer to perform the external sort.
    for(int32_t i = 0; i < sortGroup; ++i) {
      pHandle->sourceId += 1;

      int32_t end = (i + 1) * numOfInputSources - 1;
      if (end > numOfSorted - 1) {
        end = numOfSorted - 1;
      }

      pHandle->cmpParam.numOfSources = end - i * numOfInputSources + 1;

      int32_t code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, i * numOfInputSources, end, pHandle);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

      code = tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

      while (1) {
wmmhello's avatar
wmmhello 已提交
465
        SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows);
466 467 468 469 470
        if (pDataBlock == NULL) {
          break;
        }

        int32_t pageId = -1;
471
        void* pPage = getNewBufPage(pHandle->pBuf, pHandle->sourceId, &pageId);
472 473 474 475 476 477 478
        if (pPage == NULL) {
          return terrno;
        }

        int32_t size = blockDataGetSize(pDataBlock) + sizeof(int32_t) + pDataBlock->info.numOfCols * sizeof(int32_t);
        assert(size <= getBufPageSize(pHandle->pBuf));

479
        blockDataToBuf(pPage, pDataBlock);
480 481 482 483

        setBufPageDirty(pPage, true);
        releaseBufPage(pHandle->pBuf, pPage);

484
        blockDataCleanup(pDataBlock);
485 486
      }

wmmhello's avatar
wmmhello 已提交
487
      sortComparClearup(&pHandle->cmpParam);
488 489 490
      tMergeTreeDestroy(pHandle->pMergeTree);
      pHandle->numOfCompletedSources = 0;

491
      SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false);
492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510
      code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId);
      if (code != 0) {
        return code;
      }
    }

    taosArrayClear(pHandle->pOrderedSource);
    taosArrayAddAll(pHandle->pOrderedSource, pResList);
    taosArrayDestroy(pResList);

    numOfSorted = taosArrayGetSize(pHandle->pOrderedSource);

    int64_t el = taosGetTimestampUs() - st;
    pHandle->totalElapsed += el;

    SDiskbasedBufStatis statis = getDBufStatis(pHandle->pBuf);
    qDebug("%s %d round mergesort, elapsed:%"PRId64" readDisk:%.2f Kb, flushDisk:%.2f Kb", pHandle->idStr, t + 1, el, statis.loadBytes/1024.0,
           statis.flushBytes/1024.0);

H
Haojun Liao 已提交
511 512
    if (pHandle->type == SORT_MULTISOURCE_MERGE) {
      pHandle->type = SORT_SINGLESOURCE_SORT;
513 514 515 516 517 518 519 520
      pHandle->comparFn = msortComparFn;
    }
  }

  pHandle->cmpParam.numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
  return 0;
}

H
Haojun Liao 已提交
521
static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
522
  size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
H
Haojun Liao 已提交
523 524

  if (pHandle->type == SORT_SINGLESOURCE_SORT) {
wmmhello's avatar
wmmhello 已提交
525
    SSortSource* source = taosArrayGetP(pHandle->pOrderedSource, 0);
H
Haojun Liao 已提交
526
    taosArrayClear(pHandle->pOrderedSource);
527
    while (1) {
H
Haojun Liao 已提交
528
      SSDataBlock* pBlock = pHandle->fetchfp(source->param);
529 530 531 532 533
      if (pBlock == NULL) {
        break;
      }

      if (pHandle->pDataBlock == NULL) {
534
        pHandle->pDataBlock = createOneDataBlock(pBlock, false);
535 536
      }

537
      int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock, pHandle->pIndexMap);
538 539 540 541 542 543 544 545
      if (code != 0) {
        return code;
      }

      size_t size = blockDataGetSize(pHandle->pDataBlock);
      if (size > sortBufSize) {
        // Perform the in-memory sort and then flush data in the buffer into disk.
        int64_t p = taosGetTimestampUs();
H
Haojun Liao 已提交
546
        blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
547 548 549 550 551 552 553 554 555 556 557 558

        int64_t el = taosGetTimestampUs() - p;
        pHandle->sortElapsed += el;

        doAddToBuf(pHandle->pDataBlock, pHandle);
      }
    }

    if (pHandle->pDataBlock->info.rows > 0) {
      size_t size = blockDataGetSize(pHandle->pDataBlock);

      // Perform the in-memory sort and then flush data in the buffer into disk.
559 560
      int64_t p = taosGetTimestampUs();

H
Haojun Liao 已提交
561
      blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
562

563 564 565
      int64_t el = taosGetTimestampUs() - p;
      pHandle->sortElapsed += el;

566 567 568 569 570 571 572 573 574 575 576 577
      // All sorted data can fit in memory, external memory sort is not needed. Return to directly
      if (size <= sortBufSize) {
        pHandle->cmpParam.numOfSources = 1;
        pHandle->inMemSort = true;

        pHandle->tupleHandle.rowIndex  = -1;
        pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
        return 0;
      } else {
        doAddToBuf(pHandle->pDataBlock, pHandle);
      }
    }
H
Haojun Liao 已提交
578 579 580 581 582
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
583
int32_t tsortOpen(SSortHandle* pHandle) {
H
Haojun Liao 已提交
584 585 586 587 588 589 590 591 592 593 594 595 596
  if (pHandle->opened) {
    return 0;
  }

  if (pHandle->fetchfp == NULL || pHandle->comparFn == NULL) {
    return -1;
  }

  pHandle->opened = true;

  int32_t code = createInitialSortedMultiSources(pHandle);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
597 598 599
  }

  // do internal sort
H
Haojun Liao 已提交
600
  code = doInternalMergeSort(pHandle);
601 602 603 604 605
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  int32_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
H
Haojun Liao 已提交
606 607 608 609
  if (pHandle->pBuf != NULL) {
    ASSERT(numOfSources <= getNumOfInMemBufPages(pHandle->pBuf));
  }

H
Haojun Liao 已提交
610 611 612 613
  if (numOfSources == 0) {
    return 0;
  }

614 615 616 617 618
  code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, 0, numOfSources - 1, pHandle);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

619
  return tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
620 621
}

H
Haojun Liao 已提交
622
int32_t tsortClose(SSortHandle* pHandle) {
623
  // do nothing
624
  return TSDB_CODE_SUCCESS;
625 626
}

H
Haojun Liao 已提交
627
int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fp) {
628
  pHandle->fetchfp = fp;
629
  return TSDB_CODE_SUCCESS;
630 631
}

H
Haojun Liao 已提交
632
int32_t tsortSetComparFp(SSortHandle* pHandle, _sort_merge_compar_fn_t fp) {
633
  pHandle->comparFn = fp;
634
  return TSDB_CODE_SUCCESS;
635 636
}

H
Haojun Liao 已提交
637
STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
638 639 640 641
  if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) {
    return NULL;
  }

642
  // All the data are hold in the buffer, no external sort is invoked.
643 644 645 646 647 648 649 650 651 652 653
  if (pHandle->inMemSort) {
    pHandle->tupleHandle.rowIndex += 1;
    if (pHandle->tupleHandle.rowIndex == pHandle->pDataBlock->info.rows) {
      pHandle->numOfCompletedSources = 1;
      return NULL;
    }

    return &pHandle->tupleHandle;
  }

  int32_t index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
wmmhello's avatar
wmmhello 已提交
654
  SSortSource *pSource = pHandle->cmpParam.pSources[index];
655 656 657 658 659 660 661 662 663 664 665 666 667

  if (pHandle->needAdjust) {
    int32_t code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = code;
      return NULL;
    }
  }

  if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) {
    return NULL;
  }

668
  // Get the adjusted value after the loser tree is updated.
669 670 671 672 673 674 675 676 677 678 679 680 681 682
  index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
  pSource = pHandle->cmpParam.pSources[index];

  assert(pSource->src.pBlock != NULL);

  pHandle->tupleHandle.rowIndex = pSource->src.rowIndex;
  pHandle->tupleHandle.pBlock = pSource->src.pBlock;

  pHandle->needAdjust = true;
  pSource->src.rowIndex += 1;

  return &pHandle->tupleHandle;
}

H
Haojun Liao 已提交
683
bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colIndex) {
684 685
  SColumnInfoData* pColInfoSrc = taosArrayGet(pVHandle->pBlock->pDataBlock, colIndex);
  return colDataIsNull(pColInfoSrc, 0, pVHandle->rowIndex, NULL);
686 687
}

H
Haojun Liao 已提交
688
void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) {
689
  SColumnInfoData* pColInfo = TARRAY_GET_ELEM(pVHandle->pBlock->pDataBlock, colIndex);
690
  return colDataGetData(pColInfo, pVHandle->rowIndex);
691
}