executorimpl.c 93.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

H
Haojun Liao 已提交
16 17
#include "filter.h"
#include "function.h"
18 19
#include "functionMgt.h"
#include "os.h"
H
Haojun Liao 已提交
20
#include "querynodes.h"
21
#include "tfill.h"
dengyihao's avatar
dengyihao 已提交
22
#include "tname.h"
23

H
Haojun Liao 已提交
24
#include "tdatablock.h"
25
#include "tglobal.h"
H
Haojun Liao 已提交
26
#include "tmsg.h"
27
#include "ttime.h"
H
Haojun Liao 已提交
28

29
#include "executorimpl.h"
dengyihao's avatar
dengyihao 已提交
30
#include "index.h"
31
#include "query.h"
32
#include "tcompare.h"
H
Haojun Liao 已提交
33
#include "thash.h"
34
#include "ttypes.h"
dengyihao's avatar
dengyihao 已提交
35
#include "vnode.h"
36

H
Haojun Liao 已提交
37
#define IS_MAIN_SCAN(runtime)          ((runtime)->scanFlag == MAIN_SCAN)
38 39 40 41 42 43
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)

#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)

#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
wafwerar's avatar
wafwerar 已提交
44
  uint32_t v = taosRand();
45 46 47 48

  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
49
    return taosMemoryMalloc(__size);
50 51 52 53
  }
}

static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
wafwerar's avatar
wafwerar 已提交
54
  uint32_t v = taosRand();
55 56 57
  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
58
    return taosMemoryCalloc(num, __size);
59 60 61 62
  }
}

static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
wafwerar's avatar
wafwerar 已提交
63
  uint32_t v = taosRand();
64 65 66
  if (v % 5 <= 1) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
67
    return taosMemoryRealloc(p, __size);
68 69 70 71 72 73 74 75
  }
}

#define calloc  u_calloc
#define malloc  u_malloc
#define realloc u_realloc
#endif

X
Xiaoyu Wang 已提交
76
#define CLEAR_QUERY_STATUS(q, st)   ((q)->status &= (~(st)))
77 78
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)

H
Haojun Liao 已提交
79 80 81 82 83 84 85 86 87
typedef struct SAggOperatorInfo {
  SOptrBasicInfo   binfo;
  SAggSupporter    aggSup;
  STableQueryInfo* current;
  uint64_t         groupId;
  SGroupResInfo    groupResInfo;
  SExprSupp        scalarExprSup;
} SAggOperatorInfo;

88
static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock);
89

X
Xiaoyu Wang 已提交
90
static void releaseQueryBuf(size_t numOfTables);
91

H
Haojun Liao 已提交
92 93 94 95 96 97 98 99 100 101
static void    destroyAggOperatorInfo(void* param);
static void    initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
static void    doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
static void    doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                                const char* pKey);
static void    extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep,
                                                   int32_t status);
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
                                   bool createDummyCol);
H
Haojun Liao 已提交
102

H
Haojun Liao 已提交
103
void setOperatorCompleted(SOperatorInfo* pOperator) {
104
  pOperator->status = OP_EXEC_DONE;
H
Haojun Liao 已提交
105
  ASSERT(pOperator->pTaskInfo != NULL);
106

107
  pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start) / 1000.0;
H
Haojun Liao 已提交
108
  setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
109
}
110

H
Haojun Liao 已提交
111 112 113 114 115 116 117 118 119 120
void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status,
                     void* pInfo, SExecTaskInfo* pTaskInfo) {
  pOperator->name = (char*)name;
  pOperator->operatorType = type;
  pOperator->blocking = blocking;
  pOperator->status = status;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
}

121
int32_t optrDummyOpenFn(SOperatorInfo* pOperator) {
122
  OPTR_SET_OPENED(pOperator);
123
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
124
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
125 126
}

H
Haojun Liao 已提交
127
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
128 129
                                   __optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn,
                                   __optr_explain_fn_t explain) {
130 131 132 133 134
  SOperatorFpSet fpSet = {
      ._openFn = openFn,
      .getNextFn = nextFn,
      .cleanupFn = cleanup,
      .closeFn = closeFn,
135
      .reqBufFn = reqBufFn,
136 137 138 139 140 141
      .getExplainFn = explain,
  };

  return fpSet;
}

142 143
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
                                  SGroupResInfo* pGroupResInfo);
H
Haojun Liao 已提交
144

145
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
L
Liu Jicong 已提交
146
  SFilePage* pData = NULL;
147 148 149

  // in the first scan, new space needed for results
  int32_t pageId = -1;
150
  if (*currentPageId == -1) {
151
    pData = getNewBufPage(pResultBuf, &pageId);
152 153
    pData->num = sizeof(SFilePage);
  } else {
154 155
    pData = getBufPage(pResultBuf, *currentPageId);
    pageId = *currentPageId;
156

wmmhello's avatar
wmmhello 已提交
157
    if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
158
      // release current page first, and prepare the next one
159
      releaseBufPage(pResultBuf, pData);
160

161
      pData = getNewBufPage(pResultBuf, &pageId);
162 163 164 165 166 167 168 169 170 171
      if (pData != NULL) {
        pData->num = sizeof(SFilePage);
      }
    }
  }

  if (pData == NULL) {
    return NULL;
  }

172 173
  setBufPageDirty(pData, true);

174 175 176 177
  // set the number of rows in current disk page
  SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
  pResultRow->pageId = pageId;
  pResultRow->offset = (int32_t)pData->num;
178
  *currentPageId = pageId;
179

wmmhello's avatar
wmmhello 已提交
180
  pData->num += interBufSize;
181 182 183
  return pResultRow;
}

184 185 186 187 188 189 190
/**
 * the struct of key in hash table
 * +----------+---------------+
 * | group id |   key data    |
 * | 8 bytes  | actual length |
 * +----------+---------------+
 */
191 192 193
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
                                   int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
                                   bool isIntervalQuery, SAggSupporter* pSup) {
194
  SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
H
Haojun Liao 已提交
195

dengyihao's avatar
dengyihao 已提交
196
  SResultRowPosition* p1 =
197
      (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
H
Haojun Liao 已提交
198

199 200
  SResultRow* pResult = NULL;

H
Haojun Liao 已提交
201 202
  // in case of repeat scan/reverse scan, no new time window added.
  if (isIntervalQuery) {
203
    if (masterscan && p1 != NULL) {  // the *p1 may be NULL in case of sliding+offset exists.
204
      pResult = getResultRowByPos(pResultBuf, p1, true);
205
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
206 207
    }
  } else {
dengyihao's avatar
dengyihao 已提交
208 209
    // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
    // pResultRowInfo object.
H
Haojun Liao 已提交
210
    if (p1 != NULL) {
211
      // todo
212
      pResult = getResultRowByPos(pResultBuf, p1, true);
213
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
214 215 216
    }
  }

L
Liu Jicong 已提交
217
  // 1. close current opened time window
218
  if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
219
    SResultRowPosition pos = pResultRowInfo->cur;
X
Xiaoyu Wang 已提交
220
    SFilePage*         pPage = getBufPage(pResultBuf, pos.pageId);
221 222 223 224 225
    releaseBufPage(pResultBuf, pPage);
  }

  // allocate a new buffer page
  if (pResult == NULL) {
H
Haojun Liao 已提交
226
    ASSERT(pSup->resultRowSize > 0);
227
    pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
228

229 230
    // add a new result set for a new group
    SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
231
    tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
L
Liu Jicong 已提交
232
                   sizeof(SResultRowPosition));
H
Haojun Liao 已提交
233 234
  }

235 236 237
  // 2. set the new time window to be the new active time window
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};

H
Haojun Liao 已提交
238
  // too many time window in query
239
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
240
      tSimpleHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
241
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
H
Haojun Liao 已提交
242 243
  }

H
Haojun Liao 已提交
244
  return pResult;
H
Haojun Liao 已提交
245 246
}

247
// a new buffer page for each table. Needs to opt this design
L
Liu Jicong 已提交
248
static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, int32_t tid, uint32_t size) {
249 250 251 252
  if (pWindowRes->pageId != -1) {
    return 0;
  }

L
Liu Jicong 已提交
253
  SFilePage* pData = NULL;
254 255 256

  // in the first scan, new space needed for results
  int32_t pageId = -1;
257
  SArray* list = getDataBufPagesIdList(pResultBuf);
258 259

  if (taosArrayGetSize(list) == 0) {
260
    pData = getNewBufPage(pResultBuf, &pageId);
261
    pData->num = sizeof(SFilePage);
262 263
  } else {
    SPageInfo* pi = getLastPageInfo(list);
264
    pData = getBufPage(pResultBuf, getPageId(pi));
265
    pageId = getPageId(pi);
266

267
    if (pData->num + size > getBufPageSize(pResultBuf)) {
268
      // release current page first, and prepare the next one
269
      releaseBufPageInfo(pResultBuf, pi);
270

271
      pData = getNewBufPage(pResultBuf, &pageId);
272
      if (pData != NULL) {
273
        pData->num = sizeof(SFilePage);
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
      }
    }
  }

  if (pData == NULL) {
    return -1;
  }

  // set the number of rows in current disk page
  if (pWindowRes->pageId == -1) {  // not allocated yet, allocate new buffer
    pWindowRes->pageId = pageId;
    pWindowRes->offset = (int32_t)pData->num;

    pData->num += size;
    assert(pWindowRes->pageId >= 0);
  }

  return 0;
}

294
//  query_range_start, query_range_end, window_duration, window_start, window_end
295
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
296 297 298
  pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
  pColData->info.bytes = sizeof(int64_t);

H
Haojun Liao 已提交
299
  colInfoDataEnsureCapacity(pColData, 5, false);
300 301 302 303 304 305 306 307 308
  colDataAppendInt64(pColData, 0, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 1, &pQueryWindow->ekey);

  int64_t interval = 0;
  colDataAppendInt64(pColData, 2, &interval);  // this value may be variable in case of 'n' and 'y'.
  colDataAppendInt64(pColData, 3, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
}

309 310 311 312 313 314 315
typedef struct {
  bool    hasAgg;
  int32_t numOfRows;
  int32_t startOffset;
} SFunctionCtxStatus;

static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
H
Haojun Liao 已提交
316
  pStatus->hasAgg = pCtx->input.colDataSMAIsSet;
317 318 319 320 321
  pStatus->numOfRows = pCtx->input.numOfRows;
  pStatus->startOffset = pCtx->input.startRowIndex;
}

static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
H
Haojun Liao 已提交
322
  pCtx->input.colDataSMAIsSet = pStatus->hasAgg;
H
Haojun Liao 已提交
323
  pCtx->input.numOfRows = pStatus->numOfRows;
324 325 326
  pCtx->input.startRowIndex = pStatus->startOffset;
}

H
Haojun Liao 已提交
327 328
void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
                                     int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) {
329
  for (int32_t k = 0; k < numOfOutput; ++k) {
H
Haojun Liao 已提交
330
    // keep it temporarily
331 332
    SFunctionCtxStatus status = {0};
    functionCtxSave(&pCtx[k], &status);
333

334
    pCtx[k].input.startRowIndex = offset;
335
    pCtx[k].input.numOfRows = forwardStep;
336 337 338

    // not a whole block involved in query processing, statistics data can not be used
    // NOTE: the original value of isSet have been changed here
H
Haojun Liao 已提交
339 340
    if (pCtx[k].input.colDataSMAIsSet && forwardStep < numOfTotal) {
      pCtx[k].input.colDataSMAIsSet = false;
341 342
    }

343 344
    if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) {
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
345 346

      char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
347

348
      SColumnInfoData idata = {0};
dengyihao's avatar
dengyihao 已提交
349
      idata.info.type = TSDB_DATA_TYPE_BIGINT;
350
      idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
dengyihao's avatar
dengyihao 已提交
351
      idata.pData = p;
352 353 354 355

      SScalarParam out = {.columnData = &idata};
      SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
      pCtx[k].sfp.process(&tw, 1, &out);
356
      pEntryInfo->numOfRes = 1;
357 358 359 360 361 362 363 364
    } else {
      int32_t code = TSDB_CODE_SUCCESS;
      if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
        code = pCtx[k].fpSet.process(&pCtx[k]);

        if (code != TSDB_CODE_SUCCESS) {
          qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
          taskInfo->code = code;
365
          T_LONG_JMP(taskInfo->env, code);
366
        }
367
      }
368

369
      // restore it
370
      functionCtxRestore(&pCtx[k], &status);
371
    }
372 373 374
  }
}

375 376 377
static void doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order) {
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
378
    pCtx[i].order = order;
379
    pCtx[i].input.numOfRows = pBlock->info.rows;
380
    setBlockSMAInfo(&pCtx[i], &pExprSup->pExprInfo[i], pBlock);
381
    pCtx[i].pSrcBlock = pBlock;
382 383 384
  }
}

385
void setInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol) {
386
  if (pBlock->pBlockAgg != NULL) {
387
    doSetInputDataBlockInfo(pExprSup, pBlock, order);
388
  } else {
389
    doSetInputDataBlock(pExprSup, pBlock, order, scanFlag, createDummyCol);
H
Haojun Liao 已提交
390
  }
391 392
}

L
Liu Jicong 已提交
393 394
static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t paramIndex,
                                             int32_t numOfRows) {
395 396 397 398 399 400 401 402
  SColumnInfoData* pColInfo = NULL;
  if (pInput->pData[paramIndex] == NULL) {
    pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pColInfo == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    // Set the correct column info (data type and bytes)
403 404
    pColInfo->info.type = pFuncParam->param.nType;
    pColInfo->info.bytes = pFuncParam->param.nLen;
405 406

    pInput->pData[paramIndex] = pColInfo;
407 408
  } else {
    pColInfo = pInput->pData[paramIndex];
409 410
  }

H
Haojun Liao 已提交
411
  colInfoDataEnsureCapacity(pColInfo, numOfRows, false);
412

413
  int8_t type = pFuncParam->param.nType;
414 415
  if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
    int64_t v = pFuncParam->param.i;
dengyihao's avatar
dengyihao 已提交
416
    for (int32_t i = 0; i < numOfRows; ++i) {
417 418 419 420
      colDataAppendInt64(pColInfo, i, &v);
    }
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
dengyihao's avatar
dengyihao 已提交
421
    for (int32_t i = 0; i < numOfRows; ++i) {
422 423
      colDataAppendDouble(pColInfo, i, &v);
    }
424
  } else if (type == TSDB_DATA_TYPE_VARCHAR) {
L
Liu Jicong 已提交
425
    char* tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE);
426
    STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen);
L
Liu Jicong 已提交
427
    for (int32_t i = 0; i < numOfRows; ++i) {
428 429
      colDataAppend(pColInfo, i, tmp, false);
    }
H
Haojun Liao 已提交
430
    taosMemoryFree(tmp);
431 432 433 434 435
  }

  return TSDB_CODE_SUCCESS;
}

436 437
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
                                   bool createDummyCol) {
438
  int32_t         code = TSDB_CODE_SUCCESS;
439
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
440

441
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
L
Liu Jicong 已提交
442
    pCtx[i].order = order;
443 444
    pCtx[i].input.numOfRows = pBlock->info.rows;

L
Liu Jicong 已提交
445
    pCtx[i].pSrcBlock = pBlock;
X
Xiaoyu Wang 已提交
446
    pCtx[i].scanFlag = scanFlag;
H
Haojun Liao 已提交
447

448
    SInputColumnInfoData* pInput = &pCtx[i].input;
H
Haojun Liao 已提交
449
    pInput->uid = pBlock->info.id.uid;
H
Haojun Liao 已提交
450
    pInput->colDataSMAIsSet = false;
451

452
    SExprInfo* pOneExpr = &pExprSup->pExprInfo[i];
453
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
dengyihao's avatar
dengyihao 已提交
454
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
G
Ganlin Zhao 已提交
455 456
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
dengyihao's avatar
dengyihao 已提交
457
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
458 459 460
        pInput->totalRows = pBlock->info.rows;
        pInput->numOfRows = pBlock->info.rows;
        pInput->startRowIndex = 0;
461

462
        // NOTE: the last parameter is the primary timestamp column
H
Haojun Liao 已提交
463
        // todo: refactor this
464
        if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
L
Liu Jicong 已提交
465
          pInput->pPTS = pInput->pData[j];  // in case of merge function, this is not always the ts column data.
466
          //          ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP);
467
        }
468 469
        ASSERT(pInput->pData[j] != NULL);
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
470 471 472
        // todo avoid case: top(k, 12), 12 is the value parameter.
        // sum(11), 11 is also the value parameter.
        if (createDummyCol && pOneExpr->base.numOfParams == 1) {
473 474 475 476
          pInput->totalRows = pBlock->info.rows;
          pInput->numOfRows = pBlock->info.rows;
          pInput->startRowIndex = 0;

477
          code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows);
478 479 480
          if (code != TSDB_CODE_SUCCESS) {
            return code;
          }
481
        }
G
Ganlin Zhao 已提交
482 483
      }
    }
H
Haojun Liao 已提交
484
  }
485 486

  return code;
H
Haojun Liao 已提交
487 488
}

489
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
490
  for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
H
Haojun Liao 已提交
491
    if (functionNeedToExecute(&pCtx[k])) {
492
      // todo add a dummy funtion to avoid process check
493 494 495
      if (pCtx[k].fpSet.process == NULL) {
        continue;
      }
H
Haojun Liao 已提交
496

497 498 499 500
      int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
      if (code != TSDB_CODE_SUCCESS) {
        qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
        return code;
501
      }
502 503
    }
  }
504 505

  return TSDB_CODE_SUCCESS;
506 507
}

5
54liuyao 已提交
508
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
509
  struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
510

511 512 513 514 515
  // in case of timestamp column, always generated results.
  int32_t functionId = pCtx->functionId;
  if (functionId == -1) {
    return false;
  }
516

517 518
  if (pCtx->scanFlag == REPEAT_SCAN) {
    return fmIsRepeatScanFunc(pCtx->functionId);
519 520
  }

521 522
  if (isRowEntryCompleted(pResInfo)) {
    return false;
523 524
  }

525 526 527
  return true;
}

528 529 530 531 532 533 534
static int32_t doCreateConstantValColumnAggInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type,
                                                int32_t paramIndex, int32_t numOfRows) {
  if (pInput->pData[paramIndex] == NULL) {
    pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pInput->pData[paramIndex] == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
535

536 537 538
    // Set the correct column info (data type and bytes)
    pInput->pData[paramIndex]->info.type = type;
    pInput->pData[paramIndex]->info.bytes = tDataTypes[type].bytes;
539
  }
H
Haojun Liao 已提交
540

541 542 543 544 545 546
  SColumnDataAgg* da = NULL;
  if (pInput->pColumnDataAgg[paramIndex] == NULL) {
    da = taosMemoryCalloc(1, sizeof(SColumnDataAgg));
    pInput->pColumnDataAgg[paramIndex] = da;
    if (da == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
547 548
    }
  } else {
549
    da = pInput->pColumnDataAgg[paramIndex];
550 551
  }

552
  ASSERT(!IS_VAR_DATA_TYPE(type));
553

554 555
  if (type == TSDB_DATA_TYPE_BIGINT) {
    int64_t v = pFuncParam->param.i;
556
    *da = (SColumnDataAgg){.numOfNull = 0, .min = v, .max = v, .sum = v * numOfRows};
557 558
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
559
    *da = (SColumnDataAgg){.numOfNull = 0};
560

561 562 563 564 565 566
    *(double*)&da->min = v;
    *(double*)&da->max = v;
    *(double*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_BOOL) {  // todo validate this data type
    bool v = pFuncParam->param.i;

567
    *da = (SColumnDataAgg){.numOfNull = 0};
568 569 570 571 572
    *(bool*)&da->min = 0;
    *(bool*)&da->max = v;
    *(bool*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_TIMESTAMP) {
    // do nothing
573
  } else {
574
    ASSERT(0);
575 576
  }

577 578
  return TSDB_CODE_SUCCESS;
}
579

580
void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock) {
581 582 583 584 585 586 587
  int32_t numOfRows = pBlock->info.rows;

  SInputColumnInfoData* pInput = &pCtx->input;
  pInput->numOfRows = numOfRows;
  pInput->totalRows = numOfRows;

  if (pBlock->pBlockAgg != NULL) {
H
Haojun Liao 已提交
588
    pInput->colDataSMAIsSet = true;
589

590 591
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
592

593 594
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
595 596
        pInput->pColumnDataAgg[j] = pBlock->pBlockAgg[slotId];
        if (pInput->pColumnDataAgg[j] == NULL) {
H
Haojun Liao 已提交
597
          pInput->colDataSMAIsSet = false;
598
        }
599 600 601 602

        // Here we set the column info data since the data type for each column data is required, but
        // the data in the corresponding SColumnInfoData will not be used.
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
603 604
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
        doCreateConstantValColumnAggInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
605 606
      }
    }
607
  } else {
H
Haojun Liao 已提交
608
    pInput->colDataSMAIsSet = false;
609 610 611
  }
}

L
Liu Jicong 已提交
612
bool isTaskKilled(SExecTaskInfo* pTaskInfo) {
D
dapan1121 已提交
613
  return (0 != pTaskInfo->code) ? true : false;
614 615
}

D
dapan1121 已提交
616
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) { pTaskInfo->code = rspCode; }
617 618

/////////////////////////////////////////////////////////////////////////////////////////////
619
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) {
L
Liu Jicong 已提交
620
  STimeWindow win = {0};
621
  win.skey = taosTimeTruncate(key, pInterval, precision);
622 623

  /*
H
Haojun Liao 已提交
624
   * if the realSkey > INT64_MAX - pInterval->interval, the query duration between
625 626
   * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
   */
627 628 629
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
  if (win.ekey < win.skey) {
    win.ekey = INT64_MAX;
630
  }
631 632

  return win;
633 634
}

L
Liu Jicong 已提交
635 636
int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
                              uint32_t* status) {
637
  *status = BLK_DATA_NOT_LOAD;
638

H
Haojun Liao 已提交
639
  pBlock->pDataBlock = NULL;
L
Liu Jicong 已提交
640
  pBlock->pBlockAgg = NULL;
H
Haojun Liao 已提交
641

L
Liu Jicong 已提交
642 643
  //  int64_t groupId = pRuntimeEnv->current->groupIndex;
  //  bool    ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
644

H
Haojun Liao 已提交
645
  STaskCostInfo* pCost = &pTaskInfo->cost;
646

647 648
//  pCost->totalBlocks += 1;
//  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
649
#if 0
650 651 652
  // Calculate all time windows that are overlapping or contain current data block.
  // If current data block is contained by all possible time window, do not load current data block.
  if (/*pQueryAttr->pFilters || */pQueryAttr->groupbyColumn || pQueryAttr->sw.gap > 0 ||
H
Haojun Liao 已提交
653
      (QUERY_IS_INTERVAL_QUERY(pQueryAttr) && overlapWithTimeWindow(pTaskInfo, &pBlock->info))) {
654
    (*status) = BLK_DATA_DATA_LOAD;
655 656 657
  }

  // check if this data block is required to load
658
  if ((*status) != BLK_DATA_DATA_LOAD) {
659 660 661 662 663 664 665
    bool needFilter = true;

    // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
    // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
    if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
      SResultRow* pResult = NULL;

H
Haojun Liao 已提交
666
      bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
667 668 669 670 671 672
      TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

      STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
      if (pQueryAttr->pointInterpQuery) {
        needFilter = chkWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
673
                                    pTableScanInfo->rowEntryInfoOffset);
674
      } else {
H
Haojun Liao 已提交
675
        if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.id.uid, &win, masterScan, &pResult, groupId,
676
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
677
                                    pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
678
          T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
679 680 681 682
        }
      }
    } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate
      doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx,
683
                               pTableScanInfo->rowEntryInfoOffset, pTableScanInfo->numOfOutput,
684 685 686 687 688 689
                               pRuntimeEnv->current->groupIndex);
    }

    if (needFilter) {
      (*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock);
    } else {
690
      (*status) = BLK_DATA_DATA_LOAD;
691 692 693 694
    }
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
H
Haojun Liao 已提交
695
//  *status = updateBlockLoadStatus(pRuntimeEnv->pQueryAttr, *status);
696

697
  if ((*status) == BLK_DATA_NOT_LOAD || (*status) == BLK_DATA_FILTEROUT) {
698 699
    //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//           pBlockInfo->window.ekey, pBlockInfo->rows);
700
    pCost->skipBlocks += 1;
701
  } else if ((*status) == BLK_DATA_SMA_LOAD) {
702 703
    // this function never returns error?
    pCost->loadBlockStatis += 1;
704
//    tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
705 706

    if (pBlock->pBlockAgg == NULL) {  // data block statistics does not exist, load data block
707
//      pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
708 709 710
      pCost->totalCheckedRows += pBlock->info.rows;
    }
  } else {
711
    assert((*status) == BLK_DATA_DATA_LOAD);
712 713 714

    // load the data block statistics to perform further filter
    pCost->loadBlockStatis += 1;
715
//    tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
716 717 718 719 720 721

    if (pQueryAttr->topBotQuery && pBlock->pBlockAgg != NULL) {
      { // set previous window
        if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
          SResultRow* pResult = NULL;

H
Haojun Liao 已提交
722
          bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
723 724 725
          TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

          STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
H
Haojun Liao 已提交
726
          if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.id.uid, &win, masterScan, &pResult, groupId,
727
                                      pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
728
                                      pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
729
            T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
730 731 732 733 734 735 736 737 738 739
          }
        }
      }
      bool load = false;
      for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
        int32_t functionId = pTableScanInfo->pCtx[i].functionId;
        if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM) {
//          load = topbot_datablock_filter(&pTableScanInfo->pCtx[i], (char*)&(pBlock->pBlockAgg[i].min),
//                                         (char*)&(pBlock->pBlockAgg[i].max));
          if (!load) { // current block has been discard due to filter applied
740
            pCost->skipBlocks += 1;
741 742
            //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId,
//                   pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
743
            (*status) = BLK_DATA_FILTEROUT;
744 745 746 747 748 749 750
            return TSDB_CODE_SUCCESS;
          }
        }
      }
    }

    // current block has been discard due to filter applied
H
Haojun Liao 已提交
751
//    if (!doFilterByBlockSMA(pRuntimeEnv, pBlock->pBlockAgg, pTableScanInfo->pCtx, pBlockInfo->rows)) {
752
//      pCost->skipBlocks += 1;
753 754
//      qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//             pBlockInfo->window.ekey, pBlockInfo->rows);
755
//      (*status) = BLK_DATA_FILTEROUT;
756 757 758 759 760
//      return TSDB_CODE_SUCCESS;
//    }

    pCost->totalCheckedRows += pBlockInfo->rows;
    pCost->loadBlocks += 1;
761
//    pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
762 763 764 765 766
//    if (pBlock->pDataBlock == NULL) {
//      return terrno;
//    }

//    if (pQueryAttr->pFilters != NULL) {
767
//      filterSetColFieldData(pQueryAttr->pFilters, taosArrayGetSize(pBlock->pDataBlock), pBlock->pDataBlock);
768
//    }
769

770 771 772 773
//    if (pQueryAttr->pFilters != NULL || pRuntimeEnv->pTsBuf != NULL) {
//      filterColRowsInDataBlock(pRuntimeEnv, pBlock, ascQuery);
//    }
  }
H
Haojun Liao 已提交
774
#endif
775 776 777
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
778
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
779
  if (status == TASK_NOT_COMPLETED) {
H
Haojun Liao 已提交
780
    pTaskInfo->status = status;
781 782
  } else {
    // QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first
783
    CLEAR_QUERY_STATUS(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
784
    pTaskInfo->status |= status;
785 786 787
  }
}

788
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
5
54liuyao 已提交
789
  bool init = false;
790
  for (int32_t i = 0; i < numOfOutput; ++i) {
791
    pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
5
54liuyao 已提交
792 793 794
    if (init) {
      continue;
    }
795 796 797 798 799

    struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
    if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
      continue;
    }
800 801 802 803 804

    if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
      continue;
    }

805 806 807 808 809 810
    if (!pResInfo->initialized) {
      if (pCtx[i].functionId != -1) {
        pCtx[i].fpSet.init(&pCtx[i], pResInfo);
      } else {
        pResInfo->initialized = true;
      }
5
54liuyao 已提交
811 812
    } else {
      init = true;
813 814 815 816
    }
  }
}

H
Haojun Liao 已提交
817 818
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) {
  if (pFilterInfo == NULL || pBlock->info.rows == 0) {
S
shenglian zhou 已提交
819 820
    return;
  }
821

822
  SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
823
  int32_t            code = filterSetDataFromSlotId(pFilterInfo, &param1);
824

825
  SColumnInfoData* p = NULL;
826
  int32_t          status = 0;
H
Haojun Liao 已提交
827

828
  // todo the keep seems never to be True??
H
Haojun Liao 已提交
829
  bool keep = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
830
  extractQualifiedTupleByFilterResult(pBlock, p, keep, status);
H
Haojun Liao 已提交
831

832
  if (pColMatchInfo != NULL) {
833
    size_t size = taosArrayGetSize(pColMatchInfo->pList);
H
Haojun Liao 已提交
834
    for (int32_t i = 0; i < size; ++i) {
H
Haojun Liao 已提交
835
      SColMatchItem* pInfo = taosArrayGet(pColMatchInfo->pList, i);
836
      if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
837
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->dstSlotId);
838
        if (pColData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
H
Haojun Liao 已提交
839
          blockDataUpdateTsWindow(pBlock, pInfo->dstSlotId);
840 841 842 843 844 845
          break;
        }
      }
    }
  }

846 847
  colDataDestroy(p);
  taosMemoryFree(p);
848 849
}

850
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status) {
851 852 853 854
  if (keep) {
    return;
  }

H
Haojun Liao 已提交
855 856 857
  int32_t totalRows = pBlock->info.rows;

  if (status == FILTER_RESULT_ALL_QUALIFIED) {
858
    // here nothing needs to be done
H
Haojun Liao 已提交
859
  } else if (status == FILTER_RESULT_NONE_QUALIFIED) {
860
    pBlock->info.rows = 0;
H
Haojun Liao 已提交
861
  } else {
862
    SSDataBlock* px = createOneDataBlock(pBlock, true);
863

864 865
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
    for (int32_t i = 0; i < numOfCols; ++i) {
866 867
      SColumnInfoData* pSrc = taosArrayGet(px->pDataBlock, i);
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
868
      // it is a reserved column for scalar function, and no data in this column yet.
869
      if (pDst->pData == NULL || pSrc->pData == NULL) {
870 871 872
        continue;
      }

873 874
      colInfoDataCleanup(pDst, pBlock->info.rows);

875
      int32_t numOfRows = 0;
876
      for (int32_t j = 0; j < totalRows; ++j) {
877
        if (((int8_t*)p->pData)[j] == 0) {
D
dapan1121 已提交
878 879
          continue;
        }
880

D
dapan1121 已提交
881
        if (colDataIsNull_s(pSrc, j)) {
882
          colDataAppendNULL(pDst, numOfRows);
D
dapan1121 已提交
883
        } else {
884
          colDataAppend(pDst, numOfRows, colDataGetData(pSrc, j), false);
D
dapan1121 已提交
885
        }
886
        numOfRows += 1;
H
Haojun Liao 已提交
887
      }
888

889
      // todo this value can be assigned directly
890 891 892 893 894
      if (pBlock->info.rows == totalRows) {
        pBlock->info.rows = numOfRows;
      } else {
        ASSERT(pBlock->info.rows == numOfRows);
      }
895
    }
896

dengyihao's avatar
dengyihao 已提交
897
    blockDataDestroy(px);  // fix memory leak
898 899 900
  }
}

901
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
902
  // for simple group by query without interval, all the tables belong to one group result.
903 904 905
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
  SAggOperatorInfo* pAggInfo = pOperator->info;

906
  SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
907 908
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
909

910
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId,
L
Liu Jicong 已提交
911
                                                  sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
L
Liu Jicong 已提交
912
  assert(pResultRow != NULL);
913 914 915 916 917 918

  /*
   * not assign result buffer yet, add new result buffer
   * all group belong to one result set, and each group result has different group id so set the id to be one
   */
  if (pResultRow->pageId == -1) {
dengyihao's avatar
dengyihao 已提交
919 920
    int32_t ret =
        addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, groupId, pAggInfo->binfo.pRes->info.rowSize);
921 922 923 924 925
    if (ret != TSDB_CODE_SUCCESS) {
      return;
    }
  }

926
  setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
927 928
}

929 930 931
static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
  SAggOperatorInfo* pAggInfo = pOperator->info;
  if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) {
932 933
    return;
  }
934 935

  doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId);
936 937

  // record the current active group id
H
Haojun Liao 已提交
938
  pAggInfo->groupId = groupId;
939 940
}

dengyihao's avatar
dengyihao 已提交
941
static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs,
942
                              const int32_t* rowEntryOffset) {
943
  bool returnNotNull = false;
944
  for (int32_t j = 0; j < numOfExprs; ++j) {
945
    SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
946 947 948 949 950 951 952
    if (!isRowEntryInitialized(pResInfo)) {
      continue;
    }

    if (pRow->numOfRows < pResInfo->numOfRes) {
      pRow->numOfRows = pResInfo->numOfRes;
    }
953

954
    if (fmIsNotNullOutputFunc(pCtx[j].functionId)) {
955 956
      returnNotNull = true;
    }
957
  }
S
shenglian zhou 已提交
958 959
  // if all expr skips all blocks, e.g. all null inputs for max function, output one row in final result.
  //  except for first/last, which require not null output, output no rows
960
  if (pRow->numOfRows == 0 && !returnNotNull) {
961
    pRow->numOfRows = 1;
962 963 964
  }
}

965 966
static void doCopyResultToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
                                    SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) {
967 968 969
  for (int32_t j = 0; j < numOfExprs; ++j) {
    int32_t slotId = pExprInfo[j].base.resSchema.slotId;

970
    pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
971
    if (pCtx[j].fpSet.finalize) {
972
      if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_key") == 0) {
973 974
        // for groupkey along with functions that output multiple lines(e.g. Histogram)
        // need to match groupkey result for each output row of that function.
975 976 977 978 979
        if (pCtx[j].resultInfo->numOfRes != 0) {
          pCtx[j].resultInfo->numOfRes = pRow->numOfRows;
        }
      }

980 981 982
      int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
      if (TAOS_FAILED(code)) {
        qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
983
        T_LONG_JMP(pTaskInfo->env, code);
984 985
      }
    } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
986
      // do nothing
987
    } else {
988 989
      // expand the result into multiple rows. E.g., _wstart, top(k, 20)
      // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
990 991 992 993 994 995 996
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
      char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
      for (int32_t k = 0; k < pRow->numOfRows; ++k) {
        colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
      }
    }
  }
997 998
}

999 1000 1001
// todo refactor. SResultRow has direct pointer in miainfo
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
                           SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027
  SFilePage*  page = getBufPage(pBuf, resultRowPosition->pageId);
  SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);

  SqlFunctionCtx* pCtx = pSup->pCtx;
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  const int32_t*  rowEntryOffset = pSup->rowEntryInfoOffset;

  doUpdateNumOfRows(pCtx, pRow, pSup->numOfExprs, rowEntryOffset);
  if (pRow->numOfRows == 0) {
    releaseBufPage(pBuf, page);
    return 0;
  }

  int32_t size = pBlock->info.capacity;
  while (pBlock->info.rows + pRow->numOfRows > size) {
    size = size * 1.25;
  }

  int32_t code = blockDataEnsureCapacity(pBlock, size);
  if (TAOS_FAILED(code)) {
    releaseBufPage(pBuf, page);
    qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
    T_LONG_JMP(pTaskInfo->env, code);
  }

  doCopyResultToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
1028 1029

  releaseBufPage(pBuf, page);
1030
  pBlock->info.rows += pRow->numOfRows;
1031 1032 1033
  return 0;
}

1034 1035 1036 1037 1038 1039 1040
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
                           SGroupResInfo* pGroupResInfo) {
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

1041
  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
1042

1043
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
L
Liu Jicong 已提交
1044 1045
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
    SFilePage*  page = getBufPage(pBuf, pPos->pos.pageId);
1046

1047
    SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
1048

H
Haojun Liao 已提交
1049
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
1050 1051

    // no results, continue to check the next one
1052 1053
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
1054
      releaseBufPage(pBuf, page);
1055 1056 1057
      continue;
    }

H
Haojun Liao 已提交
1058 1059
    if (pBlock->info.id.groupId == 0) {
      pBlock->info.id.groupId = pPos->groupId;
1060 1061
    } else {
      // current value belongs to different group, it can't be packed into one datablock
H
Haojun Liao 已提交
1062
      if (pBlock->info.id.groupId != pPos->groupId) {
1063
        releaseBufPage(pBuf, page);
1064 1065 1066 1067
        break;
      }
    }

1068
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
1069
      ASSERT(pBlock->info.rows > 0);
1070
      releaseBufPage(pBuf, page);
1071 1072 1073 1074
      break;
    }

    pGroupResInfo->index += 1;
1075
    doCopyResultToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
1076

1077
    releaseBufPage(pBuf, page);
1078
    pBlock->info.rows += pRow->numOfRows;
1079 1080
  }

X
Xiaoyu Wang 已提交
1081
  qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
H
Haojun Liao 已提交
1082
         pBlock->info.id.groupId);
1083

1084
  blockDataUpdateTsWindow(pBlock, 0);
1085 1086 1087
  return 0;
}

1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                           SDiskbasedBuf* pBuf) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SSDataBlock*   pBlock = pbInfo->pRes;

  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

  blockDataCleanup(pBlock);
  if (!hasRemainResults(pGroupResInfo)) {
    return;
  }

  // clear the existed group id
H
Haojun Liao 已提交
1102
  pBlock->info.id.groupId = 0;
1103 1104 1105
  ASSERT(!pbInfo->mergeResultBlock);
  doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);

1106
  void* tbname = NULL;
H
Haojun Liao 已提交
1107
  if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
1108 1109 1110
    pBlock->info.parTbName[0] = 0;
  } else {
    memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
1111
  }
1112
  tdbFree(tbname);
1113 1114
}

X
Xiaoyu Wang 已提交
1115 1116
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                            SDiskbasedBuf* pBuf) {
1117
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1118
  SSDataBlock*   pBlock = pbInfo->pRes;
1119

1120 1121 1122
  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

1123
  blockDataCleanup(pBlock);
1124
  if (!hasRemainResults(pGroupResInfo)) {
1125 1126 1127
    return;
  }

1128
  // clear the existed group id
H
Haojun Liao 已提交
1129
  pBlock->info.id.groupId = 0;
1130 1131 1132
  if (!pbInfo->mergeResultBlock) {
    doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
  } else {
dengyihao's avatar
dengyihao 已提交
1133
    while (hasRemainResults(pGroupResInfo)) {
1134 1135 1136
      doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
      if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
        break;
1137 1138
      }

1139
      // clearing group id to continue to merge data that belong to different groups
H
Haojun Liao 已提交
1140
      pBlock->info.id.groupId = 0;
1141
    }
1142 1143

    // clear the group id info in SSDataBlock, since the client does not need it
H
Haojun Liao 已提交
1144
    pBlock->info.id.groupId = 0;
1145 1146 1147
  }
}

L
Liu Jicong 已提交
1148 1149 1150 1151
// static TSKEY doSkipIntervalProcess(STaskRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo,
// STableQueryInfo* pTableQueryInfo) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
1152
//
L
Liu Jicong 已提交
1153 1154 1155
//   assert(pQueryAttr->limit.offset == 0);
//   STimeWindow tw = *win;
//   getNextTimeWindow(pQueryAttr, &tw);
1156
//
L
Liu Jicong 已提交
1157 1158
//   if ((tw.skey <= pBlockInfo->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) ||
//       (tw.ekey >= pBlockInfo->window.skey && !QUERY_IS_ASC_QUERY(pQueryAttr))) {
1159
//
L
Liu Jicong 已提交
1160 1161 1162 1163
//     // load the data block and check data remaining in current data block
//     // TODO optimize performance
//     SArray *         pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//     SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1164
//
L
Liu Jicong 已提交
1165 1166 1167 1168
//     tw = *win;
//     int32_t startPos =
//         getNextQualifiedWindow(pQueryAttr, &tw, pBlockInfo, pColInfoData->pData, binarySearchForKey, -1);
//     assert(startPos >= 0);
1169
//
L
Liu Jicong 已提交
1170 1171
//     // set the abort info
//     pQueryAttr->pos = startPos;
1172
//
L
Liu Jicong 已提交
1173 1174 1175 1176
//     // reset the query start timestamp
//     pTableQueryInfo->win.skey = ((TSKEY *)pColInfoData->pData)[startPos];
//     pQueryAttr->window.skey = pTableQueryInfo->win.skey;
//     TSKEY key = pTableQueryInfo->win.skey;
1177
//
L
Liu Jicong 已提交
1178 1179
//     pWindowResInfo->prevSKey = tw.skey;
//     int32_t index = pRuntimeEnv->resultRowInfo.curIndex;
1180
//
L
Liu Jicong 已提交
1181 1182
//     int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
//     pRuntimeEnv->resultRowInfo.curIndex = index;  // restore the window index
1183
//
L
Liu Jicong 已提交
1184 1185 1186 1187
//     //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d,
//     lastKey:%" PRId64,
//            GET_TASKID(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes,
//            pQueryAttr->current->lastKey);
1188
//
L
Liu Jicong 已提交
1189 1190 1191 1192 1193
//     return key;
//   } else {  // do nothing
//     pQueryAttr->window.skey      = tw.skey;
//     pWindowResInfo->prevSKey = tw.skey;
//     pTableQueryInfo->lastKey = tw.skey;
1194
//
L
Liu Jicong 已提交
1195 1196
//     return tw.skey;
//   }
1197
//
L
Liu Jicong 已提交
1198 1199 1200 1201 1202 1203 1204 1205 1206 1207
//   return true;
// }

// static bool skipTimeInterval(STaskRuntimeEnv *pRuntimeEnv, TSKEY* start) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//     assert(*start <= pRuntimeEnv->current->lastKey);
//   } else {
//     assert(*start >= pRuntimeEnv->current->lastKey);
//   }
1208
//
L
Liu Jicong 已提交
1209 1210 1211 1212 1213
//   // if queried with value filter, do NOT forward query start position
//   if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL ||
//   pRuntimeEnv->pFillInfo != NULL) {
//     return true;
//   }
1214
//
L
Liu Jicong 已提交
1215 1216 1217 1218 1219 1220 1221
//   /*
//    * 1. for interval without interpolation query we forward pQueryAttr->interval.interval at a time for
//    *    pQueryAttr->limit.offset times. Since hole exists, pQueryAttr->interval.interval*pQueryAttr->limit.offset
//    value is
//    *    not valid. otherwise, we only forward pQueryAttr->limit.offset number of points
//    */
//   assert(pRuntimeEnv->resultRowInfo.prevSKey == TSKEY_INITIAL_VAL);
1222
//
L
Liu Jicong 已提交
1223 1224
//   STimeWindow w = TSWINDOW_INITIALIZER;
//   bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
1225
//
L
Liu Jicong 已提交
1226 1227
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
//   STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
1228
//
L
Liu Jicong 已提交
1229 1230 1231
//   SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
//   while (tsdbNextDataBlock(pRuntimeEnv->pTsdbReadHandle)) {
//     tsdbRetrieveDataBlockInfo(pRuntimeEnv->pTsdbReadHandle, &blockInfo);
1232
//
L
Liu Jicong 已提交
1233 1234 1235 1236 1237 1238 1239 1240 1241
//     if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//       if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
//         getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.skey, blockInfo.window.skey, pQueryAttr->window.ekey,
//         &w); pWindowResInfo->prevSKey = w.skey;
//       }
//     } else {
//       getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.ekey, pQueryAttr->window.ekey, blockInfo.window.ekey, &w);
//       pWindowResInfo->prevSKey = w.skey;
//     }
1242
//
L
Liu Jicong 已提交
1243 1244
//     // the first time window
//     STimeWindow win = getActiveTimeWindow(pWindowResInfo, pWindowResInfo->prevSKey, pQueryAttr);
1245
//
L
Liu Jicong 已提交
1246 1247
//     while (pQueryAttr->limit.offset > 0) {
//       STimeWindow tw = win;
1248
//
L
Liu Jicong 已提交
1249 1250 1251
//       if ((win.ekey <= blockInfo.window.ekey && ascQuery) || (win.ekey >= blockInfo.window.skey && !ascQuery)) {
//         pQueryAttr->limit.offset -= 1;
//         pWindowResInfo->prevSKey = win.skey;
1252
//
L
Liu Jicong 已提交
1253 1254 1255 1256 1257 1258
//         // current time window is aligned with blockInfo.window.ekey
//         // restart it from next data block by set prevSKey to be TSKEY_INITIAL_VAL;
//         if ((win.ekey == blockInfo.window.ekey && ascQuery) || (win.ekey == blockInfo.window.skey && !ascQuery)) {
//           pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL;
//         }
//       }
1259
//
L
Liu Jicong 已提交
1260 1261 1262 1263
//       if (pQueryAttr->limit.offset == 0) {
//         *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//         return true;
//       }
1264
//
L
Liu Jicong 已提交
1265 1266
//       // current window does not ended in current data block, try next data block
//       getNextTimeWindow(pQueryAttr, &tw);
1267
//
L
Liu Jicong 已提交
1268 1269 1270 1271 1272 1273 1274 1275 1276
//       /*
//        * If the next time window still starts from current data block,
//        * load the primary timestamp column first, and then find the start position for the next queried time window.
//        * Note that only the primary timestamp column is required.
//        * TODO: Optimize for this cases. All data blocks are not needed to be loaded, only if the first actually
//        required
//        * time window resides in current data block.
//        */
//       if ((tw.skey <= blockInfo.window.ekey && ascQuery) || (tw.ekey >= blockInfo.window.skey && !ascQuery)) {
1277
//
L
Liu Jicong 已提交
1278 1279
//         SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//         SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1280
//
L
Liu Jicong 已提交
1281 1282 1283
//         if ((win.ekey > blockInfo.window.ekey && ascQuery) || (win.ekey < blockInfo.window.skey && !ascQuery)) {
//           pQueryAttr->limit.offset -= 1;
//         }
1284
//
L
Liu Jicong 已提交
1285 1286 1287 1288 1289 1290 1291 1292
//         if (pQueryAttr->limit.offset == 0) {
//           *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//           return true;
//         } else {
//           tw = win;
//           int32_t startPos =
//               getNextQualifiedWindow(pQueryAttr, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1);
//           assert(startPos >= 0);
1293
//
L
Liu Jicong 已提交
1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304
//           // set the abort info
//           pQueryAttr->pos = startPos;
//           pTableQueryInfo->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
//           pWindowResInfo->prevSKey = tw.skey;
//           win = tw;
//         }
//       } else {
//         break;  // offset is not 0, and next time window begins or ends in the next block.
//       }
//     }
//   }
1305
//
L
Liu Jicong 已提交
1306 1307
//   // check for error
//   if (terrno != TSDB_CODE_SUCCESS) {
1308
//     T_LONG_JMP(pRuntimeEnv->env, terrno);
L
Liu Jicong 已提交
1309
//   }
1310
//
L
Liu Jicong 已提交
1311 1312
//   return true;
// }
1313

1314
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) {
H
Haojun Liao 已提交
1315
  if (p->pDownstream == NULL) {
H
Haojun Liao 已提交
1316
    assert(p->numOfDownstream == 0);
1317 1318
  }

wafwerar's avatar
wafwerar 已提交
1319
  p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES);
1320 1321 1322 1323 1324 1325 1326
  if (p->pDownstream == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  memcpy(p->pDownstream, pDownstream, num * POINTER_BYTES);
  p->numOfDownstream = num;
  return TSDB_CODE_SUCCESS;
1327 1328
}

X
Xiaoyu Wang 已提交
1329
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) {
1330
  // todo add more information about exchange operation
1331
  int32_t type = pOperator->operatorType;
X
Xiaoyu Wang 已提交
1332
  if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN ||
1333
      type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN ||
1334 1335
      type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN ||
      type == QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN) {
1336 1337 1338
    *order = TSDB_ORDER_ASC;
    *scanFlag = MAIN_SCAN;
    return TSDB_CODE_SUCCESS;
1339
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
1340
    STableScanInfo* pTableScanInfo = pOperator->info;
H
Haojun Liao 已提交
1341 1342
    *order = pTableScanInfo->base.cond.order;
    *scanFlag = pTableScanInfo->base.scanFlag;
1343
    return TSDB_CODE_SUCCESS;
1344 1345
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) {
    STableMergeScanInfo* pTableScanInfo = pOperator->info;
H
Haojun Liao 已提交
1346 1347
    *order = pTableScanInfo->base.cond.order;
    *scanFlag = pTableScanInfo->base.scanFlag;
1348
    return TSDB_CODE_SUCCESS;
1349
  } else {
H
Haojun Liao 已提交
1350
    if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
1351
      return TSDB_CODE_INVALID_PARA;
H
Haojun Liao 已提交
1352
    } else {
1353
      return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag);
1354 1355 1356
    }
  }
}
1357

1358 1359 1360 1361 1362
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock **ppBlock) {
  if (!tsCountAlwaysReturnValue) {
    return TSDB_CODE_SUCCESS;
  }

1363
  SOperatorInfo* downstream = pOperator->pDownstream[0];
G
Ganlin Zhao 已提交
1364
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION ||
1365 1366 1367 1368 1369
      (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN &&
       ((STableScanInfo *)downstream->info)->hasGroupByTag == true)) {
    return TSDB_CODE_SUCCESS;
  }

1370 1371
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
  bool hasCountFunc = false;
1372

1373
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
1374 1375 1376
    const char* pName = pCtx[i].pExpr->pExpr->_function.functionName;
    if ((strcmp(pName, "count") == 0) || (strcmp(pName, "hyperloglog") == 0) ||
        (strcmp(pName, "_hyperloglog_partial") == 0) || (strcmp(pName, "_hyperloglog_merge") == 0)) {
1377 1378 1379 1380 1381 1382 1383 1384 1385 1386
      hasCountFunc = true;
      break;
    }
  }

  if (!hasCountFunc) {
    return TSDB_CODE_SUCCESS;
  }

  SSDataBlock* pBlock = createDataBlock();
G
Ganlin Zhao 已提交
1387
  pBlock->info.rows = 1;
1388 1389 1390
  pBlock->info.capacity = 0;

  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
G
Ganlin Zhao 已提交
1391 1392 1393 1394
    SColumnInfoData colInfo = {0};
    colInfo.hasNull = true;
    colInfo.info.type = TSDB_DATA_TYPE_NULL;
    colInfo.info.bytes = 1;
1395 1396 1397 1398 1399 1400

    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
G
Ganlin Zhao 已提交
1401 1402 1403 1404 1405 1406 1407
        int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
        if (slotId >= numOfCols) {
          taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1);
          for (int32_t k = numOfCols; k < slotId + 1; ++k) {
            taosArrayPush(pBlock->pDataBlock, &colInfo);
          }
        }
1408
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
G
Ganlin Zhao 已提交
1409
        // do nothing
1410 1411 1412 1413
      }
    }
  }

G
Ganlin Zhao 已提交
1414
  blockDataEnsureCapacity(pBlock, pBlock->info.rows);
1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428
  *ppBlock = pBlock;

  return TSDB_CODE_SUCCESS;
}

static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock **ppBlock) {
  if (!blockAllocated) {
    return;
  }

  blockDataDestroy(*ppBlock);
  *ppBlock = NULL;
}

1429
// this is a blocking operator
L
Liu Jicong 已提交
1430
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
1431 1432
  if (OPTR_IS_OPENED(pOperator)) {
    return TSDB_CODE_SUCCESS;
1433 1434
  }

H
Haojun Liao 已提交
1435
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
1436
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
1437

1438 1439
  SExprSupp*     pSup = &pOperator->exprSupp;
  SOperatorInfo* downstream = pOperator->pDownstream[0];
1440

1441 1442
  int64_t st = taosGetTimestampUs();

1443 1444 1445
  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;

1446 1447 1448
  bool    hasValidBlock = false;
  bool    blockAllocated = false;

H
Haojun Liao 已提交
1449
  while (1) {
1450
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
1451
    if (pBlock == NULL) {
G
Ganlin Zhao 已提交
1452
      if (!hasValidBlock) {
1453 1454 1455 1456 1457 1458 1459 1460
        createDataBlockForEmptyInput(pOperator, &pBlock);
        if (pBlock == NULL) {
          break;
        }
        blockAllocated = true;
      } else {
        break;
      }
1461
    }
1462
    hasValidBlock = true;
1463

1464 1465
    int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
    if (code != TSDB_CODE_SUCCESS) {
1466
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
1467
      T_LONG_JMP(pTaskInfo->env, code);
1468
    }
1469

1470
    // there is an scalar expression that needs to be calculated before apply the group aggregation.
G
Ganlin Zhao 已提交
1471
    if (pAggInfo->scalarExprSup.pExprInfo != NULL && !blockAllocated) {
1472 1473
      SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
      code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
1474
      if (code != TSDB_CODE_SUCCESS) {
1475
        destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
1476
        T_LONG_JMP(pTaskInfo->env, code);
1477
      }
1478 1479
    }

1480
    // the pDataBlock are always the same one, no need to call this again
H
Haojun Liao 已提交
1481
    setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
1482
    setInputDataBlock(pSup, pBlock, order, scanFlag, true);
1483
    code = doAggregateImpl(pOperator, pSup->pCtx);
1484
    if (code != 0) {
1485
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
1486
      T_LONG_JMP(pTaskInfo->env, code);
1487
    }
1488 1489 1490

    destroyDataBlockForEmptyInput(blockAllocated, &pBlock);

1491 1492
  }

1493 1494 1495 1496 1497
  // the downstream operator may return with error code, so let's check the code before generating results.
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
  }

1498
  initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
H
Haojun Liao 已提交
1499
  OPTR_SET_OPENED(pOperator);
1500

1501
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1502
  return pTaskInfo->code;
H
Haojun Liao 已提交
1503 1504
}

1505
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
1506
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
1507 1508 1509 1510 1511 1512
  SOptrBasicInfo*   pInfo = &pAggInfo->binfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

L
Liu Jicong 已提交
1513
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1514
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
H
Haojun Liao 已提交
1515
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1516
    setOperatorCompleted(pOperator);
H
Haojun Liao 已提交
1517 1518 1519
    return NULL;
  }

H
Haojun Liao 已提交
1520
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
S
slzhou 已提交
1521 1522
  while (1) {
    doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
H
Haojun Liao 已提交
1523
    doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
S
slzhou 已提交
1524

1525
    if (!hasRemainResults(&pAggInfo->groupResInfo)) {
H
Haojun Liao 已提交
1526
      setOperatorCompleted(pOperator);
S
slzhou 已提交
1527 1528
      break;
    }
1529

S
slzhou 已提交
1530 1531 1532 1533
    if (pInfo->pRes->info.rows > 0) {
      break;
    }
  }
1534

1535
  size_t rows = blockDataGetNumOfRows(pInfo->pRes);
1536 1537
  pOperator->resultInfo.totalRows += rows;

1538
  return (rows == 0) ? NULL : pInfo->pRes;
1539 1540
}

1541
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
C
Cary Xu 已提交
1542 1543 1544 1545 1546
  for (int32_t i = 0; i < numOfExprs; ++i) {
    SExprInfo* pExprInfo = &pExpr[i];
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
        taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
5
54liuyao 已提交
1547 1548
      } else if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_VALUE) {
        taosVariantDestroy(&pExprInfo->base.pParam[j].param);
H
Haojun Liao 已提交
1549
      }
1550
    }
C
Cary Xu 已提交
1551 1552 1553

    taosMemoryFree(pExprInfo->base.pParam);
    taosMemoryFree(pExprInfo->pExpr);
H
Haojun Liao 已提交
1554 1555 1556
  }
}

5
54liuyao 已提交
1557
void destroyOperatorInfo(SOperatorInfo* pOperator) {
1558 1559 1560 1561
  if (pOperator == NULL) {
    return;
  }

1562
  if (pOperator->fpSet.closeFn != NULL) {
1563
    pOperator->fpSet.closeFn(pOperator->info);
1564 1565
  }

H
Haojun Liao 已提交
1566
  if (pOperator->pDownstream != NULL) {
L
Liu Jicong 已提交
1567
    for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
H
Haojun Liao 已提交
1568
      destroyOperatorInfo(pOperator->pDownstream[i]);
1569 1570
    }

wafwerar's avatar
wafwerar 已提交
1571
    taosMemoryFreeClear(pOperator->pDownstream);
H
Haojun Liao 已提交
1572
    pOperator->numOfDownstream = 0;
1573 1574
  }

1575
  cleanupExprSupp(&pOperator->exprSupp);
wafwerar's avatar
wafwerar 已提交
1576
  taosMemoryFreeClear(pOperator);
1577 1578
}

1579 1580 1581 1582 1583 1584 1585 1586 1587
// each operator should be set their own function to return total cost buffer
int32_t optrDefaultBufFn(SOperatorInfo* pOperator) {
  if (pOperator->blocking) {
    ASSERT(0);
  } else {
    return 0;
  }
}

1588 1589 1590 1591 1592 1593
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) {
  *defaultPgsz = 4096;
  while (*defaultPgsz < rowSize * 4) {
    *defaultPgsz <<= 1u;
  }

1594
  // The default buffer for each operator in query is 10MB.
1595
  // at least four pages need to be in buffer
1596 1597
  // TODO: make this variable to be configurable.
  *defaultBufsz = 4096 * 2560;
1598 1599 1600 1601 1602 1603 1604
  if ((*defaultBufsz) <= (*defaultPgsz)) {
    (*defaultBufsz) = (*defaultPgsz) * 4;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
1605 1606
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                         const char* pKey) {
L
Liu Jicong 已提交
1607
  int32_t    code = 0;
1608 1609
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);

1610
  pAggSup->currentPageId = -1;
dengyihao's avatar
dengyihao 已提交
1611 1612
  pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
  pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
H
Haojun Liao 已提交
1613
  pAggSup->pResultRowHashTable = tSimpleHashInit(100, hashFn);
1614

H
Haojun Liao 已提交
1615
  if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
1616 1617 1618
    return TSDB_CODE_OUT_OF_MEMORY;
  }

dengyihao's avatar
dengyihao 已提交
1619
  uint32_t defaultPgsz = 0;
1620 1621
  uint32_t defaultBufsz = 0;
  getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
H
Haojun Liao 已提交
1622

wafwerar's avatar
wafwerar 已提交
1623
  if (!osTempSpaceAvailable()) {
H
Haojun Liao 已提交
1624 1625 1626
    code = TSDB_CODE_NO_AVAIL_DISK;
    qError("Init stream agg supporter failed since %s, %s", terrstr(code), pKey);
    return code;
wafwerar's avatar
wafwerar 已提交
1627
  }
1628

H
Haojun Liao 已提交
1629
  code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
H
Haojun Liao 已提交
1630
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1631
    qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey);
H
Haojun Liao 已提交
1632 1633 1634
    return code;
  }

H
Haojun Liao 已提交
1635
  return code;
1636 1637
}

1638
void cleanupAggSup(SAggSupporter* pAggSup) {
wafwerar's avatar
wafwerar 已提交
1639
  taosMemoryFreeClear(pAggSup->keyBuf);
1640
  tSimpleHashCleanup(pAggSup->pResultRowHashTable);
H
Haojun Liao 已提交
1641
  destroyDiskbasedBuf(pAggSup->pResultBuf);
1642 1643
}

H
Haojun Liao 已提交
1644
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
L
Liu Jicong 已提交
1645
                    const char* pkey) {
1646 1647 1648 1649 1650
  int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1651 1652 1653 1654 1655
  code = doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

L
Liu Jicong 已提交
1656
  for (int32_t i = 0; i < numOfCols; ++i) {
1657
    pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf;
1658 1659
  }

1660
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1661 1662
}

L
Liu Jicong 已提交
1663
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) {
wmmhello's avatar
wmmhello 已提交
1664
  ASSERT(numOfRows != 0);
1665 1666
  pResultInfo->capacity = numOfRows;
  pResultInfo->threshold = numOfRows * 0.75;
1667

1668 1669
  if (pResultInfo->threshold == 0) {
    pResultInfo->threshold = numOfRows;
1670 1671 1672
  }
}

1673 1674 1675 1676 1677
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
  pInfo->pRes = pBlock;
  initResultRowInfo(&pInfo->resultRowInfo);
}

5
54liuyao 已提交
1678
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
1679 1680 1681 1682 1683 1684 1685 1686 1687 1688
  if (pCtx == NULL) {
    return NULL;
  }

  for (int32_t i = 0; i < numOfOutput; ++i) {
    for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) {
      taosVariantDestroy(&pCtx[i].param[j].param);
    }

    taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
1689
    taosMemoryFreeClear(pCtx[i].subsidiaries.buf);
1690 1691 1692 1693 1694 1695 1696 1697
    taosMemoryFree(pCtx[i].input.pData);
    taosMemoryFree(pCtx[i].input.pColumnDataAgg);
  }

  taosMemoryFreeClear(pCtx);
  return NULL;
}

1698
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) {
1699 1700 1701 1702
  pSup->pExprInfo = pExprInfo;
  pSup->numOfExprs = numOfExpr;
  if (pSup->pExprInfo != NULL) {
    pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset);
1703 1704 1705
    if (pSup->pCtx == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1706
  }
1707 1708

  return TSDB_CODE_SUCCESS;
1709 1710
}

1711 1712 1713 1714
void cleanupExprSupp(SExprSupp* pSupp) {
  destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs);
  if (pSupp->pExprInfo != NULL) {
    destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
C
Cary Xu 已提交
1715
    taosMemoryFreeClear(pSupp->pExprInfo);
1716
  }
H
Haojun Liao 已提交
1717 1718 1719 1720 1721 1722

  if (pSupp->pFilterInfo != NULL) {
    filterFreeInfo(pSupp->pFilterInfo);
    pSupp->pFilterInfo = NULL;
  }

1723 1724 1725
  taosMemoryFree(pSupp->rowEntryInfoOffset);
}

1726 1727
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode,
                                           SExecTaskInfo* pTaskInfo) {
wafwerar's avatar
wafwerar 已提交
1728
  SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
L
Liu Jicong 已提交
1729
  SOperatorInfo*    pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
1730 1731 1732
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
H
Haojun Liao 已提交
1733

H
Haojun Liao 已提交
1734
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc);
H
Haojun Liao 已提交
1735 1736 1737
  initBasicInfo(&pInfo->binfo, pResBlock);

  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
1738
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
1739

1740 1741
  int32_t    num = 0;
  SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
H
Haojun Liao 已提交
1742
  int32_t    code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
L
Liu Jicong 已提交
1743
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1744 1745
    goto _error;
  }
H
Haojun Liao 已提交
1746

H
Haojun Liao 已提交
1747 1748 1749 1750 1751 1752
  int32_t    numOfScalarExpr = 0;
  SExprInfo* pScalarExprInfo = NULL;
  if (pAggNode->pExprs != NULL) {
    pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
  }

1753 1754 1755 1756
  code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
1757

H
Haojun Liao 已提交
1758 1759 1760 1761 1762
  code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
1763
  pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
1764
  pInfo->groupId = UINT64_MAX;
H
Haojun Liao 已提交
1765

1766 1767
  setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
1768
  pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
1769

1770 1771
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    STableScanInfo* pTableScanInfo = downstream->info;
H
Haojun Liao 已提交
1772 1773
    pTableScanInfo->base.pdInfo.pExprSup = &pOperator->exprSupp;
    pTableScanInfo->base.pdInfo.pAggSup = &pInfo->aggSup;
1774 1775
  }

H
Haojun Liao 已提交
1776 1777 1778 1779
  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
1780 1781

  return pOperator;
H
Haojun Liao 已提交
1782

1783
_error:
H
Haojun Liao 已提交
1784 1785 1786 1787
  if (pInfo != NULL) {
    destroyAggOperatorInfo(pInfo);
  }

1788 1789 1790
  if (pOperator != NULL) {
    cleanupExprSupp(&pOperator->exprSupp);
  }
H
Haojun Liao 已提交
1791

1792
  taosMemoryFreeClear(pOperator);
H
Haojun Liao 已提交
1793
  pTaskInfo->code = code;
H
Haojun Liao 已提交
1794
  return NULL;
1795 1796
}

1797
void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
1798
  assert(pInfo != NULL);
H
Haojun Liao 已提交
1799
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
1800 1801
}

H
Haojun Liao 已提交
1802 1803 1804 1805 1806 1807 1808
static void freeItem(void* pItem) {
  void** p = pItem;
  if (*p != NULL) {
    taosMemoryFreeClear(*p);
  }
}

1809
void destroyAggOperatorInfo(void* param) {
L
Liu Jicong 已提交
1810
  SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
L
Liu Jicong 已提交
1811 1812
  cleanupBasicInfo(&pInfo->binfo);

H
Haojun Liao 已提交
1813
  cleanupAggSup(&pInfo->aggSup);
S
shenglian zhou 已提交
1814
  cleanupExprSupp(&pInfo->scalarExprSup);
H
Haojun Liao 已提交
1815
  cleanupGroupResInfo(&pInfo->groupResInfo);
D
dapan1121 已提交
1816
  taosMemoryFreeClear(param);
1817
}
1818

D
dapan1121 已提交
1819
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model, char* dbFName) {
wafwerar's avatar
wafwerar 已提交
1820
  SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
H
Haojun Liao 已提交
1821 1822 1823 1824 1825
  if (pTaskInfo == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

1826
  setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
1827

1828
  pTaskInfo->schemaInfo.dbname = strdup(dbFName);
H
Haojun Liao 已提交
1829
  pTaskInfo->id.queryId = queryId;
dengyihao's avatar
dengyihao 已提交
1830
  pTaskInfo->execModel = model;
H
Haojun Liao 已提交
1831
  pTaskInfo->pTableInfoList = tableListCreate();
D
dapan1121 已提交
1832
  pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo));
H
Haojun Liao 已提交
1833
  pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES);
H
Haojun Liao 已提交
1834

wafwerar's avatar
wafwerar 已提交
1835
  char* p = taosMemoryCalloc(1, 128);
L
Liu Jicong 已提交
1836
  snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId);
H
Haojun Liao 已提交
1837
  pTaskInfo->id.str = p;
H
Haojun Liao 已提交
1838

1839 1840
  return pTaskInfo;
}
H
Haojun Liao 已提交
1841

H
Haojun Liao 已提交
1842 1843
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);

1844
int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) {
1845 1846
  SMetaReader mr = {0};
  metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
1847
  int32_t code = metaGetTableEntryByUidCache(&mr, pScanNode->uid);
1848
  if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
1849 1850
    qError("failed to get the table meta, uid:0x%" PRIx64 ", suid:0x%" PRIx64 ", %s", pScanNode->uid, pScanNode->suid,
           GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
1851

D
dapan1121 已提交
1852
    metaReaderClear(&mr);
1853
    return terrno;
D
dapan1121 已提交
1854
  }
1855

1856 1857
  SSchemaInfo* pSchemaInfo = &pTaskInfo->schemaInfo;
  pSchemaInfo->tablename = strdup(mr.me.name);
1858 1859

  if (mr.me.type == TSDB_SUPER_TABLE) {
1860 1861
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
    pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
1862
  } else if (mr.me.type == TSDB_CHILD_TABLE) {
1863 1864
    tDecoderClear(&mr.coder);

1865
    tb_uid_t suid = mr.me.ctbEntry.suid;
H
Haojun Liao 已提交
1866
    metaGetTableEntryByUidCache(&mr, suid);
1867 1868
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
    pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
1869
  } else {
1870
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
1871
  }
1872 1873

  metaReaderClear(&mr);
1874

H
Haojun Liao 已提交
1875 1876 1877 1878 1879
  pSchemaInfo->qsw = extractQueriedColumnSchema(pScanNode);
  return TSDB_CODE_SUCCESS;
}

SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
1880 1881 1882
  int32_t numOfCols = LIST_LENGTH(pScanNode->pScanCols);
  int32_t numOfTags = LIST_LENGTH(pScanNode->pScanPseudoCols);

1883
  SSchemaWrapper* pqSw = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
1884
  pqSw->pSchema = taosMemoryCalloc(numOfCols + numOfTags, sizeof(SSchema));
1885

L
Liu Jicong 已提交
1886
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
1887
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanCols, i);
1888 1889
    SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

H
Haojun Liao 已提交
1890 1891 1892
    SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
    pSchema->colId = pColNode->colId;
    pSchema->type = pColNode->node.resType.type;
H
Haojun Liao 已提交
1893 1894
    pSchema->bytes = pColNode->node.resType.bytes;
    tstrncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
1895 1896
  }

1897
  // this the tags and pseudo function columns, we only keep the tag columns
1898
  for (int32_t i = 0; i < numOfTags; ++i) {
1899 1900 1901 1902 1903 1904 1905 1906 1907
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanPseudoCols, i);

    int32_t type = nodeType(pNode->pExpr);
    if (type == QUERY_NODE_COLUMN) {
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

      SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
      pSchema->colId = pColNode->colId;
      pSchema->type = pColNode->node.resType.type;
H
Haojun Liao 已提交
1908
      pSchema->bytes = pColNode->node.resType.bytes;
H
Haojun Liao 已提交
1909
      tstrncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
1910 1911 1912
    }
  }

H
Haojun Liao 已提交
1913
  return pqSw;
1914 1915
}

1916 1917
static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
  taosMemoryFreeClear(pSchemaInfo->dbname);
1918
  taosMemoryFreeClear(pSchemaInfo->tablename);
1919 1920
  tDeleteSSchemaWrapper(pSchemaInfo->sw);
  tDeleteSSchemaWrapper(pSchemaInfo->qsw);
1921 1922
}

1923
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); }
1924

1925
bool groupbyTbname(SNodeList* pGroupList) {
1926
  bool bytbname = false;
1927
  if (LIST_LENGTH(pGroupList) == 1) {
1928 1929 1930 1931 1932 1933 1934 1935 1936 1937
    SNode* p = nodesListGetNode(pGroupList, 0);
    if (p->type == QUERY_NODE_FUNCTION) {
      // partition by tbname/group by tbname
      bytbname = (strcmp(((struct SFunctionNode*)p)->functionName, "tbname") == 0);
    }
  }

  return bytbname;
}

1938 1939
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond,
                                  SNode* pTagIndexCond, const char* pUser) {
1940
  int32_t         type = nodeType(pPhyNode);
1941
  STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
1942
  const char*     idstr = GET_TASKID(pTaskInfo);
1943

X
Xiaoyu Wang 已提交
1944
  if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
1945
    SOperatorInfo* pOperator = NULL;
H
Haojun Liao 已提交
1946
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
dengyihao's avatar
dengyihao 已提交
1947
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
1948

1949 1950 1951 1952 1953 1954
      // NOTE: this is an patch to fix the physical plan
      // TODO remove it later
      if (pTableScanNode->scan.node.pLimit != NULL) {
        pTableScanNode->groupSort = true;
      }

L
Liu Jicong 已提交
1955 1956
      int32_t code =
          createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
H
Haojun Liao 已提交
1957
                                  pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
1958
      if (code) {
wmmhello's avatar
wmmhello 已提交
1959
        pTaskInfo->code = code;
1960
        qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr);
D
dapan1121 已提交
1961 1962
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
1963

1964
      code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo);
S
slzhou 已提交
1965
      if (code) {
1966
        pTaskInfo->code = terrno;
wmmhello's avatar
wmmhello 已提交
1967 1968 1969
        return NULL;
      }

1970
      pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
D
dapan1121 已提交
1971 1972 1973 1974 1975
      if (NULL == pOperator) {
        pTaskInfo->code = terrno;
        return NULL;
      }

1976
      STableScanInfo* pScanInfo = pOperator->info;
H
Haojun Liao 已提交
1977
      pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
S
slzhou 已提交
1978 1979
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
      STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
1980 1981 1982

      int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, true, pHandle,
                                             pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
L
Liu Jicong 已提交
1983
      if (code) {
wmmhello's avatar
wmmhello 已提交
1984
        pTaskInfo->code = code;
H
Haojun Liao 已提交
1985
        qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
wmmhello's avatar
wmmhello 已提交
1986 1987
        return NULL;
      }
1988

1989
      code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo);
wmmhello's avatar
wmmhello 已提交
1990 1991 1992 1993
      if (code) {
        pTaskInfo->code = terrno;
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
1994

H
Haojun Liao 已提交
1995
      pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
D
dapan1121 已提交
1996 1997 1998 1999
      if (NULL == pOperator) {
        pTaskInfo->code = terrno;
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
2000

2001
      STableScanInfo* pScanInfo = pOperator->info;
H
Haojun Liao 已提交
2002
      pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
H
Haojun Liao 已提交
2003
    } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
2004 2005
      pOperator = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode,
                                             pTaskInfo);
H
Haojun Liao 已提交
2006
    } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
5
54liuyao 已提交
2007
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
5
54liuyao 已提交
2008
      if (pHandle->vnode) {
L
Liu Jicong 已提交
2009 2010
        int32_t code =
            createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
H
Haojun Liao 已提交
2011
                                    pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
L
Liu Jicong 已提交
2012
        if (code) {
wmmhello's avatar
wmmhello 已提交
2013
          pTaskInfo->code = code;
H
Haojun Liao 已提交
2014
          qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2015 2016
          return NULL;
        }
L
Liu Jicong 已提交
2017 2018

#ifndef NDEBUG
H
Haojun Liao 已提交
2019
        int32_t sz = tableListGetSize(pTableListInfo);
H
Haojun Liao 已提交
2020 2021
        qDebug("create stream task, total:%d", sz);

L
Liu Jicong 已提交
2022
        for (int32_t i = 0; i < sz; i++) {
H
Haojun Liao 已提交
2023
          STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
2024
          qDebug("add table uid:%" PRIu64 ", gid:%" PRIu64, pKeyInfo->uid, pKeyInfo->groupId);
L
Liu Jicong 已提交
2025 2026
        }
#endif
2027
      }
2028

H
Haojun Liao 已提交
2029
      pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan);
2030
      pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo);
H
Haojun Liao 已提交
2031
    } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
L
Liu Jicong 已提交
2032
      SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
2033
      pOperator = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo);
S
shenglian zhou 已提交
2034 2035 2036
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN == type) {
      STableCountScanPhysiNode* pTblCountScanNode = (STableCountScanPhysiNode*)pPhyNode;
      pOperator = createTableCountScanOperatorInfo(pHandle, pTblCountScanNode, pTaskInfo);
2037
    } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
X
Xiaoyu Wang 已提交
2038
      STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
2039 2040

      int32_t code = createScanTableListInfo(pScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
H
Haojun Liao 已提交
2041
                                             pTagIndexCond, pTaskInfo);
2042
      if (code != TSDB_CODE_SUCCESS) {
2043
        pTaskInfo->code = code;
H
Haojun Liao 已提交
2044
        qError("failed to getTableList, code: %s", tstrerror(code));
2045 2046 2047
        return NULL;
      }

H
Haojun Liao 已提交
2048
      pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTaskInfo);
2049
    } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
2050
      SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
2051 2052

      if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
H
Haojun Liao 已提交
2053 2054
        SArray* pList = taosArrayInit(4, sizeof(STableKeyInfo));
        int32_t code = vnodeGetAllTableList(pHandle->vnode, pBlockNode->uid, pList);
2055 2056 2057 2058
        if (code != TSDB_CODE_SUCCESS) {
          pTaskInfo->code = terrno;
          return NULL;
        }
H
Haojun Liao 已提交
2059

H
Haojun Liao 已提交
2060 2061
        size_t num = taosArrayGetSize(pList);
        for (int32_t i = 0; i < num; ++i) {
H
Haojun Liao 已提交
2062
          STableKeyInfo* p = taosArrayGet(pList, i);
H
Haojun Liao 已提交
2063
          tableListAddTableInfo(pTableListInfo, p->uid, 0);
H
Haojun Liao 已提交
2064
        }
H
Haojun Liao 已提交
2065

H
Haojun Liao 已提交
2066
        taosArrayDestroy(pList);
2067
      } else {  // Create group with only one table
H
Haojun Liao 已提交
2068
        tableListAddTableInfo(pTableListInfo, pBlockNode->uid, 0);
2069 2070
      }

2071
      pOperator = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTaskInfo);
H
Haojun Liao 已提交
2072 2073 2074
    } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
      SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;

L
Liu Jicong 已提交
2075
      int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo,
H
Haojun Liao 已提交
2076
                                             pTagCond, pTagIndexCond, pTaskInfo);
2077 2078 2079 2080
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
      }
2081

2082
      code = extractTableSchemaInfo(pHandle, &pScanNode->scan, pTaskInfo);
2083 2084 2085
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
H
Haojun Liao 已提交
2086 2087
      }

2088
      pOperator = createCacherowsScanOperator(pScanNode, pHandle, pTaskInfo);
2089
    } else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
2090
      pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
2091 2092
    } else {
      ASSERT(0);
H
Haojun Liao 已提交
2093
    }
2094 2095 2096 2097 2098

    if (pOperator != NULL) {
      pOperator->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
    }

2099
    return pOperator;
H
Haojun Liao 已提交
2100 2101
  }

2102
  size_t          size = LIST_LENGTH(pPhyNode->pChildren);
2103
  SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
2104 2105 2106 2107
  if (ops == NULL) {
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
2108
  for (int32_t i = 0; i < size; ++i) {
2109
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
2110
    ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTagCond, pTagIndexCond, pUser);
2111
    if (ops[i] == NULL) {
H
Haojun Liao 已提交
2112
      taosMemoryFree(ops);
2113 2114
      return NULL;
    }
2115
  }
H
Haojun Liao 已提交
2116

2117
  SOperatorInfo* pOptr = NULL;
H
Haojun Liao 已提交
2118
  if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
2119
    pOptr = createProjectOperatorInfo(ops[0], (SProjectPhysiNode*)pPhyNode, pTaskInfo);
2120
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) {
H
Haojun Liao 已提交
2121 2122
    SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
    if (pAggNode->pGroupKeys != NULL) {
H
Haojun Liao 已提交
2123
      pOptr = createGroupOperatorInfo(ops[0], pAggNode, pTaskInfo);
H
Haojun Liao 已提交
2124
    } else {
H
Haojun Liao 已提交
2125
      pOptr = createAggregateOperatorInfo(ops[0], pAggNode, pTaskInfo);
H
Haojun Liao 已提交
2126
    }
2127
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type) {
H
Haojun Liao 已提交
2128
    SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
2129

H
Haojun Liao 已提交
2130 2131
    bool isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
    pOptr = createIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo, isStream);
2132
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
2133
    pOptr = createStreamIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo);
2134 2135
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
    SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode;
2136
    pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo);
S
shenglian zhou 已提交
2137
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
X
Xiaoyu Wang 已提交
2138
    SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
2139
    pOptr = createMergeIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo);
5
54liuyao 已提交
2140
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
2141
    int32_t children = 0;
5
54liuyao 已提交
2142 2143
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) {
5
54liuyao 已提交
2144
    int32_t children = pHandle->numOfVgroups;
5
54liuyao 已提交
2145
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
2146
  } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
2147
    pOptr = createSortOperatorInfo(ops[0], (SSortPhysiNode*)pPhyNode, pTaskInfo);
S
shenglian zhou 已提交
2148 2149
  } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT == type) {
    pOptr = createGroupSortOperatorInfo(ops[0], (SGroupSortPhysiNode*)pPhyNode, pTaskInfo);
X
Xiaoyu Wang 已提交
2150
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) {
2151
    SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode;
2152
    pOptr = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo);
2153
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
H
Haojun Liao 已提交
2154
    SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
2155
    pOptr = createSessionAggOperatorInfo(ops[0], pSessionNode, pTaskInfo);
2156
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
2157 2158 2159 2160 2161
    pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
    int32_t children = 0;
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION == type) {
2162
    int32_t children = pHandle->numOfVgroups;
2163
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
2164
  } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
2165
    pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
2166
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION == type) {
2167
    pOptr = createStreamPartitionOperatorInfo(ops[0], (SStreamPartitionPhysiNode*)pPhyNode, pTaskInfo);
2168
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
dengyihao's avatar
dengyihao 已提交
2169
    SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
2170
    pOptr = createStatewindowOperatorInfo(ops[0], pStateNode, pTaskInfo);
2171
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
5
54liuyao 已提交
2172
    pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
2173
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
2174
    pOptr = createMergeJoinOperatorInfo(ops, size, (SSortMergeJoinPhysiNode*)pPhyNode, pTaskInfo);
2175
  } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
H
Haojun Liao 已提交
2176
    pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo);
5
54liuyao 已提交
2177 2178
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL == type) {
    pOptr = createStreamFillOperatorInfo(ops[0], (SStreamFillPhysiNode*)pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
2179 2180
  } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
    pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo);
2181 2182
  } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
    pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
2183 2184
  } else {
    ASSERT(0);
H
Haojun Liao 已提交
2185
  }
2186

2187
  taosMemoryFree(ops);
2188 2189 2190 2191
  if (pOptr) {
    pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
  }

2192
  return pOptr;
2193
}
H
Haojun Liao 已提交
2194

L
Liu Jicong 已提交
2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207
static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator");
      return TSDB_CODE_QRY_APP_ERROR;
    }

    if (pOperator->numOfDownstream > 1) {
      qError("join not supported for stream block scan");
      return TSDB_CODE_QRY_APP_ERROR;
    }
    return extractTbscanInStreamOpTree(pOperator->pDownstream[0], ppInfo);
  } else {
2208 2209 2210
    SStreamScanInfo* pInfo = pOperator->info;
    ASSERT(pInfo->pTableScanOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
    *ppInfo = pInfo->pTableScanOp->info;
L
Liu Jicong 已提交
2211 2212 2213 2214
    return 0;
  }
}

2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236
int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) {
  if (pNode->pChildren == NULL || LIST_LENGTH(pNode->pChildren) == 0) {
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == pNode->type) {
      *ppNode = (STableScanPhysiNode*)pNode;
      return 0;
    } else {
      ASSERT(0);
      terrno = TSDB_CODE_QRY_APP_ERROR;
      return -1;
    }
  } else {
    if (LIST_LENGTH(pNode->pChildren) != 1) {
      ASSERT(0);
      terrno = TSDB_CODE_QRY_APP_ERROR;
      return -1;
    }
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pNode->pChildren, 0);
    return extractTableScanNode(pChildNode, ppNode);
  }
  return -1;
}

2237
#if 0
L
Liu Jicong 已提交
2238 2239 2240 2241 2242
int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHandle, int64_t uid, int64_t ts) {
  STableScanInfo* pTableScanInfo = NULL;
  if (extractTbscanInStreamOpTree(pOperator, &pTableScanInfo) < 0) {
    return -1;
  }
2243

L
Liu Jicong 已提交
2244 2245 2246 2247
  STableScanPhysiNode* pNode = NULL;
  if (extractTableScanNode(plan->pNode, &pNode) < 0) {
    ASSERT(0);
  }
2248

H
Haojun Liao 已提交
2249
  tsdbReaderClose(pTableScanInfo->dataReader);
2250

L
Liu Jicong 已提交
2251
  STableListInfo info = {0};
H
Haojun Liao 已提交
2252
  pTableScanInfo->dataReader = doCreateDataReader(pNode, pHandle, &info, NULL);
L
Liu Jicong 已提交
2253 2254 2255 2256
  if (pTableScanInfo->dataReader == NULL) {
    ASSERT(0);
    qError("failed to create data reader");
    return TSDB_CODE_QRY_APP_ERROR;
2257
  }
L
Liu Jicong 已提交
2258
  // TODO: set uid and ts to data reader
2259 2260
  return 0;
}
2261
#endif
2262

D
dapan1121 已提交
2263
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle) {
D
dapan1121 已提交
2264
  SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo;
2265

D
dapan1121 已提交
2266
  switch (pNode->type) {
D
dapan1121 已提交
2267 2268 2269 2270 2271 2272
    case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
      SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
      if (NULL == pInserterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      pInserterParam->readHandle = readHandle;
L
Liu Jicong 已提交
2273

D
dapan1121 已提交
2274 2275 2276
      *pParam = pInserterParam;
      break;
    }
D
dapan1121 已提交
2277
    case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
2278
      SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
D
dapan1121 已提交
2279 2280 2281
      if (NULL == pDeleterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
H
Haojun Liao 已提交
2282 2283 2284 2285
      int32_t tbNum = tableListGetSize(pTask->pTableInfoList);
      pDeleterParam->suid = tableListGetSuid(pTask->pTableInfoList);

      // TODO extract uid list
D
dapan1121 已提交
2286 2287 2288 2289 2290
      pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t));
      if (NULL == pDeleterParam->pUidList) {
        taosMemoryFree(pDeleterParam);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
H
Haojun Liao 已提交
2291

D
dapan1121 已提交
2292
      for (int32_t i = 0; i < tbNum; ++i) {
H
Haojun Liao 已提交
2293
        STableKeyInfo* pTable = tableListGetInfo(pTask->pTableInfoList, i);
D
dapan1121 已提交
2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306
        taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
      }

      *pParam = pDeleterParam;
      break;
    }
    default:
      break;
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2307
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
D
dapan1121 已提交
2308
                               char* sql, EOPTR_EXEC_MODEL model) {
H
Haojun Liao 已提交
2309 2310
  uint64_t queryId = pPlan->id.queryId;

D
dapan1121 已提交
2311
  *pTaskInfo = createExecTaskInfo(queryId, taskId, model, pPlan->dbFName);
H
Haojun Liao 已提交
2312 2313 2314
  if (*pTaskInfo == NULL) {
    goto _complete;
  }
H
Haojun Liao 已提交
2315

2316
  if (pHandle) {
L
Liu Jicong 已提交
2317
    /*(*pTaskInfo)->streamInfo.fillHistoryVer1 = pHandle->fillHistoryVer1;*/
2318 2319 2320
    if (pHandle->pStateBackend) {
      (*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend;
    }
H
Haojun Liao 已提交
2321 2322
  }

2323
  (*pTaskInfo)->sql = sql;
D
dapan1121 已提交
2324
  sql = NULL;
H
Haojun Liao 已提交
2325

2326
  (*pTaskInfo)->pSubplan = pPlan;
2327 2328
  (*pTaskInfo)->pRoot =
      createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user);
L
Liu Jicong 已提交
2329

D
dapan1121 已提交
2330
  if (NULL == (*pTaskInfo)->pRoot) {
H
Haojun Liao 已提交
2331
    terrno = (*pTaskInfo)->code;
D
dapan1121 已提交
2332
    goto _complete;
2333 2334
  }

2335
  (*pTaskInfo)->cost.created = taosGetTimestampUs();
H
Haojun Liao 已提交
2336
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2337

H
Haojun Liao 已提交
2338
_complete:
D
dapan1121 已提交
2339
  taosMemoryFree(sql);
H
Haojun Liao 已提交
2340
  doDestroyTask(*pTaskInfo);
H
Haojun Liao 已提交
2341
  return terrno;
H
Haojun Liao 已提交
2342 2343
}

H
Haojun Liao 已提交
2344 2345 2346 2347 2348
static void freeBlock(void* pParam) {
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
  blockDataDestroy(pBlock);
}

L
Liu Jicong 已提交
2349
void doDestroyTask(SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
2350 2351
  qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));

H
Haojun Liao 已提交
2352
  pTaskInfo->pTableInfoList = tableListDestroy(pTaskInfo->pTableInfoList);
H
Haojun Liao 已提交
2353
  destroyOperatorInfo(pTaskInfo->pRoot);
2354
  cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);
2355
  cleanupStreamInfo(&pTaskInfo->streamInfo);
2356

D
dapan1121 已提交
2357
  if (!pTaskInfo->localFetch.localExec) {
D
dapan1121 已提交
2358 2359
    nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
  }
2360

H
Haojun Liao 已提交
2361
  taosArrayDestroyEx(pTaskInfo->pResultBlockList, freeBlock);
D
dapan1121 已提交
2362
  taosArrayDestroy(pTaskInfo->stopInfo.pStopInfo);
wafwerar's avatar
wafwerar 已提交
2363 2364 2365
  taosMemoryFreeClear(pTaskInfo->sql);
  taosMemoryFreeClear(pTaskInfo->id.str);
  taosMemoryFreeClear(pTaskInfo);
2366 2367 2368 2369
}

static int64_t getQuerySupportBufSize(size_t numOfTables) {
  size_t s1 = sizeof(STableQueryInfo);
L
Liu Jicong 已提交
2370 2371
  //  size_t s3 = sizeof(STableCheckInfo);  buffer consumption in tsdb
  return (int64_t)(s1 * 1.5 * numOfTables);
2372 2373 2374 2375 2376 2377 2378
}

int32_t checkForQueryBuf(size_t numOfTables) {
  int64_t t = getQuerySupportBufSize(numOfTables);
  if (tsQueryBufferSizeBytes < 0) {
    return TSDB_CODE_SUCCESS;
  } else if (tsQueryBufferSizeBytes > 0) {
L
Liu Jicong 已提交
2379
    while (1) {
2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405
      int64_t s = tsQueryBufferSizeBytes;
      int64_t remain = s - t;
      if (remain >= 0) {
        if (atomic_val_compare_exchange_64(&tsQueryBufferSizeBytes, s, remain) == s) {
          return TSDB_CODE_SUCCESS;
        }
      } else {
        return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
      }
    }
  }

  // disable query processing if the value of tsQueryBufferSize is zero.
  return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
}

void releaseQueryBuf(size_t numOfTables) {
  if (tsQueryBufferSizeBytes < 0) {
    return;
  }

  int64_t t = getQuerySupportBufSize(numOfTables);

  // restore value is not enough buffer available
  atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
}
D
dapan1121 已提交
2406

H
Haojun Liao 已提交
2407
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) {
2408
  SExplainExecInfo  execInfo = {0};
H
Haojun Liao 已提交
2409
  SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo);
2410

H
Haojun Liao 已提交
2411 2412 2413 2414 2415
  pExplainInfo->numOfRows = operatorInfo->resultInfo.totalRows;
  pExplainInfo->startupCost = operatorInfo->cost.openCost;
  pExplainInfo->totalCost = operatorInfo->cost.totalCost;
  pExplainInfo->verboseLen = 0;
  pExplainInfo->verboseInfo = NULL;
D
dapan1121 已提交
2416

2417
  if (operatorInfo->fpSet.getExplainFn) {
2418 2419
    int32_t code =
        operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen);
D
dapan1121 已提交
2420
    if (code) {
2421
      qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
D
dapan1121 已提交
2422 2423 2424
      return code;
    }
  }
dengyihao's avatar
dengyihao 已提交
2425

D
dapan1121 已提交
2426
  int32_t code = 0;
D
dapan1121 已提交
2427
  for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
H
Haojun Liao 已提交
2428 2429
    code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pExecInfoList);
    if (code != TSDB_CODE_SUCCESS) {
2430
      //      taosMemoryFreeClear(*pRes);
D
dapan1121 已提交
2431 2432 2433 2434 2435
      return TSDB_CODE_QRY_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
2436
}
5
54liuyao 已提交
2437

2438 2439
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
                     SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup) {
2440 2441 2442 2443 2444 2445
  SWinKey key = {
      .ts = win->skey,
      .groupId = tableGroupId,
  };
  char*   value = NULL;
  int32_t size = pAggSup->resultRowSize;
5
54liuyao 已提交
2446

2447
  if (streamStateAddIfNotExist(pState, &key, (void**)&value, &size) < 0) {
2448 2449 2450 2451 2452 2453 2454 2455 2456 2457
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }
  *pResult = (SResultRow*)value;
  ASSERT(*pResult);
  // set time window for current result
  (*pResult)->win = (*win);
  setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset);
  return TSDB_CODE_SUCCESS;
}

2458 2459
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult) {
  streamStateReleaseBuf(pState, pKey, pResult);
2460 2461 2462
  return TSDB_CODE_SUCCESS;
}

2463 2464
int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize) {
  streamStatePut(pState, pKey, pResult, resSize);
2465 2466 2467
  return TSDB_CODE_SUCCESS;
}

2468
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
2469
                                   SGroupResInfo* pGroupResInfo) {
2470
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);

  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
    int32_t     size = 0;
    void*       pVal = NULL;
    SWinKey     key = {
2483 2484
            .ts = *(TSKEY*)pPos->key,
            .groupId = pPos->groupId,
2485
    };
2486
    int32_t code = streamStateGet(pState, &key, &pVal, &size);
2487 2488 2489 2490 2491 2492
    ASSERT(code == 0);
    SResultRow* pRow = (SResultRow*)pVal;
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
    // no results, continue to check the next one
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
2493
      releaseOutputBuf(pState, &key, pRow);
2494 2495 2496
      continue;
    }

H
Haojun Liao 已提交
2497 2498
    if (pBlock->info.id.groupId == 0) {
      pBlock->info.id.groupId = pPos->groupId;
2499
      void* tbname = NULL;
H
Haojun Liao 已提交
2500
      if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
L
Liu Jicong 已提交
2501
        pBlock->info.parTbName[0] = 0;
2502 2503
      } else {
        memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
2504
      }
2505
      tdbFree(tbname);
2506 2507
    } else {
      // current value belongs to different group, it can't be packed into one datablock
H
Haojun Liao 已提交
2508
      if (pBlock->info.id.groupId != pPos->groupId) {
2509
        releaseOutputBuf(pState, &key, pRow);
2510 2511 2512 2513 2514 2515
        break;
      }
    }

    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
      ASSERT(pBlock->info.rows > 0);
2516
      releaseOutputBuf(pState, &key, pRow);
2517 2518 2519 2520 2521 2522 2523 2524 2525 2526
      break;
    }

    pGroupResInfo->index += 1;

    for (int32_t j = 0; j < numOfExprs; ++j) {
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
      if (pCtx[j].fpSet.finalize) {
2527 2528 2529 2530
        int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
        if (TAOS_FAILED(code1)) {
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
          T_LONG_JMP(pTaskInfo->env, code1);
2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543
        }
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
      } else {
        // expand the result into multiple rows. E.g., _wstart, top(k, 20)
        // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
        for (int32_t k = 0; k < pRow->numOfRows; ++k) {
          colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
        }
      }
    }
5
54liuyao 已提交
2544

2545
    pBlock->info.rows += pRow->numOfRows;
2546
    releaseOutputBuf(pState, &key, pRow);
2547 2548 2549 2550
  }
  blockDataUpdateTsWindow(pBlock, 0);
  return TSDB_CODE_SUCCESS;
}
5
54liuyao 已提交
2551 2552 2553 2554 2555 2556 2557

int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size) {
  streamStateSessionPut(pState, key, (const void*)buf, size);
  releaseOutputBuf(pState, NULL, (SResultRow*)buf);
  return TSDB_CODE_SUCCESS;
}

2558
int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
5
54liuyao 已提交
2559
                                    SExprSupp* pSup, SGroupResInfo* pGroupResInfo) {
2560
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
5
54liuyao 已提交
2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);

  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
    SSessionKey* pKey = taosArrayGet(pGroupResInfo->pRows, i);
    int32_t      size = 0;
    void*        pVal = NULL;
    int32_t      code = streamStateSessionGet(pState, pKey, &pVal, &size);
    ASSERT(code == 0);
2574 2575
    if (code == -1) {
      // coverity scan
5
54liuyao 已提交
2576
      pGroupResInfo->index += 1;
2577 2578
      continue;
    }
5
54liuyao 已提交
2579 2580 2581 2582 2583 2584 2585 2586 2587
    SResultRow* pRow = (SResultRow*)pVal;
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
    // no results, continue to check the next one
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
      releaseOutputBuf(pState, NULL, pRow);
      continue;
    }

H
Haojun Liao 已提交
2588 2589
    if (pBlock->info.id.groupId == 0) {
      pBlock->info.id.groupId = pKey->groupId;
2590

2591
      void* tbname = NULL;
H
Haojun Liao 已提交
2592
      if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
2593
        pBlock->info.parTbName[0] = 0;
2594
      } else {
2595
        memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
2596
      }
2597
      tdbFree(tbname);
5
54liuyao 已提交
2598 2599
    } else {
      // current value belongs to different group, it can't be packed into one datablock
H
Haojun Liao 已提交
2600
      if (pBlock->info.id.groupId != pKey->groupId) {
5
54liuyao 已提交
2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642
        releaseOutputBuf(pState, NULL, pRow);
        break;
      }
    }

    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
      ASSERT(pBlock->info.rows > 0);
      releaseOutputBuf(pState, NULL, pRow);
      break;
    }

    pGroupResInfo->index += 1;

    for (int32_t j = 0; j < numOfExprs; ++j) {
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
      if (pCtx[j].fpSet.finalize) {
        int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
        if (TAOS_FAILED(code1)) {
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
          T_LONG_JMP(pTaskInfo->env, code1);
        }
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
      } else {
        // expand the result into multiple rows. E.g., _wstart, top(k, 20)
        // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
        for (int32_t k = 0; k < pRow->numOfRows; ++k) {
          colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
        }
      }
    }

    pBlock->info.rows += pRow->numOfRows;
    // saveSessionDiscBuf(pState, pKey, pVal, size);
    releaseOutputBuf(pState, NULL, pRow);
  }
  blockDataUpdateTsWindow(pBlock, 0);
  return TSDB_CODE_SUCCESS;
2643
}