tscLocalMerge.c 56.8 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16 17
#include "tscLocalMerge.h"
#include "tscSubquery.h"
18
#include "os.h"
H
Haojun Liao 已提交
19
#include "qAst.h"
S
slguan 已提交
20
#include "tlosertree.h"
H
Haojun Liao 已提交
21
#include "tscLog.h"
H
hzcheng 已提交
22
#include "tscUtil.h"
H
hjxilinx 已提交
23
#include "tschemautil.h"
S
slguan 已提交
24
#include "tsclient.h"
H
hzcheng 已提交
25 26

typedef struct SCompareParam {
S
slguan 已提交
27 28
  SLocalDataSource **pLocalData;
  tOrderDescriptor * pDesc;
29
  int32_t            num;
S
slguan 已提交
30
  int32_t            groupOrderType;
H
hzcheng 已提交
31 32 33 34 35 36
} SCompareParam;

int32_t treeComparator(const void *pLeft, const void *pRight, void *param) {
  int32_t pLeftIdx = *(int32_t *)pLeft;
  int32_t pRightIdx = *(int32_t *)pRight;

S
slguan 已提交
37 38 39
  SCompareParam *    pParam = (SCompareParam *)param;
  tOrderDescriptor * pDesc = pParam->pDesc;
  SLocalDataSource **pLocalData = pParam->pLocalData;
H
hzcheng 已提交
40 41 42 43 44 45 46 47 48 49

  /* this input is exhausted, set the special value to denote this */
  if (pLocalData[pLeftIdx]->rowIdx == -1) {
    return 1;
  }

  if (pLocalData[pRightIdx]->rowIdx == -1) {
    return -1;
  }

50
  if (pParam->groupOrderType == TSDB_ORDER_DESC) {  // desc
51 52
    return compare_d(pDesc, pParam->num, pLocalData[pLeftIdx]->rowIdx, pLocalData[pLeftIdx]->filePage.data,
                     pParam->num, pLocalData[pRightIdx]->rowIdx, pLocalData[pRightIdx]->filePage.data);
H
hzcheng 已提交
53
  } else {
54 55
    return compare_a(pDesc, pParam->num, pLocalData[pLeftIdx]->rowIdx, pLocalData[pLeftIdx]->filePage.data,
                     pParam->num, pLocalData[pRightIdx]->rowIdx, pLocalData[pRightIdx]->filePage.data);
H
hzcheng 已提交
56 57 58
  }
}

H
Haojun Liao 已提交
59
static void tscInitSqlContext(SSqlCmd *pCmd, SLocalMerger *pReducer, tOrderDescriptor *pDesc) {
H
hzcheng 已提交
60 61
  /*
   * the fields and offset attributes in pCmd and pModel may be different due to
S
slguan 已提交
62
   * merge requirement. So, the final result in pRes structure is formatted in accordance with the pCmd object.
H
hzcheng 已提交
63
   */
H
hjxilinx 已提交
64
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
65 66 67
  size_t size = tscSqlExprNumOfExprs(pQueryInfo);
  
  for (int32_t i = 0; i < size; ++i) {
H
hzcheng 已提交
68
    SQLFunctionCtx *pCtx = &pReducer->pCtx[i];
H
hjxilinx 已提交
69 70
    SSqlExpr *      pExpr = tscSqlExprGet(pQueryInfo, i);

H
Haojun Liao 已提交
71
    pCtx->aOutputBuf = pReducer->pResultBuf->data + pExpr->offset * pReducer->resColModel->capacity;
72
    pCtx->order = pQueryInfo->order.order;
H
hjxilinx 已提交
73
    pCtx->functionId = pExpr->functionId;
S
slguan 已提交
74 75

    // input buffer hold only one point data
H
hjxilinx 已提交
76 77 78
    int16_t  offset = getColumnModelOffset(pDesc->pColumnModel, i);
    SSchema *pSchema = getColumnModelSchema(pDesc->pColumnModel, i);

H
hjxilinx 已提交
79
    pCtx->aInputElemBuf = pReducer->pTempBuffer->data + offset;
H
hzcheng 已提交
80 81

    // input data format comes from pModel
H
hjxilinx 已提交
82 83
    pCtx->inputType = pSchema->type;
    pCtx->inputBytes = pSchema->bytes;
H
hzcheng 已提交
84 85

    // output data format yet comes from pCmd.
H
hjxilinx 已提交
86 87
    pCtx->outputBytes = pExpr->resBytes;
    pCtx->outputType = pExpr->resType;
H
hzcheng 已提交
88 89 90

    pCtx->startOffset = 0;
    pCtx->size = 1;
S
slguan 已提交
91
    pCtx->hasNull = true;
H
Haojun Liao 已提交
92
    pCtx->currentStage = MERGE_STAGE;
H
hzcheng 已提交
93

S
slguan 已提交
94
    // for top/bottom function, the output of timestamp is the first column
H
hjxilinx 已提交
95
    int32_t functionId = pExpr->functionId;
96
    if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
S
slguan 已提交
97
      pCtx->ptsOutputBuf = pReducer->pCtx[0].aOutputBuf;
98
      pCtx->param[2].i64 = pQueryInfo->order.order;
H
hjLiao 已提交
99
      pCtx->param[2].nType  = TSDB_DATA_TYPE_BIGINT;
100
      pCtx->param[1].i64 = pQueryInfo->order.orderColId;
101
    } else if (functionId == TSDB_FUNC_APERCT) {
102
      pCtx->param[0].i64 = pExpr->param[0].i64;
103
      pCtx->param[0].nType  = pExpr->param[0].nType;
H
hzcheng 已提交
104
    }
S
slguan 已提交
105

106 107
    pCtx->interBufBytes = pExpr->interBytes;
    pCtx->resultInfo = calloc(1, pCtx->interBufBytes + sizeof(SResultRowCellInfo));
H
Haojun Liao 已提交
108
    pCtx->stableQuery = true;
S
slguan 已提交
109 110
  }

H
hjxilinx 已提交
111 112
  int16_t          n = 0;
  int16_t          tagLen = 0;
H
hjxilinx 已提交
113
  SQLFunctionCtx **pTagCtx = calloc(pQueryInfo->fieldsInfo.numOfOutput, POINTER_BYTES);
S
slguan 已提交
114

H
hjxilinx 已提交
115
  SQLFunctionCtx *pCtx = NULL;
H
hjxilinx 已提交
116
  for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
117
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
S
slguan 已提交
118 119 120 121 122 123 124 125
    if (pExpr->functionId == TSDB_FUNC_TAG_DUMMY || pExpr->functionId == TSDB_FUNC_TS_DUMMY) {
      tagLen += pExpr->resBytes;
      pTagCtx[n++] = &pReducer->pCtx[i];
    } else if ((aAggs[pExpr->functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
      pCtx = &pReducer->pCtx[i];
    }
  }

B
Bomin Zhang 已提交
126
  if (n == 0 || pCtx == NULL) {
S
slguan 已提交
127 128 129 130 131
    free(pTagCtx);
  } else {
    pCtx->tagInfo.pTagCtxList = pTagCtx;
    pCtx->tagInfo.numOfTagCols = n;
    pCtx->tagInfo.tagsLen = tagLen;
H
hzcheng 已提交
132 133 134
  }
}

135
static SFillColInfo* createFillColInfo(SQueryInfo* pQueryInfo) {
H
Haojun Liao 已提交
136
  int32_t numOfCols = (int32_t)tscNumOfFields(pQueryInfo);
137 138 139 140
  int32_t offset = 0;
  
  SFillColInfo* pFillCol = calloc(numOfCols, sizeof(SFillColInfo));
  for(int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
    SInternalField* pIField = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i);

    if (pIField->pArithExprInfo == NULL) {
      SSqlExpr* pExpr = pIField->pSqlExpr;

      pFillCol[i].col.bytes  = pExpr->resBytes;
      pFillCol[i].col.type   = (int8_t)pExpr->resType;
      pFillCol[i].col.colId  = pExpr->colInfo.colId;
      pFillCol[i].flag       = pExpr->colInfo.flag;
      pFillCol[i].col.offset = offset;
      pFillCol[i].functionId = pExpr->functionId;
      pFillCol[i].fillVal.i  = pQueryInfo->fillVal[i];
    } else {
      pFillCol[i].col.bytes  = pIField->field.bytes;
      pFillCol[i].col.type   = (int8_t)pIField->field.type;
      pFillCol[i].col.colId  = -100;
      pFillCol[i].flag       = TSDB_COL_NORMAL;
      pFillCol[i].col.offset = offset;
      pFillCol[i].functionId = -1;
      pFillCol[i].fillVal.i  = pQueryInfo->fillVal[i];
    }

    offset += pFillCol[i].col.bytes;
164 165 166 167 168
  }
  
  return pFillCol;
}

H
Haojun Liao 已提交
169
void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
H
Haojun Liao 已提交
170
                           SColumnModel *finalmodel, SColumnModel *pFFModel, SSqlObj* pSql) {
H
hjLiao 已提交
171 172 173
  SSqlCmd* pCmd = &pSql->cmd;
  SSqlRes* pRes = &pSql->res;
  
174
  if (pMemBuffer == NULL) {
H
Haojun Liao 已提交
175
    tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
H
hjLiao 已提交
176
    tscError("%p pMemBuffer is NULL", pMemBuffer);
177
    pRes->code = TSDB_CODE_TSC_APP_ERROR;
178 179 180 181
    return;
  }
 
  if (pDesc->pColumnModel == NULL) {
H
Haojun Liao 已提交
182
    tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
H
hjLiao 已提交
183
    tscError("%p no local buffer or intermediate result format model", pSql);
184
    pRes->code = TSDB_CODE_TSC_APP_ERROR;
H
hzcheng 已提交
185 186 187 188 189 190 191
    return;
  }

  int32_t numOfFlush = 0;
  for (int32_t i = 0; i < numOfBuffer; ++i) {
    int32_t len = pMemBuffer[i]->fileMeta.flushoutData.nLength;
    if (len == 0) {
192
      tscDebug("%p no data retrieved from orderOfVnode:%d", pSql, i + 1);
H
hzcheng 已提交
193 194 195 196 197 198 199
      continue;
    }

    numOfFlush += len;
  }

  if (numOfFlush == 0 || numOfBuffer == 0) {
H
Haojun Liao 已提交
200
    tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
201
    pCmd->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; // no result, set the result empty
202
    tscDebug("%p retrieved no data", pSql);
H
hzcheng 已提交
203 204 205
    return;
  }

H
hjxilinx 已提交
206
  if (pDesc->pColumnModel->capacity >= pMemBuffer[0]->pageSize) {
H
hjLiao 已提交
207
    tscError("%p Invalid value of buffer capacity %d and page size %d ", pSql, pDesc->pColumnModel->capacity,
H
hjxilinx 已提交
208
             pMemBuffer[0]->pageSize);
S
slguan 已提交
209

H
Haojun Liao 已提交
210
    tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
211
    pRes->code = TSDB_CODE_TSC_APP_ERROR;
H
hzcheng 已提交
212 213 214
    return;
  }

H
Haojun Liao 已提交
215
  size_t size = sizeof(SLocalMerger) + POINTER_BYTES * numOfFlush;
H
hjLiao 已提交
216
  
H
Haojun Liao 已提交
217
  SLocalMerger *pReducer = (SLocalMerger *) calloc(1, size);
H
hzcheng 已提交
218
  if (pReducer == NULL) {
H
hjLiao 已提交
219
    tscError("%p failed to create local merge structure, out of memory", pSql);
S
slguan 已提交
220

H
Haojun Liao 已提交
221
    tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer);
222
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
223 224 225 226
    return;
  }

  pReducer->pExtMemBuffer = pMemBuffer;
S
slguan 已提交
227
  pReducer->pLocalDataSrc = (SLocalDataSource **)&pReducer[1];
H
hzcheng 已提交
228 229 230 231
  assert(pReducer->pLocalDataSrc != NULL);

  pReducer->numOfBuffer = numOfFlush;
  pReducer->numOfVnode = numOfBuffer;
232

H
hzcheng 已提交
233
  pReducer->pDesc = pDesc;
234
  tscDebug("%p the number of merged leaves is: %d", pSql, pReducer->numOfBuffer);
H
hzcheng 已提交
235 236 237 238 239 240

  int32_t idx = 0;
  for (int32_t i = 0; i < numOfBuffer; ++i) {
    int32_t numOfFlushoutInFile = pMemBuffer[i]->fileMeta.flushoutData.nLength;

    for (int32_t j = 0; j < numOfFlushoutInFile; ++j) {
H
hjLiao 已提交
241 242 243
      SLocalDataSource *ds = (SLocalDataSource *)malloc(sizeof(SLocalDataSource) + pMemBuffer[0]->pageSize);
      if (ds == NULL) {
        tscError("%p failed to create merge structure", pSql);
244
        pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
S
TD-1848  
Shengliang Guan 已提交
245
        tfree(pReducer);
H
hzcheng 已提交
246 247
        return;
      }
H
hjLiao 已提交
248 249
      
      pReducer->pLocalDataSrc[idx] = ds;
H
hzcheng 已提交
250

H
hjLiao 已提交
251 252
      ds->pMemBuffer = pMemBuffer[i];
      ds->flushoutIdx = j;
253
      ds->filePage.num = 0;
H
hjLiao 已提交
254 255
      ds->pageId = 0;
      ds->rowIdx = 0;
H
hzcheng 已提交
256

257
      tscDebug("%p load data from disk into memory, orderOfVnode:%d, total:%d", pSql, i + 1, idx + 1);
H
hjLiao 已提交
258
      tExtMemBufferLoadData(pMemBuffer[i], &(ds->filePage), j, 0);
H
hzcheng 已提交
259
#ifdef _DEBUG_VIEW
260
      printf("load data page into mem for build loser tree: %" PRIu64 " rows\n", ds->filePage.num);
H
hzcheng 已提交
261
      SSrcColumnInfo colInfo[256] = {0};
H
hjxilinx 已提交
262
      SQueryInfo *   pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
263 264

      tscGetSrcColumnInfo(colInfo, pQueryInfo);
H
hzcheng 已提交
265

266
      tColModelDisplayEx(pDesc->pColumnModel, ds->filePage.data, ds->filePage.num,
H
hjxilinx 已提交
267
                         pMemBuffer[0]->numOfElemsPerPage, colInfo);
H
hzcheng 已提交
268
#endif
H
hjLiao 已提交
269
      
270
      if (ds->filePage.num == 0) {  // no data in this flush, the index does not increase
271
        tscDebug("%p flush data is empty, ignore %d flush record", pSql, idx);
S
TD-1848  
Shengliang Guan 已提交
272
        tfree(ds);
H
hzcheng 已提交
273 274
        continue;
      }
H
hjLiao 已提交
275
      
H
hzcheng 已提交
276 277 278
      idx += 1;
    }
  }
H
hjLiao 已提交
279 280
  
  // no data actually, no need to merge result.
H
hzcheng 已提交
281
  if (idx == 0) {
S
TD-1848  
Shengliang Guan 已提交
282
    tfree(pReducer);
H
hzcheng 已提交
283 284 285 286 287 288
    return;
  }

  pReducer->numOfBuffer = idx;

  SCompareParam *param = malloc(sizeof(SCompareParam));
B
Bomin Zhang 已提交
289
  if (param == NULL) {
S
TD-1848  
Shengliang Guan 已提交
290
    tfree(pReducer);
B
Bomin Zhang 已提交
291 292
    return;
  }
H
Haojun Liao 已提交
293

H
hzcheng 已提交
294 295
  param->pLocalData = pReducer->pLocalDataSrc;
  param->pDesc = pReducer->pDesc;
296
  param->num = pReducer->pLocalDataSrc[0]->pMemBuffer->numOfElemsPerPage;
H
hjxilinx 已提交
297 298
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);

299
  param->groupOrderType = pQueryInfo->groupbyExpr.orderType;
H
Haojun Liao 已提交
300
  pReducer->orderPrjOnSTable = tscOrderedProjectionQueryOnSTable(pQueryInfo, 0);
H
hzcheng 已提交
301 302 303

  pRes->code = tLoserTreeCreate(&pReducer->pLoserTree, pReducer->numOfBuffer, param, treeComparator);
  if (pReducer->pLoserTree == NULL || pRes->code != 0) {
S
TD-1848  
Shengliang Guan 已提交
304 305
    tfree(param);
    tfree(pReducer);
H
hzcheng 已提交
306 307 308 309 310
    return;
  }

  // the input data format follows the old format, but output in a new format.
  // so, all the input must be parsed as old format
H
hjLiao 已提交
311
  pReducer->pCtx = (SQLFunctionCtx *)calloc(tscSqlExprNumOfExprs(pQueryInfo), sizeof(SQLFunctionCtx));
H
hzcheng 已提交
312 313
  pReducer->rowSize = pMemBuffer[0]->nElemSize;

H
hjxilinx 已提交
314 315
  tscRestoreSQLFuncForSTableQuery(pQueryInfo);
  tscFieldInfoUpdateOffset(pQueryInfo);
H
hzcheng 已提交
316

H
hjxilinx 已提交
317
  if (pReducer->rowSize > pMemBuffer[0]->pageSize) {
H
hzcheng 已提交
318 319 320 321 322 323 324 325
    assert(false);  // todo fixed row size is larger than the minimum page size;
  }

  pReducer->hasPrevRow = false;
  pReducer->hasUnprocessedRow = false;

  pReducer->prevRowOfInput = (char *)calloc(1, pReducer->rowSize);

S
slguan 已提交
326
  // used to keep the latest input row
H
hzcheng 已提交
327 328 329 330
  pReducer->pTempBuffer = (tFilePage *)calloc(1, pReducer->rowSize + sizeof(tFilePage));
  pReducer->discardData = (tFilePage *)calloc(1, pReducer->rowSize + sizeof(tFilePage));
  pReducer->discard = false;

H
hjxilinx 已提交
331
  pReducer->nResultBufSize = pMemBuffer[0]->pageSize * 16;
H
hzcheng 已提交
332
  pReducer->pResultBuf = (tFilePage *)calloc(1, pReducer->nResultBufSize + sizeof(tFilePage));
H
hjxilinx 已提交
333

H
hzcheng 已提交
334
  pReducer->resColModel = finalmodel;
B
Bomin Zhang 已提交
335
  pReducer->resColModel->capacity = pReducer->nResultBufSize;
H
Haojun Liao 已提交
336 337
  pReducer->finalModel = pFFModel;

H
Haojun Liao 已提交
338 339
  if (finalmodel->rowSize > 0) {
    pReducer->resColModel->capacity /= finalmodel->rowSize;
B
Bomin Zhang 已提交
340
  }
H
hzcheng 已提交
341

H
Haojun Liao 已提交
342
  assert(finalmodel->rowSize > 0 && finalmodel->rowSize <= pReducer->rowSize);
H
hjxilinx 已提交
343
  pReducer->pFinalRes = calloc(1, pReducer->rowSize * pReducer->resColModel->capacity);
H
hzcheng 已提交
344

H
hjxilinx 已提交
345
  if (pReducer->pTempBuffer == NULL || pReducer->discardData == NULL || pReducer->pResultBuf == NULL ||
H
Haojun Liao 已提交
346
      pReducer->pFinalRes == NULL || pReducer->prevRowOfInput == NULL) {
S
TD-1848  
Shengliang Guan 已提交
347 348 349 350 351 352 353 354
    tfree(pReducer->pTempBuffer);
    tfree(pReducer->discardData);
    tfree(pReducer->pResultBuf);
    tfree(pReducer->pFinalRes);
    tfree(pReducer->prevRowOfInput);
    tfree(pReducer->pLoserTree);
    tfree(param);
    tfree(pReducer);
355
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
356 357
    return;
  }
H
hjLiao 已提交
358
  
359
  pReducer->pTempBuffer->num = 0;
H
hzcheng 已提交
360

361
  tscCreateResPointerInfo(pRes, pQueryInfo);
H
hjLiao 已提交
362
  tscInitSqlContext(pCmd, pReducer, pDesc);
H
hzcheng 已提交
363

H
hjxilinx 已提交
364 365
  // we change the capacity of schema to denote that there is only one row in temp buffer
  pReducer->pDesc->pColumnModel->capacity = 1;
H
hjxilinx 已提交
366 367

  // restore the limitation value at the last stage
368 369 370 371
  if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
    pQueryInfo->limit.limit = pQueryInfo->clauseLimit;
    pQueryInfo->limit.offset = pQueryInfo->prjOffset;
  }
H
hjxilinx 已提交
372

S
TD-1057  
Shengliang Guan 已提交
373
  pReducer->offset = (int32_t)pQueryInfo->limit.offset;
H
hjxilinx 已提交
374

H
Haojun Liao 已提交
375
  pRes->pLocalMerger = pReducer;
H
hzcheng 已提交
376 377
  pRes->numOfGroups = 0;

378
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
379
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
380
  
381
  TSKEY stime = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.skey : pQueryInfo->window.ekey;
382
  int64_t revisedSTime = taosTimeTruncate(stime, &pQueryInfo->interval, tinfo.precision);
383 384 385 386
  
  if (pQueryInfo->fillType != TSDB_FILL_NONE) {
    SFillColInfo* pFillCol = createFillColInfo(pQueryInfo);
    pReducer->pFillInfo = taosInitFillInfo(pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols,
H
Haojun Liao 已提交
387
                                           4096, (int32_t)pQueryInfo->fieldsInfo.numOfOutput, pQueryInfo->interval.sliding, pQueryInfo->interval.slidingUnit,
H
Haojun Liao 已提交
388
                                           tinfo.precision, pQueryInfo->fillType, pFillCol, pSql);
389
  }
H
hzcheng 已提交
390 391 392 393
}

static int32_t tscFlushTmpBufferImpl(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage,
                                     int32_t orderType) {
394
  if (pPage->num == 0) {
H
hzcheng 已提交
395 396 397
    return 0;
  }

398
  assert(pPage->num <= pDesc->pColumnModel->capacity);
H
hzcheng 已提交
399 400

  // sort before flush to disk, the data must be consecutively put on tFilePage.
H
Haojun Liao 已提交
401
  if (pDesc->orderInfo.numOfCols > 0) {
S
TD-1057  
Shengliang Guan 已提交
402
    tColDataQSort(pDesc, (int32_t)pPage->num, 0, (int32_t)pPage->num - 1, pPage->data, orderType);
H
hzcheng 已提交
403 404 405
  }

#ifdef _DEBUG_VIEW
406 407
  printf("%" PRIu64 " rows data flushed to disk after been sorted:\n", pPage->num);
  tColModelDisplay(pDesc->pColumnModel, pPage->data, pPage->num, pPage->num);
H
hzcheng 已提交
408 409 410
#endif

  // write to cache after being sorted
S
TD-1057  
Shengliang Guan 已提交
411
  if (tExtMemBufferPut(pMemoryBuf, pPage->data, (int32_t)pPage->num) < 0) {
H
hzcheng 已提交
412 413 414 415
    tscError("failed to save data in temporary buffer");
    return -1;
  }

416
  pPage->num = 0;
H
hzcheng 已提交
417 418 419 420
  return 0;
}

int32_t tscFlushTmpBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, int32_t orderType) {
421 422 423
  int32_t ret = 0;
  if ((ret = tscFlushTmpBufferImpl(pMemoryBuf, pDesc, pPage, orderType)) != 0) {
    return ret;
H
hzcheng 已提交
424 425
  }

426 427
  if ((ret = tExtMemBufferFlush(pMemoryBuf)) != 0) {
    return ret;
H
hzcheng 已提交
428 429 430 431 432 433 434
  }

  return 0;
}

int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, void *data,
                     int32_t numOfRows, int32_t orderType) {
435
  SColumnModel *pModel = pDesc->pColumnModel;
H
hjxilinx 已提交
436

437
  if (pPage->num + numOfRows <= pModel->capacity) {
438
    tColModelAppend(pModel, pPage, data, 0, numOfRows, numOfRows);
H
hzcheng 已提交
439 440 441
    return 0;
  }

442
  // current buffer is overflow, flush data to extensive buffer
S
TD-1057  
Shengliang Guan 已提交
443
  int32_t numOfRemainEntries = pModel->capacity - (int32_t)pPage->num;
H
hzcheng 已提交
444 445
  tColModelAppend(pModel, pPage, data, 0, numOfRemainEntries, numOfRows);

446
  // current buffer is full, need to flushed to disk
447
  assert(pPage->num == pModel->capacity);
448 449 450
  int32_t code = tscFlushTmpBuffer(pMemoryBuf, pDesc, pPage, orderType);
  if (code != 0) {
    return code;
H
hzcheng 已提交
451 452 453 454 455 456
  }

  int32_t remain = numOfRows - numOfRemainEntries;

  while (remain > 0) {
    int32_t numOfWriteElems = 0;
H
hjxilinx 已提交
457 458
    if (remain > pModel->capacity) {
      numOfWriteElems = pModel->capacity;
H
hzcheng 已提交
459 460 461 462 463 464
    } else {
      numOfWriteElems = remain;
    }

    tColModelAppend(pModel, pPage, data, numOfRows - remain, numOfWriteElems, numOfRows);

465
    if (pPage->num == pModel->capacity) {
466 467
      if ((code = tscFlushTmpBuffer(pMemoryBuf, pDesc, pPage, orderType)) != TSDB_CODE_SUCCESS) {
        return code;
H
hzcheng 已提交
468 469
      }
    } else {
470
      pPage->num = numOfWriteElems;
H
hzcheng 已提交
471 472 473 474 475 476 477 478 479
    }

    remain -= numOfWriteElems;
    numOfRemainEntries += numOfWriteElems;
  }

  return 0;
}

H
Haojun Liao 已提交
480
void tscDestroyLocalMerger(SSqlObj *pSql) {
H
hzcheng 已提交
481 482 483 484 485
  if (pSql == NULL) {
    return;
  }

  SSqlRes *pRes = &(pSql->res);
H
Haojun Liao 已提交
486
  if (pRes->pLocalMerger == NULL) {
H
hzcheng 已提交
487 488 489
    return;
  }

H
hjxilinx 已提交
490 491 492
  SSqlCmd *   pCmd = &pSql->cmd;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);

H
hzcheng 已提交
493
  // there is no more result, so we release all allocated resource
H
Haojun Liao 已提交
494 495 496
  SLocalMerger *pLocalMerge = (SLocalMerger *)atomic_exchange_ptr(&pRes->pLocalMerger, NULL);
  if (pLocalMerge != NULL) {
    pLocalMerge->pFillInfo = taosDestroyFillInfo(pLocalMerge->pFillInfo);
H
hzcheng 已提交
497

H
Haojun Liao 已提交
498
    if (pLocalMerge->pCtx != NULL) {
H
Haojun Liao 已提交
499
      int32_t numOfExprs = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
H
Haojun Liao 已提交
500
      for (int32_t i = 0; i < numOfExprs; ++i) {
H
Haojun Liao 已提交
501
        SQLFunctionCtx *pCtx = &pLocalMerge->pCtx[i];
H
hjxilinx 已提交
502

H
hjxilinx 已提交
503
        tVariantDestroy(&pCtx->tag);
S
TD-1848  
Shengliang Guan 已提交
504
        tfree(pCtx->resultInfo);
505

506
        if (pCtx->tagInfo.pTagCtxList != NULL) {
S
TD-1848  
Shengliang Guan 已提交
507
          tfree(pCtx->tagInfo.pTagCtxList);
508
        }
S
slguan 已提交
509
      }
H
hjxilinx 已提交
510

H
Haojun Liao 已提交
511
      tfree(pLocalMerge->pCtx);
S
slguan 已提交
512 513
    }

H
Haojun Liao 已提交
514
    tfree(pLocalMerge->prevRowOfInput);
H
hzcheng 已提交
515

H
Haojun Liao 已提交
516 517
    tfree(pLocalMerge->pTempBuffer);
    tfree(pLocalMerge->pResultBuf);
H
hzcheng 已提交
518

H
Haojun Liao 已提交
519 520 521
    if (pLocalMerge->pLoserTree) {
      tfree(pLocalMerge->pLoserTree->param);
      tfree(pLocalMerge->pLoserTree);
H
hzcheng 已提交
522 523
    }

H
Haojun Liao 已提交
524 525
    tfree(pLocalMerge->pFinalRes);
    tfree(pLocalMerge->discardData);
H
hzcheng 已提交
526

H
Haojun Liao 已提交
527 528 529 530
    tscLocalReducerEnvDestroy(pLocalMerge->pExtMemBuffer, pLocalMerge->pDesc, pLocalMerge->resColModel, pLocalMerge->finalModel,
                              pLocalMerge->numOfVnode);
    for (int32_t i = 0; i < pLocalMerge->numOfBuffer; ++i) {
      tfree(pLocalMerge->pLocalDataSrc[i]);
H
hzcheng 已提交
531 532
    }

H
Haojun Liao 已提交
533 534 535
    pLocalMerge->numOfBuffer = 0;
    pLocalMerge->numOfCompleted = 0;
    free(pLocalMerge);
H
hzcheng 已提交
536
  } else {
537
    tscDebug("%p already freed or another free function is invoked", pSql);
H
hzcheng 已提交
538 539
  }

540
  tscDebug("%p free local reducer finished", pSql);
H
hzcheng 已提交
541 542
}

H
hjxilinx 已提交
543
static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCmd, SColumnModel *pModel) {
H
hjxilinx 已提交
544 545 546
  int32_t     numOfGroupByCols = 0;
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);

547 548
  if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) {
    numOfGroupByCols = pQueryInfo->groupbyExpr.numOfGroupCols;
H
hzcheng 已提交
549 550 551
  }

  // primary timestamp column is involved in final result
552
  if (pQueryInfo->interval.interval != 0 || tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
H
hzcheng 已提交
553 554 555
    numOfGroupByCols++;
  }

H
Haojun Liao 已提交
556 557
  int32_t *orderColIndexList = (int32_t *)calloc(numOfGroupByCols, sizeof(int32_t));
  if (orderColIndexList == NULL) {
558
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
559 560 561 562
  }

  if (numOfGroupByCols > 0) {

H
Haojun Liao 已提交
563
    if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) {
H
Haojun Liao 已提交
564
      int32_t numOfInternalOutput = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
H
Haojun Liao 已提交
565
      int32_t startCols = numOfInternalOutput - pQueryInfo->groupbyExpr.numOfGroupCols;
H
Haojun Liao 已提交
566 567 568 569 570 571 572 573 574 575

      // the last "pQueryInfo->groupbyExpr.numOfGroupCols" columns are order-by columns
      for (int32_t i = 0; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) {
        orderColIndexList[i] = startCols++;
      }

      if (pQueryInfo->interval.interval != 0) {
        // the first column is the timestamp, handles queries like "interval(10m) group by tags"
        orderColIndexList[numOfGroupByCols - 1] = PRIMARYKEY_TIMESTAMP_COL_INDEX; //TODO ???
      }
H
Haojun Liao 已提交
576 577 578 579 580 581 582 583 584 585 586 587 588 589
    } else {
      /*
       * 1. the orderby ts asc/desc projection query for the super table
       * 2. interval query without groupby clause
       */
      if (pQueryInfo->interval.interval != 0) {
        orderColIndexList[0] = PRIMARYKEY_TIMESTAMP_COL_INDEX;
      } else {
        size_t size = tscSqlExprNumOfExprs(pQueryInfo);
        for (int32_t i = 0; i < size; ++i) {
          SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
          if (pExpr->functionId == TSDB_FUNC_PRJ && pExpr->colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
            orderColIndexList[0] = i;
          }
H
Haojun Liao 已提交
590 591
        }
      }
H
hzcheng 已提交
592

H
Haojun Liao 已提交
593
      assert(pQueryInfo->order.orderColId == PRIMARYKEY_TIMESTAMP_COL_INDEX);
H
hzcheng 已提交
594 595 596
    }
  }

H
Haojun Liao 已提交
597
  *pOrderDesc = tOrderDesCreate(orderColIndexList, numOfGroupByCols, pModel, pQueryInfo->order.order);
S
TD-1848  
Shengliang Guan 已提交
598
  tfree(orderColIndexList);
H
hzcheng 已提交
599 600

  if (*pOrderDesc == NULL) {
601
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
602 603 604 605 606
  } else {
    return TSDB_CODE_SUCCESS;
  }
}

H
Haojun Liao 已提交
607
bool isSameGroup(SSqlCmd *pCmd, SLocalMerger *pReducer, char *pPrev, tFilePage *tmpBuffer) {
H
hjxilinx 已提交
608 609
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);

S
slguan 已提交
610
  // disable merge procedure for column projection query
H
Haojun Liao 已提交
611 612
  int16_t functionId = pReducer->pCtx[0].functionId;
  if (pReducer->orderPrjOnSTable) {
613 614
    return true;
  }
H
hjxilinx 已提交
615

S
slguan 已提交
616 617
  if (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_ARITHM) {
    return false;
H
hzcheng 已提交
618 619 620
  }

  tOrderDescriptor *pOrderDesc = pReducer->pDesc;
H
Haojun Liao 已提交
621
  SColumnOrderInfo* orderInfo = &pOrderDesc->orderInfo;
622

S
slguan 已提交
623
  // no group by columns, all data belongs to one group
H
Haojun Liao 已提交
624
  int32_t numOfCols = orderInfo->numOfCols;
S
slguan 已提交
625
  if (numOfCols <= 0) {
H
hzcheng 已提交
626 627 628
    return true;
  }

H
Haojun Liao 已提交
629
  if (orderInfo->colIndex[numOfCols - 1] == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
H
Haojun Liao 已提交
630 631 632 633
    /*
     * super table interval query
     * if the order columns is the primary timestamp, all result data belongs to one group
     */
634
    assert(pQueryInfo->interval.interval > 0);
H
Haojun Liao 已提交
635 636 637
    if (numOfCols == 1) {
      return true;
    }
S
slguan 已提交
638
  } else {  // simple group by query
639
    assert(pQueryInfo->interval.interval == 0);
S
slguan 已提交
640 641
  }

H
hzcheng 已提交
642
  // only one row exists
H
Haojun Liao 已提交
643
  int32_t index = orderInfo->colIndex[0];
H
Haojun Liao 已提交
644
  int32_t offset = (pOrderDesc->pColumnModel)->pFields[index].offset;
645

H
Haojun Liao 已提交
646 647
  int32_t ret = memcmp(pPrev + offset, tmpBuffer->data + offset, pOrderDesc->pColumnModel->rowSize - offset);
  return ret == 0;
H
hzcheng 已提交
648 649 650
}

int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pOrderDesc,
H
Haojun Liao 已提交
651
                                 SColumnModel **pFinalModel, SColumnModel** pFFModel, uint32_t nBufferSizes) {
H
hzcheng 已提交
652 653 654
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

H
hjxilinx 已提交
655
  SSchema *     pSchema = NULL;
H
hjxilinx 已提交
656
  SColumnModel *pModel = NULL;
H
hzcheng 已提交
657 658
  *pFinalModel = NULL;

H
hjxilinx 已提交
659
  SQueryInfo *    pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
660
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
S
slguan 已提交
661

H
Haojun Liao 已提交
662
  (*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pSql->subState.numOfSub);
H
hzcheng 已提交
663 664
  if (*pMemBuffer == NULL) {
    tscError("%p failed to allocate memory", pSql);
665
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
666 667
    return pRes->code;
  }
H
hjxilinx 已提交
668 669 670 671
  
  size_t size = tscSqlExprNumOfExprs(pQueryInfo);
  
  pSchema = (SSchema *)calloc(1, sizeof(SSchema) * size);
H
hzcheng 已提交
672 673
  if (pSchema == NULL) {
    tscError("%p failed to allocate memory", pSql);
674
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
hzcheng 已提交
675 676 677 678
    return pRes->code;
  }

  int32_t rlen = 0;
H
hjxilinx 已提交
679
  for (int32_t i = 0; i < size; ++i) {
680
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
H
hzcheng 已提交
681 682

    pSchema[i].bytes = pExpr->resBytes;
S
TD-1057  
Shengliang Guan 已提交
683
    pSchema[i].type = (int8_t)pExpr->resType;
H
Haojun Liao 已提交
684 685
    tstrncpy(pSchema[i].name, pExpr->aliasName, tListLen(pSchema[i].name));

H
hzcheng 已提交
686 687 688
    rlen += pExpr->resBytes;
  }

L
lihui 已提交
689
  int32_t capacity = 0;
H
hjxilinx 已提交
690 691 692
  if (rlen != 0) {
    capacity = nBufferSizes / rlen;
  }
H
hjxilinx 已提交
693
  
S
TD-1057  
Shengliang Guan 已提交
694
  pModel = createColumnModel(pSchema, (int32_t)size, capacity);
H
hzcheng 已提交
695

H
Haojun Liao 已提交
696 697 698 699 700 701
  int32_t pg = DEFAULT_PAGE_SIZE;
  int32_t overhead = sizeof(tFilePage);
  while((pg - overhead) < pModel->rowSize * 2) {
    pg *= 2;
  }

702 703
  size_t numOfSubs = pSql->subState.numOfSub;
  assert(numOfSubs <= pTableMetaInfo->vgroupList->numOfVgroups);
704
  for (int32_t i = 0; i < numOfSubs; ++i) {
H
Haojun Liao 已提交
705
    (*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pg, pModel);
706 707
    (*pMemBuffer)[i]->flushModel = MULTIPLE_APPEND_MODEL;
  }
H
hzcheng 已提交
708 709

  if (createOrderDescriptor(pOrderDesc, pCmd, pModel) != TSDB_CODE_SUCCESS) {
710
    pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
S
TD-1848  
Shengliang Guan 已提交
711
    tfree(pSchema);
H
hzcheng 已提交
712 713 714
    return pRes->code;
  }

H
hjxilinx 已提交
715
  // final result depends on the fields number
H
hjxilinx 已提交
716
  memset(pSchema, 0, sizeof(SSchema) * size);
717

H
hjxilinx 已提交
718
  for (int32_t i = 0; i < size; ++i) {
H
hjxilinx 已提交
719 720
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);

721
    SSchema p1 = {0};
H
Haojun Liao 已提交
722
    if (pExpr->colInfo.colIndex == TSDB_TBNAME_COLUMN_INDEX) {
723
      p1 = tGetTableNameColumnSchema();
724
    } else if (TSDB_COL_IS_UD_COL(pExpr->colInfo.flag)) {
H
Haojun Liao 已提交
725
      p1.bytes = pExpr->resBytes;
H
Haojun Liao 已提交
726
      p1.type  = (uint8_t) pExpr->resType;
H
Haojun Liao 已提交
727 728 729
      tstrncpy(p1.name, pExpr->aliasName, tListLen(p1.name));
    } else {
      p1 = *tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, pExpr->colInfo.colIndex);
730
    }
H
hjxilinx 已提交
731

732
    int32_t inter = 0;
H
hjxilinx 已提交
733 734
    int16_t type = -1;
    int16_t bytes = 0;
H
hjxilinx 已提交
735 736 737 738 739

    // the final result size and type in the same as query on single table.
    // so here, set the flag to be false;
    int32_t functionId = pExpr->functionId;
    if (functionId >= TSDB_FUNC_TS && functionId <= TSDB_FUNC_DIFF) {
H
hjxilinx 已提交
740 741
      type = pModel->pFields[i].field.type;
      bytes = pModel->pFields[i].field.bytes;
H
hjxilinx 已提交
742 743 744 745 746 747 748
    } else {
      if (functionId == TSDB_FUNC_FIRST_DST) {
        functionId = TSDB_FUNC_FIRST;
      } else if (functionId == TSDB_FUNC_LAST_DST) {
        functionId = TSDB_FUNC_LAST;
      }

749 750
      int32_t ret = getResultDataInfo(p1.type, p1.bytes, functionId, 0, &type, &bytes, &inter, 0, false);
      assert(ret == TSDB_CODE_SUCCESS);
H
hjxilinx 已提交
751
    }
H
hzcheng 已提交
752

S
TD-1057  
Shengliang Guan 已提交
753
    pSchema[i].type = (uint8_t)type;
H
hjxilinx 已提交
754 755
    pSchema[i].bytes = bytes;
    strcpy(pSchema[i].name, pModel->pFields[i].field.name);
H
hzcheng 已提交
756
  }
H
hjxilinx 已提交
757
  
S
TD-1057  
Shengliang Guan 已提交
758
  *pFinalModel = createColumnModel(pSchema, (int32_t)size, capacity);
H
hzcheng 已提交
759

H
Haojun Liao 已提交
760 761 762 763 764 765 766 767 768 769 770 771
  memset(pSchema, 0, sizeof(SSchema) * size);
  size = tscNumOfFields(pQueryInfo);

  for(int32_t i = 0; i < size; ++i) {
    SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i);
    pSchema[i].bytes = pField->field.bytes;
    pSchema[i].type = pField->field.type;
    tstrncpy(pSchema[i].name, pField->field.name, tListLen(pSchema[i].name));
  }

  *pFFModel = createColumnModel(pSchema, (int32_t) size, capacity);

H
Haojun Liao 已提交
772
   tfree(pSchema);
H
hzcheng 已提交
773 774 775 776 777 778 779 780 781
  return TSDB_CODE_SUCCESS;
}

/**
 * @param pMemBuffer
 * @param pDesc
 * @param pFinalModel
 * @param numOfVnodes
 */
H
Haojun Liao 已提交
782
void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel, SColumnModel *pFFModel,
H
hzcheng 已提交
783
                               int32_t numOfVnodes) {
H
hjxilinx 已提交
784
  destroyColumnModel(pFinalModel);
H
Haojun Liao 已提交
785 786
  destroyColumnModel(pFFModel);

H
hzcheng 已提交
787
  tOrderDescDestroy(pDesc);
H
Haojun Liao 已提交
788

H
hzcheng 已提交
789
  for (int32_t i = 0; i < numOfVnodes; ++i) {
H
hjxilinx 已提交
790
    pMemBuffer[i] = destoryExtMemBuffer(pMemBuffer[i]);
H
hzcheng 已提交
791 792
  }

S
TD-1848  
Shengliang Guan 已提交
793
  tfree(pMemBuffer);
H
hzcheng 已提交
794 795 796 797
}

/**
 *
H
Haojun Liao 已提交
798
 * @param pLocalMerge
H
hzcheng 已提交
799 800 801 802
 * @param pOneInterDataSrc
 * @param treeList
 * @return the number of remain input source. if ret == 0, all data has been handled
 */
H
Haojun Liao 已提交
803
int32_t loadNewDataFromDiskFor(SLocalMerger *pLocalMerge, SLocalDataSource *pOneInterDataSrc,
H
hzcheng 已提交
804 805 806 807
                               bool *needAdjustLoserTree) {
  pOneInterDataSrc->rowIdx = 0;
  pOneInterDataSrc->pageId += 1;

S
TD-1057  
Shengliang Guan 已提交
808
  if ((uint32_t)pOneInterDataSrc->pageId <
H
hzcheng 已提交
809 810 811 812 813 814
      pOneInterDataSrc->pMemBuffer->fileMeta.flushoutData.pFlushoutInfo[pOneInterDataSrc->flushoutIdx].numOfPages) {
    tExtMemBufferLoadData(pOneInterDataSrc->pMemBuffer, &(pOneInterDataSrc->filePage), pOneInterDataSrc->flushoutIdx,
                          pOneInterDataSrc->pageId);

#if defined(_DEBUG_VIEW)
    printf("new page load to buffer\n");
H
hjxilinx 已提交
815
    tColModelDisplay(pOneInterDataSrc->pMemBuffer->pColumnModel, pOneInterDataSrc->filePage.data,
816
                     pOneInterDataSrc->filePage.num, pOneInterDataSrc->pMemBuffer->pColumnModel->capacity);
H
hzcheng 已提交
817 818 819
#endif
    *needAdjustLoserTree = true;
  } else {
H
Haojun Liao 已提交
820
    pLocalMerge->numOfCompleted += 1;
H
hzcheng 已提交
821 822 823 824 825 826

    pOneInterDataSrc->rowIdx = -1;
    pOneInterDataSrc->pageId = -1;
    *needAdjustLoserTree = true;
  }

H
Haojun Liao 已提交
827
  return pLocalMerge->numOfBuffer;
H
hzcheng 已提交
828 829
}

H
Haojun Liao 已提交
830
void adjustLoserTreeFromNewData(SLocalMerger *pLocalMerge, SLocalDataSource *pOneInterDataSrc,
S
slguan 已提交
831
                                SLoserTreeInfo *pTree) {
H
hzcheng 已提交
832 833 834 835 836
  /*
   * load a new data page into memory for intermediate dataset source,
   * since it's last record in buffer has been chosen to be processed, as the winner of loser-tree
   */
  bool needToAdjust = true;
837
  if (pOneInterDataSrc->filePage.num <= pOneInterDataSrc->rowIdx) {
H
Haojun Liao 已提交
838
    loadNewDataFromDiskFor(pLocalMerge, pOneInterDataSrc, &needToAdjust);
H
hzcheng 已提交
839 840 841 842 843 844 845
  }

  /*
   * adjust loser tree otherwise, according to new candidate data
   * if the loser tree is rebuild completed, we do not need to adjust
   */
  if (needToAdjust) {
H
Haojun Liao 已提交
846
    int32_t leafNodeIdx = pTree->pNode[0].index + pLocalMerge->numOfBuffer;
H
hzcheng 已提交
847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862

#ifdef _DEBUG_VIEW
    printf("before adjust:\t");
    tLoserTreeDisplay(pTree);
#endif

    tLoserTreeAdjust(pTree, leafNodeIdx);

#ifdef _DEBUG_VIEW
    printf("\nafter adjust:\t");
    tLoserTreeDisplay(pTree);
    printf("\n");
#endif
  }
}

H
Haojun Liao 已提交
863
void savePrevRecordAndSetupFillInfo(SLocalMerger *pLocalMerge, SQueryInfo *pQueryInfo, SFillInfo *pFillInfo) {
H
hjxilinx 已提交
864
  // discard following dataset in the same group and reset the interpolation information
H
hjxilinx 已提交
865
  STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
866

H
hjxilinx 已提交
867
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
868

H
Haojun Liao 已提交
869 870
  if (pFillInfo != NULL) {
    int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey;
871
    int64_t revisedSTime = taosTimeTruncate(stime, &pQueryInfo->interval, tinfo.precision);
H
Haojun Liao 已提交
872 873 874
  
    taosResetFillInfo(pFillInfo, revisedSTime);
  }
H
hzcheng 已提交
875

H
Haojun Liao 已提交
876 877
  pLocalMerge->discard = true;
  pLocalMerge->discardData->num = 0;
H
hzcheng 已提交
878

H
Haojun Liao 已提交
879 880
  SColumnModel *pModel = pLocalMerge->pDesc->pColumnModel;
  tColModelAppend(pModel, pLocalMerge->discardData, pLocalMerge->prevRowOfInput, 0, 1, 1);
H
hzcheng 已提交
881 882
}

H
Haojun Liao 已提交
883
static void genFinalResWithoutFill(SSqlRes* pRes, SLocalMerger *pLocalMerge, SQueryInfo* pQueryInfo) {
884
  assert(pQueryInfo->interval.interval == 0 || pQueryInfo->fillType == TSDB_FILL_NONE);
H
hjxilinx 已提交
885

H
Haojun Liao 已提交
886
  tFilePage * pBeforeFillData = pLocalMerge->pResultBuf;
H
hzcheng 已提交
887

H
Haojun Liao 已提交
888
  pRes->data = pLocalMerge->pFinalRes;
H
Haojun Liao 已提交
889
  pRes->numOfRows = (int32_t) pBeforeFillData->num;
H
hzcheng 已提交
890

891 892
  if (pQueryInfo->limit.offset > 0) {
    if (pQueryInfo->limit.offset < pRes->numOfRows) {
H
Haojun Liao 已提交
893
      int32_t prevSize = (int32_t) pBeforeFillData->num;
H
Haojun Liao 已提交
894
      tColModelErase(pLocalMerge->finalModel, pBeforeFillData, prevSize, 0, (int32_t)pQueryInfo->limit.offset - 1);
H
hzcheng 已提交
895

896
      /* remove the hole in column model */
H
Haojun Liao 已提交
897
      tColModelCompact(pLocalMerge->finalModel, pBeforeFillData, prevSize);
898

H
Haojun Liao 已提交
899
      pRes->numOfRows -= (int32_t) pQueryInfo->limit.offset;
900 901 902 903
      pQueryInfo->limit.offset = 0;
    } else {
      pQueryInfo->limit.offset -= pRes->numOfRows;
      pRes->numOfRows = 0;
H
hzcheng 已提交
904
    }
905
  }
H
hzcheng 已提交
906

907 908 909
  if (pRes->numOfRowsGroup >= pQueryInfo->limit.limit && pQueryInfo->limit.limit > 0) {
    pRes->numOfRows = 0;
    pBeforeFillData->num = 0;
H
Haojun Liao 已提交
910
    pLocalMerge->discard = true;
911 912 913
    return;
  }

914
  pRes->numOfRowsGroup += pRes->numOfRows;
H
Haojun Liao 已提交
915

916 917 918 919 920
  // impose the limitation of output rows on the final result
  if (pQueryInfo->limit.limit >= 0 && pRes->numOfRowsGroup > pQueryInfo->limit.limit) {
    int32_t prevSize = (int32_t)pBeforeFillData->num;
    int32_t overflow = (int32_t)(pRes->numOfRowsGroup - pQueryInfo->limit.limit);
    assert(overflow < pRes->numOfRows);
H
hzcheng 已提交
921

922 923 924
    pRes->numOfRowsGroup = pQueryInfo->limit.limit;
    pRes->numOfRows -= overflow;
    pBeforeFillData->num -= overflow;
H
hzcheng 已提交
925

H
Haojun Liao 已提交
926
    tColModelCompact(pLocalMerge->finalModel, pBeforeFillData, prevSize);
H
hzcheng 已提交
927

928
    // set remain data to be discarded, and reset the interpolation information
H
Haojun Liao 已提交
929
    savePrevRecordAndSetupFillInfo(pLocalMerge, pQueryInfo, pLocalMerge->pFillInfo);
930
  }
H
hzcheng 已提交
931

H
Haojun Liao 已提交
932
  memcpy(pRes->data, pBeforeFillData->data, (size_t)(pRes->numOfRows * pLocalMerge->finalModel->rowSize));
H
Haojun Liao 已提交
933

934 935 936 937 938
  pRes->numOfClauseTotal += pRes->numOfRows;
  pBeforeFillData->num = 0;
}

/*
H
Haojun Liao 已提交
939
 * Note: pRes->pLocalMerge may be null, due to the fact that "tscDestroyLocalMerger" is called
940 941
 * by "interuptHandler" function in shell
 */
H
Haojun Liao 已提交
942
static void doFillResult(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool doneOutput) {
943 944 945
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
  
H
Haojun Liao 已提交
946
  tFilePage  *pBeforeFillData = pLocalMerge->pResultBuf;
947
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
Haojun Liao 已提交
948
  SFillInfo  *pFillInfo = pLocalMerge->pFillInfo;
H
hzcheng 已提交
949

950 951
  // todo extract function
  int64_t actualETime = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.ekey: pQueryInfo->window.skey;
H
hzcheng 已提交
952

H
hjxilinx 已提交
953 954 955
  tFilePage **pResPages = malloc(POINTER_BYTES * pQueryInfo->fieldsInfo.numOfOutput);
  for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
    TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
Haojun Liao 已提交
956
    pResPages[i] = calloc(1, sizeof(tFilePage) + pField->bytes * pLocalMerge->resColModel->capacity);
H
hzcheng 已提交
957
  }
H
Haojun Liao 已提交
958

H
hzcheng 已提交
959
  while (1) {
H
Haojun Liao 已提交
960
    int64_t newRows = taosFillResultDataBlock(pFillInfo, pResPages, pLocalMerge->resColModel->capacity);
H
hzcheng 已提交
961

962 963
    if (pQueryInfo->limit.offset < newRows) {
      newRows -= pQueryInfo->limit.offset;
H
hzcheng 已提交
964

965
      if (pQueryInfo->limit.offset > 0) {
H
hjxilinx 已提交
966 967
        for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
          TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
H
hjxilinx 已提交
968
          memmove(pResPages[i]->data, pResPages[i]->data + pField->bytes * pQueryInfo->limit.offset,
969
                  (size_t)(newRows * pField->bytes));
H
hzcheng 已提交
970 971 972
        }
      }

H
Haojun Liao 已提交
973
      pRes->data = pLocalMerge->pFinalRes;
H
Haojun Liao 已提交
974
      pRes->numOfRows = (int32_t) newRows;
H
hzcheng 已提交
975

976
      pQueryInfo->limit.offset = 0;
H
hzcheng 已提交
977 978
      break;
    } else {
979
      pQueryInfo->limit.offset -= newRows;
H
hzcheng 已提交
980 981
      pRes->numOfRows = 0;

982
      if (!taosFillHasMoreResults(pFillInfo)) {
983
        if (!doneOutput) { // reduce procedure has not completed yet, but current results for fill are exhausted
H
hzcheng 已提交
984 985 986
          break;
        }

987
        // all output in current group are completed
H
Haojun Liao 已提交
988
        int32_t totalRemainRows = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, actualETime, pLocalMerge->resColModel->capacity);
H
hzcheng 已提交
989 990 991 992 993 994 995 996
        if (totalRemainRows <= 0) {
          break;
        }
      }
    }
  }

  if (pRes->numOfRows > 0) {
997
    int32_t currentTotal = (int32_t)(pRes->numOfRowsGroup + pRes->numOfRows);
H
hzcheng 已提交
998

999 1000 1001 1002 1003
    if (pQueryInfo->limit.limit >= 0 && currentTotal > pQueryInfo->limit.limit) {
      int32_t overflow = (int32_t)(currentTotal - pQueryInfo->limit.limit);

      pRes->numOfRows -= overflow;
      assert(pRes->numOfRows >= 0);
H
Haojun Liao 已提交
1004

H
hzcheng 已提交
1005
      /* set remain data to be discarded, and reset the interpolation information */
H
Haojun Liao 已提交
1006
      savePrevRecordAndSetupFillInfo(pLocalMerge, pQueryInfo, pFillInfo);
H
hzcheng 已提交
1007 1008
    }

H
Haojun Liao 已提交
1009
    int32_t offset = 0;
1010 1011
    for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
      TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
1012
      memcpy(pRes->data + offset * pRes->numOfRows, pResPages[i]->data, (size_t)(pField->bytes * pRes->numOfRows));
H
Haojun Liao 已提交
1013
      offset += pField->bytes;
H
hzcheng 已提交
1014
    }
H
Haojun Liao 已提交
1015 1016 1017

    pRes->numOfRowsGroup += pRes->numOfRows;
    pRes->numOfClauseTotal += pRes->numOfRows;
H
hzcheng 已提交
1018 1019
  }

1020
  pBeforeFillData->num = 0;
H
hjxilinx 已提交
1021
  for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
S
TD-1848  
Shengliang Guan 已提交
1022
    tfree(pResPages[i]);
H
hzcheng 已提交
1023
  }
1024
  
S
TD-1848  
Shengliang Guan 已提交
1025
  tfree(pResPages);
H
hzcheng 已提交
1026 1027
}

H
Haojun Liao 已提交
1028 1029
static void savePreviousRow(SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) {
  SColumnModel *pColumnModel = pLocalMerge->pDesc->pColumnModel;
1030
  assert(pColumnModel->capacity == 1 && tmpBuffer->num == 1);
H
hzcheng 已提交
1031 1032

  // copy to previous temp buffer
H
hjxilinx 已提交
1033
  for (int32_t i = 0; i < pColumnModel->numOfCols; ++i) {
H
hjxilinx 已提交
1034 1035 1036
    SSchema *pSchema = getColumnModelSchema(pColumnModel, i);
    int16_t  offset = getColumnModelOffset(pColumnModel, i);

H
Haojun Liao 已提交
1037
    memcpy(pLocalMerge->prevRowOfInput + offset, tmpBuffer->data + offset, pSchema->bytes);
H
hzcheng 已提交
1038 1039
  }

1040
  tmpBuffer->num = 0;
H
Haojun Liao 已提交
1041
  pLocalMerge->hasPrevRow = true;
H
hzcheng 已提交
1042 1043
}

H
Haojun Liao 已提交
1044
static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, bool needInit) {
S
slguan 已提交
1045
  // the tag columns need to be set before all functions execution
H
hjxilinx 已提交
1046
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
1047

H
hjxilinx 已提交
1048 1049
  size_t size = tscSqlExprNumOfExprs(pQueryInfo);
  for (int32_t j = 0; j < size; ++j) {
H
Haojun Liao 已提交
1050
    SQLFunctionCtx *pCtx = &pLocalMerge->pCtx[j];
H
hzcheng 已提交
1051

S
slguan 已提交
1052
    // tags/tags_dummy function, the tag field of SQLFunctionCtx is from the input buffer
H
Haojun Liao 已提交
1053
    int32_t functionId = pCtx->functionId;
H
hjxilinx 已提交
1054
    if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS_DUMMY) {
S
slguan 已提交
1055
      tVariantDestroy(&pCtx->tag);
1056 1057 1058 1059 1060 1061 1062 1063
      char* input = pCtx->aInputElemBuf;
      
      if (pCtx->inputType == TSDB_DATA_TYPE_BINARY || pCtx->inputType == TSDB_DATA_TYPE_NCHAR) {
        assert(varDataLen(input) <= pCtx->inputBytes);
        tVariantCreateFromBinary(&pCtx->tag, varDataVal(input), varDataLen(input), pCtx->inputType);
      } else {
        tVariantCreateFromBinary(&pCtx->tag, input, pCtx->inputBytes, pCtx->inputType);
      }
H
Haojun Liao 已提交
1064 1065
    } else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
      SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, j);
1066
      pCtx->param[0].i64 = pExpr->param[0].i64;
H
hzcheng 已提交
1067 1068
    }

H
Haojun Liao 已提交
1069
    pCtx->currentStage = MERGE_STAGE;
S
slguan 已提交
1070 1071

    if (needInit) {
H
Haojun Liao 已提交
1072
      aAggs[pCtx->functionId].init(pCtx);
S
slguan 已提交
1073 1074 1075
    }
  }

H
hjxilinx 已提交
1076
  for (int32_t j = 0; j < size; ++j) {
H
Haojun Liao 已提交
1077
    int32_t functionId = pLocalMerge->pCtx[j].functionId;
S
slguan 已提交
1078 1079 1080 1081
    if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
      continue;
    }

H
Haojun Liao 已提交
1082
    aAggs[functionId].mergeFunc(&pLocalMerge->pCtx[j]);
S
slguan 已提交
1083 1084
  }
}
H
hzcheng 已提交
1085

H
Haojun Liao 已提交
1086 1087 1088 1089 1090
static void handleUnprocessedRow(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) {
  if (pLocalMerge->hasUnprocessedRow) {
    pLocalMerge->hasUnprocessedRow = false;
    doExecuteSecondaryMerge(pCmd, pLocalMerge, true);
    savePreviousRow(pLocalMerge, tmpBuffer);
H
hzcheng 已提交
1091 1092 1093
  }
}

1094
static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx) {
H
hzcheng 已提交
1095
  int64_t maxOutput = 0;
H
hjxilinx 已提交
1096 1097 1098
  
  size_t size = tscSqlExprNumOfExprs(pQueryInfo);
  for (int32_t j = 0; j < size; ++j) {
H
hzcheng 已提交
1099 1100 1101 1102
    /*
     * ts, tag, tagprj function can not decide the output number of current query
     * the number of output result is decided by main output
     */
H
Haojun Liao 已提交
1103
    int32_t functionId = pCtx[j].functionId;
Y
TD-2571  
yihaoDeng 已提交
1104
    if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG) {
H
hzcheng 已提交
1105 1106
      continue;
    }
H
hjxilinx 已提交
1107

H
Haojun Liao 已提交
1108
    SResultRowCellInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
H
Haojun Liao 已提交
1109 1110
    if (maxOutput < pResInfo->numOfRes) {
      maxOutput = pResInfo->numOfRes;
H
hzcheng 已提交
1111 1112
    }
  }
H
hjxilinx 已提交
1113

H
hzcheng 已提交
1114 1115 1116 1117
  return maxOutput;
}

/*
S
slguan 已提交
1118
 * in handling the top/bottom query, which produce more than one rows result,
H
hzcheng 已提交
1119 1120
 * the tsdb_func_tags only fill the first row of results, the remain rows need to
 * filled with the same result, which is the tags, specified in group by clause
S
slguan 已提交
1121
 *
H
hzcheng 已提交
1122
 */
H
Haojun Liao 已提交
1123
static void fillMultiRowsOfTagsVal(SQueryInfo *pQueryInfo, int32_t numOfRes, SLocalMerger *pLocalMerge) {
S
slguan 已提交
1124
  int32_t maxBufSize = 0;  // find the max tags column length to prepare the buffer
H
hjxilinx 已提交
1125 1126 1127
  size_t size = tscSqlExprNumOfExprs(pQueryInfo);
  
  for (int32_t k = 0; k < size; ++k) {
1128
    SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, k);
S
slguan 已提交
1129
    if (maxBufSize < pExpr->resBytes && pExpr->functionId == TSDB_FUNC_TAG) {
H
hzcheng 已提交
1130 1131 1132 1133 1134 1135
      maxBufSize = pExpr->resBytes;
    }
  }

  assert(maxBufSize >= 0);

H
hjxilinx 已提交
1136
  char *buf = malloc((size_t)maxBufSize);
H
hjxilinx 已提交
1137
  for (int32_t k = 0; k < size; ++k) {
H
Haojun Liao 已提交
1138
    SQLFunctionCtx *pCtx = &pLocalMerge->pCtx[k];
H
Haojun Liao 已提交
1139
    if (pCtx->functionId != TSDB_FUNC_TAG) {
S
slguan 已提交
1140 1141 1142
      continue;
    }

H
hzcheng 已提交
1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155
    int32_t inc = numOfRes - 1;  // tsdb_func_tag function only produce one row of result
    memset(buf, 0, (size_t)maxBufSize);
    memcpy(buf, pCtx->aOutputBuf, (size_t)pCtx->outputBytes);

    for (int32_t i = 0; i < inc; ++i) {
      pCtx->aOutputBuf += pCtx->outputBytes;
      memcpy(pCtx->aOutputBuf, buf, (size_t)pCtx->outputBytes);
    }
  }

  free(buf);
}

H
Haojun Liao 已提交
1156
int32_t finalizeRes(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge) {
H
hjxilinx 已提交
1157 1158 1159
  size_t size = tscSqlExprNumOfExprs(pQueryInfo);
  
  for (int32_t k = 0; k < size; ++k) {
H
Haojun Liao 已提交
1160
    SQLFunctionCtx* pCtx = &pLocalMerge->pCtx[k];
H
Haojun Liao 已提交
1161
    aAggs[pCtx->functionId].xFinalize(pCtx);
H
hzcheng 已提交
1162 1163
  }

H
Haojun Liao 已提交
1164
  pLocalMerge->hasPrevRow = false;
H
hzcheng 已提交
1165

H
Haojun Liao 已提交
1166 1167
  int32_t numOfRes = (int32_t)getNumOfResultLocal(pQueryInfo, pLocalMerge->pCtx);
  pLocalMerge->pResultBuf->num += numOfRes;
H
hzcheng 已提交
1168

H
Haojun Liao 已提交
1169
  fillMultiRowsOfTagsVal(pQueryInfo, numOfRes, pLocalMerge);
H
hzcheng 已提交
1170 1171 1172 1173 1174 1175 1176 1177 1178 1179
  return numOfRes;
}

/*
 * points merge:
 * points are merged according to the sort info, which is tags columns and timestamp column.
 * In case of points without either tags columns or timestamp, such as
 * results generated by simple aggregation function, we merge them all into one points
 * *Exception*: column projection query, required no merge procedure
 */
H
Haojun Liao 已提交
1180
bool needToMerge(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) {
H
hzcheng 已提交
1181
  int32_t ret = 0;  // merge all result by default
1182

H
Haojun Liao 已提交
1183
  int16_t functionId = pLocalMerge->pCtx[0].functionId;
1184 1185

  // todo opt performance
Y
TD-2571  
yihaoDeng 已提交
1186
  if ((/*functionId == TSDB_FUNC_PRJ || */functionId == TSDB_FUNC_ARITHM) || (tscIsProjectionQueryOnSTable(pQueryInfo, 0) && pQueryInfo->distinctTag == false)) {  // column projection query
H
hzcheng 已提交
1187 1188
    ret = 1;                                                            // disable merge procedure
  } else {
H
Haojun Liao 已提交
1189
    tOrderDescriptor *pDesc = pLocalMerge->pDesc;
H
Haojun Liao 已提交
1190
    if (pDesc->orderInfo.numOfCols > 0) {
1191
      if (pDesc->tsOrder == TSDB_ORDER_ASC) {  // asc
H
hzcheng 已提交
1192
        // todo refactor comparator
H
Haojun Liao 已提交
1193
        ret = compare_a(pLocalMerge->pDesc, 1, 0, pLocalMerge->prevRowOfInput, 1, 0, tmpBuffer->data);
H
hzcheng 已提交
1194
      } else {  // desc
H
Haojun Liao 已提交
1195
        ret = compare_d(pLocalMerge->pDesc, 1, 0, pLocalMerge->prevRowOfInput, 1, 0, tmpBuffer->data);
H
hzcheng 已提交
1196 1197 1198 1199 1200 1201 1202 1203
      }
    }
  }

  /* if ret == 0, means the result belongs to the same group */
  return (ret == 0);
}

H
hjxilinx 已提交
1204
static bool reachGroupResultLimit(SQueryInfo *pQueryInfo, SSqlRes *pRes) {
1205
  return (pRes->numOfGroups >= pQueryInfo->slimit.limit && pQueryInfo->slimit.limit >= 0);
S
slguan 已提交
1206 1207 1208 1209 1210 1211
}

static bool saveGroupResultInfo(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

H
hjxilinx 已提交
1212
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
Haojun Liao 已提交
1213 1214 1215 1216

  if (pRes->numOfRowsGroup > 0) {
    pRes->numOfGroups += 1;
  }
S
slguan 已提交
1217

S
slguan 已提交
1218
  // the output group is limited by the slimit clause
1219
  if (reachGroupResultLimit(pQueryInfo, pRes)) {
S
slguan 已提交
1220 1221 1222 1223
    return true;
  }

  //    pRes->pGroupRec = realloc(pRes->pGroupRec, pRes->numOfGroups*sizeof(SResRec));
H
hzcheng 已提交
1224
  //    pRes->pGroupRec[pRes->numOfGroups-1].numOfRows = pRes->numOfRows;
H
Haojun Liao 已提交
1225
  //    pRes->pGroupRec[pRes->numOfGroups-1].numOfClauseTotal = pRes->numOfClauseTotal;
S
slguan 已提交
1226 1227

  return false;
H
hzcheng 已提交
1228 1229
}

S
slguan 已提交
1230 1231 1232
/**
 *
 * @param pSql
H
Haojun Liao 已提交
1233
 * @param pLocalMerge
S
slguan 已提交
1234 1235 1236
 * @param noMoreCurrentGroupRes
 * @return if current group is skipped, return false, and do NOT record it into pRes->numOfGroups
 */
H
Haojun Liao 已提交
1237
bool genFinalResults(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool noMoreCurrentGroupRes) {
H
hjxilinx 已提交
1238 1239 1240 1241
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

  SQueryInfo *  pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
Haojun Liao 已提交
1242 1243
  tFilePage *   pResBuf = pLocalMerge->pResultBuf;
  SColumnModel *pModel = pLocalMerge->resColModel;
H
hzcheng 已提交
1244

S
slguan 已提交
1245 1246 1247
  pRes->code = TSDB_CODE_SUCCESS;

  /*
1248
   * Ignore the output of the current group since this group is skipped by user
S
slguan 已提交
1249 1250
   * We set the numOfRows to be 0 and discard the possible remain results.
   */
1251
  if (pQueryInfo->slimit.offset > 0) {
S
slguan 已提交
1252
    pRes->numOfRows = 0;
1253
    pQueryInfo->slimit.offset -= 1;
H
Haojun Liao 已提交
1254
    pLocalMerge->discard = !noMoreCurrentGroupRes;
H
Haojun Liao 已提交
1255

H
Haojun Liao 已提交
1256 1257 1258
    if (pLocalMerge->discard) {
      SColumnModel *pInternModel = pLocalMerge->pDesc->pColumnModel;
      tColModelAppend(pInternModel, pLocalMerge->discardData, pLocalMerge->pTempBuffer->data, 0, 1, 1);
H
Haojun Liao 已提交
1259 1260
    }

S
slguan 已提交
1261 1262 1263
    return false;
  }

H
hjxilinx 已提交
1264
  tColModelCompact(pModel, pResBuf, pModel->capacity);
H
hzcheng 已提交
1265

H
Haojun Liao 已提交
1266
  if (tscIsSecondStageQuery(pQueryInfo)) {
H
Haojun Liao 已提交
1267
    doArithmeticCalculate(pQueryInfo, pResBuf, pModel->rowSize, pLocalMerge->finalModel->rowSize);
H
Haojun Liao 已提交
1268 1269
  }

1270
  // no interval query, no fill operation
1271
  if (pQueryInfo->interval.interval == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) {
H
Haojun Liao 已提交
1272
    genFinalResWithoutFill(pRes, pLocalMerge, pQueryInfo);
1273
  } else {
H
Haojun Liao 已提交
1274
    SFillInfo* pFillInfo = pLocalMerge->pFillInfo;
1275
    if (pFillInfo != NULL) {
1276 1277 1278
      TSKEY ekey = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.ekey: pQueryInfo->window.skey;

      taosFillSetStartInfo(pFillInfo, (int32_t)pResBuf->num, ekey);
1279 1280 1281
      taosFillCopyInputDataFromOneFilePage(pFillInfo, pResBuf);
    }
    
H
Haojun Liao 已提交
1282
    doFillResult(pSql, pLocalMerge, noMoreCurrentGroupRes);
1283 1284
  }

S
slguan 已提交
1285
  return true;
H
hzcheng 已提交
1286 1287
}

H
Haojun Liao 已提交
1288
void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge) {// reset output buffer to the beginning
H
Haojun Liao 已提交
1289 1290 1291
  size_t t = tscSqlExprNumOfExprs(pQueryInfo);
  for (int32_t i = 0; i < t; ++i) {
    SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
H
Haojun Liao 已提交
1292
    pLocalMerge->pCtx[i].aOutputBuf = pLocalMerge->pResultBuf->data + pExpr->offset * pLocalMerge->resColModel->capacity;
1293 1294

    if (pExpr->functionId == TSDB_FUNC_TOP || pExpr->functionId == TSDB_FUNC_BOTTOM || pExpr->functionId == TSDB_FUNC_DIFF) {
H
Haojun Liao 已提交
1295
      pLocalMerge->pCtx[i].ptsOutputBuf = pLocalMerge->pCtx[0].aOutputBuf;
1296
    }
H
hzcheng 已提交
1297 1298
  }

H
Haojun Liao 已提交
1299
  memset(pLocalMerge->pResultBuf, 0, pLocalMerge->nResultBufSize + sizeof(tFilePage));
H
hzcheng 已提交
1300 1301
}

H
Haojun Liao 已提交
1302
static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalMerger *pLocalMerge) {
S
slguan 已提交
1303
  // In handling data in other groups, we need to reset the interpolation information for a new group data
H
hzcheng 已提交
1304
  pRes->numOfRows = 0;
H
Haojun Liao 已提交
1305
  pRes->numOfRowsGroup = 0;
H
hjxilinx 已提交
1306 1307 1308

  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);

H
Haojun Liao 已提交
1309
  pQueryInfo->limit.offset = pLocalMerge->offset;
H
hzcheng 已提交
1310

1311
  STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
1312
  STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
H
hjxilinx 已提交
1313
  
S
slguan 已提交
1314
  // for group result interpolation, do not return if not data is generated
1315
  if (pQueryInfo->fillType != TSDB_FILL_NONE) {
1316
    TSKEY skey = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.skey:pQueryInfo->window.ekey;//MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey);
1317
    int64_t newTime = taosTimeTruncate(skey, &pQueryInfo->interval, tinfo.precision);
H
Haojun Liao 已提交
1318
    taosResetFillInfo(pLocalMerge->pFillInfo, newTime);
H
hzcheng 已提交
1319 1320 1321
  }
}

H
Haojun Liao 已提交
1322 1323
static bool isAllSourcesCompleted(SLocalMerger *pLocalMerge) {
  return (pLocalMerge->numOfBuffer == pLocalMerge->numOfCompleted);
S
slguan 已提交
1324 1325
}

1326
static bool doBuildFilledResultForGroup(SSqlObj *pSql) {
H
hzcheng 已提交
1327 1328 1329
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

1330
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
Haojun Liao 已提交
1331 1332
  SLocalMerger *pLocalMerge = pRes->pLocalMerger;
  SFillInfo *pFillInfo = pLocalMerge->pFillInfo;
S
slguan 已提交
1333

1334
  if (pFillInfo != NULL && taosFillHasMoreResults(pFillInfo)) {
1335
    assert(pQueryInfo->fillType != TSDB_FILL_NONE);
H
hzcheng 已提交
1336

H
Haojun Liao 已提交
1337
    tFilePage *pFinalDataBuf = pLocalMerge->pResultBuf;
H
Haojun Liao 已提交
1338
    int64_t etime = *(int64_t *)(pFinalDataBuf->data + TSDB_KEYSIZE * (pFillInfo->numOfRows - 1));
H
hzcheng 已提交
1339

1340
    // the first column must be the timestamp column
H
Haojun Liao 已提交
1341
    int32_t rows = (int32_t) getNumOfResultsAfterFillGap(pFillInfo, etime, pLocalMerge->resColModel->capacity);
H
Haojun Liao 已提交
1342
    if (rows > 0) {  // do fill gap
H
Haojun Liao 已提交
1343
      doFillResult(pSql, pLocalMerge, false);
S
slguan 已提交
1344
    }
H
hzcheng 已提交
1345

S
slguan 已提交
1346 1347 1348
    return true;
  } else {
    return false;
H
hzcheng 已提交
1349
  }
S
slguan 已提交
1350
}
H
hzcheng 已提交
1351

S
slguan 已提交
1352 1353 1354 1355
static bool doHandleLastRemainData(SSqlObj *pSql) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

H
Haojun Liao 已提交
1356 1357
  SLocalMerger *pLocalMerge = pRes->pLocalMerger;
  SFillInfo     *pFillInfo = pLocalMerge->pFillInfo;
H
hzcheng 已提交
1358

H
Haojun Liao 已提交
1359
  bool prevGroupCompleted = (!pLocalMerge->discard) && pLocalMerge->hasUnprocessedRow;
S
slguan 已提交
1360

H
Haojun Liao 已提交
1361
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
1362

H
Haojun Liao 已提交
1363
  if ((isAllSourcesCompleted(pLocalMerge) && !pLocalMerge->hasPrevRow) || pLocalMerge->pLocalDataSrc[0] == NULL ||
S
slguan 已提交
1364
      prevGroupCompleted) {
1365
    // if fillType == TSDB_FILL_NONE, return directly
H
Haojun Liao 已提交
1366 1367
    if (pQueryInfo->fillType != TSDB_FILL_NONE &&
      ((pRes->numOfRowsGroup < pQueryInfo->limit.limit && pQueryInfo->limit.limit > 0) || (pQueryInfo->limit.limit < 0))) {
1368
      int64_t etime = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.ekey : pQueryInfo->window.skey;
H
hzcheng 已提交
1369

H
Haojun Liao 已提交
1370
      int32_t rows = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, etime, pLocalMerge->resColModel->capacity);
1371
      if (rows > 0) {
H
Haojun Liao 已提交
1372
        doFillResult(pSql, pLocalMerge, true);
H
hzcheng 已提交
1373 1374 1375
      }
    }

S
slguan 已提交
1376 1377 1378 1379 1380 1381
    /*
     * 1. numOfRows == 0, means no interpolation results are generated.
     * 2. if all local data sources are consumed, and no un-processed rows exist.
     *
     * No results will be generated and query completed.
     */
H
Haojun Liao 已提交
1382
    if (pRes->numOfRows > 0 || (isAllSourcesCompleted(pLocalMerge) && (!pLocalMerge->hasUnprocessedRow))) {
S
slguan 已提交
1383
      return true;
H
hzcheng 已提交
1384
    }
S
slguan 已提交
1385 1386 1387 1388 1389 1390

    // start to process result for a new group and save the result info of previous group
    if (saveGroupResultInfo(pSql)) {
      return true;
    }

H
Haojun Liao 已提交
1391
    resetEnvForNewResultset(pRes, pCmd, pLocalMerge);
H
hzcheng 已提交
1392 1393
  }

S
slguan 已提交
1394 1395
  return false;
}
H
hzcheng 已提交
1396

H
hjxilinx 已提交
1397 1398 1399 1400
static void doProcessResultInNextWindow(SSqlObj *pSql, int32_t numOfRes) {
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

H
Haojun Liao 已提交
1401
  SLocalMerger *pLocalMerge = pRes->pLocalMerger;
H
hjxilinx 已提交
1402
  SQueryInfo *   pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hjxilinx 已提交
1403
  size_t size = tscSqlExprNumOfExprs(pQueryInfo);
H
hjxilinx 已提交
1404

H
hjxilinx 已提交
1405
  for (int32_t k = 0; k < size; ++k) {
H
Haojun Liao 已提交
1406
    SQLFunctionCtx *pCtx = &pLocalMerge->pCtx[k];
S
slguan 已提交
1407
    pCtx->aOutputBuf += pCtx->outputBytes * numOfRes;
S
slguan 已提交
1408 1409

    // set the correct output timestamp column position
H
Haojun Liao 已提交
1410
    if (pCtx->functionId == TSDB_FUNC_TOP || pCtx->functionId == TSDB_FUNC_BOTTOM) {
S
slguan 已提交
1411
      pCtx->ptsOutputBuf = ((char *)pCtx->ptsOutputBuf + TSDB_KEYSIZE * numOfRes);
H
hzcheng 已提交
1412
    }
S
slguan 已提交
1413 1414
  }

H
Haojun Liao 已提交
1415
  doExecuteSecondaryMerge(pCmd, pLocalMerge, true);
S
slguan 已提交
1416 1417
}

1418
int32_t tscDoLocalMerge(SSqlObj *pSql) {
S
slguan 已提交
1419 1420
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
H
hjxilinx 已提交
1421

H
hjxilinx 已提交
1422
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
1423

H
Haojun Liao 已提交
1424
  if (pSql->signature != pSql || pRes == NULL || pRes->pLocalMerger == NULL) {  // all data has been processed
D
dapan1121 已提交
1425 1426 1427
    if (pRes->code == TSDB_CODE_SUCCESS) {
      return pRes->code;
    }
H
Haojun Liao 已提交
1428

H
Haojun Liao 已提交
1429 1430
    tscError("%p local merge abort due to error occurs, code:%s", pSql, tstrerror(pRes->code));
    return pRes->code;
H
hzcheng 已提交
1431
  }
H
hjxilinx 已提交
1432

H
Haojun Liao 已提交
1433
  SLocalMerger  *pLocalMerge = pRes->pLocalMerger;
H
Haojun Liao 已提交
1434
  SQueryInfo    *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
Haojun Liao 已提交
1435
  tFilePage     *tmpBuffer = pLocalMerge->pTempBuffer;
S
slguan 已提交
1436 1437 1438 1439 1440

  if (doHandleLastRemainData(pSql)) {
    return TSDB_CODE_SUCCESS;
  }

1441
  if (doBuildFilledResultForGroup(pSql)) {
S
slguan 已提交
1442 1443 1444
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
1445
  SLoserTreeInfo *pTree = pLocalMerge->pLoserTree;
H
hzcheng 已提交
1446 1447

  // clear buffer
H
Haojun Liao 已提交
1448 1449
  handleUnprocessedRow(pCmd, pLocalMerge, tmpBuffer);
  SColumnModel *pModel = pLocalMerge->pDesc->pColumnModel;
H
hzcheng 已提交
1450 1451

  while (1) {
H
Haojun Liao 已提交
1452
    if (isAllSourcesCompleted(pLocalMerge)) {
H
hzcheng 已提交
1453 1454 1455 1456 1457 1458
      break;
    }

#ifdef _DEBUG_VIEW
    printf("chosen data in pTree[0] = %d\n", pTree->pNode[0].index);
#endif
H
Haojun Liao 已提交
1459
    assert((pTree->pNode[0].index < pLocalMerge->numOfBuffer) && (pTree->pNode[0].index >= 0) && tmpBuffer->num == 0);
H
hzcheng 已提交
1460 1461

    // chosen from loser tree
H
Haojun Liao 已提交
1462
    SLocalDataSource *pOneDataSrc = pLocalMerge->pLocalDataSrc[pTree->pNode[0].index];
H
hzcheng 已提交
1463

S
slguan 已提交
1464
    tColModelAppend(pModel, tmpBuffer, pOneDataSrc->filePage.data, pOneDataSrc->rowIdx, 1,
H
hjxilinx 已提交
1465
                    pOneDataSrc->pMemBuffer->pColumnModel->capacity);
H
hzcheng 已提交
1466 1467 1468 1469

#if defined(_DEBUG_VIEW)
    printf("chosen row:\t");
    SSrcColumnInfo colInfo[256] = {0};
1470
    tscGetSrcColumnInfo(colInfo, pQueryInfo);
H
hzcheng 已提交
1471

1472
    tColModelDisplayEx(pModel, tmpBuffer->data, tmpBuffer->num, pModel->capacity, colInfo);
H
hzcheng 已提交
1473
#endif
S
slguan 已提交
1474

H
Haojun Liao 已提交
1475 1476
    if (pLocalMerge->discard) {
      assert(pLocalMerge->hasUnprocessedRow == false);
H
hzcheng 已提交
1477 1478

      /* current record belongs to the same group of previous record, need to discard it */
H
Haojun Liao 已提交
1479
      if (isSameGroup(pCmd, pLocalMerge, pLocalMerge->discardData->data, tmpBuffer)) {
1480
        tmpBuffer->num = 0;
H
hzcheng 已提交
1481 1482
        pOneDataSrc->rowIdx += 1;

H
Haojun Liao 已提交
1483
        adjustLoserTreeFromNewData(pLocalMerge, pOneDataSrc, pTree);
S
slguan 已提交
1484 1485

        // all inputs are exhausted, abort current process
H
Haojun Liao 已提交
1486
        if (isAllSourcesCompleted(pLocalMerge)) {
H
hzcheng 已提交
1487 1488 1489
          break;
        }

S
slguan 已提交
1490
        // data belongs to the same group needs to be discarded
H
hzcheng 已提交
1491 1492
        continue;
      } else {
H
Haojun Liao 已提交
1493 1494
        pLocalMerge->discard = false;
        pLocalMerge->discardData->num = 0;
H
hzcheng 已提交
1495

S
slguan 已提交
1496 1497 1498 1499
        if (saveGroupResultInfo(pSql)) {
          return TSDB_CODE_SUCCESS;
        }

H
Haojun Liao 已提交
1500
        resetEnvForNewResultset(pRes, pCmd, pLocalMerge);
H
hzcheng 已提交
1501 1502 1503
      }
    }

H
Haojun Liao 已提交
1504 1505
    if (pLocalMerge->hasPrevRow) {
      if (needToMerge(pQueryInfo, pLocalMerge, tmpBuffer)) {
S
slguan 已提交
1506
        // belong to the group of the previous row, continue process it
H
Haojun Liao 已提交
1507
        doExecuteSecondaryMerge(pCmd, pLocalMerge, false);
H
hzcheng 已提交
1508 1509

        // copy to buffer
H
Haojun Liao 已提交
1510
        savePreviousRow(pLocalMerge, tmpBuffer);
S
slguan 已提交
1511 1512 1513 1514 1515
      } else {
        /*
         * current row does not belong to the group of previous row.
         * so the processing of previous group is completed.
         */
H
Haojun Liao 已提交
1516 1517
        int32_t numOfRes = finalizeRes(pQueryInfo, pLocalMerge);
        bool   sameGroup = isSameGroup(pCmd, pLocalMerge, pLocalMerge->prevRowOfInput, tmpBuffer);
H
hzcheng 已提交
1518

H
Haojun Liao 已提交
1519
        tFilePage *pResBuf = pLocalMerge->pResultBuf;
H
hzcheng 已提交
1520 1521

        /*
1522
         * if the previous group does NOT generate any result (pResBuf->num == 0),
H
hzcheng 已提交
1523 1524
         * continue to process results instead of return results.
         */
H
Haojun Liao 已提交
1525
        if ((!sameGroup && pResBuf->num > 0) || (pResBuf->num == pLocalMerge->resColModel->capacity)) {
H
hzcheng 已提交
1526
          // does not belong to the same group
H
Haojun Liao 已提交
1527
          bool notSkipped = genFinalResults(pSql, pLocalMerge, !sameGroup);
H
hzcheng 已提交
1528

S
slguan 已提交
1529
          // this row needs to discard, since it belongs to the group of previous
H
Haojun Liao 已提交
1530 1531
          if (pLocalMerge->discard && sameGroup) {
            pLocalMerge->hasUnprocessedRow = false;
1532
            tmpBuffer->num = 0;
H
Haojun Liao 已提交
1533
          } else { // current row does not belongs to the previous group, so it is not be handled yet.
H
Haojun Liao 已提交
1534
            pLocalMerge->hasUnprocessedRow = true;
H
hzcheng 已提交
1535 1536
          }

H
Haojun Liao 已提交
1537
          resetOutputBuf(pQueryInfo, pLocalMerge);
H
hzcheng 已提交
1538 1539
          pOneDataSrc->rowIdx += 1;

S
slguan 已提交
1540
          // here we do not check the return value
H
Haojun Liao 已提交
1541
          adjustLoserTreeFromNewData(pLocalMerge, pOneDataSrc, pTree);
H
hzcheng 已提交
1542 1543

          if (pRes->numOfRows == 0) {
H
Haojun Liao 已提交
1544
            handleUnprocessedRow(pCmd, pLocalMerge, tmpBuffer);
H
hzcheng 已提交
1545 1546

            if (!sameGroup) {
S
slguan 已提交
1547 1548 1549 1550 1551 1552 1553 1554
              /*
               * previous group is done, prepare for the next group
               * If previous group is not skipped, keep it in pRes->numOfGroups
               */
              if (notSkipped && saveGroupResultInfo(pSql)) {
                return TSDB_CODE_SUCCESS;
              }

H
Haojun Liao 已提交
1555
              resetEnvForNewResultset(pRes, pCmd, pLocalMerge);
H
hzcheng 已提交
1556 1557 1558 1559 1560 1561 1562
            }
          } else {
            /*
             * if next record belongs to a new group, we do not handle this record here.
             * We start the process in a new round.
             */
            if (sameGroup) {
H
Haojun Liao 已提交
1563
              handleUnprocessedRow(pCmd, pLocalMerge, tmpBuffer);
H
hzcheng 已提交
1564 1565 1566
            }
          }

S
slguan 已提交
1567 1568 1569 1570 1571
          // current group has no result,
          if (pRes->numOfRows == 0) {
            continue;
          } else {
            return TSDB_CODE_SUCCESS;
H
hzcheng 已提交
1572
          }
S
slguan 已提交
1573
        } else {  // result buffer is not full
H
hjxilinx 已提交
1574
          doProcessResultInNextWindow(pSql, numOfRes);
H
Haojun Liao 已提交
1575
          savePreviousRow(pLocalMerge, tmpBuffer);
H
hzcheng 已提交
1576 1577
        }
      }
S
slguan 已提交
1578
    } else {
H
Haojun Liao 已提交
1579 1580
      doExecuteSecondaryMerge(pCmd, pLocalMerge, true);
      savePreviousRow(pLocalMerge, tmpBuffer);  // copy the processed row to buffer
H
hzcheng 已提交
1581 1582 1583
    }

    pOneDataSrc->rowIdx += 1;
H
Haojun Liao 已提交
1584
    adjustLoserTreeFromNewData(pLocalMerge, pOneDataSrc, pTree);
H
hzcheng 已提交
1585 1586
  }

H
Haojun Liao 已提交
1587 1588
  if (pLocalMerge->hasPrevRow) {
    finalizeRes(pQueryInfo, pLocalMerge);
H
hzcheng 已提交
1589 1590
  }

H
Haojun Liao 已提交
1591 1592
  if (pLocalMerge->pResultBuf->num) {
    genFinalResults(pSql, pLocalMerge, true);
H
hzcheng 已提交
1593 1594 1595 1596 1597 1598 1599
  }

  return TSDB_CODE_SUCCESS;
}

void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen) {
  SSqlRes *pRes = &pObj->res;
H
Haojun Liao 已提交
1600 1601
  if (pRes->pLocalMerger != NULL) {
    tscDestroyLocalMerger(pObj);
H
hzcheng 已提交
1602 1603 1604 1605 1606 1607 1608
  }

  pRes->qhandle = 1;  // hack to pass the safety check in fetch_row function
  pRes->numOfRows = 0;
  pRes->row = 0;

  pRes->rspType = 0;  // used as a flag to denote if taos_retrieved() has been called yet
H
Haojun Liao 已提交
1609
  pRes->pLocalMerger = (SLocalMerger *)calloc(1, sizeof(SLocalMerger));
H
hzcheng 已提交
1610 1611

  /*
S
slguan 已提交
1612 1613
   * we need one additional byte space
   * the sprintf function needs one additional space to put '\0' at the end of string
H
hzcheng 已提交
1614 1615
   */
  size_t allocSize = numOfRes * rowLen + sizeof(tFilePage) + 1;
H
Haojun Liao 已提交
1616
  pRes->pLocalMerger->pResultBuf = (tFilePage *)calloc(1, allocSize);
H
hzcheng 已提交
1617

H
Haojun Liao 已提交
1618 1619
  pRes->pLocalMerger->pResultBuf->num = numOfRes;
  pRes->data = pRes->pLocalMerger->pResultBuf->data;
H
hzcheng 已提交
1620
}
H
Haojun Liao 已提交
1621

H
Haojun Liao 已提交
1622
int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize) {
H
Haojun Liao 已提交
1623
  int32_t maxRowSize = MAX(rowSize, finalRowSize);
P
plum-lihui 已提交
1624
  char* pbuf = calloc(1, (size_t)(pOutput->num * maxRowSize));
H
Haojun Liao 已提交
1625

H
Haojun Liao 已提交
1626
  size_t size = tscNumOfFields(pQueryInfo);
H
Haojun Liao 已提交
1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638
  SArithmeticSupport arithSup = {0};

  // todo refactor
  arithSup.offset     = 0;
  arithSup.numOfCols  = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
  arithSup.exprList   = pQueryInfo->exprList;
  arithSup.data       = calloc(arithSup.numOfCols, POINTER_BYTES);

  for(int32_t k = 0; k < arithSup.numOfCols; ++k) {
    SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k);
    arithSup.data[k] = (pOutput->data + pOutput->num* pExpr->offset);
  }
H
Haojun Liao 已提交
1639 1640 1641 1642 1643 1644 1645 1646

  int32_t offset = 0;

  for (int i = 0; i < size; ++i) {
    SInternalField* pSup = TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
    
    // calculate the result from several other columns
    if (pSup->pArithExprInfo != NULL) {
H
Haojun Liao 已提交
1647
      arithSup.pArithExpr = pSup->pArithExprInfo;
H
Haojun Liao 已提交
1648
      arithmeticTreeTraverse(arithSup.pArithExpr->pExpr, (int32_t) pOutput->num, pbuf + pOutput->num*offset, &arithSup, TSDB_ORDER_ASC, getArithmeticInputSrc);
H
Haojun Liao 已提交
1649 1650
    } else {
      SSqlExpr* pExpr = pSup->pSqlExpr;
P
plum-lihui 已提交
1651
      memcpy(pbuf + pOutput->num * offset, pExpr->offset * pOutput->num + pOutput->data, (size_t)(pExpr->resBytes * pOutput->num));
H
Haojun Liao 已提交
1652 1653 1654 1655
    }

    offset += pSup->field.bytes;
  }
H
Haojun Liao 已提交
1656

P
plum-lihui 已提交
1657
  memcpy(pOutput->data, pbuf, (size_t)(pOutput->num * offset));
H
Haojun Liao 已提交
1658 1659 1660

  tfree(pbuf);
  tfree(arithSup.data);
H
Haojun Liao 已提交
1661 1662

  return offset;
P
plum-lihui 已提交
1663
}