tscLocalMerge.c 56.0 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "os.h"
S
slguan 已提交
17
#include "tlosertree.h"
H
hzcheng 已提交
18
#include "tscUtil.h"
H
hjxilinx 已提交
19
#include "tschemautil.h"
S
slguan 已提交
20
#include "tsclient.h"
H
hzcheng 已提交
21
#include "tutil.h"
S
slguan 已提交
22
#include "tscLog.h"
23
#include "tscLocalMerge.h"
H
hzcheng 已提交
24 25

typedef struct SCompareParam {
S
slguan 已提交
26 27
  SLocalDataSource **pLocalData;
  tOrderDescriptor * pDesc;
28
  int32_t            num;
S
slguan 已提交
29
  int32_t            groupOrderType;
H
hzcheng 已提交
30 31 32 33 34 35
} SCompareParam;

int32_t treeComparator(const void *pLeft, const void *pRight, void *param) {
  int32_t pLeftIdx = *(int32_t *)pLeft;
  int32_t pRightIdx = *(int32_t *)pRight;

S
slguan 已提交
36 37 38
  SCompareParam *    pParam = (SCompareParam *)param;
  tOrderDescriptor * pDesc = pParam->pDesc;
  SLocalDataSource **pLocalData = pParam->pLocalData;
H
hzcheng 已提交
39 40 41 42 43 44 45 46 47 48

  /* this input is exhausted, set the special value to denote this */
  if (pLocalData[pLeftIdx]->rowIdx == -1) {
    return 1;
  }

  if (pLocalData[pRightIdx]->rowIdx == -1) {
    return -1;
  }

49
  if (pParam->groupOrderType == TSDB_ORDER_DESC) {  // desc
50 51
    return compare_d(pDesc, pParam->num, pLocalData[pLeftIdx]->rowIdx, pLocalData[pLeftIdx]->filePage.data,
                     pParam->num, pLocalData[pRightIdx]->rowIdx, pLocalData[pRightIdx]->filePage.data);
H
hzcheng 已提交
52
  } else {
53 54
    return compare_a(pDesc, pParam->num, pLocalData[pLeftIdx]->rowIdx, pLocalData[pLeftIdx]->filePage.data,
                     pParam->num, pLocalData[pRightIdx]->rowIdx, pLocalData[pRightIdx]->filePage.data);
H
hzcheng 已提交
55 56 57
  }
}

H
hjLiao 已提交
58
static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDescriptor *pDesc) {
H
hzcheng 已提交
59 60
  /*
   * the fields and offset attributes in pCmd and pModel may be different due to
S
slguan 已提交
61
   * merge requirement. So, the final result in pRes structure is formatted in accordance with the pCmd object.
H
hzcheng 已提交
62
   */
H
hjxilinx 已提交
63
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
64 65 66
  size_t size = tscSqlExprNumOfExprs(pQueryInfo);
  
  for (int32_t i = 0; i < size; ++i) {
H
hzcheng 已提交
67
    SQLFunctionCtx *pCtx = &pReducer->pCtx[i];
H
hjxilinx 已提交
68 69 70
    SSqlExpr *      pExpr = tscSqlExprGet(pQueryInfo, i);

    pCtx->aOutputBuf =
71
        pReducer->pResultBuf->data + pExpr->offset * pReducer->resColModel->capacity;
72
    pCtx->order = pQueryInfo->order.order;
H
hjxilinx 已提交
73
    pCtx->functionId = pExpr->functionId;
S
slguan 已提交
74 75

    // input buffer hold only one point data
H
hjxilinx 已提交
76 77 78
    int16_t  offset = getColumnModelOffset(pDesc->pColumnModel, i);
    SSchema *pSchema = getColumnModelSchema(pDesc->pColumnModel, i);

H
hjxilinx 已提交
79
    pCtx->aInputElemBuf = pReducer->pTempBuffer->data + offset;
H
hzcheng 已提交
80 81

    // input data format comes from pModel
H
hjxilinx 已提交
82 83
    pCtx->inputType = pSchema->type;
    pCtx->inputBytes = pSchema->bytes;
H
hzcheng 已提交
84 85

    // output data format yet comes from pCmd.
H
hjxilinx 已提交
86 87
    pCtx->outputBytes = pExpr->resBytes;
    pCtx->outputType = pExpr->resType;
H
hzcheng 已提交
88 89 90

    pCtx->startOffset = 0;
    pCtx->size = 1;
S
slguan 已提交
91
    pCtx->hasNull = true;
H
hzcheng 已提交
92 93
    pCtx->currentStage = SECONDARY_STAGE_MERGE;

S
slguan 已提交
94
    // for top/bottom function, the output of timestamp is the first column
H
hjxilinx 已提交
95
    int32_t functionId = pExpr->functionId;
S
slguan 已提交
96 97
    if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
      pCtx->ptsOutputBuf = pReducer->pCtx[0].aOutputBuf;
98
      pCtx->param[2].i64Key = pQueryInfo->order.order;
H
hjLiao 已提交
99
      pCtx->param[2].nType  = TSDB_DATA_TYPE_BIGINT;
100
      pCtx->param[1].i64Key = pQueryInfo->order.orderColId;
H
hzcheng 已提交
101
    }
S
slguan 已提交
102 103

    SResultInfo *pResInfo = &pReducer->pResInfo[i];
104
    pResInfo->bufLen = pExpr->interBytes;
H
hjLiao 已提交
105
    pResInfo->interResultBuf = calloc(1, (size_t) pResInfo->bufLen);
S
slguan 已提交
106 107 108 109 110

    pCtx->resultInfo = &pReducer->pResInfo[i];
    pCtx->resultInfo->superTableQ = true;
  }

H
hjxilinx 已提交
111 112
  int16_t          n = 0;
  int16_t          tagLen = 0;
H
hjxilinx 已提交
113
  SQLFunctionCtx **pTagCtx = calloc(pQueryInfo->fieldsInfo.numOfOutput, POINTER_BYTES);
S
slguan 已提交
114

H
hjxilinx 已提交
115
  SQLFunctionCtx *pCtx = NULL;
H
hjxilinx 已提交
116
  for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
117
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
S
slguan 已提交
118 119 120 121 122 123 124 125
    if (pExpr->functionId == TSDB_FUNC_TAG_DUMMY || pExpr->functionId == TSDB_FUNC_TS_DUMMY) {
      tagLen += pExpr->resBytes;
      pTagCtx[n++] = &pReducer->pCtx[i];
    } else if ((aAggs[pExpr->functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
      pCtx = &pReducer->pCtx[i];
    }
  }

B
Bomin Zhang 已提交
126
  if (n == 0 || pCtx == NULL) {
S
slguan 已提交
127 128 129 130 131
    free(pTagCtx);
  } else {
    pCtx->tagInfo.pTagCtxList = pTagCtx;
    pCtx->tagInfo.numOfTagCols = n;
    pCtx->tagInfo.tagsLen = tagLen;
H
hzcheng 已提交
132 133 134
  }
}

135
static SFillColInfo* createFillColInfo(SQueryInfo* pQueryInfo) {
S
TD-1057  
Shengliang Guan 已提交
136
  int32_t numOfCols = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
137 138 139 140 141 142 143
  int32_t offset = 0;
  
  SFillColInfo* pFillCol = calloc(numOfCols, sizeof(SFillColInfo));
  for(int32_t i = 0; i < numOfCols; ++i) {
    SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
    
    pFillCol[i].col.bytes  = pExpr->resBytes;
S
TD-1057  
Shengliang Guan 已提交
144
    pFillCol[i].col.type   = (int8_t)pExpr->resType;
145 146 147
    pFillCol[i].flag       = pExpr->colInfo.flag;
    pFillCol[i].col.offset = offset;
    pFillCol[i].functionId = pExpr->functionId;
148
    pFillCol[i].fillVal.i  = pQueryInfo->fillVal[i];
149 150 151 152 153 154
    offset += pExpr->resBytes;
  }
  
  return pFillCol;
}

H
hzcheng 已提交
155
void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
H
hjLiao 已提交
156 157 158 159
                           SColumnModel *finalmodel, SSqlObj* pSql) {
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;
  
160
  if (pMemBuffer == NULL) {
H
hjLiao 已提交
161 162 163
    tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
  
    tscError("%p pMemBuffer is NULL", pMemBuffer);
164
    pRes->code = TSDB_CODE_TSC_APP_ERROR;
165 166 167 168
    return;
  }
 
  if (pDesc->pColumnModel == NULL) {
H
hzcheng 已提交
169 170
    tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);

H
hjLiao 已提交
171
    tscError("%p no local buffer or intermediate result format model", pSql);
172
    pRes->code = TSDB_CODE_TSC_APP_ERROR;
H
hzcheng 已提交
173 174 175 176 177 178 179
    return;
  }

  int32_t numOfFlush = 0;
  for (int32_t i = 0; i < numOfBuffer; ++i) {
    int32_t len = pMemBuffer[i]->fileMeta.flushoutData.nLength;
    if (len == 0) {
180
      tscDebug("%p no data retrieved from orderOfVnode:%d", pSql, i + 1);
H
hzcheng 已提交
181 182 183 184 185 186 187 188
      continue;
    }

    numOfFlush += len;
  }

  if (numOfFlush == 0 || numOfBuffer == 0) {
    tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
189
    tscDebug("%p retrieved no data", pSql);
S
slguan 已提交
190

H
hzcheng 已提交
191 192 193
    return;
  }

H
hjxilinx 已提交
194
  if (pDesc->pColumnModel->capacity >= pMemBuffer[0]->pageSize) {
H
hjLiao 已提交
195
    tscError("%p Invalid value of buffer capacity %d and page size %d ", pSql, pDesc->pColumnModel->capacity,
H
hjxilinx 已提交
196
             pMemBuffer[0]->pageSize);
S
slguan 已提交
197 198

    tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
199
    pRes->code = TSDB_CODE_TSC_APP_ERROR;
H
hzcheng 已提交
200 201 202
    return;
  }

H
hjLiao 已提交
203 204 205
  size_t size = sizeof(SLocalReducer) + POINTER_BYTES * numOfFlush;
  
  SLocalReducer *pReducer = (SLocalReducer *) calloc(1, size);
H
hzcheng 已提交
206
  if (pReducer == NULL) {
H
hjLiao 已提交
207
    tscError("%p failed to create local merge structure, out of memory", pSql);
S
slguan 已提交
208 209

    tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, numOfBuffer);
210
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
211 212 213 214
    return;
  }

  pReducer->pExtMemBuffer = pMemBuffer;
S
slguan 已提交
215
  pReducer->pLocalDataSrc = (SLocalDataSource **)&pReducer[1];
H
hzcheng 已提交
216 217 218 219
  assert(pReducer->pLocalDataSrc != NULL);

  pReducer->numOfBuffer = numOfFlush;
  pReducer->numOfVnode = numOfBuffer;
220

H
hzcheng 已提交
221
  pReducer->pDesc = pDesc;
222
  tscDebug("%p the number of merged leaves is: %d", pSql, pReducer->numOfBuffer);
H
hzcheng 已提交
223 224 225 226 227 228

  int32_t idx = 0;
  for (int32_t i = 0; i < numOfBuffer; ++i) {
    int32_t numOfFlushoutInFile = pMemBuffer[i]->fileMeta.flushoutData.nLength;

    for (int32_t j = 0; j < numOfFlushoutInFile; ++j) {
H
hjLiao 已提交
229 230 231
      SLocalDataSource *ds = (SLocalDataSource *)malloc(sizeof(SLocalDataSource) + pMemBuffer[0]->pageSize);
      if (ds == NULL) {
        tscError("%p failed to create merge structure", pSql);
232
        pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
233
        taosTFree(pReducer);
H
hzcheng 已提交
234 235
        return;
      }
H
hjLiao 已提交
236 237
      
      pReducer->pLocalDataSrc[idx] = ds;
H
hzcheng 已提交
238

H
hjLiao 已提交
239 240
      ds->pMemBuffer = pMemBuffer[i];
      ds->flushoutIdx = j;
241
      ds->filePage.num = 0;
H
hjLiao 已提交
242 243
      ds->pageId = 0;
      ds->rowIdx = 0;
H
hzcheng 已提交
244

245
      tscDebug("%p load data from disk into memory, orderOfVnode:%d, total:%d", pSql, i + 1, idx + 1);
H
hjLiao 已提交
246
      tExtMemBufferLoadData(pMemBuffer[i], &(ds->filePage), j, 0);
H
hzcheng 已提交
247
#ifdef _DEBUG_VIEW
248
      printf("load data page into mem for build loser tree: %" PRIu64 " rows\n", ds->filePage.num);
H
hzcheng 已提交
249
      SSrcColumnInfo colInfo[256] = {0};
H
hjxilinx 已提交
250
      SQueryInfo *   pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
251 252

      tscGetSrcColumnInfo(colInfo, pQueryInfo);
H
hzcheng 已提交
253

254
      tColModelDisplayEx(pDesc->pColumnModel, ds->filePage.data, ds->filePage.num,
H
hjxilinx 已提交
255
                         pMemBuffer[0]->numOfElemsPerPage, colInfo);
H
hzcheng 已提交
256
#endif
H
hjLiao 已提交
257
      
258
      if (ds->filePage.num == 0) {  // no data in this flush, the index does not increase
259
        tscDebug("%p flush data is empty, ignore %d flush record", pSql, idx);
S
Shengliang Guan 已提交
260
        taosTFree(ds);
H
hzcheng 已提交
261 262
        continue;
      }
H
hjLiao 已提交
263
      
H
hzcheng 已提交
264 265 266
      idx += 1;
    }
  }
H
hjLiao 已提交
267 268
  
  // no data actually, no need to merge result.
H
hzcheng 已提交
269
  if (idx == 0) {
S
Shengliang Guan 已提交
270
    taosTFree(pReducer);
H
hzcheng 已提交
271 272 273 274 275 276
    return;
  }

  pReducer->numOfBuffer = idx;

  SCompareParam *param = malloc(sizeof(SCompareParam));
B
Bomin Zhang 已提交
277
  if (param == NULL) {
S
Shengliang Guan 已提交
278
    taosTFree(pReducer);
B
Bomin Zhang 已提交
279 280
    return;
  }
H
hzcheng 已提交
281 282
  param->pLocalData = pReducer->pLocalDataSrc;
  param->pDesc = pReducer->pDesc;
283
  param->num = pReducer->pLocalDataSrc[0]->pMemBuffer->numOfElemsPerPage;
H
hjxilinx 已提交
284 285
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);

286
  param->groupOrderType = pQueryInfo->groupbyExpr.orderType;
H
Haojun Liao 已提交
287
  pReducer->orderPrjOnSTable = tscOrderedProjectionQueryOnSTable(pQueryInfo, 0);
H
hzcheng 已提交
288 289 290

  pRes->code = tLoserTreeCreate(&pReducer->pLoserTree, pReducer->numOfBuffer, param, treeComparator);
  if (pReducer->pLoserTree == NULL || pRes->code != 0) {
S
Shengliang Guan 已提交
291 292
    taosTFree(param);
    taosTFree(pReducer);
H
hzcheng 已提交
293 294 295 296 297
    return;
  }

  // the input data format follows the old format, but output in a new format.
  // so, all the input must be parsed as old format
H
hjLiao 已提交
298
  pReducer->pCtx = (SQLFunctionCtx *)calloc(tscSqlExprNumOfExprs(pQueryInfo), sizeof(SQLFunctionCtx));
H
hzcheng 已提交
299 300
  pReducer->rowSize = pMemBuffer[0]->nElemSize;

H
hjxilinx 已提交
301 302
  tscRestoreSQLFuncForSTableQuery(pQueryInfo);
  tscFieldInfoUpdateOffset(pQueryInfo);
H
hzcheng 已提交
303

H
hjxilinx 已提交
304
  if (pReducer->rowSize > pMemBuffer[0]->pageSize) {
H
hzcheng 已提交
305 306 307 308 309 310 311 312
    assert(false);  // todo fixed row size is larger than the minimum page size;
  }

  pReducer->hasPrevRow = false;
  pReducer->hasUnprocessedRow = false;

  pReducer->prevRowOfInput = (char *)calloc(1, pReducer->rowSize);

S
slguan 已提交
313
  // used to keep the latest input row
H
hzcheng 已提交
314 315 316 317
  pReducer->pTempBuffer = (tFilePage *)calloc(1, pReducer->rowSize + sizeof(tFilePage));
  pReducer->discardData = (tFilePage *)calloc(1, pReducer->rowSize + sizeof(tFilePage));
  pReducer->discard = false;

H
hjxilinx 已提交
318
  pReducer->nResultBufSize = pMemBuffer[0]->pageSize * 16;
H
hzcheng 已提交
319
  pReducer->pResultBuf = (tFilePage *)calloc(1, pReducer->nResultBufSize + sizeof(tFilePage));
H
hjxilinx 已提交
320

H
Haojun Liao 已提交
321
  pReducer->finalRowSize = tscGetResRowLength(pQueryInfo->exprList);
H
hzcheng 已提交
322
  pReducer->resColModel = finalmodel;
B
Bomin Zhang 已提交
323
  pReducer->resColModel->capacity = pReducer->nResultBufSize;
324

325
  assert(pReducer->finalRowSize > 0);
B
Bomin Zhang 已提交
326 327 328
  if (pReducer->finalRowSize > 0) {
    pReducer->resColModel->capacity /= pReducer->finalRowSize;
  }
H
Haojun Liao 已提交
329
  assert(pReducer->finalRowSize <= pReducer->rowSize);
H
hzcheng 已提交
330

H
hjxilinx 已提交
331
  pReducer->pFinalRes = calloc(1, pReducer->rowSize * pReducer->resColModel->capacity);
H
hzcheng 已提交
332

H
hjxilinx 已提交
333
  if (pReducer->pTempBuffer == NULL || pReducer->discardData == NULL || pReducer->pResultBuf == NULL ||
334
      /*pReducer->pBufForInterpo == NULL || */pReducer->pFinalRes == NULL || pReducer->prevRowOfInput == NULL) {
S
Shengliang Guan 已提交
335 336 337 338 339 340 341 342
    taosTFree(pReducer->pTempBuffer);
    taosTFree(pReducer->discardData);
    taosTFree(pReducer->pResultBuf);
    taosTFree(pReducer->pFinalRes);
    taosTFree(pReducer->prevRowOfInput);
    taosTFree(pReducer->pLoserTree);
    taosTFree(param);
    taosTFree(pReducer);
343
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
344 345
    return;
  }
H
hjLiao 已提交
346
  
347 348 349 350
  size_t numOfCols = tscSqlExprNumOfExprs(pQueryInfo);
  
  pReducer->pTempBuffer->num = 0;
  pReducer->pResInfo = calloc(numOfCols, sizeof(SResultInfo));
H
hzcheng 已提交
351

352
  tscCreateResPointerInfo(pRes, pQueryInfo);
H
hjLiao 已提交
353
  tscInitSqlContext(pCmd, pReducer, pDesc);
H
hzcheng 已提交
354

H
hjxilinx 已提交
355 356
  // we change the capacity of schema to denote that there is only one row in temp buffer
  pReducer->pDesc->pColumnModel->capacity = 1;
H
hjxilinx 已提交
357 358

  // restore the limitation value at the last stage
359 360 361 362
  if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
    pQueryInfo->limit.limit = pQueryInfo->clauseLimit;
    pQueryInfo->limit.offset = pQueryInfo->prjOffset;
  }
H
hjxilinx 已提交
363

S
TD-1057  
Shengliang Guan 已提交
364
  pReducer->offset = (int32_t)pQueryInfo->limit.offset;
H
hjxilinx 已提交
365

H
hzcheng 已提交
366 367 368
  pRes->pLocalReducer = pReducer;
  pRes->numOfGroups = 0;

369
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
370
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
371
  
372
  TSKEY stime = MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey);
H
hjxilinx 已提交
373
  int64_t revisedSTime =
H
Haojun Liao 已提交
374
      taosGetIntervalStartTimestamp(stime, pQueryInfo->slidingTime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, tinfo.precision);
375 376 377 378
  
  if (pQueryInfo->fillType != TSDB_FILL_NONE) {
    SFillColInfo* pFillCol = createFillColInfo(pQueryInfo);
    pReducer->pFillInfo = taosInitFillInfo(pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols,
S
TD-1057  
Shengliang Guan 已提交
379
                                           4096, (int32_t)numOfCols, pQueryInfo->slidingTime, pQueryInfo->slidingTimeUnit,
H
Haojun Liao 已提交
380
                                           tinfo.precision, pQueryInfo->fillType, pFillCol);
381
  }
H
hzcheng 已提交
382

H
hjxilinx 已提交
383
  int32_t startIndex = pQueryInfo->fieldsInfo.numOfOutput - pQueryInfo->groupbyExpr.numOfGroupCols;
H
hzcheng 已提交
384

H
Haojun Liao 已提交
385
  if (pQueryInfo->groupbyExpr.numOfGroupCols > 0 && pReducer->pFillInfo != NULL) {
386
    pReducer->pFillInfo->pTags[0] = (char *)pReducer->pFillInfo->pTags + POINTER_BYTES * pQueryInfo->groupbyExpr.numOfGroupCols;
387
    for (int32_t i = 1; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) {
H
hjxilinx 已提交
388
      SSchema *pSchema = getColumnModelSchema(pReducer->resColModel, startIndex + i - 1);
389
      pReducer->pFillInfo->pTags[i] = pSchema->bytes + pReducer->pFillInfo->pTags[i - 1];
H
hzcheng 已提交
390 391
    }
  } else {
392 393 394
    if (pReducer->pFillInfo != NULL) {
      assert(pReducer->pFillInfo->pTags == NULL);
    }
H
hzcheng 已提交
395 396 397 398 399
  }
}

static int32_t tscFlushTmpBufferImpl(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage,
                                     int32_t orderType) {
400
  if (pPage->num == 0) {
H
hzcheng 已提交
401 402 403
    return 0;
  }

404
  assert(pPage->num <= pDesc->pColumnModel->capacity);
H
hzcheng 已提交
405 406

  // sort before flush to disk, the data must be consecutively put on tFilePage.
H
Haojun Liao 已提交
407
  if (pDesc->orderInfo.numOfCols > 0) {
S
TD-1057  
Shengliang Guan 已提交
408
    tColDataQSort(pDesc, (int32_t)pPage->num, 0, (int32_t)pPage->num - 1, pPage->data, orderType);
H
hzcheng 已提交
409 410 411
  }

#ifdef _DEBUG_VIEW
412 413
  printf("%" PRIu64 " rows data flushed to disk after been sorted:\n", pPage->num);
  tColModelDisplay(pDesc->pColumnModel, pPage->data, pPage->num, pPage->num);
H
hzcheng 已提交
414 415 416
#endif

  // write to cache after being sorted
S
TD-1057  
Shengliang Guan 已提交
417
  if (tExtMemBufferPut(pMemoryBuf, pPage->data, (int32_t)pPage->num) < 0) {
H
hzcheng 已提交
418 419 420 421
    tscError("failed to save data in temporary buffer");
    return -1;
  }

422
  pPage->num = 0;
H
hzcheng 已提交
423 424 425 426
  return 0;
}

int32_t tscFlushTmpBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, int32_t orderType) {
427 428 429
  int32_t ret = 0;
  if ((ret = tscFlushTmpBufferImpl(pMemoryBuf, pDesc, pPage, orderType)) != 0) {
    return ret;
H
hzcheng 已提交
430 431
  }

432 433
  if ((ret = tExtMemBufferFlush(pMemoryBuf)) != 0) {
    return ret;
H
hzcheng 已提交
434 435 436 437 438 439 440
  }

  return 0;
}

int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, void *data,
                     int32_t numOfRows, int32_t orderType) {
441
  SColumnModel *pModel = pDesc->pColumnModel;
H
hjxilinx 已提交
442

443
  if (pPage->num + numOfRows <= pModel->capacity) {
444
    tColModelAppend(pModel, pPage, data, 0, numOfRows, numOfRows);
H
hzcheng 已提交
445 446 447
    return 0;
  }

448
  // current buffer is overflow, flush data to extensive buffer
S
TD-1057  
Shengliang Guan 已提交
449
  int32_t numOfRemainEntries = pModel->capacity - (int32_t)pPage->num;
H
hzcheng 已提交
450 451
  tColModelAppend(pModel, pPage, data, 0, numOfRemainEntries, numOfRows);

452
  // current buffer is full, need to flushed to disk
453
  assert(pPage->num == pModel->capacity);
454 455 456
  int32_t code = tscFlushTmpBuffer(pMemoryBuf, pDesc, pPage, orderType);
  if (code != 0) {
    return code;
H
hzcheng 已提交
457 458 459 460 461 462
  }

  int32_t remain = numOfRows - numOfRemainEntries;

  while (remain > 0) {
    int32_t numOfWriteElems = 0;
H
hjxilinx 已提交
463 464
    if (remain > pModel->capacity) {
      numOfWriteElems = pModel->capacity;
H
hzcheng 已提交
465 466 467 468 469 470
    } else {
      numOfWriteElems = remain;
    }

    tColModelAppend(pModel, pPage, data, numOfRows - remain, numOfWriteElems, numOfRows);

471
    if (pPage->num == pModel->capacity) {
472 473
      if ((code = tscFlushTmpBuffer(pMemoryBuf, pDesc, pPage, orderType)) != TSDB_CODE_SUCCESS) {
        return code;
H
hzcheng 已提交
474 475
      }
    } else {
476
      pPage->num = numOfWriteElems;
H
hzcheng 已提交
477 478 479 480 481 482 483 484 485 486 487 488 489 490
    }

    remain -= numOfWriteElems;
    numOfRemainEntries += numOfWriteElems;
  }

  return 0;
}

void tscDestroyLocalReducer(SSqlObj *pSql) {
  if (pSql == NULL) {
    return;
  }

491
  tscDebug("%p start to free local reducer", pSql);
H
hzcheng 已提交
492 493
  SSqlRes *pRes = &(pSql->res);
  if (pRes->pLocalReducer == NULL) {
494
    tscDebug("%p local reducer has been freed, abort", pSql);
H
hzcheng 已提交
495 496 497
    return;
  }

H
hjxilinx 已提交
498 499 500
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);

H
hzcheng 已提交
501
  // there is no more result, so we release all allocated resource
H
hjxilinx 已提交
502
  SLocalReducer *pLocalReducer = (SLocalReducer *)atomic_exchange_ptr(&pRes->pLocalReducer, NULL);
H
hzcheng 已提交
503 504
  if (pLocalReducer != NULL) {
    int32_t status = 0;
weixin_48148422's avatar
weixin_48148422 已提交
505
    while ((status = atomic_val_compare_exchange_32(&pLocalReducer->status, TSC_LOCALREDUCE_READY,
H
hzcheng 已提交
506 507
                                                    TSC_LOCALREDUCE_TOBE_FREED)) == TSC_LOCALREDUCE_IN_PROGRESS) {
      taosMsleep(100);
508
      tscDebug("%p waiting for delete procedure, status: %d", pSql, status);
H
hzcheng 已提交
509 510
    }

H
Haojun Liao 已提交
511
    pLocalReducer->pFillInfo = taosDestoryFillInfo(pLocalReducer->pFillInfo);
H
hzcheng 已提交
512

S
slguan 已提交
513
    if (pLocalReducer->pCtx != NULL) {
H
hjxilinx 已提交
514
      for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
S
slguan 已提交
515
        SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[i];
H
hjxilinx 已提交
516

H
hjxilinx 已提交
517
        tVariantDestroy(&pCtx->tag);
518
        if (pCtx->tagInfo.pTagCtxList != NULL) {
S
Shengliang Guan 已提交
519
          taosTFree(pCtx->tagInfo.pTagCtxList);
520
        }
S
slguan 已提交
521
      }
H
hjxilinx 已提交
522

S
Shengliang Guan 已提交
523
      taosTFree(pLocalReducer->pCtx);
S
slguan 已提交
524 525
    }

S
Shengliang Guan 已提交
526
    taosTFree(pLocalReducer->prevRowOfInput);
H
hzcheng 已提交
527

S
Shengliang Guan 已提交
528 529
    taosTFree(pLocalReducer->pTempBuffer);
    taosTFree(pLocalReducer->pResultBuf);
H
hzcheng 已提交
530

S
slguan 已提交
531
    if (pLocalReducer->pResInfo != NULL) {
H
hjxilinx 已提交
532
      for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
S
Shengliang Guan 已提交
533
        taosTFree(pLocalReducer->pResInfo[i].interResultBuf);
S
slguan 已提交
534 535
      }

S
Shengliang Guan 已提交
536
      taosTFree(pLocalReducer->pResInfo);
S
slguan 已提交
537 538
    }

H
hzcheng 已提交
539
    if (pLocalReducer->pLoserTree) {
S
Shengliang Guan 已提交
540 541
      taosTFree(pLocalReducer->pLoserTree->param);
      taosTFree(pLocalReducer->pLoserTree);
H
hzcheng 已提交
542 543
    }

S
Shengliang Guan 已提交
544 545
    taosTFree(pLocalReducer->pFinalRes);
    taosTFree(pLocalReducer->discardData);
H
hzcheng 已提交
546 547 548 549

    tscLocalReducerEnvDestroy(pLocalReducer->pExtMemBuffer, pLocalReducer->pDesc, pLocalReducer->resColModel,
                              pLocalReducer->numOfVnode);
    for (int32_t i = 0; i < pLocalReducer->numOfBuffer; ++i) {
S
Shengliang Guan 已提交
550
      taosTFree(pLocalReducer->pLocalDataSrc[i]);
H
hzcheng 已提交
551 552 553 554 555 556
    }

    pLocalReducer->numOfBuffer = 0;
    pLocalReducer->numOfCompleted = 0;
    free(pLocalReducer);
  } else {
557
    tscDebug("%p already freed or another free function is invoked", pSql);
H
hzcheng 已提交
558 559
  }

560
  tscDebug("%p free local reducer finished", pSql);
H
hzcheng 已提交
561 562
}

H
hjxilinx 已提交
563
static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCmd, SColumnModel *pModel) {
H
hjxilinx 已提交
564 565 566
  int32_t     numOfGroupByCols = 0;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);

567 568
  if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) {
    numOfGroupByCols = pQueryInfo->groupbyExpr.numOfGroupCols;
H
hzcheng 已提交
569 570 571
  }

  // primary timestamp column is involved in final result
572
  if (pQueryInfo->intervalTime != 0 || tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
H
hzcheng 已提交
573 574 575 576 577
    numOfGroupByCols++;
  }

  int32_t *orderIdx = (int32_t *)calloc(numOfGroupByCols, sizeof(int32_t));
  if (orderIdx == NULL) {
578
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
579 580 581
  }

  if (numOfGroupByCols > 0) {
H
hjxilinx 已提交
582
    int32_t startCols = pQueryInfo->fieldsInfo.numOfOutput - pQueryInfo->groupbyExpr.numOfGroupCols;
H
hzcheng 已提交
583 584

    // tags value locate at the last columns
585
    for (int32_t i = 0; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) {
H
hzcheng 已提交
586 587 588
      orderIdx[i] = startCols++;
    }

589
    if (pQueryInfo->intervalTime != 0) {
S
slguan 已提交
590
      // the first column is the timestamp, handles queries like "interval(10m) group by tags"
H
hzcheng 已提交
591 592 593 594
      orderIdx[numOfGroupByCols - 1] = PRIMARYKEY_TIMESTAMP_COL_INDEX;
    }
  }

595
  *pOrderDesc = tOrderDesCreate(orderIdx, numOfGroupByCols, pModel, pQueryInfo->order.order);
S
Shengliang Guan 已提交
596
  taosTFree(orderIdx);
H
hzcheng 已提交
597 598

  if (*pOrderDesc == NULL) {
599
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
600 601 602 603 604
  } else {
    return TSDB_CODE_SUCCESS;
  }
}

S
slguan 已提交
605
bool isSameGroup(SSqlCmd *pCmd, SLocalReducer *pReducer, char *pPrev, tFilePage *tmpBuffer) {
H
hjxilinx 已提交
606 607
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);

S
slguan 已提交
608
  // disable merge procedure for column projection query
H
Haojun Liao 已提交
609
  int16_t functionId = pReducer->pCtx[0].functionId;
610
  assert(functionId != TSDB_FUNC_ARITHM);
H
Haojun Liao 已提交
611
  if (pReducer->orderPrjOnSTable) {
612 613
    return true;
  }
H
hjxilinx 已提交
614

S
slguan 已提交
615 616
  if (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_ARITHM) {
    return false;
H
hzcheng 已提交
617 618 619
  }

  tOrderDescriptor *pOrderDesc = pReducer->pDesc;
H
Haojun Liao 已提交
620
  SColumnOrderInfo* orderInfo = &pOrderDesc->orderInfo;
621

S
slguan 已提交
622
  // no group by columns, all data belongs to one group
H
Haojun Liao 已提交
623
  int32_t numOfCols = orderInfo->numOfCols;
S
slguan 已提交
624
  if (numOfCols <= 0) {
H
hzcheng 已提交
625 626 627
    return true;
  }

H
Haojun Liao 已提交
628 629 630 631 632
  if (orderInfo->pData[numOfCols - 1] == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
    /*
     * super table interval query
     * if the order columns is the primary timestamp, all result data belongs to one group
     */
633
    assert(pQueryInfo->intervalTime > 0);
H
Haojun Liao 已提交
634 635 636
    if (numOfCols == 1) {
      return true;
    }
S
slguan 已提交
637
  } else {  // simple group by query
638
    assert(pQueryInfo->intervalTime == 0);
S
slguan 已提交
639 640
  }

H
hzcheng 已提交
641
  // only one row exists
H
Haojun Liao 已提交
642 643
  int32_t index = orderInfo->pData[0];
  int32_t offset = (pOrderDesc->pColumnModel)->pFields[index].offset;
644

H
Haojun Liao 已提交
645 646
  int32_t ret = memcmp(pPrev + offset, tmpBuffer->data + offset, pOrderDesc->pColumnModel->rowSize - offset);
  return ret == 0;
H
hzcheng 已提交
647 648 649
}

int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pOrderDesc,
H
hjxilinx 已提交
650
                                 SColumnModel **pFinalModel, uint32_t nBufferSizes) {
H
hzcheng 已提交
651 652 653
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

H
hjxilinx 已提交
654
  SSchema *     pSchema = NULL;
H
hjxilinx 已提交
655
  SColumnModel *pModel = NULL;
H
hzcheng 已提交
656 657
  *pFinalModel = NULL;

H
hjxilinx 已提交
658
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
659
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
660

661
  (*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pSql->numOfSubs);
H
hzcheng 已提交
662 663
  if (*pMemBuffer == NULL) {
    tscError("%p failed to allocate memory", pSql);
664
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
665 666
    return pRes->code;
  }
H
hjxilinx 已提交
667 668 669 670
  
  size_t size = tscSqlExprNumOfExprs(pQueryInfo);
  
  pSchema = (SSchema *)calloc(1, sizeof(SSchema) * size);
H
hzcheng 已提交
671 672
  if (pSchema == NULL) {
    tscError("%p failed to allocate memory", pSql);
673
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
674 675 676 677
    return pRes->code;
  }

  int32_t rlen = 0;
H
hjxilinx 已提交
678
  for (int32_t i = 0; i < size; ++i) {
679
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
680 681

    pSchema[i].bytes = pExpr->resBytes;
S
TD-1057  
Shengliang Guan 已提交
682
    pSchema[i].type = (int8_t)pExpr->resType;
H
hzcheng 已提交
683 684 685 686

    rlen += pExpr->resBytes;
  }

L
lihui 已提交
687
  int32_t capacity = 0;
H
hjxilinx 已提交
688 689 690
  if (rlen != 0) {
    capacity = nBufferSizes / rlen;
  }
H
hjxilinx 已提交
691
  
S
TD-1057  
Shengliang Guan 已提交
692
  pModel = createColumnModel(pSchema, (int32_t)size, capacity);
H
hzcheng 已提交
693

H
Haojun Liao 已提交
694 695 696 697 698 699
  int32_t pg = DEFAULT_PAGE_SIZE;
  int32_t overhead = sizeof(tFilePage);
  while((pg - overhead) < pModel->rowSize * 2) {
    pg *= 2;
  }

H
hjxilinx 已提交
700
  size_t numOfSubs = pTableMetaInfo->vgroupList->numOfVgroups;
701
  for (int32_t i = 0; i < numOfSubs; ++i) {
H
Haojun Liao 已提交
702
    (*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pg, pModel);
703 704
    (*pMemBuffer)[i]->flushModel = MULTIPLE_APPEND_MODEL;
  }
H
hzcheng 已提交
705 706

  if (createOrderDescriptor(pOrderDesc, pCmd, pModel) != TSDB_CODE_SUCCESS) {
707
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
708
    taosTFree(pSchema);
H
hzcheng 已提交
709 710 711
    return pRes->code;
  }

H
hjxilinx 已提交
712
  // final result depends on the fields number
H
hjxilinx 已提交
713 714
  memset(pSchema, 0, sizeof(SSchema) * size);
  for (int32_t i = 0; i < size; ++i) {
H
hjxilinx 已提交
715 716
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);

717
    SSchema *p1 = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, pExpr->colInfo.colIndex);
H
hjxilinx 已提交
718

719
    int32_t inter = 0;
H
hjxilinx 已提交
720 721
    int16_t type = -1;
    int16_t bytes = 0;
H
hjxilinx 已提交
722 723 724 725 726 727 728 729 730

    //    if ((pExpr->functionId >= TSDB_FUNC_FIRST_DST && pExpr->functionId <= TSDB_FUNC_LAST_DST) ||
    //        (pExpr->functionId >= TSDB_FUNC_SUM && pExpr->functionId <= TSDB_FUNC_MAX) ||
    //        pExpr->functionId == TSDB_FUNC_LAST_ROW) {
    // the final result size and type in the same as query on single table.
    // so here, set the flag to be false;

    int32_t functionId = pExpr->functionId;
    if (functionId >= TSDB_FUNC_TS && functionId <= TSDB_FUNC_DIFF) {
H
hjxilinx 已提交
731 732
      type = pModel->pFields[i].field.type;
      bytes = pModel->pFields[i].field.bytes;
H
hjxilinx 已提交
733 734 735 736 737 738 739 740
    } else {
      if (functionId == TSDB_FUNC_FIRST_DST) {
        functionId = TSDB_FUNC_FIRST;
      } else if (functionId == TSDB_FUNC_LAST_DST) {
        functionId = TSDB_FUNC_LAST;
      }

      getResultDataInfo(p1->type, p1->bytes, functionId, 0, &type, &bytes, &inter, 0, false);
H
hjxilinx 已提交
741
    }
H
hzcheng 已提交
742

S
TD-1057  
Shengliang Guan 已提交
743
    pSchema[i].type = (uint8_t)type;
H
hjxilinx 已提交
744 745
    pSchema[i].bytes = bytes;
    strcpy(pSchema[i].name, pModel->pFields[i].field.name);
H
hzcheng 已提交
746
  }
H
hjxilinx 已提交
747
  
S
TD-1057  
Shengliang Guan 已提交
748
  *pFinalModel = createColumnModel(pSchema, (int32_t)size, capacity);
S
Shengliang Guan 已提交
749
  taosTFree(pSchema);
H
hzcheng 已提交
750 751 752 753 754 755 756 757 758 759

  return TSDB_CODE_SUCCESS;
}

/**
 * @param pMemBuffer
 * @param pDesc
 * @param pFinalModel
 * @param numOfVnodes
 */
H
hjxilinx 已提交
760
void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel,
H
hzcheng 已提交
761
                               int32_t numOfVnodes) {
H
hjxilinx 已提交
762
  destroyColumnModel(pFinalModel);
H
hzcheng 已提交
763 764
  tOrderDescDestroy(pDesc);
  for (int32_t i = 0; i < numOfVnodes; ++i) {
H
hjxilinx 已提交
765
    pMemBuffer[i] = destoryExtMemBuffer(pMemBuffer[i]);
H
hzcheng 已提交
766 767
  }

S
Shengliang Guan 已提交
768
  taosTFree(pMemBuffer);
H
hzcheng 已提交
769 770 771 772 773 774 775 776 777
}

/**
 *
 * @param pLocalReducer
 * @param pOneInterDataSrc
 * @param treeList
 * @return the number of remain input source. if ret == 0, all data has been handled
 */
S
slguan 已提交
778
int32_t loadNewDataFromDiskFor(SLocalReducer *pLocalReducer, SLocalDataSource *pOneInterDataSrc,
H
hzcheng 已提交
779 780 781 782
                               bool *needAdjustLoserTree) {
  pOneInterDataSrc->rowIdx = 0;
  pOneInterDataSrc->pageId += 1;

S
TD-1057  
Shengliang Guan 已提交
783
  if ((uint32_t)pOneInterDataSrc->pageId <
H
hzcheng 已提交
784 785 786 787 788 789
      pOneInterDataSrc->pMemBuffer->fileMeta.flushoutData.pFlushoutInfo[pOneInterDataSrc->flushoutIdx].numOfPages) {
    tExtMemBufferLoadData(pOneInterDataSrc->pMemBuffer, &(pOneInterDataSrc->filePage), pOneInterDataSrc->flushoutIdx,
                          pOneInterDataSrc->pageId);

#if defined(_DEBUG_VIEW)
    printf("new page load to buffer\n");
H
hjxilinx 已提交
790
    tColModelDisplay(pOneInterDataSrc->pMemBuffer->pColumnModel, pOneInterDataSrc->filePage.data,
791
                     pOneInterDataSrc->filePage.num, pOneInterDataSrc->pMemBuffer->pColumnModel->capacity);
H
hzcheng 已提交
792 793 794 795 796 797 798 799 800 801 802 803 804
#endif
    *needAdjustLoserTree = true;
  } else {
    pLocalReducer->numOfCompleted += 1;

    pOneInterDataSrc->rowIdx = -1;
    pOneInterDataSrc->pageId = -1;
    *needAdjustLoserTree = true;
  }

  return pLocalReducer->numOfBuffer;
}

S
slguan 已提交
805 806
void adjustLoserTreeFromNewData(SLocalReducer *pLocalReducer, SLocalDataSource *pOneInterDataSrc,
                                SLoserTreeInfo *pTree) {
H
hzcheng 已提交
807 808 809 810 811
  /*
   * load a new data page into memory for intermediate dataset source,
   * since it's last record in buffer has been chosen to be processed, as the winner of loser-tree
   */
  bool needToAdjust = true;
812
  if (pOneInterDataSrc->filePage.num <= pOneInterDataSrc->rowIdx) {
H
hzcheng 已提交
813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837
    loadNewDataFromDiskFor(pLocalReducer, pOneInterDataSrc, &needToAdjust);
  }

  /*
   * adjust loser tree otherwise, according to new candidate data
   * if the loser tree is rebuild completed, we do not need to adjust
   */
  if (needToAdjust) {
    int32_t leafNodeIdx = pTree->pNode[0].index + pLocalReducer->numOfBuffer;

#ifdef _DEBUG_VIEW
    printf("before adjust:\t");
    tLoserTreeDisplay(pTree);
#endif

    tLoserTreeAdjust(pTree, leafNodeIdx);

#ifdef _DEBUG_VIEW
    printf("\nafter adjust:\t");
    tLoserTreeDisplay(pTree);
    printf("\n");
#endif
  }
}

H
Haojun Liao 已提交
838
void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQueryInfo, SFillInfo *pFillInfo) {
H
hjxilinx 已提交
839
  // discard following dataset in the same group and reset the interpolation information
H
hjxilinx 已提交
840
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
841

H
hjxilinx 已提交
842
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
843

H
Haojun Liao 已提交
844 845 846
  if (pFillInfo != NULL) {
    int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey;
    int64_t revisedSTime =
H
Haojun Liao 已提交
847
        taosGetIntervalStartTimestamp(stime, pQueryInfo->slidingTime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, tinfo.precision);
H
Haojun Liao 已提交
848 849 850
  
    taosResetFillInfo(pFillInfo, revisedSTime);
  }
H
hzcheng 已提交
851 852

  pLocalReducer->discard = true;
853
  pLocalReducer->discardData->num = 0;
H
hzcheng 已提交
854

H
hjxilinx 已提交
855
  SColumnModel *pModel = pLocalReducer->pDesc->pColumnModel;
H
hzcheng 已提交
856 857 858
  tColModelAppend(pModel, pLocalReducer->discardData, pLocalReducer->prevRowOfInput, 0, 1, 1);
}

H
hjxilinx 已提交
859 860
static void reversedCopyFromInterpolationToDstBuf(SQueryInfo *pQueryInfo, SSqlRes *pRes, tFilePage **pResPages,
                                                  SLocalReducer *pLocalReducer) {
H
hjxilinx 已提交
861
  assert(0);
H
hjxilinx 已提交
862 863 864 865
  size_t size = tscSqlExprNumOfExprs(pQueryInfo);
  
  for (int32_t i = 0; i < size; ++i) {
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
hzcheng 已提交
866

867
    int32_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
H
hjxilinx 已提交
868
    assert(offset == getColumnModelOffset(pLocalReducer->resColModel, i));
H
hzcheng 已提交
869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884

    char *src = pResPages[i]->data + (pRes->numOfRows - 1) * pField->bytes;
    char *dst = pRes->data + pRes->numOfRows * offset;

    for (int32_t j = 0; j < pRes->numOfRows; ++j) {
      memcpy(dst, src, (size_t)pField->bytes);
      dst += pField->bytes;
      src -= pField->bytes;
    }
  }
}

/*
 * Note: pRes->pLocalReducer may be null, due to the fact that "tscDestroyLocalReducer" is called
 * by "interuptHandler" function in shell
 */
H
Haojun Liao 已提交
885
static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneOutput) {
H
hjxilinx 已提交
886 887
  SSqlCmd *   pCmd = &pSql->cmd;
  SSqlRes *   pRes = &pSql->res;
888
  
H
hjxilinx 已提交
889 890 891
  tFilePage * pFinalDataPage = pLocalReducer->pResultBuf;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);

H
Haojun Liao 已提交
892
  // no interval query, no fill operation
893
  if (pQueryInfo->intervalTime == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) {
H
hzcheng 已提交
894
    pRes->data = pLocalReducer->pFinalRes;
895
    pRes->numOfRows = pFinalDataPage->num;
H
hzcheng 已提交
896

897 898
    if (pQueryInfo->limit.offset > 0) {
      if (pQueryInfo->limit.offset < pRes->numOfRows) {
S
TD-1057  
Shengliang Guan 已提交
899 900
        int32_t prevSize = (int32_t)pFinalDataPage->num;
        tColModelErase(pLocalReducer->resColModel, pFinalDataPage, prevSize, 0, (int32_t)pQueryInfo->limit.offset - 1);
H
hzcheng 已提交
901 902

        /* remove the hole in column model */
S
slguan 已提交
903
        tColModelCompact(pLocalReducer->resColModel, pFinalDataPage, prevSize);
H
hzcheng 已提交
904

905 906
        pRes->numOfRows -= pQueryInfo->limit.offset;
        pQueryInfo->limit.offset = 0;
H
hzcheng 已提交
907
      } else {
908
        pQueryInfo->limit.offset -= pRes->numOfRows;
H
hzcheng 已提交
909 910 911 912
        pRes->numOfRows = 0;
      }
    }

H
Haojun Liao 已提交
913 914 915
    pRes->numOfRowsGroup += pRes->numOfRows;

    if (pQueryInfo->limit.limit >= 0 && pRes->numOfRowsGroup > pQueryInfo->limit.limit) {
H
hzcheng 已提交
916
      /* impose the limitation of output rows on the final result */
S
Shengliang Guan 已提交
917
      int32_t prevSize = (int32_t)pFinalDataPage->num;
S
Shengliang Guan 已提交
918
      int32_t overflow = (int32_t)(pRes->numOfRowsGroup - pQueryInfo->limit.limit);
H
Haojun Liao 已提交
919
      assert(overflow < pRes->numOfRows);
H
hzcheng 已提交
920

H
Haojun Liao 已提交
921
      pRes->numOfRowsGroup = pQueryInfo->limit.limit;
H
Haojun Liao 已提交
922 923
      pRes->numOfRows -= overflow;
      pFinalDataPage->num -= overflow;
H
hzcheng 已提交
924

S
slguan 已提交
925
      tColModelCompact(pLocalReducer->resColModel, pFinalDataPage, prevSize);
H
hzcheng 已提交
926 927

      /* set remain data to be discarded, and reset the interpolation information */
928
      savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pLocalReducer->pFillInfo);
H
hzcheng 已提交
929 930
    }

H
Haojun Liao 已提交
931
    memcpy(pRes->data, pFinalDataPage->data, pRes->numOfRows * pLocalReducer->finalRowSize);
H
Haojun Liao 已提交
932 933

    pRes->numOfClauseTotal += pRes->numOfRows;
934
    pFinalDataPage->num = 0;
H
hzcheng 已提交
935 936 937
    return;
  }

938 939
  SFillInfo *pFillInfo = pLocalReducer->pFillInfo;
  int64_t actualETime = MAX(pQueryInfo->window.skey, pQueryInfo->window.ekey);
H
hzcheng 已提交
940

H
hjxilinx 已提交
941 942 943
  tFilePage **pResPages = malloc(POINTER_BYTES * pQueryInfo->fieldsInfo.numOfOutput);
  for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
hjxilinx 已提交
944
    pResPages[i] = calloc(1, sizeof(tFilePage) + pField->bytes * pLocalReducer->resColModel->capacity);
H
hzcheng 已提交
945
  }
H
Haojun Liao 已提交
946

H
hzcheng 已提交
947
  while (1) {
948
    int64_t newRows = taosGenerateDataBlock(pFillInfo, pResPages, pLocalReducer->resColModel->capacity);
H
hzcheng 已提交
949

950 951
    if (pQueryInfo->limit.offset < newRows) {
      newRows -= pQueryInfo->limit.offset;
H
hzcheng 已提交
952

953
      if (pQueryInfo->limit.offset > 0) {
H
hjxilinx 已提交
954 955
        for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
          TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
hjxilinx 已提交
956 957
          memmove(pResPages[i]->data, pResPages[i]->data + pField->bytes * pQueryInfo->limit.offset,
                  newRows * pField->bytes);
H
hzcheng 已提交
958 959 960 961 962 963
        }
      }

      pRes->data = pLocalReducer->pFinalRes;
      pRes->numOfRows = newRows;

964
      pQueryInfo->limit.offset = 0;
H
hzcheng 已提交
965 966
      break;
    } else {
967
      pQueryInfo->limit.offset -= newRows;
H
hzcheng 已提交
968 969
      pRes->numOfRows = 0;

970
      int32_t rpoints = taosNumOfRemainRows(pFillInfo);
H
hzcheng 已提交
971
      if (rpoints <= 0) {
972
        if (!doneOutput) { // reduce procedure has not completed yet, but current results for fill are exhausted
H
hzcheng 已提交
973 974 975 976
          break;
        }

        /* all output for current group are completed */
S
TD-1057  
Shengliang Guan 已提交
977
        int32_t totalRemainRows = (int32_t)getFilledNumOfRes(pFillInfo, actualETime, pLocalReducer->resColModel->capacity);
H
hzcheng 已提交
978 979 980 981 982 983 984 985
        if (totalRemainRows <= 0) {
          break;
        }
      }
    }
  }

  if (pRes->numOfRows > 0) {
H
Haojun Liao 已提交
986
    if (pQueryInfo->limit.limit >= 0 && pRes->numOfRows > pQueryInfo->limit.limit) {
S
Shengliang Guan 已提交
987
      int32_t overflow = (int32_t)(pRes->numOfRows - pQueryInfo->limit.limit);
H
Haojun Liao 已提交
988 989
      pRes->numOfRows -= overflow;
      pFinalDataPage->num -= overflow;
H
hzcheng 已提交
990

H
Haojun Liao 已提交
991 992
      assert(pRes->numOfRows >= 0 && pFinalDataPage->num > 0);

H
hzcheng 已提交
993
      /* set remain data to be discarded, and reset the interpolation information */
994
      savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pFillInfo);
H
hzcheng 已提交
995 996
    }

997
    if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
H
hjxilinx 已提交
998 999
      for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
        TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
hjxilinx 已提交
1000
        int16_t     offset = getColumnModelOffset(pLocalReducer->resColModel, i);
H
hjxilinx 已提交
1001
        memcpy(pRes->data + offset * pRes->numOfRows, pResPages[i]->data, pField->bytes * pRes->numOfRows);
H
hzcheng 已提交
1002
      }
H
hjxilinx 已提交
1003
    } else {  // todo bug??
1004
      reversedCopyFromInterpolationToDstBuf(pQueryInfo, pRes, pResPages, pLocalReducer);
H
hzcheng 已提交
1005
    }
H
Haojun Liao 已提交
1006 1007 1008

    pRes->numOfRowsGroup += pRes->numOfRows;
    pRes->numOfClauseTotal += pRes->numOfRows;
H
hzcheng 已提交
1009 1010
  }

1011
  pFinalDataPage->num = 0;
H
hjxilinx 已提交
1012
  for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
S
Shengliang Guan 已提交
1013
    taosTFree(pResPages[i]);
H
hzcheng 已提交
1014
  }
1015
  
S
Shengliang Guan 已提交
1016
  taosTFree(pResPages);
H
hzcheng 已提交
1017 1018
}

S
slguan 已提交
1019
static void savePreviousRow(SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) {
H
hjxilinx 已提交
1020
  SColumnModel *pColumnModel = pLocalReducer->pDesc->pColumnModel;
1021
  assert(pColumnModel->capacity == 1 && tmpBuffer->num == 1);
H
hzcheng 已提交
1022 1023

  // copy to previous temp buffer
H
hjxilinx 已提交
1024
  for (int32_t i = 0; i < pColumnModel->numOfCols; ++i) {
H
hjxilinx 已提交
1025 1026 1027
    SSchema *pSchema = getColumnModelSchema(pColumnModel, i);
    int16_t  offset = getColumnModelOffset(pColumnModel, i);

H
hjxilinx 已提交
1028
    memcpy(pLocalReducer->prevRowOfInput + offset, tmpBuffer->data + offset, pSchema->bytes);
H
hzcheng 已提交
1029 1030
  }

1031
  tmpBuffer->num = 0;
H
hzcheng 已提交
1032 1033 1034
  pLocalReducer->hasPrevRow = true;
}

H
hjxilinx 已提交
1035
static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalReducer *pLocalReducer, bool needInit) {
S
slguan 已提交
1036
  // the tag columns need to be set before all functions execution
H
hjxilinx 已提交
1037
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1038

H
hjxilinx 已提交
1039 1040
  size_t size = tscSqlExprNumOfExprs(pQueryInfo);
  for (int32_t j = 0; j < size; ++j) {
S
slguan 已提交
1041
    SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[j];
H
hzcheng 已提交
1042

S
slguan 已提交
1043
    // tags/tags_dummy function, the tag field of SQLFunctionCtx is from the input buffer
H
Haojun Liao 已提交
1044
    int32_t functionId = pCtx->functionId;
H
hjxilinx 已提交
1045
    if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS_DUMMY) {
S
slguan 已提交
1046
      tVariantDestroy(&pCtx->tag);
1047 1048 1049 1050 1051 1052 1053 1054
      char* input = pCtx->aInputElemBuf;
      
      if (pCtx->inputType == TSDB_DATA_TYPE_BINARY || pCtx->inputType == TSDB_DATA_TYPE_NCHAR) {
        assert(varDataLen(input) <= pCtx->inputBytes);
        tVariantCreateFromBinary(&pCtx->tag, varDataVal(input), varDataLen(input), pCtx->inputType);
      } else {
        tVariantCreateFromBinary(&pCtx->tag, input, pCtx->inputBytes, pCtx->inputType);
      }
H
Haojun Liao 已提交
1055 1056 1057
    } else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
      SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, j);
      pCtx->param[0].i64Key = pExpr->param[0].i64Key;
H
hzcheng 已提交
1058 1059
    }

S
slguan 已提交
1060 1061 1062
    pCtx->currentStage = SECONDARY_STAGE_MERGE;

    if (needInit) {
H
Haojun Liao 已提交
1063
      aAggs[pCtx->functionId].init(pCtx);
S
slguan 已提交
1064 1065 1066
    }
  }

H
hjxilinx 已提交
1067
  for (int32_t j = 0; j < size; ++j) {
H
Haojun Liao 已提交
1068
    int32_t functionId = pLocalReducer->pCtx[j].functionId;
S
slguan 已提交
1069 1070 1071 1072 1073 1074 1075
    if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
      continue;
    }

    aAggs[functionId].distSecondaryMergeFunc(&pLocalReducer->pCtx[j]);
  }
}
H
hzcheng 已提交
1076

H
hjxilinx 已提交
1077
static void handleUnprocessedRow(SSqlCmd *pCmd, SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) {
S
slguan 已提交
1078 1079 1080
  if (pLocalReducer->hasUnprocessedRow) {
    pLocalReducer->hasUnprocessedRow = false;
    doExecuteSecondaryMerge(pCmd, pLocalReducer, true);
S
slguan 已提交
1081
    savePreviousRow(pLocalReducer, tmpBuffer);
H
hzcheng 已提交
1082 1083 1084
  }
}

1085
static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx) {
H
hzcheng 已提交
1086
  int64_t maxOutput = 0;
H
hjxilinx 已提交
1087 1088 1089
  
  size_t size = tscSqlExprNumOfExprs(pQueryInfo);
  for (int32_t j = 0; j < size; ++j) {
H
hzcheng 已提交
1090 1091 1092 1093
    /*
     * ts, tag, tagprj function can not decide the output number of current query
     * the number of output result is decided by main output
     */
H
Haojun Liao 已提交
1094
    int32_t functionId = pCtx[j].functionId;
H
hzcheng 已提交
1095 1096 1097
    if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ) {
      continue;
    }
H
hjxilinx 已提交
1098

H
Haojun Liao 已提交
1099 1100 1101
    SResultInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
    if (maxOutput < pResInfo->numOfRes) {
      maxOutput = pResInfo->numOfRes;
H
hzcheng 已提交
1102 1103
    }
  }
H
hjxilinx 已提交
1104

H
hzcheng 已提交
1105 1106 1107 1108
  return maxOutput;
}

/*
S
slguan 已提交
1109
 * in handling the top/bottom query, which produce more than one rows result,
H
hzcheng 已提交
1110 1111
 * the tsdb_func_tags only fill the first row of results, the remain rows need to
 * filled with the same result, which is the tags, specified in group by clause
S
slguan 已提交
1112
 *
H
hzcheng 已提交
1113
 */
H
hjxilinx 已提交
1114
static void fillMultiRowsOfTagsVal(SQueryInfo *pQueryInfo, int32_t numOfRes, SLocalReducer *pLocalReducer) {
S
slguan 已提交
1115
  int32_t maxBufSize = 0;  // find the max tags column length to prepare the buffer
H
hjxilinx 已提交
1116 1117 1118
  size_t size = tscSqlExprNumOfExprs(pQueryInfo);
  
  for (int32_t k = 0; k < size; ++k) {
1119
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, k);
S
slguan 已提交
1120
    if (maxBufSize < pExpr->resBytes && pExpr->functionId == TSDB_FUNC_TAG) {
H
hzcheng 已提交
1121 1122 1123 1124 1125 1126
      maxBufSize = pExpr->resBytes;
    }
  }

  assert(maxBufSize >= 0);

H
hjxilinx 已提交
1127
  char *buf = malloc((size_t)maxBufSize);
H
hjxilinx 已提交
1128
  for (int32_t k = 0; k < size; ++k) {
H
Haojun Liao 已提交
1129 1130
    SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[k];
    if (pCtx->functionId != TSDB_FUNC_TAG) {
S
slguan 已提交
1131 1132 1133
      continue;
    }

H
hzcheng 已提交
1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146
    int32_t inc = numOfRes - 1;  // tsdb_func_tag function only produce one row of result
    memset(buf, 0, (size_t)maxBufSize);
    memcpy(buf, pCtx->aOutputBuf, (size_t)pCtx->outputBytes);

    for (int32_t i = 0; i < inc; ++i) {
      pCtx->aOutputBuf += pCtx->outputBytes;
      memcpy(pCtx->aOutputBuf, buf, (size_t)pCtx->outputBytes);
    }
  }

  free(buf);
}

H
hjxilinx 已提交
1147
int32_t finalizeRes(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) {
H
hjxilinx 已提交
1148 1149 1150
  size_t size = tscSqlExprNumOfExprs(pQueryInfo);
  
  for (int32_t k = 0; k < size; ++k) {
H
Haojun Liao 已提交
1151 1152
    SQLFunctionCtx* pCtx = &pLocalReducer->pCtx[k];
    aAggs[pCtx->functionId].xFinalize(pCtx);
H
hzcheng 已提交
1153 1154 1155 1156
  }

  pLocalReducer->hasPrevRow = false;

1157
  int32_t numOfRes = (int32_t)getNumOfResultLocal(pQueryInfo, pLocalReducer->pCtx);
1158
  pLocalReducer->pResultBuf->num += numOfRes;
H
hzcheng 已提交
1159

1160
  fillMultiRowsOfTagsVal(pQueryInfo, numOfRes, pLocalReducer);
H
hzcheng 已提交
1161 1162 1163 1164 1165 1166 1167 1168 1169 1170
  return numOfRes;
}

/*
 * points merge:
 * points are merged according to the sort info, which is tags columns and timestamp column.
 * In case of points without either tags columns or timestamp, such as
 * results generated by simple aggregation function, we merge them all into one points
 * *Exception*: column projection query, required no merge procedure
 */
H
hjxilinx 已提交
1171
bool needToMerge(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer, tFilePage *tmpBuffer) {
H
hzcheng 已提交
1172
  int32_t ret = 0;  // merge all result by default
1173

H
Haojun Liao 已提交
1174
  int16_t functionId = pLocalReducer->pCtx[0].functionId;
1175 1176 1177

  // todo opt performance
  if ((/*functionId == TSDB_FUNC_PRJ || */functionId == TSDB_FUNC_ARITHM) || (tscIsProjectionQueryOnSTable(pQueryInfo, 0))) {  // column projection query
H
hzcheng 已提交
1178 1179 1180
    ret = 1;                                                            // disable merge procedure
  } else {
    tOrderDescriptor *pDesc = pLocalReducer->pDesc;
H
Haojun Liao 已提交
1181
    if (pDesc->orderInfo.numOfCols > 0) {
1182
      if (pDesc->tsOrder == TSDB_ORDER_ASC) {  // asc
H
hzcheng 已提交
1183
        // todo refactor comparator
S
slguan 已提交
1184
        ret = compare_a(pLocalReducer->pDesc, 1, 0, pLocalReducer->prevRowOfInput, 1, 0, tmpBuffer->data);
H
hzcheng 已提交
1185
      } else {  // desc
S
slguan 已提交
1186
        ret = compare_d(pLocalReducer->pDesc, 1, 0, pLocalReducer->prevRowOfInput, 1, 0, tmpBuffer->data);
H
hzcheng 已提交
1187 1188 1189 1190 1191 1192 1193 1194
      }
    }
  }

  /* if ret == 0, means the result belongs to the same group */
  return (ret == 0);
}

H
hjxilinx 已提交
1195
static bool reachGroupResultLimit(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
1196
  return (pRes->numOfGroups >= pQueryInfo->slimit.limit && pQueryInfo->slimit.limit >= 0);
S
slguan 已提交
1197 1198 1199 1200 1201 1202
}

static bool saveGroupResultInfo(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

H
hjxilinx 已提交
1203
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
Haojun Liao 已提交
1204 1205 1206 1207

  if (pRes->numOfRowsGroup > 0) {
    pRes->numOfGroups += 1;
  }
S
slguan 已提交
1208

S
slguan 已提交
1209
  // the output group is limited by the slimit clause
1210
  if (reachGroupResultLimit(pQueryInfo, pRes)) {
S
slguan 已提交
1211 1212 1213 1214
    return true;
  }

  //    pRes->pGroupRec = realloc(pRes->pGroupRec, pRes->numOfGroups*sizeof(SResRec));
H
hzcheng 已提交
1215
  //    pRes->pGroupRec[pRes->numOfGroups-1].numOfRows = pRes->numOfRows;
H
Haojun Liao 已提交
1216
  //    pRes->pGroupRec[pRes->numOfGroups-1].numOfClauseTotal = pRes->numOfClauseTotal;
S
slguan 已提交
1217 1218

  return false;
H
hzcheng 已提交
1219 1220
}

S
slguan 已提交
1221 1222 1223 1224 1225 1226 1227 1228
/**
 *
 * @param pSql
 * @param pLocalReducer
 * @param noMoreCurrentGroupRes
 * @return if current group is skipped, return false, and do NOT record it into pRes->numOfGroups
 */
bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCurrentGroupRes) {
H
hjxilinx 已提交
1229 1230 1231 1232 1233
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

  SQueryInfo *  pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  tFilePage *   pResBuf = pLocalReducer->pResultBuf;
H
hjxilinx 已提交
1234
  SColumnModel *pModel = pLocalReducer->resColModel;
H
hzcheng 已提交
1235

S
slguan 已提交
1236 1237 1238
  pRes->code = TSDB_CODE_SUCCESS;

  /*
1239
   * Ignore the output of the current group since this group is skipped by user
S
slguan 已提交
1240 1241
   * We set the numOfRows to be 0 and discard the possible remain results.
   */
1242
  if (pQueryInfo->slimit.offset > 0) {
S
slguan 已提交
1243
    pRes->numOfRows = 0;
1244
    pQueryInfo->slimit.offset -= 1;
S
slguan 已提交
1245
    pLocalReducer->discard = !noMoreCurrentGroupRes;
H
Haojun Liao 已提交
1246 1247 1248 1249 1250 1251

    if (pLocalReducer->discard) {
      SColumnModel *pInternModel = pLocalReducer->pDesc->pColumnModel;
      tColModelAppend(pInternModel, pLocalReducer->discardData, pLocalReducer->pTempBuffer->data, 0, 1, 1);
    }

S
slguan 已提交
1252 1253 1254
    return false;
  }

H
hjxilinx 已提交
1255
  tColModelCompact(pModel, pResBuf, pModel->capacity);
H
hzcheng 已提交
1256 1257 1258

#ifdef _DEBUG_VIEW
  printf("final result before interpo:\n");
1259
//  tColModelDisplay(pLocalReducer->resColModel, pLocalReducer->pBufForInterpo, pResBuf->num, pResBuf->num);
H
hzcheng 已提交
1260
#endif
1261 1262
  
  SFillInfo* pFillInfo = pLocalReducer->pFillInfo;
H
Haojun Liao 已提交
1263
  if (pFillInfo != NULL) {
S
TD-1057  
Shengliang Guan 已提交
1264
    taosFillSetStartInfo(pFillInfo, (int32_t)pResBuf->num, pQueryInfo->window.ekey);
H
Haojun Liao 已提交
1265 1266
    taosFillCopyInputDataFromOneFilePage(pFillInfo, pResBuf);
  }
1267
  
H
Haojun Liao 已提交
1268
  doFillResult(pSql, pLocalReducer, noMoreCurrentGroupRes);
S
slguan 已提交
1269
  return true;
H
hzcheng 已提交
1270 1271
}

H
hjxilinx 已提交
1272
void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) {  // reset output buffer to the beginning
H
hjxilinx 已提交
1273
  for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
H
hzcheng 已提交
1274
    pLocalReducer->pCtx[i].aOutputBuf =
H
hjxilinx 已提交
1275
        pLocalReducer->pResultBuf->data + tscFieldInfoGetOffset(pQueryInfo, i) * pLocalReducer->resColModel->capacity;
H
hzcheng 已提交
1276 1277 1278 1279 1280
  }

  memset(pLocalReducer->pResultBuf, 0, pLocalReducer->nResultBufSize + sizeof(tFilePage));
}

S
slguan 已提交
1281
static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer *pLocalReducer) {
S
slguan 已提交
1282
  // In handling data in other groups, we need to reset the interpolation information for a new group data
H
hzcheng 已提交
1283
  pRes->numOfRows = 0;
H
Haojun Liao 已提交
1284
  pRes->numOfRowsGroup = 0;
H
hjxilinx 已提交
1285 1286 1287

  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);

1288
  pQueryInfo->limit.offset = pLocalReducer->offset;
H
hzcheng 已提交
1289

1290
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1291
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1292 1293
  
  int8_t precision = tinfo.precision;
H
hjxilinx 已提交
1294

S
slguan 已提交
1295
  // for group result interpolation, do not return if not data is generated
1296 1297
  if (pQueryInfo->fillType != TSDB_FILL_NONE) {
    TSKEY skey = MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey);
H
hjxilinx 已提交
1298
    int64_t newTime =
H
Haojun Liao 已提交
1299
        taosGetIntervalStartTimestamp(skey, pQueryInfo->slidingTime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, precision);
1300
    taosResetFillInfo(pLocalReducer->pFillInfo, newTime);
H
hzcheng 已提交
1301 1302 1303
  }
}

S
slguan 已提交
1304 1305 1306 1307
static bool isAllSourcesCompleted(SLocalReducer *pLocalReducer) {
  return (pLocalReducer->numOfBuffer == pLocalReducer->numOfCompleted);
}

1308
static bool doBuildFilledResultForGroup(SSqlObj *pSql) {
H
hzcheng 已提交
1309 1310 1311
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

1312 1313 1314
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
  SLocalReducer *pLocalReducer = pRes->pLocalReducer;
  SFillInfo *pFillInfo = pLocalReducer->pFillInfo;
S
slguan 已提交
1315

1316 1317
  if (pFillInfo != NULL && taosNumOfRemainRows(pFillInfo) > 0) {
    assert(pQueryInfo->fillType != TSDB_FILL_NONE);
H
hzcheng 已提交
1318

S
slguan 已提交
1319
    tFilePage *pFinalDataBuf = pLocalReducer->pResultBuf;
H
Haojun Liao 已提交
1320
    int64_t etime = *(int64_t *)(pFinalDataBuf->data + TSDB_KEYSIZE * (pFillInfo->numOfRows - 1));
H
hzcheng 已提交
1321

1322
    // the first column must be the timestamp column
S
TD-1057  
Shengliang Guan 已提交
1323
    int32_t rows = (int32_t)getFilledNumOfRes(pFillInfo, etime, pLocalReducer->resColModel->capacity);
H
Haojun Liao 已提交
1324
    if (rows > 0) {  // do fill gap
H
Haojun Liao 已提交
1325
      doFillResult(pSql, pLocalReducer, false);
S
slguan 已提交
1326
    }
H
hzcheng 已提交
1327

S
slguan 已提交
1328 1329 1330
    return true;
  } else {
    return false;
H
hzcheng 已提交
1331
  }
S
slguan 已提交
1332
}
H
hzcheng 已提交
1333

S
slguan 已提交
1334 1335 1336 1337
static bool doHandleLastRemainData(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

1338 1339
  SLocalReducer *pLocalReducer = pRes->pLocalReducer;
  SFillInfo     *pFillInfo = pLocalReducer->pFillInfo;
H
hzcheng 已提交
1340

S
slguan 已提交
1341
  bool prevGroupCompleted = (!pLocalReducer->discard) && pLocalReducer->hasUnprocessedRow;
S
slguan 已提交
1342

H
Haojun Liao 已提交
1343
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
1344

S
slguan 已提交
1345 1346
  if ((isAllSourcesCompleted(pLocalReducer) && !pLocalReducer->hasPrevRow) || pLocalReducer->pLocalDataSrc[0] == NULL ||
      prevGroupCompleted) {
1347
    // if fillType == TSDB_FILL_NONE, return directly
H
Haojun Liao 已提交
1348 1349
    if (pQueryInfo->fillType != TSDB_FILL_NONE &&
      ((pRes->numOfRowsGroup < pQueryInfo->limit.limit && pQueryInfo->limit.limit > 0) || (pQueryInfo->limit.limit < 0))) {
H
hjxilinx 已提交
1350
      int64_t etime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.ekey : pQueryInfo->window.skey;
H
hzcheng 已提交
1351

S
TD-1057  
Shengliang Guan 已提交
1352
      int32_t rows = (int32_t)getFilledNumOfRes(pFillInfo, etime, pLocalReducer->resColModel->capacity);
H
hzcheng 已提交
1353
      if (rows > 0) {  // do interpo
H
Haojun Liao 已提交
1354
        doFillResult(pSql, pLocalReducer, true);
H
hzcheng 已提交
1355 1356 1357
      }
    }

S
slguan 已提交
1358 1359 1360 1361 1362 1363 1364 1365
    /*
     * 1. numOfRows == 0, means no interpolation results are generated.
     * 2. if all local data sources are consumed, and no un-processed rows exist.
     *
     * No results will be generated and query completed.
     */
    if (pRes->numOfRows > 0 || (isAllSourcesCompleted(pLocalReducer) && (!pLocalReducer->hasUnprocessedRow))) {
      return true;
H
hzcheng 已提交
1366
    }
S
slguan 已提交
1367 1368 1369 1370 1371 1372 1373

    // start to process result for a new group and save the result info of previous group
    if (saveGroupResultInfo(pSql)) {
      return true;
    }

    resetEnvForNewResultset(pRes, pCmd, pLocalReducer);
H
hzcheng 已提交
1374 1375
  }

S
slguan 已提交
1376 1377
  return false;
}
H
hzcheng 已提交
1378

H
hjxilinx 已提交
1379 1380 1381 1382
static void doProcessResultInNextWindow(SSqlObj *pSql, int32_t numOfRes) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

S
slguan 已提交
1383
  SLocalReducer *pLocalReducer = pRes->pLocalReducer;
H
hjxilinx 已提交
1384
  SQueryInfo *   pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
1385
  size_t size = tscSqlExprNumOfExprs(pQueryInfo);
H
hjxilinx 已提交
1386

H
hjxilinx 已提交
1387
  for (int32_t k = 0; k < size; ++k) {
S
slguan 已提交
1388 1389
    SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[k];
    pCtx->aOutputBuf += pCtx->outputBytes * numOfRes;
S
slguan 已提交
1390 1391

    // set the correct output timestamp column position
H
Haojun Liao 已提交
1392
    if (pCtx->functionId == TSDB_FUNC_TOP || pCtx->functionId == TSDB_FUNC_BOTTOM) {
S
slguan 已提交
1393
      pCtx->ptsOutputBuf = ((char *)pCtx->ptsOutputBuf + TSDB_KEYSIZE * numOfRes);
H
hzcheng 已提交
1394
    }
S
slguan 已提交
1395 1396
  }

S
slguan 已提交
1397
  doExecuteSecondaryMerge(pCmd, pLocalReducer, true);
S
slguan 已提交
1398 1399
}

1400
int32_t tscDoLocalMerge(SSqlObj *pSql) {
S
slguan 已提交
1401 1402
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
H
hjxilinx 已提交
1403

H
hjxilinx 已提交
1404
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
1405

S
slguan 已提交
1406
  if (pSql->signature != pSql || pRes == NULL || pRes->pLocalReducer == NULL) {  // all data has been processed
1407
    tscDebug("%p %s call the drop local reducer", pSql, __FUNCTION__);
S
slguan 已提交
1408
    tscDestroyLocalReducer(pSql);
H
hzcheng 已提交
1409 1410
    return 0;
  }
H
hjxilinx 已提交
1411

S
slguan 已提交
1412
  SLocalReducer *pLocalReducer = pRes->pLocalReducer;
H
hjxilinx 已提交
1413 1414
  SQueryInfo *   pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);

S
slguan 已提交
1415 1416
  // set the data merge in progress
  int32_t prevStatus =
weixin_48148422's avatar
weixin_48148422 已提交
1417
      atomic_val_compare_exchange_32(&pLocalReducer->status, TSC_LOCALREDUCE_READY, TSC_LOCALREDUCE_IN_PROGRESS);
H
hjxilinx 已提交
1418
  if (prevStatus != TSC_LOCALREDUCE_READY) {
H
hjxilinx 已提交
1419
    assert(prevStatus == TSC_LOCALREDUCE_TOBE_FREED);  // it is in tscDestroyLocalReducer function already
S
slguan 已提交
1420 1421 1422 1423 1424 1425 1426 1427 1428 1429
    return TSDB_CODE_SUCCESS;
  }

  tFilePage *tmpBuffer = pLocalReducer->pTempBuffer;

  if (doHandleLastRemainData(pSql)) {
    pLocalReducer->status = TSC_LOCALREDUCE_READY;  // set the flag, taos_free_result can release this result.
    return TSDB_CODE_SUCCESS;
  }

1430
  if (doBuildFilledResultForGroup(pSql)) {
S
slguan 已提交
1431 1432 1433 1434
    pLocalReducer->status = TSC_LOCALREDUCE_READY;  // set the flag, taos_free_result can release this result.
    return TSDB_CODE_SUCCESS;
  }

H
hzcheng 已提交
1435 1436 1437
  SLoserTreeInfo *pTree = pLocalReducer->pLoserTree;

  // clear buffer
S
slguan 已提交
1438
  handleUnprocessedRow(pCmd, pLocalReducer, tmpBuffer);
H
hjxilinx 已提交
1439
  SColumnModel *pModel = pLocalReducer->pDesc->pColumnModel;
H
hzcheng 已提交
1440 1441

  while (1) {
S
slguan 已提交
1442
    if (isAllSourcesCompleted(pLocalReducer)) {
H
hzcheng 已提交
1443 1444 1445 1446 1447 1448
      break;
    }

#ifdef _DEBUG_VIEW
    printf("chosen data in pTree[0] = %d\n", pTree->pNode[0].index);
#endif
1449
    assert((pTree->pNode[0].index < pLocalReducer->numOfBuffer) && (pTree->pNode[0].index >= 0) && tmpBuffer->num == 0);
H
hzcheng 已提交
1450 1451

    // chosen from loser tree
S
slguan 已提交
1452
    SLocalDataSource *pOneDataSrc = pLocalReducer->pLocalDataSrc[pTree->pNode[0].index];
H
hzcheng 已提交
1453

S
slguan 已提交
1454
    tColModelAppend(pModel, tmpBuffer, pOneDataSrc->filePage.data, pOneDataSrc->rowIdx, 1,
H
hjxilinx 已提交
1455
                    pOneDataSrc->pMemBuffer->pColumnModel->capacity);
H
hzcheng 已提交
1456 1457 1458 1459

#if defined(_DEBUG_VIEW)
    printf("chosen row:\t");
    SSrcColumnInfo colInfo[256] = {0};
1460
    tscGetSrcColumnInfo(colInfo, pQueryInfo);
H
hzcheng 已提交
1461

1462
    tColModelDisplayEx(pModel, tmpBuffer->data, tmpBuffer->num, pModel->capacity, colInfo);
H
hzcheng 已提交
1463
#endif
S
slguan 已提交
1464

H
hzcheng 已提交
1465 1466 1467 1468
    if (pLocalReducer->discard) {
      assert(pLocalReducer->hasUnprocessedRow == false);

      /* current record belongs to the same group of previous record, need to discard it */
S
slguan 已提交
1469
      if (isSameGroup(pCmd, pLocalReducer, pLocalReducer->discardData->data, tmpBuffer)) {
1470
        tmpBuffer->num = 0;
H
hzcheng 已提交
1471 1472
        pOneDataSrc->rowIdx += 1;

S
slguan 已提交
1473 1474 1475 1476
        adjustLoserTreeFromNewData(pLocalReducer, pOneDataSrc, pTree);

        // all inputs are exhausted, abort current process
        if (isAllSourcesCompleted(pLocalReducer)) {
H
hzcheng 已提交
1477 1478 1479
          break;
        }

S
slguan 已提交
1480
        // data belongs to the same group needs to be discarded
H
hzcheng 已提交
1481 1482 1483
        continue;
      } else {
        pLocalReducer->discard = false;
1484
        pLocalReducer->discardData->num = 0;
H
hzcheng 已提交
1485

S
slguan 已提交
1486 1487 1488 1489 1490 1491
        if (saveGroupResultInfo(pSql)) {
          pLocalReducer->status = TSC_LOCALREDUCE_READY;
          return TSDB_CODE_SUCCESS;
        }

        resetEnvForNewResultset(pRes, pCmd, pLocalReducer);
H
hzcheng 已提交
1492 1493 1494 1495
      }
    }

    if (pLocalReducer->hasPrevRow) {
1496
      if (needToMerge(pQueryInfo, pLocalReducer, tmpBuffer)) {
S
slguan 已提交
1497
        // belong to the group of the previous row, continue process it
S
slguan 已提交
1498
        doExecuteSecondaryMerge(pCmd, pLocalReducer, false);
H
hzcheng 已提交
1499 1500

        // copy to buffer
S
slguan 已提交
1501 1502 1503 1504 1505 1506
        savePreviousRow(pLocalReducer, tmpBuffer);
      } else {
        /*
         * current row does not belong to the group of previous row.
         * so the processing of previous group is completed.
         */
1507
        int32_t numOfRes = finalizeRes(pQueryInfo, pLocalReducer);
H
hzcheng 已提交
1508

S
slguan 已提交
1509
        bool       sameGroup = isSameGroup(pCmd, pLocalReducer, pLocalReducer->prevRowOfInput, tmpBuffer);
H
hzcheng 已提交
1510 1511 1512
        tFilePage *pResBuf = pLocalReducer->pResultBuf;

        /*
1513
         * if the previous group does NOT generate any result (pResBuf->num == 0),
H
hzcheng 已提交
1514 1515
         * continue to process results instead of return results.
         */
1516
        if ((!sameGroup && pResBuf->num > 0) || (pResBuf->num == pLocalReducer->resColModel->capacity)) {
H
hzcheng 已提交
1517
          // does not belong to the same group
S
slguan 已提交
1518
          bool notSkipped = doGenerateFinalResults(pSql, pLocalReducer, !sameGroup);
H
hzcheng 已提交
1519

S
slguan 已提交
1520
          // this row needs to discard, since it belongs to the group of previous
H
hzcheng 已提交
1521 1522
          if (pLocalReducer->discard && sameGroup) {
            pLocalReducer->hasUnprocessedRow = false;
1523
            tmpBuffer->num = 0;
H
hzcheng 已提交
1524
          } else {
S
slguan 已提交
1525
            // current row does not belongs to the previous group, so it is not be handled yet.
H
hzcheng 已提交
1526 1527 1528
            pLocalReducer->hasUnprocessedRow = true;
          }

1529
          resetOutputBuf(pQueryInfo, pLocalReducer);
H
hzcheng 已提交
1530 1531
          pOneDataSrc->rowIdx += 1;

S
slguan 已提交
1532 1533
          // here we do not check the return value
          adjustLoserTreeFromNewData(pLocalReducer, pOneDataSrc, pTree);
H
hzcheng 已提交
1534 1535 1536
          assert(pLocalReducer->status == TSC_LOCALREDUCE_IN_PROGRESS);

          if (pRes->numOfRows == 0) {
S
slguan 已提交
1537
            handleUnprocessedRow(pCmd, pLocalReducer, tmpBuffer);
H
hzcheng 已提交
1538 1539

            if (!sameGroup) {
S
slguan 已提交
1540 1541 1542 1543 1544 1545 1546 1547 1548 1549
              /*
               * previous group is done, prepare for the next group
               * If previous group is not skipped, keep it in pRes->numOfGroups
               */
              if (notSkipped && saveGroupResultInfo(pSql)) {
                pLocalReducer->status = TSC_LOCALREDUCE_READY;
                return TSDB_CODE_SUCCESS;
              }

              resetEnvForNewResultset(pRes, pCmd, pLocalReducer);
H
hzcheng 已提交
1550 1551 1552 1553 1554 1555 1556
            }
          } else {
            /*
             * if next record belongs to a new group, we do not handle this record here.
             * We start the process in a new round.
             */
            if (sameGroup) {
S
slguan 已提交
1557
              handleUnprocessedRow(pCmd, pLocalReducer, tmpBuffer);
H
hzcheng 已提交
1558 1559 1560
            }
          }

S
slguan 已提交
1561 1562 1563 1564 1565 1566
          // current group has no result,
          if (pRes->numOfRows == 0) {
            continue;
          } else {
            pLocalReducer->status = TSC_LOCALREDUCE_READY;  // set the flag, taos_free_result can release this result.
            return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1567
          }
S
slguan 已提交
1568
        } else {  // result buffer is not full
H
hjxilinx 已提交
1569
          doProcessResultInNextWindow(pSql, numOfRes);
S
slguan 已提交
1570
          savePreviousRow(pLocalReducer, tmpBuffer);
H
hzcheng 已提交
1571 1572
        }
      }
S
slguan 已提交
1573
    } else {
S
slguan 已提交
1574
      doExecuteSecondaryMerge(pCmd, pLocalReducer, true);
S
slguan 已提交
1575
      savePreviousRow(pLocalReducer, tmpBuffer);  // copy the processed row to buffer
H
hzcheng 已提交
1576 1577 1578
    }

    pOneDataSrc->rowIdx += 1;
S
slguan 已提交
1579
    adjustLoserTreeFromNewData(pLocalReducer, pOneDataSrc, pTree);
H
hzcheng 已提交
1580 1581 1582
  }

  if (pLocalReducer->hasPrevRow) {
1583
    finalizeRes(pQueryInfo, pLocalReducer);
H
hzcheng 已提交
1584 1585
  }

1586
  if (pLocalReducer->pResultBuf->num) {
H
hzcheng 已提交
1587 1588 1589 1590
    doGenerateFinalResults(pSql, pLocalReducer, true);
  }

  assert(pLocalReducer->status == TSC_LOCALREDUCE_IN_PROGRESS && pRes->row == 0);
S
slguan 已提交
1591
  pLocalReducer->status = TSC_LOCALREDUCE_READY;  // set the flag, taos_free_result can release this result.
H
hzcheng 已提交
1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609

  return TSDB_CODE_SUCCESS;
}

void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen) {
  SSqlRes *pRes = &pObj->res;
  if (pRes->pLocalReducer != NULL) {
    tscDestroyLocalReducer(pObj);
  }

  pRes->qhandle = 1;  // hack to pass the safety check in fetch_row function
  pRes->numOfRows = 0;
  pRes->row = 0;

  pRes->rspType = 0;  // used as a flag to denote if taos_retrieved() has been called yet
  pRes->pLocalReducer = (SLocalReducer *)calloc(1, sizeof(SLocalReducer));

  /*
S
slguan 已提交
1610 1611
   * we need one additional byte space
   * the sprintf function needs one additional space to put '\0' at the end of string
H
hzcheng 已提交
1612 1613 1614 1615
   */
  size_t allocSize = numOfRes * rowLen + sizeof(tFilePage) + 1;
  pRes->pLocalReducer->pResultBuf = (tFilePage *)calloc(1, allocSize);

1616
  pRes->pLocalReducer->pResultBuf->num = numOfRes;
H
hzcheng 已提交
1617 1618
  pRes->data = pRes->pLocalReducer->pResultBuf->data;
}