executorimpl.c 93.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

H
Haojun Liao 已提交
16 17
#include "filter.h"
#include "function.h"
18 19
#include "functionMgt.h"
#include "os.h"
H
Haojun Liao 已提交
20
#include "querynodes.h"
21
#include "tfill.h"
dengyihao's avatar
dengyihao 已提交
22
#include "tname.h"
23

H
Haojun Liao 已提交
24
#include "tdatablock.h"
25
#include "tglobal.h"
H
Haojun Liao 已提交
26
#include "tmsg.h"
27
#include "ttime.h"
H
Haojun Liao 已提交
28

29
#include "executorimpl.h"
dengyihao's avatar
dengyihao 已提交
30
#include "index.h"
31
#include "query.h"
32
#include "tcompare.h"
H
Haojun Liao 已提交
33
#include "thash.h"
34
#include "ttypes.h"
dengyihao's avatar
dengyihao 已提交
35
#include "vnode.h"
36

H
Haojun Liao 已提交
37
#define IS_MAIN_SCAN(runtime)          ((runtime)->scanFlag == MAIN_SCAN)
38 39 40 41 42 43
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)

#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)

#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
wafwerar's avatar
wafwerar 已提交
44
  uint32_t v = taosRand();
45 46 47 48

  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
49
    return taosMemoryMalloc(__size);
50 51 52 53
  }
}

static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
wafwerar's avatar
wafwerar 已提交
54
  uint32_t v = taosRand();
55 56 57
  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
58
    return taosMemoryCalloc(num, __size);
59 60 61 62
  }
}

static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
wafwerar's avatar
wafwerar 已提交
63
  uint32_t v = taosRand();
64 65 66
  if (v % 5 <= 1) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
67
    return taosMemoryRealloc(p, __size);
68 69 70 71 72 73 74 75
  }
}

#define calloc  u_calloc
#define malloc  u_malloc
#define realloc u_realloc
#endif

X
Xiaoyu Wang 已提交
76
#define CLEAR_QUERY_STATUS(q, st)   ((q)->status &= (~(st)))
77 78
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)

H
Haojun Liao 已提交
79 80 81 82 83 84 85 86 87
typedef struct SAggOperatorInfo {
  SOptrBasicInfo   binfo;
  SAggSupporter    aggSup;
  STableQueryInfo* current;
  uint64_t         groupId;
  SGroupResInfo    groupResInfo;
  SExprSupp        scalarExprSup;
} SAggOperatorInfo;

88
static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock);
89

X
Xiaoyu Wang 已提交
90
static void releaseQueryBuf(size_t numOfTables);
91

H
Haojun Liao 已提交
92 93 94 95 96 97 98 99 100 101
static void    destroyAggOperatorInfo(void* param);
static void    initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
static void    doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
static void    doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                                const char* pKey);
static void    extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep,
                                                   int32_t status);
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
                                   bool createDummyCol);
H
Haojun Liao 已提交
102 103
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
                                  SGroupResInfo* pGroupResInfo);
H
Haojun Liao 已提交
104

H
Haojun Liao 已提交
105
void setOperatorCompleted(SOperatorInfo* pOperator) {
106
  pOperator->status = OP_EXEC_DONE;
H
Haojun Liao 已提交
107
  ASSERT(pOperator->pTaskInfo != NULL);
108

109
  pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start) / 1000.0;
H
Haojun Liao 已提交
110
  setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
111
}
112

H
Haojun Liao 已提交
113 114 115 116 117 118 119 120 121 122
void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status,
                     void* pInfo, SExecTaskInfo* pTaskInfo) {
  pOperator->name = (char*)name;
  pOperator->operatorType = type;
  pOperator->blocking = blocking;
  pOperator->status = status;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
}

123
int32_t optrDummyOpenFn(SOperatorInfo* pOperator) {
124
  OPTR_SET_OPENED(pOperator);
125
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
126
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
127 128
}

H
Haojun Liao 已提交
129
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
130 131
                                   __optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn,
                                   __optr_explain_fn_t explain) {
132 133 134 135 136
  SOperatorFpSet fpSet = {
      ._openFn = openFn,
      .getNextFn = nextFn,
      .cleanupFn = cleanup,
      .closeFn = closeFn,
137
      .reqBufFn = reqBufFn,
138 139 140 141 142 143
      .getExplainFn = explain,
  };

  return fpSet;
}

144
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
L
Liu Jicong 已提交
145
  SFilePage* pData = NULL;
146 147 148

  // in the first scan, new space needed for results
  int32_t pageId = -1;
149
  if (*currentPageId == -1) {
150
    pData = getNewBufPage(pResultBuf, &pageId);
151 152
    pData->num = sizeof(SFilePage);
  } else {
153 154
    pData = getBufPage(pResultBuf, *currentPageId);
    pageId = *currentPageId;
155

wmmhello's avatar
wmmhello 已提交
156
    if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
157
      // release current page first, and prepare the next one
158
      releaseBufPage(pResultBuf, pData);
159

160
      pData = getNewBufPage(pResultBuf, &pageId);
161 162 163 164 165 166 167 168 169 170
      if (pData != NULL) {
        pData->num = sizeof(SFilePage);
      }
    }
  }

  if (pData == NULL) {
    return NULL;
  }

171 172
  setBufPageDirty(pData, true);

173 174 175 176
  // set the number of rows in current disk page
  SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
  pResultRow->pageId = pageId;
  pResultRow->offset = (int32_t)pData->num;
177
  *currentPageId = pageId;
178

wmmhello's avatar
wmmhello 已提交
179
  pData->num += interBufSize;
180 181 182
  return pResultRow;
}

183 184 185 186 187 188 189
/**
 * the struct of key in hash table
 * +----------+---------------+
 * | group id |   key data    |
 * | 8 bytes  | actual length |
 * +----------+---------------+
 */
190 191 192
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
                                   int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
                                   bool isIntervalQuery, SAggSupporter* pSup) {
193
  SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
H
Haojun Liao 已提交
194

dengyihao's avatar
dengyihao 已提交
195
  SResultRowPosition* p1 =
196
      (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
H
Haojun Liao 已提交
197

198 199
  SResultRow* pResult = NULL;

H
Haojun Liao 已提交
200 201
  // in case of repeat scan/reverse scan, no new time window added.
  if (isIntervalQuery) {
202
    if (p1 != NULL) {  // the *p1 may be NULL in case of sliding+offset exists.
203
      pResult = getResultRowByPos(pResultBuf, p1, true);
204
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
205 206
    }
  } else {
dengyihao's avatar
dengyihao 已提交
207 208
    // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
    // pResultRowInfo object.
H
Haojun Liao 已提交
209
    if (p1 != NULL) {
210
      // todo
211
      pResult = getResultRowByPos(pResultBuf, p1, true);
212
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
213 214 215
    }
  }

L
Liu Jicong 已提交
216
  // 1. close current opened time window
217
  if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
218
    SResultRowPosition pos = pResultRowInfo->cur;
X
Xiaoyu Wang 已提交
219
    SFilePage*         pPage = getBufPage(pResultBuf, pos.pageId);
220 221 222 223 224
    releaseBufPage(pResultBuf, pPage);
  }

  // allocate a new buffer page
  if (pResult == NULL) {
H
Haojun Liao 已提交
225
    ASSERT(pSup->resultRowSize > 0);
226
    pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
227

228 229
    // add a new result set for a new group
    SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
230
    tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
L
Liu Jicong 已提交
231
                   sizeof(SResultRowPosition));
H
Haojun Liao 已提交
232 233
  }

234 235 236
  // 2. set the new time window to be the new active time window
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};

H
Haojun Liao 已提交
237
  // too many time window in query
238
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
239
      tSimpleHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
240
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
H
Haojun Liao 已提交
241 242
  }

H
Haojun Liao 已提交
243
  return pResult;
H
Haojun Liao 已提交
244 245
}

246
// a new buffer page for each table. Needs to opt this design
H
Haojun Liao 已提交
247
static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size) {
248 249 250 251
  if (pWindowRes->pageId != -1) {
    return 0;
  }

L
Liu Jicong 已提交
252
  SFilePage* pData = NULL;
253 254 255

  // in the first scan, new space needed for results
  int32_t pageId = -1;
256
  SArray* list = getDataBufPagesIdList(pResultBuf);
257 258

  if (taosArrayGetSize(list) == 0) {
259
    pData = getNewBufPage(pResultBuf, &pageId);
260
    pData->num = sizeof(SFilePage);
261 262
  } else {
    SPageInfo* pi = getLastPageInfo(list);
263
    pData = getBufPage(pResultBuf, getPageId(pi));
264
    pageId = getPageId(pi);
265

266
    if (pData->num + size > getBufPageSize(pResultBuf)) {
267
      // release current page first, and prepare the next one
268
      releaseBufPageInfo(pResultBuf, pi);
269

270
      pData = getNewBufPage(pResultBuf, &pageId);
271
      if (pData != NULL) {
272
        pData->num = sizeof(SFilePage);
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
      }
    }
  }

  if (pData == NULL) {
    return -1;
  }

  // set the number of rows in current disk page
  if (pWindowRes->pageId == -1) {  // not allocated yet, allocate new buffer
    pWindowRes->pageId = pageId;
    pWindowRes->offset = (int32_t)pData->num;

    pData->num += size;
    assert(pWindowRes->pageId >= 0);
  }

  return 0;
}

293
//  query_range_start, query_range_end, window_duration, window_start, window_end
294
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
295 296 297
  pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
  pColData->info.bytes = sizeof(int64_t);

H
Haojun Liao 已提交
298
  colInfoDataEnsureCapacity(pColData, 5, false);
299 300 301 302 303 304 305 306 307
  colDataAppendInt64(pColData, 0, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 1, &pQueryWindow->ekey);

  int64_t interval = 0;
  colDataAppendInt64(pColData, 2, &interval);  // this value may be variable in case of 'n' and 'y'.
  colDataAppendInt64(pColData, 3, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
}

308 309 310 311 312 313 314
typedef struct {
  bool    hasAgg;
  int32_t numOfRows;
  int32_t startOffset;
} SFunctionCtxStatus;

static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
H
Haojun Liao 已提交
315
  pStatus->hasAgg = pCtx->input.colDataSMAIsSet;
316 317 318 319 320
  pStatus->numOfRows = pCtx->input.numOfRows;
  pStatus->startOffset = pCtx->input.startRowIndex;
}

static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
H
Haojun Liao 已提交
321
  pCtx->input.colDataSMAIsSet = pStatus->hasAgg;
H
Haojun Liao 已提交
322
  pCtx->input.numOfRows = pStatus->numOfRows;
323 324 325
  pCtx->input.startRowIndex = pStatus->startOffset;
}

H
Haojun Liao 已提交
326 327
void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
                                     int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) {
328
  for (int32_t k = 0; k < numOfOutput; ++k) {
H
Haojun Liao 已提交
329
    // keep it temporarily
330 331
    SFunctionCtxStatus status = {0};
    functionCtxSave(&pCtx[k], &status);
332

333
    pCtx[k].input.startRowIndex = offset;
334
    pCtx[k].input.numOfRows = forwardStep;
335 336 337

    // not a whole block involved in query processing, statistics data can not be used
    // NOTE: the original value of isSet have been changed here
H
Haojun Liao 已提交
338 339
    if (pCtx[k].input.colDataSMAIsSet && forwardStep < numOfTotal) {
      pCtx[k].input.colDataSMAIsSet = false;
340 341
    }

342 343
    if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) {
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
344 345

      char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
346

347
      SColumnInfoData idata = {0};
dengyihao's avatar
dengyihao 已提交
348
      idata.info.type = TSDB_DATA_TYPE_BIGINT;
349
      idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
dengyihao's avatar
dengyihao 已提交
350
      idata.pData = p;
351 352 353 354

      SScalarParam out = {.columnData = &idata};
      SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
      pCtx[k].sfp.process(&tw, 1, &out);
355
      pEntryInfo->numOfRes = 1;
356 357 358 359 360 361 362 363
    } else {
      int32_t code = TSDB_CODE_SUCCESS;
      if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
        code = pCtx[k].fpSet.process(&pCtx[k]);

        if (code != TSDB_CODE_SUCCESS) {
          qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
          taskInfo->code = code;
364
          T_LONG_JMP(taskInfo->env, code);
365
        }
366
      }
367

368
      // restore it
369
      functionCtxRestore(&pCtx[k], &status);
370
    }
371 372 373
  }
}

374 375 376
static void doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order) {
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
377
    pCtx[i].order = order;
378
    pCtx[i].input.numOfRows = pBlock->info.rows;
379
    setBlockSMAInfo(&pCtx[i], &pExprSup->pExprInfo[i], pBlock);
380
    pCtx[i].pSrcBlock = pBlock;
381 382 383
  }
}

384
void setInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol) {
385
  if (pBlock->pBlockAgg != NULL) {
386
    doSetInputDataBlockInfo(pExprSup, pBlock, order);
387
  } else {
388
    doSetInputDataBlock(pExprSup, pBlock, order, scanFlag, createDummyCol);
H
Haojun Liao 已提交
389
  }
390 391
}

L
Liu Jicong 已提交
392 393
static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t paramIndex,
                                             int32_t numOfRows) {
394 395 396 397 398 399 400 401
  SColumnInfoData* pColInfo = NULL;
  if (pInput->pData[paramIndex] == NULL) {
    pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pColInfo == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    // Set the correct column info (data type and bytes)
402 403
    pColInfo->info.type = pFuncParam->param.nType;
    pColInfo->info.bytes = pFuncParam->param.nLen;
404 405

    pInput->pData[paramIndex] = pColInfo;
406 407
  } else {
    pColInfo = pInput->pData[paramIndex];
408 409
  }

H
Haojun Liao 已提交
410
  colInfoDataEnsureCapacity(pColInfo, numOfRows, false);
411

412
  int8_t type = pFuncParam->param.nType;
413 414
  if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
    int64_t v = pFuncParam->param.i;
dengyihao's avatar
dengyihao 已提交
415
    for (int32_t i = 0; i < numOfRows; ++i) {
416 417 418 419
      colDataAppendInt64(pColInfo, i, &v);
    }
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
dengyihao's avatar
dengyihao 已提交
420
    for (int32_t i = 0; i < numOfRows; ++i) {
421 422
      colDataAppendDouble(pColInfo, i, &v);
    }
423
  } else if (type == TSDB_DATA_TYPE_VARCHAR) {
L
Liu Jicong 已提交
424
    char* tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE);
425
    STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen);
L
Liu Jicong 已提交
426
    for (int32_t i = 0; i < numOfRows; ++i) {
427 428
      colDataAppend(pColInfo, i, tmp, false);
    }
H
Haojun Liao 已提交
429
    taosMemoryFree(tmp);
430 431 432 433 434
  }

  return TSDB_CODE_SUCCESS;
}

435 436
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
                                   bool createDummyCol) {
437
  int32_t         code = TSDB_CODE_SUCCESS;
438
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
439

440
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
L
Liu Jicong 已提交
441
    pCtx[i].order = order;
442 443
    pCtx[i].input.numOfRows = pBlock->info.rows;

L
Liu Jicong 已提交
444
    pCtx[i].pSrcBlock = pBlock;
X
Xiaoyu Wang 已提交
445
    pCtx[i].scanFlag = scanFlag;
H
Haojun Liao 已提交
446

447
    SInputColumnInfoData* pInput = &pCtx[i].input;
H
Haojun Liao 已提交
448
    pInput->uid = pBlock->info.id.uid;
H
Haojun Liao 已提交
449
    pInput->colDataSMAIsSet = false;
450

451
    SExprInfo* pOneExpr = &pExprSup->pExprInfo[i];
452
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
dengyihao's avatar
dengyihao 已提交
453
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
G
Ganlin Zhao 已提交
454 455
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
dengyihao's avatar
dengyihao 已提交
456
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
457 458 459
        pInput->totalRows = pBlock->info.rows;
        pInput->numOfRows = pBlock->info.rows;
        pInput->startRowIndex = 0;
460

461
        // NOTE: the last parameter is the primary timestamp column
H
Haojun Liao 已提交
462
        // todo: refactor this
463
        if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
L
Liu Jicong 已提交
464
          pInput->pPTS = pInput->pData[j];  // in case of merge function, this is not always the ts column data.
465
          //          ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP);
466
        }
467 468
        ASSERT(pInput->pData[j] != NULL);
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
469 470 471
        // todo avoid case: top(k, 12), 12 is the value parameter.
        // sum(11), 11 is also the value parameter.
        if (createDummyCol && pOneExpr->base.numOfParams == 1) {
472 473 474 475
          pInput->totalRows = pBlock->info.rows;
          pInput->numOfRows = pBlock->info.rows;
          pInput->startRowIndex = 0;

476
          code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows);
477 478 479
          if (code != TSDB_CODE_SUCCESS) {
            return code;
          }
480
        }
G
Ganlin Zhao 已提交
481 482
      }
    }
H
Haojun Liao 已提交
483
  }
484 485

  return code;
H
Haojun Liao 已提交
486 487
}

488
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
489
  for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
H
Haojun Liao 已提交
490
    if (functionNeedToExecute(&pCtx[k])) {
491
      // todo add a dummy funtion to avoid process check
492 493 494
      if (pCtx[k].fpSet.process == NULL) {
        continue;
      }
H
Haojun Liao 已提交
495

496 497 498 499
      int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
      if (code != TSDB_CODE_SUCCESS) {
        qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
        return code;
500
      }
501 502
    }
  }
503 504

  return TSDB_CODE_SUCCESS;
505 506
}

5
54liuyao 已提交
507
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
508
  struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
509

510 511 512 513 514
  // in case of timestamp column, always generated results.
  int32_t functionId = pCtx->functionId;
  if (functionId == -1) {
    return false;
  }
515

516 517
  if (pCtx->scanFlag == REPEAT_SCAN) {
    return fmIsRepeatScanFunc(pCtx->functionId);
518 519
  }

520 521
  if (isRowEntryCompleted(pResInfo)) {
    return false;
522 523
  }

524 525 526
  return true;
}

527 528 529 530 531 532 533
static int32_t doCreateConstantValColumnAggInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type,
                                                int32_t paramIndex, int32_t numOfRows) {
  if (pInput->pData[paramIndex] == NULL) {
    pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pInput->pData[paramIndex] == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
534

535 536 537
    // Set the correct column info (data type and bytes)
    pInput->pData[paramIndex]->info.type = type;
    pInput->pData[paramIndex]->info.bytes = tDataTypes[type].bytes;
538
  }
H
Haojun Liao 已提交
539

540 541 542 543 544 545
  SColumnDataAgg* da = NULL;
  if (pInput->pColumnDataAgg[paramIndex] == NULL) {
    da = taosMemoryCalloc(1, sizeof(SColumnDataAgg));
    pInput->pColumnDataAgg[paramIndex] = da;
    if (da == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
546 547
    }
  } else {
548
    da = pInput->pColumnDataAgg[paramIndex];
549 550
  }

551
  ASSERT(!IS_VAR_DATA_TYPE(type));
552

553 554
  if (type == TSDB_DATA_TYPE_BIGINT) {
    int64_t v = pFuncParam->param.i;
555
    *da = (SColumnDataAgg){.numOfNull = 0, .min = v, .max = v, .sum = v * numOfRows};
556 557
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
558
    *da = (SColumnDataAgg){.numOfNull = 0};
559

560 561 562 563 564 565
    *(double*)&da->min = v;
    *(double*)&da->max = v;
    *(double*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_BOOL) {  // todo validate this data type
    bool v = pFuncParam->param.i;

566
    *da = (SColumnDataAgg){.numOfNull = 0};
567 568 569 570 571
    *(bool*)&da->min = 0;
    *(bool*)&da->max = v;
    *(bool*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_TIMESTAMP) {
    // do nothing
572
  } else {
573
    ASSERT(0);
574 575
  }

576 577
  return TSDB_CODE_SUCCESS;
}
578

579
void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock) {
580 581 582 583 584 585 586
  int32_t numOfRows = pBlock->info.rows;

  SInputColumnInfoData* pInput = &pCtx->input;
  pInput->numOfRows = numOfRows;
  pInput->totalRows = numOfRows;

  if (pBlock->pBlockAgg != NULL) {
H
Haojun Liao 已提交
587
    pInput->colDataSMAIsSet = true;
588

589 590
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
591

592 593
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
594 595
        pInput->pColumnDataAgg[j] = pBlock->pBlockAgg[slotId];
        if (pInput->pColumnDataAgg[j] == NULL) {
H
Haojun Liao 已提交
596
          pInput->colDataSMAIsSet = false;
597
        }
598 599 600 601

        // Here we set the column info data since the data type for each column data is required, but
        // the data in the corresponding SColumnInfoData will not be used.
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
602 603
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
        doCreateConstantValColumnAggInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
604 605
      }
    }
606
  } else {
H
Haojun Liao 已提交
607
    pInput->colDataSMAIsSet = false;
608 609 610
  }
}

L
Liu Jicong 已提交
611
bool isTaskKilled(SExecTaskInfo* pTaskInfo) {
D
dapan1121 已提交
612
  return (0 != pTaskInfo->code) ? true : false;
613 614
}

D
dapan1121 已提交
615
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) { pTaskInfo->code = rspCode; }
616 617

/////////////////////////////////////////////////////////////////////////////////////////////
618
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) {
L
Liu Jicong 已提交
619
  STimeWindow win = {0};
620
  win.skey = taosTimeTruncate(key, pInterval, precision);
621 622

  /*
H
Haojun Liao 已提交
623
   * if the realSkey > INT64_MAX - pInterval->interval, the query duration between
624 625
   * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
   */
626 627 628
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
  if (win.ekey < win.skey) {
    win.ekey = INT64_MAX;
629
  }
630 631

  return win;
632 633
}

L
Liu Jicong 已提交
634 635
int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
                              uint32_t* status) {
636
  *status = BLK_DATA_NOT_LOAD;
637

H
Haojun Liao 已提交
638
  pBlock->pDataBlock = NULL;
L
Liu Jicong 已提交
639
  pBlock->pBlockAgg = NULL;
H
Haojun Liao 已提交
640

L
Liu Jicong 已提交
641 642
  //  int64_t groupId = pRuntimeEnv->current->groupIndex;
  //  bool    ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
643

H
Haojun Liao 已提交
644
  STaskCostInfo* pCost = &pTaskInfo->cost;
645

646 647
//  pCost->totalBlocks += 1;
//  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
648
#if 0
649 650 651
  // Calculate all time windows that are overlapping or contain current data block.
  // If current data block is contained by all possible time window, do not load current data block.
  if (/*pQueryAttr->pFilters || */pQueryAttr->groupbyColumn || pQueryAttr->sw.gap > 0 ||
H
Haojun Liao 已提交
652
      (QUERY_IS_INTERVAL_QUERY(pQueryAttr) && overlapWithTimeWindow(pTaskInfo, &pBlock->info))) {
653
    (*status) = BLK_DATA_DATA_LOAD;
654 655 656
  }

  // check if this data block is required to load
657
  if ((*status) != BLK_DATA_DATA_LOAD) {
658 659 660 661 662 663 664
    bool needFilter = true;

    // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
    // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
    if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
      SResultRow* pResult = NULL;

H
Haojun Liao 已提交
665
      bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
666 667 668 669 670 671
      TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

      STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
      if (pQueryAttr->pointInterpQuery) {
        needFilter = chkWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
672
                                    pTableScanInfo->rowEntryInfoOffset);
673
      } else {
H
Haojun Liao 已提交
674
        if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.id.uid, &win, masterScan, &pResult, groupId,
675
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
676
                                    pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
677
          T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_OUT_OF_MEMORY);
678 679 680 681
        }
      }
    } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate
      doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx,
682
                               pTableScanInfo->rowEntryInfoOffset, pTableScanInfo->numOfOutput,
683 684 685 686 687 688
                               pRuntimeEnv->current->groupIndex);
    }

    if (needFilter) {
      (*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock);
    } else {
689
      (*status) = BLK_DATA_DATA_LOAD;
690 691 692 693
    }
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
H
Haojun Liao 已提交
694
//  *status = updateBlockLoadStatus(pRuntimeEnv->pQueryAttr, *status);
695

696
  if ((*status) == BLK_DATA_NOT_LOAD || (*status) == BLK_DATA_FILTEROUT) {
697 698
    //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//           pBlockInfo->window.ekey, pBlockInfo->rows);
699
    pCost->skipBlocks += 1;
700
  } else if ((*status) == BLK_DATA_SMA_LOAD) {
701 702
    // this function never returns error?
    pCost->loadBlockStatis += 1;
703
//    tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
704 705

    if (pBlock->pBlockAgg == NULL) {  // data block statistics does not exist, load data block
706
//      pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
707 708 709
      pCost->totalCheckedRows += pBlock->info.rows;
    }
  } else {
710
    assert((*status) == BLK_DATA_DATA_LOAD);
711 712 713

    // load the data block statistics to perform further filter
    pCost->loadBlockStatis += 1;
714
//    tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
715 716 717 718 719 720

    if (pQueryAttr->topBotQuery && pBlock->pBlockAgg != NULL) {
      { // set previous window
        if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
          SResultRow* pResult = NULL;

H
Haojun Liao 已提交
721
          bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
722 723 724
          TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

          STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
H
Haojun Liao 已提交
725
          if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.id.uid, &win, masterScan, &pResult, groupId,
726
                                      pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
727
                                      pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
728
            T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_OUT_OF_MEMORY);
729 730 731 732 733 734 735 736 737 738
          }
        }
      }
      bool load = false;
      for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
        int32_t functionId = pTableScanInfo->pCtx[i].functionId;
        if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM) {
//          load = topbot_datablock_filter(&pTableScanInfo->pCtx[i], (char*)&(pBlock->pBlockAgg[i].min),
//                                         (char*)&(pBlock->pBlockAgg[i].max));
          if (!load) { // current block has been discard due to filter applied
739
            pCost->skipBlocks += 1;
740 741
            //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId,
//                   pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
742
            (*status) = BLK_DATA_FILTEROUT;
743 744 745 746 747 748 749
            return TSDB_CODE_SUCCESS;
          }
        }
      }
    }

    // current block has been discard due to filter applied
H
Haojun Liao 已提交
750
//    if (!doFilterByBlockSMA(pRuntimeEnv, pBlock->pBlockAgg, pTableScanInfo->pCtx, pBlockInfo->rows)) {
751
//      pCost->skipBlocks += 1;
752 753
//      qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//             pBlockInfo->window.ekey, pBlockInfo->rows);
754
//      (*status) = BLK_DATA_FILTEROUT;
755 756 757 758 759
//      return TSDB_CODE_SUCCESS;
//    }

    pCost->totalCheckedRows += pBlockInfo->rows;
    pCost->loadBlocks += 1;
760
//    pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
761 762 763 764 765
//    if (pBlock->pDataBlock == NULL) {
//      return terrno;
//    }

//    if (pQueryAttr->pFilters != NULL) {
766
//      filterSetColFieldData(pQueryAttr->pFilters, taosArrayGetSize(pBlock->pDataBlock), pBlock->pDataBlock);
767
//    }
768

769 770 771 772
//    if (pQueryAttr->pFilters != NULL || pRuntimeEnv->pTsBuf != NULL) {
//      filterColRowsInDataBlock(pRuntimeEnv, pBlock, ascQuery);
//    }
  }
H
Haojun Liao 已提交
773
#endif
774 775 776
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
777
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
778
  if (status == TASK_NOT_COMPLETED) {
H
Haojun Liao 已提交
779
    pTaskInfo->status = status;
780 781
  } else {
    // QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first
782
    CLEAR_QUERY_STATUS(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
783
    pTaskInfo->status |= status;
784 785 786
  }
}

787
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
5
54liuyao 已提交
788
  bool init = false;
789
  for (int32_t i = 0; i < numOfOutput; ++i) {
790
    pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
5
54liuyao 已提交
791 792 793
    if (init) {
      continue;
    }
794 795 796 797 798

    struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
    if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
      continue;
    }
799 800 801 802 803

    if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
      continue;
    }

804 805 806 807 808 809
    if (!pResInfo->initialized) {
      if (pCtx[i].functionId != -1) {
        pCtx[i].fpSet.init(&pCtx[i], pResInfo);
      } else {
        pResInfo->initialized = true;
      }
5
54liuyao 已提交
810 811
    } else {
      init = true;
812 813 814 815
    }
  }
}

H
Haojun Liao 已提交
816 817
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) {
  if (pFilterInfo == NULL || pBlock->info.rows == 0) {
S
shenglian zhou 已提交
818 819
    return;
  }
820

821
  SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
822
  int32_t            code = filterSetDataFromSlotId(pFilterInfo, &param1);
823

824
  SColumnInfoData* p = NULL;
825
  int32_t          status = 0;
H
Haojun Liao 已提交
826

827
  // todo the keep seems never to be True??
H
Haojun Liao 已提交
828
  bool keep = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
829
  extractQualifiedTupleByFilterResult(pBlock, p, keep, status);
H
Haojun Liao 已提交
830

831
  if (pColMatchInfo != NULL) {
832
    size_t size = taosArrayGetSize(pColMatchInfo->pList);
H
Haojun Liao 已提交
833
    for (int32_t i = 0; i < size; ++i) {
H
Haojun Liao 已提交
834
      SColMatchItem* pInfo = taosArrayGet(pColMatchInfo->pList, i);
835
      if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
836
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->dstSlotId);
837
        if (pColData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
H
Haojun Liao 已提交
838
          blockDataUpdateTsWindow(pBlock, pInfo->dstSlotId);
839 840 841 842 843 844
          break;
        }
      }
    }
  }

845 846
  colDataDestroy(p);
  taosMemoryFree(p);
847 848
}

849
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status) {
850 851 852 853
  if (keep) {
    return;
  }

H
Haojun Liao 已提交
854 855 856
  int32_t totalRows = pBlock->info.rows;

  if (status == FILTER_RESULT_ALL_QUALIFIED) {
857
    // here nothing needs to be done
H
Haojun Liao 已提交
858
  } else if (status == FILTER_RESULT_NONE_QUALIFIED) {
859
    pBlock->info.rows = 0;
H
Haojun Liao 已提交
860
  } else {
861
    SSDataBlock* px = createOneDataBlock(pBlock, true);
862

863 864
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
    for (int32_t i = 0; i < numOfCols; ++i) {
865 866
      SColumnInfoData* pSrc = taosArrayGet(px->pDataBlock, i);
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
867
      // it is a reserved column for scalar function, and no data in this column yet.
868
      if (pDst->pData == NULL || pSrc->pData == NULL) {
869 870 871
        continue;
      }

872 873
      colInfoDataCleanup(pDst, pBlock->info.rows);

874
      int32_t numOfRows = 0;
875
      for (int32_t j = 0; j < totalRows; ++j) {
876
        if (((int8_t*)p->pData)[j] == 0) {
D
dapan1121 已提交
877 878
          continue;
        }
879

D
dapan1121 已提交
880
        if (colDataIsNull_s(pSrc, j)) {
881
          colDataAppendNULL(pDst, numOfRows);
D
dapan1121 已提交
882
        } else {
883
          colDataAppend(pDst, numOfRows, colDataGetData(pSrc, j), false);
D
dapan1121 已提交
884
        }
885
        numOfRows += 1;
H
Haojun Liao 已提交
886
      }
887

888
      // todo this value can be assigned directly
889 890 891 892 893
      if (pBlock->info.rows == totalRows) {
        pBlock->info.rows = numOfRows;
      } else {
        ASSERT(pBlock->info.rows == numOfRows);
      }
894
    }
895

dengyihao's avatar
dengyihao 已提交
896
    blockDataDestroy(px);  // fix memory leak
897 898 899
  }
}

900
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
901
  // for simple group by query without interval, all the tables belong to one group result.
902 903 904
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
  SAggOperatorInfo* pAggInfo = pOperator->info;

905
  SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
906 907
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
908

909
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId,
L
Liu Jicong 已提交
910
                                                  sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
L
Liu Jicong 已提交
911
  assert(pResultRow != NULL);
912 913 914 915 916 917

  /*
   * not assign result buffer yet, add new result buffer
   * all group belong to one result set, and each group result has different group id so set the id to be one
   */
  if (pResultRow->pageId == -1) {
H
Haojun Liao 已提交
918
    int32_t ret = addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize);
919 920 921 922 923
    if (ret != TSDB_CODE_SUCCESS) {
      return;
    }
  }

924
  setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
925 926
}

927 928 929
static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
  SAggOperatorInfo* pAggInfo = pOperator->info;
  if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) {
930 931
    return;
  }
932 933

  doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId);
934 935

  // record the current active group id
H
Haojun Liao 已提交
936
  pAggInfo->groupId = groupId;
937 938
}

dengyihao's avatar
dengyihao 已提交
939
static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs,
940
                              const int32_t* rowEntryOffset) {
941
  bool returnNotNull = false;
942
  for (int32_t j = 0; j < numOfExprs; ++j) {
943
    SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
944 945 946 947 948 949 950
    if (!isRowEntryInitialized(pResInfo)) {
      continue;
    }

    if (pRow->numOfRows < pResInfo->numOfRes) {
      pRow->numOfRows = pResInfo->numOfRes;
    }
951

952
    if (fmIsNotNullOutputFunc(pCtx[j].functionId)) {
953 954
      returnNotNull = true;
    }
955
  }
S
shenglian zhou 已提交
956 957
  // if all expr skips all blocks, e.g. all null inputs for max function, output one row in final result.
  //  except for first/last, which require not null output, output no rows
958
  if (pRow->numOfRows == 0 && !returnNotNull) {
959
    pRow->numOfRows = 1;
960 961 962
  }
}

963 964
static void doCopyResultToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
                                    SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) {
965 966 967
  for (int32_t j = 0; j < numOfExprs; ++j) {
    int32_t slotId = pExprInfo[j].base.resSchema.slotId;

968
    pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
969
    if (pCtx[j].fpSet.finalize) {
970
      if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_key") == 0) {
971 972
        // for groupkey along with functions that output multiple lines(e.g. Histogram)
        // need to match groupkey result for each output row of that function.
973 974 975 976 977
        if (pCtx[j].resultInfo->numOfRes != 0) {
          pCtx[j].resultInfo->numOfRes = pRow->numOfRows;
        }
      }

978 979 980
      int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
      if (TAOS_FAILED(code)) {
        qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
981
        T_LONG_JMP(pTaskInfo->env, code);
982 983
      }
    } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
984
      // do nothing
985
    } else {
986 987
      // expand the result into multiple rows. E.g., _wstart, top(k, 20)
      // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
988 989 990 991 992 993 994
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
      char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
      for (int32_t k = 0; k < pRow->numOfRows; ++k) {
        colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
      }
    }
  }
995 996
}

997 998 999
// todo refactor. SResultRow has direct pointer in miainfo
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
                           SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025
  SFilePage*  page = getBufPage(pBuf, resultRowPosition->pageId);
  SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);

  SqlFunctionCtx* pCtx = pSup->pCtx;
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  const int32_t*  rowEntryOffset = pSup->rowEntryInfoOffset;

  doUpdateNumOfRows(pCtx, pRow, pSup->numOfExprs, rowEntryOffset);
  if (pRow->numOfRows == 0) {
    releaseBufPage(pBuf, page);
    return 0;
  }

  int32_t size = pBlock->info.capacity;
  while (pBlock->info.rows + pRow->numOfRows > size) {
    size = size * 1.25;
  }

  int32_t code = blockDataEnsureCapacity(pBlock, size);
  if (TAOS_FAILED(code)) {
    releaseBufPage(pBuf, page);
    qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
    T_LONG_JMP(pTaskInfo->env, code);
  }

  doCopyResultToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
1026 1027

  releaseBufPage(pBuf, page);
1028
  pBlock->info.rows += pRow->numOfRows;
1029 1030 1031
  return 0;
}

1032 1033 1034 1035 1036 1037 1038
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
                           SGroupResInfo* pGroupResInfo) {
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

1039
  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
1040

1041
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
L
Liu Jicong 已提交
1042 1043
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
    SFilePage*  page = getBufPage(pBuf, pPos->pos.pageId);
1044

1045
    SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
1046

H
Haojun Liao 已提交
1047
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
1048 1049

    // no results, continue to check the next one
1050 1051
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
1052
      releaseBufPage(pBuf, page);
1053 1054 1055
      continue;
    }

H
Haojun Liao 已提交
1056 1057
    if (pBlock->info.id.groupId == 0) {
      pBlock->info.id.groupId = pPos->groupId;
1058 1059
    } else {
      // current value belongs to different group, it can't be packed into one datablock
H
Haojun Liao 已提交
1060
      if (pBlock->info.id.groupId != pPos->groupId) {
1061
        releaseBufPage(pBuf, page);
1062 1063 1064 1065
        break;
      }
    }

1066
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
1067
      ASSERT(pBlock->info.rows > 0);
1068
      releaseBufPage(pBuf, page);
1069 1070 1071 1072
      break;
    }

    pGroupResInfo->index += 1;
1073
    doCopyResultToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
1074

1075
    releaseBufPage(pBuf, page);
1076
    pBlock->info.rows += pRow->numOfRows;
1077 1078
  }

X
Xiaoyu Wang 已提交
1079
  qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
H
Haojun Liao 已提交
1080
         pBlock->info.id.groupId);
H
Haojun Liao 已提交
1081
  pBlock->info.dataLoad = 1;
1082
  blockDataUpdateTsWindow(pBlock, 0);
1083 1084 1085
  return 0;
}

1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                           SDiskbasedBuf* pBuf) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SSDataBlock*   pBlock = pbInfo->pRes;

  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

  blockDataCleanup(pBlock);
  if (!hasRemainResults(pGroupResInfo)) {
    return;
  }

  // clear the existed group id
H
Haojun Liao 已提交
1100
  pBlock->info.id.groupId = 0;
1101 1102 1103
  ASSERT(!pbInfo->mergeResultBlock);
  doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);

1104
  void* tbname = NULL;
H
Haojun Liao 已提交
1105
  if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
1106 1107 1108
    pBlock->info.parTbName[0] = 0;
  } else {
    memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
1109
  }
1110
  tdbFree(tbname);
1111 1112
}

X
Xiaoyu Wang 已提交
1113 1114
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                            SDiskbasedBuf* pBuf) {
1115
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1116
  SSDataBlock*   pBlock = pbInfo->pRes;
1117

1118 1119 1120
  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

1121
  blockDataCleanup(pBlock);
1122
  if (!hasRemainResults(pGroupResInfo)) {
1123 1124 1125
    return;
  }

1126
  // clear the existed group id
H
Haojun Liao 已提交
1127
  pBlock->info.id.groupId = 0;
1128 1129 1130
  if (!pbInfo->mergeResultBlock) {
    doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
  } else {
dengyihao's avatar
dengyihao 已提交
1131
    while (hasRemainResults(pGroupResInfo)) {
1132 1133 1134
      doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
      if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
        break;
1135 1136
      }

1137
      // clearing group id to continue to merge data that belong to different groups
H
Haojun Liao 已提交
1138
      pBlock->info.id.groupId = 0;
1139
    }
1140 1141

    // clear the group id info in SSDataBlock, since the client does not need it
H
Haojun Liao 已提交
1142
    pBlock->info.id.groupId = 0;
1143 1144 1145
  }
}

L
Liu Jicong 已提交
1146 1147 1148 1149
// static TSKEY doSkipIntervalProcess(STaskRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo,
// STableQueryInfo* pTableQueryInfo) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
1150
//
L
Liu Jicong 已提交
1151 1152 1153
//   assert(pQueryAttr->limit.offset == 0);
//   STimeWindow tw = *win;
//   getNextTimeWindow(pQueryAttr, &tw);
1154
//
L
Liu Jicong 已提交
1155 1156
//   if ((tw.skey <= pBlockInfo->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) ||
//       (tw.ekey >= pBlockInfo->window.skey && !QUERY_IS_ASC_QUERY(pQueryAttr))) {
1157
//
L
Liu Jicong 已提交
1158 1159 1160 1161
//     // load the data block and check data remaining in current data block
//     // TODO optimize performance
//     SArray *         pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//     SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1162
//
L
Liu Jicong 已提交
1163 1164 1165 1166
//     tw = *win;
//     int32_t startPos =
//         getNextQualifiedWindow(pQueryAttr, &tw, pBlockInfo, pColInfoData->pData, binarySearchForKey, -1);
//     assert(startPos >= 0);
1167
//
L
Liu Jicong 已提交
1168 1169
//     // set the abort info
//     pQueryAttr->pos = startPos;
1170
//
L
Liu Jicong 已提交
1171 1172 1173 1174
//     // reset the query start timestamp
//     pTableQueryInfo->win.skey = ((TSKEY *)pColInfoData->pData)[startPos];
//     pQueryAttr->window.skey = pTableQueryInfo->win.skey;
//     TSKEY key = pTableQueryInfo->win.skey;
1175
//
L
Liu Jicong 已提交
1176 1177
//     pWindowResInfo->prevSKey = tw.skey;
//     int32_t index = pRuntimeEnv->resultRowInfo.curIndex;
1178
//
L
Liu Jicong 已提交
1179 1180
//     int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
//     pRuntimeEnv->resultRowInfo.curIndex = index;  // restore the window index
1181
//
L
Liu Jicong 已提交
1182 1183 1184 1185
//     //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d,
//     lastKey:%" PRId64,
//            GET_TASKID(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes,
//            pQueryAttr->current->lastKey);
1186
//
L
Liu Jicong 已提交
1187 1188 1189 1190 1191
//     return key;
//   } else {  // do nothing
//     pQueryAttr->window.skey      = tw.skey;
//     pWindowResInfo->prevSKey = tw.skey;
//     pTableQueryInfo->lastKey = tw.skey;
1192
//
L
Liu Jicong 已提交
1193 1194
//     return tw.skey;
//   }
1195
//
L
Liu Jicong 已提交
1196 1197 1198 1199 1200 1201 1202 1203 1204 1205
//   return true;
// }

// static bool skipTimeInterval(STaskRuntimeEnv *pRuntimeEnv, TSKEY* start) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//     assert(*start <= pRuntimeEnv->current->lastKey);
//   } else {
//     assert(*start >= pRuntimeEnv->current->lastKey);
//   }
1206
//
L
Liu Jicong 已提交
1207 1208 1209 1210 1211
//   // if queried with value filter, do NOT forward query start position
//   if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL ||
//   pRuntimeEnv->pFillInfo != NULL) {
//     return true;
//   }
1212
//
L
Liu Jicong 已提交
1213 1214 1215 1216 1217 1218 1219
//   /*
//    * 1. for interval without interpolation query we forward pQueryAttr->interval.interval at a time for
//    *    pQueryAttr->limit.offset times. Since hole exists, pQueryAttr->interval.interval*pQueryAttr->limit.offset
//    value is
//    *    not valid. otherwise, we only forward pQueryAttr->limit.offset number of points
//    */
//   assert(pRuntimeEnv->resultRowInfo.prevSKey == TSKEY_INITIAL_VAL);
1220
//
L
Liu Jicong 已提交
1221 1222
//   STimeWindow w = TSWINDOW_INITIALIZER;
//   bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
1223
//
L
Liu Jicong 已提交
1224 1225
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
//   STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
1226
//
L
Liu Jicong 已提交
1227 1228 1229
//   SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
//   while (tsdbNextDataBlock(pRuntimeEnv->pTsdbReadHandle)) {
//     tsdbRetrieveDataBlockInfo(pRuntimeEnv->pTsdbReadHandle, &blockInfo);
1230
//
L
Liu Jicong 已提交
1231 1232 1233 1234 1235 1236 1237 1238 1239
//     if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//       if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
//         getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.skey, blockInfo.window.skey, pQueryAttr->window.ekey,
//         &w); pWindowResInfo->prevSKey = w.skey;
//       }
//     } else {
//       getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.ekey, pQueryAttr->window.ekey, blockInfo.window.ekey, &w);
//       pWindowResInfo->prevSKey = w.skey;
//     }
1240
//
L
Liu Jicong 已提交
1241 1242
//     // the first time window
//     STimeWindow win = getActiveTimeWindow(pWindowResInfo, pWindowResInfo->prevSKey, pQueryAttr);
1243
//
L
Liu Jicong 已提交
1244 1245
//     while (pQueryAttr->limit.offset > 0) {
//       STimeWindow tw = win;
1246
//
L
Liu Jicong 已提交
1247 1248 1249
//       if ((win.ekey <= blockInfo.window.ekey && ascQuery) || (win.ekey >= blockInfo.window.skey && !ascQuery)) {
//         pQueryAttr->limit.offset -= 1;
//         pWindowResInfo->prevSKey = win.skey;
1250
//
L
Liu Jicong 已提交
1251 1252 1253 1254 1255 1256
//         // current time window is aligned with blockInfo.window.ekey
//         // restart it from next data block by set prevSKey to be TSKEY_INITIAL_VAL;
//         if ((win.ekey == blockInfo.window.ekey && ascQuery) || (win.ekey == blockInfo.window.skey && !ascQuery)) {
//           pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL;
//         }
//       }
1257
//
L
Liu Jicong 已提交
1258 1259 1260 1261
//       if (pQueryAttr->limit.offset == 0) {
//         *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//         return true;
//       }
1262
//
L
Liu Jicong 已提交
1263 1264
//       // current window does not ended in current data block, try next data block
//       getNextTimeWindow(pQueryAttr, &tw);
1265
//
L
Liu Jicong 已提交
1266 1267 1268 1269 1270 1271 1272 1273 1274
//       /*
//        * If the next time window still starts from current data block,
//        * load the primary timestamp column first, and then find the start position for the next queried time window.
//        * Note that only the primary timestamp column is required.
//        * TODO: Optimize for this cases. All data blocks are not needed to be loaded, only if the first actually
//        required
//        * time window resides in current data block.
//        */
//       if ((tw.skey <= blockInfo.window.ekey && ascQuery) || (tw.ekey >= blockInfo.window.skey && !ascQuery)) {
1275
//
L
Liu Jicong 已提交
1276 1277
//         SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//         SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1278
//
L
Liu Jicong 已提交
1279 1280 1281
//         if ((win.ekey > blockInfo.window.ekey && ascQuery) || (win.ekey < blockInfo.window.skey && !ascQuery)) {
//           pQueryAttr->limit.offset -= 1;
//         }
1282
//
L
Liu Jicong 已提交
1283 1284 1285 1286 1287 1288 1289 1290
//         if (pQueryAttr->limit.offset == 0) {
//           *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//           return true;
//         } else {
//           tw = win;
//           int32_t startPos =
//               getNextQualifiedWindow(pQueryAttr, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1);
//           assert(startPos >= 0);
1291
//
L
Liu Jicong 已提交
1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302
//           // set the abort info
//           pQueryAttr->pos = startPos;
//           pTableQueryInfo->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
//           pWindowResInfo->prevSKey = tw.skey;
//           win = tw;
//         }
//       } else {
//         break;  // offset is not 0, and next time window begins or ends in the next block.
//       }
//     }
//   }
1303
//
L
Liu Jicong 已提交
1304 1305
//   // check for error
//   if (terrno != TSDB_CODE_SUCCESS) {
1306
//     T_LONG_JMP(pRuntimeEnv->env, terrno);
L
Liu Jicong 已提交
1307
//   }
1308
//
L
Liu Jicong 已提交
1309 1310
//   return true;
// }
1311

1312
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) {
H
Haojun Liao 已提交
1313
  if (p->pDownstream == NULL) {
H
Haojun Liao 已提交
1314
    assert(p->numOfDownstream == 0);
1315 1316
  }

wafwerar's avatar
wafwerar 已提交
1317
  p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES);
1318 1319 1320 1321 1322 1323 1324
  if (p->pDownstream == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  memcpy(p->pDownstream, pDownstream, num * POINTER_BYTES);
  p->numOfDownstream = num;
  return TSDB_CODE_SUCCESS;
1325 1326
}

X
Xiaoyu Wang 已提交
1327
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) {
1328
  // todo add more information about exchange operation
1329
  int32_t type = pOperator->operatorType;
X
Xiaoyu Wang 已提交
1330
  if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN ||
1331
      type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN ||
1332 1333
      type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN ||
      type == QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN) {
1334 1335 1336
    *order = TSDB_ORDER_ASC;
    *scanFlag = MAIN_SCAN;
    return TSDB_CODE_SUCCESS;
1337
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
1338
    STableScanInfo* pTableScanInfo = pOperator->info;
H
Haojun Liao 已提交
1339 1340
    *order = pTableScanInfo->base.cond.order;
    *scanFlag = pTableScanInfo->base.scanFlag;
1341
    return TSDB_CODE_SUCCESS;
1342 1343
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) {
    STableMergeScanInfo* pTableScanInfo = pOperator->info;
H
Haojun Liao 已提交
1344 1345
    *order = pTableScanInfo->base.cond.order;
    *scanFlag = pTableScanInfo->base.scanFlag;
1346
    return TSDB_CODE_SUCCESS;
1347
  } else {
H
Haojun Liao 已提交
1348
    if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
1349
      return TSDB_CODE_INVALID_PARA;
H
Haojun Liao 已提交
1350
    } else {
1351
      return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag);
1352 1353 1354
    }
  }
}
1355

1356 1357 1358 1359 1360
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock **ppBlock) {
  if (!tsCountAlwaysReturnValue) {
    return TSDB_CODE_SUCCESS;
  }

1361
  SOperatorInfo* downstream = pOperator->pDownstream[0];
G
Ganlin Zhao 已提交
1362
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION ||
1363 1364 1365 1366 1367
      (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN &&
       ((STableScanInfo *)downstream->info)->hasGroupByTag == true)) {
    return TSDB_CODE_SUCCESS;
  }

1368 1369
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
  bool hasCountFunc = false;
1370

1371
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
1372 1373 1374
    const char* pName = pCtx[i].pExpr->pExpr->_function.functionName;
    if ((strcmp(pName, "count") == 0) || (strcmp(pName, "hyperloglog") == 0) ||
        (strcmp(pName, "_hyperloglog_partial") == 0) || (strcmp(pName, "_hyperloglog_merge") == 0)) {
1375 1376 1377 1378 1379 1380 1381 1382 1383 1384
      hasCountFunc = true;
      break;
    }
  }

  if (!hasCountFunc) {
    return TSDB_CODE_SUCCESS;
  }

  SSDataBlock* pBlock = createDataBlock();
G
Ganlin Zhao 已提交
1385
  pBlock->info.rows = 1;
1386 1387 1388
  pBlock->info.capacity = 0;

  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
G
Ganlin Zhao 已提交
1389 1390 1391 1392
    SColumnInfoData colInfo = {0};
    colInfo.hasNull = true;
    colInfo.info.type = TSDB_DATA_TYPE_NULL;
    colInfo.info.bytes = 1;
1393 1394 1395 1396 1397 1398

    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
G
Ganlin Zhao 已提交
1399 1400 1401 1402 1403 1404 1405
        int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
        if (slotId >= numOfCols) {
          taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1);
          for (int32_t k = numOfCols; k < slotId + 1; ++k) {
            taosArrayPush(pBlock->pDataBlock, &colInfo);
          }
        }
1406
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
G
Ganlin Zhao 已提交
1407
        // do nothing
1408 1409 1410 1411
      }
    }
  }

G
Ganlin Zhao 已提交
1412
  blockDataEnsureCapacity(pBlock, pBlock->info.rows);
1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426
  *ppBlock = pBlock;

  return TSDB_CODE_SUCCESS;
}

static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock **ppBlock) {
  if (!blockAllocated) {
    return;
  }

  blockDataDestroy(*ppBlock);
  *ppBlock = NULL;
}

1427
// this is a blocking operator
L
Liu Jicong 已提交
1428
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
1429 1430
  if (OPTR_IS_OPENED(pOperator)) {
    return TSDB_CODE_SUCCESS;
1431 1432
  }

H
Haojun Liao 已提交
1433
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
1434
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
1435

1436 1437
  SExprSupp*     pSup = &pOperator->exprSupp;
  SOperatorInfo* downstream = pOperator->pDownstream[0];
1438

1439 1440
  int64_t st = taosGetTimestampUs();

1441 1442 1443
  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;

1444 1445 1446
  bool    hasValidBlock = false;
  bool    blockAllocated = false;

H
Haojun Liao 已提交
1447
  while (1) {
1448
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
1449
    if (pBlock == NULL) {
G
Ganlin Zhao 已提交
1450
      if (!hasValidBlock) {
1451 1452 1453 1454 1455 1456 1457 1458
        createDataBlockForEmptyInput(pOperator, &pBlock);
        if (pBlock == NULL) {
          break;
        }
        blockAllocated = true;
      } else {
        break;
      }
1459
    }
1460
    hasValidBlock = true;
1461

1462 1463
    int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
    if (code != TSDB_CODE_SUCCESS) {
1464
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
1465
      T_LONG_JMP(pTaskInfo->env, code);
1466
    }
1467

1468
    // there is an scalar expression that needs to be calculated before apply the group aggregation.
G
Ganlin Zhao 已提交
1469
    if (pAggInfo->scalarExprSup.pExprInfo != NULL && !blockAllocated) {
1470 1471
      SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
      code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
1472
      if (code != TSDB_CODE_SUCCESS) {
1473
        destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
1474
        T_LONG_JMP(pTaskInfo->env, code);
1475
      }
1476 1477
    }

1478
    // the pDataBlock are always the same one, no need to call this again
H
Haojun Liao 已提交
1479
    setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
1480
    setInputDataBlock(pSup, pBlock, order, scanFlag, true);
1481
    code = doAggregateImpl(pOperator, pSup->pCtx);
1482
    if (code != 0) {
1483
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
1484
      T_LONG_JMP(pTaskInfo->env, code);
1485
    }
1486 1487 1488

    destroyDataBlockForEmptyInput(blockAllocated, &pBlock);

1489 1490
  }

1491 1492 1493 1494 1495
  // the downstream operator may return with error code, so let's check the code before generating results.
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
  }

1496
  initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
H
Haojun Liao 已提交
1497
  OPTR_SET_OPENED(pOperator);
1498

1499
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1500
  return pTaskInfo->code;
H
Haojun Liao 已提交
1501 1502
}

1503
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
1504
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
1505 1506 1507 1508 1509 1510
  SOptrBasicInfo*   pInfo = &pAggInfo->binfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

L
Liu Jicong 已提交
1511
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1512
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
H
Haojun Liao 已提交
1513
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1514
    setOperatorCompleted(pOperator);
H
Haojun Liao 已提交
1515 1516 1517
    return NULL;
  }

H
Haojun Liao 已提交
1518
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
S
slzhou 已提交
1519 1520
  while (1) {
    doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
H
Haojun Liao 已提交
1521
    doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
S
slzhou 已提交
1522

1523
    if (!hasRemainResults(&pAggInfo->groupResInfo)) {
H
Haojun Liao 已提交
1524
      setOperatorCompleted(pOperator);
S
slzhou 已提交
1525 1526
      break;
    }
1527

S
slzhou 已提交
1528 1529 1530 1531
    if (pInfo->pRes->info.rows > 0) {
      break;
    }
  }
1532

1533
  size_t rows = blockDataGetNumOfRows(pInfo->pRes);
1534 1535
  pOperator->resultInfo.totalRows += rows;

1536
  return (rows == 0) ? NULL : pInfo->pRes;
1537 1538
}

1539
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
C
Cary Xu 已提交
1540 1541 1542 1543 1544
  for (int32_t i = 0; i < numOfExprs; ++i) {
    SExprInfo* pExprInfo = &pExpr[i];
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
        taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
5
54liuyao 已提交
1545 1546
      } else if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_VALUE) {
        taosVariantDestroy(&pExprInfo->base.pParam[j].param);
H
Haojun Liao 已提交
1547
      }
1548
    }
C
Cary Xu 已提交
1549 1550 1551

    taosMemoryFree(pExprInfo->base.pParam);
    taosMemoryFree(pExprInfo->pExpr);
H
Haojun Liao 已提交
1552 1553 1554
  }
}

5
54liuyao 已提交
1555
void destroyOperatorInfo(SOperatorInfo* pOperator) {
1556 1557 1558 1559
  if (pOperator == NULL) {
    return;
  }

1560
  if (pOperator->fpSet.closeFn != NULL) {
1561
    pOperator->fpSet.closeFn(pOperator->info);
1562 1563
  }

H
Haojun Liao 已提交
1564
  if (pOperator->pDownstream != NULL) {
L
Liu Jicong 已提交
1565
    for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
H
Haojun Liao 已提交
1566
      destroyOperatorInfo(pOperator->pDownstream[i]);
1567 1568
    }

wafwerar's avatar
wafwerar 已提交
1569
    taosMemoryFreeClear(pOperator->pDownstream);
H
Haojun Liao 已提交
1570
    pOperator->numOfDownstream = 0;
1571 1572
  }

1573
  cleanupExprSupp(&pOperator->exprSupp);
wafwerar's avatar
wafwerar 已提交
1574
  taosMemoryFreeClear(pOperator);
1575 1576
}

1577 1578 1579 1580
// each operator should be set their own function to return total cost buffer
int32_t optrDefaultBufFn(SOperatorInfo* pOperator) {
  if (pOperator->blocking) {
    ASSERT(0);
H
Haojun Liao 已提交
1581
    return 0;
1582 1583 1584 1585 1586
  } else {
    return 0;
  }
}

1587 1588 1589 1590 1591 1592
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) {
  *defaultPgsz = 4096;
  while (*defaultPgsz < rowSize * 4) {
    *defaultPgsz <<= 1u;
  }

1593
  // The default buffer for each operator in query is 10MB.
1594
  // at least four pages need to be in buffer
1595 1596
  // TODO: make this variable to be configurable.
  *defaultBufsz = 4096 * 2560;
1597 1598 1599 1600 1601 1602 1603
  if ((*defaultBufsz) <= (*defaultPgsz)) {
    (*defaultBufsz) = (*defaultPgsz) * 4;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
1604 1605
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                         const char* pKey) {
L
Liu Jicong 已提交
1606
  int32_t    code = 0;
1607 1608
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);

1609
  pAggSup->currentPageId = -1;
dengyihao's avatar
dengyihao 已提交
1610 1611
  pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
  pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
H
Haojun Liao 已提交
1612
  pAggSup->pResultRowHashTable = tSimpleHashInit(100, hashFn);
1613

H
Haojun Liao 已提交
1614
  if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
1615 1616 1617
    return TSDB_CODE_OUT_OF_MEMORY;
  }

dengyihao's avatar
dengyihao 已提交
1618
  uint32_t defaultPgsz = 0;
1619 1620
  uint32_t defaultBufsz = 0;
  getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
H
Haojun Liao 已提交
1621

wafwerar's avatar
wafwerar 已提交
1622
  if (!osTempSpaceAvailable()) {
H
Haojun Liao 已提交
1623 1624 1625
    code = TSDB_CODE_NO_AVAIL_DISK;
    qError("Init stream agg supporter failed since %s, %s", terrstr(code), pKey);
    return code;
wafwerar's avatar
wafwerar 已提交
1626
  }
1627

H
Haojun Liao 已提交
1628
  code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
H
Haojun Liao 已提交
1629
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1630
    qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey);
H
Haojun Liao 已提交
1631 1632 1633
    return code;
  }

H
Haojun Liao 已提交
1634
  return code;
1635 1636
}

1637
void cleanupAggSup(SAggSupporter* pAggSup) {
wafwerar's avatar
wafwerar 已提交
1638
  taosMemoryFreeClear(pAggSup->keyBuf);
1639
  tSimpleHashCleanup(pAggSup->pResultRowHashTable);
H
Haojun Liao 已提交
1640
  destroyDiskbasedBuf(pAggSup->pResultBuf);
1641 1642
}

H
Haojun Liao 已提交
1643
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
L
Liu Jicong 已提交
1644
                    const char* pkey) {
1645 1646 1647 1648 1649
  int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1650 1651 1652 1653 1654
  code = doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

L
Liu Jicong 已提交
1655
  for (int32_t i = 0; i < numOfCols; ++i) {
1656
    pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf;
1657 1658
  }

1659
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1660 1661
}

L
Liu Jicong 已提交
1662
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) {
wmmhello's avatar
wmmhello 已提交
1663
  ASSERT(numOfRows != 0);
1664 1665
  pResultInfo->capacity = numOfRows;
  pResultInfo->threshold = numOfRows * 0.75;
1666

1667 1668
  if (pResultInfo->threshold == 0) {
    pResultInfo->threshold = numOfRows;
1669 1670 1671
  }
}

1672 1673 1674 1675 1676
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
  pInfo->pRes = pBlock;
  initResultRowInfo(&pInfo->resultRowInfo);
}

5
54liuyao 已提交
1677
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
1678 1679 1680 1681 1682 1683 1684 1685 1686 1687
  if (pCtx == NULL) {
    return NULL;
  }

  for (int32_t i = 0; i < numOfOutput; ++i) {
    for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) {
      taosVariantDestroy(&pCtx[i].param[j].param);
    }

    taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
1688
    taosMemoryFreeClear(pCtx[i].subsidiaries.buf);
1689 1690 1691 1692 1693 1694 1695 1696
    taosMemoryFree(pCtx[i].input.pData);
    taosMemoryFree(pCtx[i].input.pColumnDataAgg);
  }

  taosMemoryFreeClear(pCtx);
  return NULL;
}

1697
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) {
1698 1699 1700 1701
  pSup->pExprInfo = pExprInfo;
  pSup->numOfExprs = numOfExpr;
  if (pSup->pExprInfo != NULL) {
    pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset);
1702 1703 1704
    if (pSup->pCtx == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1705
  }
1706 1707

  return TSDB_CODE_SUCCESS;
1708 1709
}

1710 1711 1712 1713
void cleanupExprSupp(SExprSupp* pSupp) {
  destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs);
  if (pSupp->pExprInfo != NULL) {
    destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
C
Cary Xu 已提交
1714
    taosMemoryFreeClear(pSupp->pExprInfo);
1715
  }
H
Haojun Liao 已提交
1716 1717 1718 1719 1720 1721

  if (pSupp->pFilterInfo != NULL) {
    filterFreeInfo(pSupp->pFilterInfo);
    pSupp->pFilterInfo = NULL;
  }

1722 1723 1724
  taosMemoryFree(pSupp->rowEntryInfoOffset);
}

1725 1726
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode,
                                           SExecTaskInfo* pTaskInfo) {
wafwerar's avatar
wafwerar 已提交
1727
  SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
L
Liu Jicong 已提交
1728
  SOperatorInfo*    pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
1729 1730 1731
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
H
Haojun Liao 已提交
1732

H
Haojun Liao 已提交
1733
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc);
H
Haojun Liao 已提交
1734 1735 1736
  initBasicInfo(&pInfo->binfo, pResBlock);

  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
1737
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
1738

1739 1740
  int32_t    num = 0;
  SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
H
Haojun Liao 已提交
1741
  int32_t    code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
L
Liu Jicong 已提交
1742
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1743 1744
    goto _error;
  }
H
Haojun Liao 已提交
1745

H
Haojun Liao 已提交
1746 1747 1748 1749 1750 1751
  int32_t    numOfScalarExpr = 0;
  SExprInfo* pScalarExprInfo = NULL;
  if (pAggNode->pExprs != NULL) {
    pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
  }

1752 1753 1754 1755
  code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
1756

H
Haojun Liao 已提交
1757 1758 1759 1760 1761
  code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
1762
  pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
1763
  pInfo->groupId = UINT64_MAX;
H
Haojun Liao 已提交
1764

1765 1766
  setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
1767
  pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
1768

1769 1770
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    STableScanInfo* pTableScanInfo = downstream->info;
H
Haojun Liao 已提交
1771 1772
    pTableScanInfo->base.pdInfo.pExprSup = &pOperator->exprSupp;
    pTableScanInfo->base.pdInfo.pAggSup = &pInfo->aggSup;
1773 1774
  }

H
Haojun Liao 已提交
1775 1776 1777 1778
  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
1779 1780

  return pOperator;
H
Haojun Liao 已提交
1781

1782
_error:
H
Haojun Liao 已提交
1783 1784 1785 1786
  if (pInfo != NULL) {
    destroyAggOperatorInfo(pInfo);
  }

1787 1788 1789
  if (pOperator != NULL) {
    cleanupExprSupp(&pOperator->exprSupp);
  }
H
Haojun Liao 已提交
1790

1791
  taosMemoryFreeClear(pOperator);
H
Haojun Liao 已提交
1792
  pTaskInfo->code = code;
H
Haojun Liao 已提交
1793
  return NULL;
1794 1795
}

1796
void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
1797
  assert(pInfo != NULL);
H
Haojun Liao 已提交
1798
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
1799 1800
}

H
Haojun Liao 已提交
1801 1802 1803 1804 1805 1806 1807
static void freeItem(void* pItem) {
  void** p = pItem;
  if (*p != NULL) {
    taosMemoryFreeClear(*p);
  }
}

1808
void destroyAggOperatorInfo(void* param) {
L
Liu Jicong 已提交
1809
  SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
L
Liu Jicong 已提交
1810 1811
  cleanupBasicInfo(&pInfo->binfo);

H
Haojun Liao 已提交
1812
  cleanupAggSup(&pInfo->aggSup);
S
shenglian zhou 已提交
1813
  cleanupExprSupp(&pInfo->scalarExprSup);
H
Haojun Liao 已提交
1814
  cleanupGroupResInfo(&pInfo->groupResInfo);
D
dapan1121 已提交
1815
  taosMemoryFreeClear(param);
1816
}
1817

D
dapan1121 已提交
1818
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model, char* dbFName) {
wafwerar's avatar
wafwerar 已提交
1819
  SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
H
Haojun Liao 已提交
1820 1821 1822 1823 1824
  if (pTaskInfo == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

1825
  setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
1826

1827
  pTaskInfo->schemaInfo.dbname = strdup(dbFName);
H
Haojun Liao 已提交
1828
  pTaskInfo->id.queryId = queryId;
dengyihao's avatar
dengyihao 已提交
1829
  pTaskInfo->execModel = model;
H
Haojun Liao 已提交
1830
  pTaskInfo->pTableInfoList = tableListCreate();
D
dapan1121 已提交
1831
  pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo));
H
Haojun Liao 已提交
1832
  pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES);
H
Haojun Liao 已提交
1833

wafwerar's avatar
wafwerar 已提交
1834
  char* p = taosMemoryCalloc(1, 128);
L
Liu Jicong 已提交
1835
  snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId);
H
Haojun Liao 已提交
1836
  pTaskInfo->id.str = p;
H
Haojun Liao 已提交
1837

1838 1839
  return pTaskInfo;
}
H
Haojun Liao 已提交
1840

H
Haojun Liao 已提交
1841 1842
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);

1843
int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) {
1844 1845
  SMetaReader mr = {0};
  metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
1846
  int32_t code = metaGetTableEntryByUidCache(&mr, pScanNode->uid);
1847
  if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
1848 1849
    qError("failed to get the table meta, uid:0x%" PRIx64 ", suid:0x%" PRIx64 ", %s", pScanNode->uid, pScanNode->suid,
           GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
1850

D
dapan1121 已提交
1851
    metaReaderClear(&mr);
1852
    return terrno;
D
dapan1121 已提交
1853
  }
1854

1855 1856
  SSchemaInfo* pSchemaInfo = &pTaskInfo->schemaInfo;
  pSchemaInfo->tablename = strdup(mr.me.name);
1857 1858

  if (mr.me.type == TSDB_SUPER_TABLE) {
1859 1860
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
    pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
1861
  } else if (mr.me.type == TSDB_CHILD_TABLE) {
1862 1863
    tDecoderClear(&mr.coder);

1864
    tb_uid_t suid = mr.me.ctbEntry.suid;
H
Haojun Liao 已提交
1865
    metaGetTableEntryByUidCache(&mr, suid);
1866 1867
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
    pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
1868
  } else {
1869
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
1870
  }
1871 1872

  metaReaderClear(&mr);
1873

H
Haojun Liao 已提交
1874 1875 1876 1877 1878
  pSchemaInfo->qsw = extractQueriedColumnSchema(pScanNode);
  return TSDB_CODE_SUCCESS;
}

SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
1879 1880 1881
  int32_t numOfCols = LIST_LENGTH(pScanNode->pScanCols);
  int32_t numOfTags = LIST_LENGTH(pScanNode->pScanPseudoCols);

1882
  SSchemaWrapper* pqSw = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
1883
  pqSw->pSchema = taosMemoryCalloc(numOfCols + numOfTags, sizeof(SSchema));
1884

L
Liu Jicong 已提交
1885
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
1886
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanCols, i);
1887 1888
    SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

H
Haojun Liao 已提交
1889 1890 1891
    SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
    pSchema->colId = pColNode->colId;
    pSchema->type = pColNode->node.resType.type;
H
Haojun Liao 已提交
1892 1893
    pSchema->bytes = pColNode->node.resType.bytes;
    tstrncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
1894 1895
  }

1896
  // this the tags and pseudo function columns, we only keep the tag columns
1897
  for (int32_t i = 0; i < numOfTags; ++i) {
1898 1899 1900 1901 1902 1903 1904 1905 1906
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanPseudoCols, i);

    int32_t type = nodeType(pNode->pExpr);
    if (type == QUERY_NODE_COLUMN) {
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

      SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
      pSchema->colId = pColNode->colId;
      pSchema->type = pColNode->node.resType.type;
H
Haojun Liao 已提交
1907
      pSchema->bytes = pColNode->node.resType.bytes;
H
Haojun Liao 已提交
1908
      tstrncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
1909 1910 1911
    }
  }

H
Haojun Liao 已提交
1912
  return pqSw;
1913 1914
}

1915 1916
static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
  taosMemoryFreeClear(pSchemaInfo->dbname);
1917
  taosMemoryFreeClear(pSchemaInfo->tablename);
1918 1919
  tDeleteSSchemaWrapper(pSchemaInfo->sw);
  tDeleteSSchemaWrapper(pSchemaInfo->qsw);
1920 1921
}

1922
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); }
1923

1924
bool groupbyTbname(SNodeList* pGroupList) {
1925
  bool bytbname = false;
1926
  if (LIST_LENGTH(pGroupList) == 1) {
1927 1928 1929 1930 1931 1932 1933 1934 1935 1936
    SNode* p = nodesListGetNode(pGroupList, 0);
    if (p->type == QUERY_NODE_FUNCTION) {
      // partition by tbname/group by tbname
      bytbname = (strcmp(((struct SFunctionNode*)p)->functionName, "tbname") == 0);
    }
  }

  return bytbname;
}

1937 1938
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond,
                                  SNode* pTagIndexCond, const char* pUser) {
1939
  int32_t         type = nodeType(pPhyNode);
1940
  STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
1941
  const char*     idstr = GET_TASKID(pTaskInfo);
1942

X
Xiaoyu Wang 已提交
1943
  if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
1944
    SOperatorInfo* pOperator = NULL;
H
Haojun Liao 已提交
1945
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
dengyihao's avatar
dengyihao 已提交
1946
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
1947

1948 1949 1950 1951 1952 1953
      // NOTE: this is an patch to fix the physical plan
      // TODO remove it later
      if (pTableScanNode->scan.node.pLimit != NULL) {
        pTableScanNode->groupSort = true;
      }

L
Liu Jicong 已提交
1954 1955
      int32_t code =
          createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
H
Haojun Liao 已提交
1956
                                  pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
1957
      if (code) {
wmmhello's avatar
wmmhello 已提交
1958
        pTaskInfo->code = code;
1959
        qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr);
D
dapan1121 已提交
1960 1961
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
1962

1963
      code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo);
S
slzhou 已提交
1964
      if (code) {
1965
        pTaskInfo->code = terrno;
wmmhello's avatar
wmmhello 已提交
1966 1967 1968
        return NULL;
      }

1969
      pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
D
dapan1121 已提交
1970 1971 1972 1973 1974
      if (NULL == pOperator) {
        pTaskInfo->code = terrno;
        return NULL;
      }

1975
      STableScanInfo* pScanInfo = pOperator->info;
H
Haojun Liao 已提交
1976
      pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
S
slzhou 已提交
1977 1978
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
      STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
1979 1980 1981

      int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, true, pHandle,
                                             pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
L
Liu Jicong 已提交
1982
      if (code) {
wmmhello's avatar
wmmhello 已提交
1983
        pTaskInfo->code = code;
H
Haojun Liao 已提交
1984
        qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
wmmhello's avatar
wmmhello 已提交
1985 1986
        return NULL;
      }
1987

1988
      code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo);
wmmhello's avatar
wmmhello 已提交
1989 1990 1991 1992
      if (code) {
        pTaskInfo->code = terrno;
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
1993

H
Haojun Liao 已提交
1994
      pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
D
dapan1121 已提交
1995 1996 1997 1998
      if (NULL == pOperator) {
        pTaskInfo->code = terrno;
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
1999

2000
      STableScanInfo* pScanInfo = pOperator->info;
H
Haojun Liao 已提交
2001
      pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
H
Haojun Liao 已提交
2002
    } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
2003 2004
      pOperator = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode,
                                             pTaskInfo);
H
Haojun Liao 已提交
2005
    } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
5
54liuyao 已提交
2006
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
5
54liuyao 已提交
2007
      if (pHandle->vnode) {
L
Liu Jicong 已提交
2008 2009
        int32_t code =
            createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
H
Haojun Liao 已提交
2010
                                    pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
L
Liu Jicong 已提交
2011
        if (code) {
wmmhello's avatar
wmmhello 已提交
2012
          pTaskInfo->code = code;
H
Haojun Liao 已提交
2013
          qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2014 2015
          return NULL;
        }
L
Liu Jicong 已提交
2016 2017

#ifndef NDEBUG
H
Haojun Liao 已提交
2018
        int32_t sz = tableListGetSize(pTableListInfo);
H
Haojun Liao 已提交
2019 2020
        qDebug("create stream task, total:%d", sz);

L
Liu Jicong 已提交
2021
        for (int32_t i = 0; i < sz; i++) {
H
Haojun Liao 已提交
2022
          STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
2023
          qDebug("add table uid:%" PRIu64 ", gid:%" PRIu64, pKeyInfo->uid, pKeyInfo->groupId);
L
Liu Jicong 已提交
2024 2025
        }
#endif
2026
      }
2027

H
Haojun Liao 已提交
2028
      pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan);
2029
      pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo);
H
Haojun Liao 已提交
2030
    } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
L
Liu Jicong 已提交
2031
      SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
2032
      pOperator = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo);
S
shenglian zhou 已提交
2033 2034 2035
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN == type) {
      STableCountScanPhysiNode* pTblCountScanNode = (STableCountScanPhysiNode*)pPhyNode;
      pOperator = createTableCountScanOperatorInfo(pHandle, pTblCountScanNode, pTaskInfo);
2036
    } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
X
Xiaoyu Wang 已提交
2037
      STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
2038 2039

      int32_t code = createScanTableListInfo(pScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
H
Haojun Liao 已提交
2040
                                             pTagIndexCond, pTaskInfo);
2041
      if (code != TSDB_CODE_SUCCESS) {
2042
        pTaskInfo->code = code;
H
Haojun Liao 已提交
2043
        qError("failed to getTableList, code: %s", tstrerror(code));
2044 2045 2046
        return NULL;
      }

H
Haojun Liao 已提交
2047
      pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTaskInfo);
2048
    } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
2049
      SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
2050 2051

      if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
H
Haojun Liao 已提交
2052 2053
        SArray* pList = taosArrayInit(4, sizeof(STableKeyInfo));
        int32_t code = vnodeGetAllTableList(pHandle->vnode, pBlockNode->uid, pList);
2054 2055 2056 2057
        if (code != TSDB_CODE_SUCCESS) {
          pTaskInfo->code = terrno;
          return NULL;
        }
H
Haojun Liao 已提交
2058

H
Haojun Liao 已提交
2059 2060
        size_t num = taosArrayGetSize(pList);
        for (int32_t i = 0; i < num; ++i) {
H
Haojun Liao 已提交
2061
          STableKeyInfo* p = taosArrayGet(pList, i);
H
Haojun Liao 已提交
2062
          tableListAddTableInfo(pTableListInfo, p->uid, 0);
H
Haojun Liao 已提交
2063
        }
H
Haojun Liao 已提交
2064

H
Haojun Liao 已提交
2065
        taosArrayDestroy(pList);
2066
      } else {  // Create group with only one table
H
Haojun Liao 已提交
2067
        tableListAddTableInfo(pTableListInfo, pBlockNode->uid, 0);
2068 2069
      }

2070
      pOperator = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTaskInfo);
H
Haojun Liao 已提交
2071 2072 2073
    } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
      SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;

L
Liu Jicong 已提交
2074
      int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo,
H
Haojun Liao 已提交
2075
                                             pTagCond, pTagIndexCond, pTaskInfo);
2076 2077 2078 2079
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
      }
2080

2081
      code = extractTableSchemaInfo(pHandle, &pScanNode->scan, pTaskInfo);
2082 2083 2084
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
H
Haojun Liao 已提交
2085 2086
      }

2087
      pOperator = createCacherowsScanOperator(pScanNode, pHandle, pTaskInfo);
2088
    } else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
2089
      pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
2090 2091
    } else {
      ASSERT(0);
H
Haojun Liao 已提交
2092
    }
2093 2094 2095 2096 2097

    if (pOperator != NULL) {
      pOperator->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
    }

2098
    return pOperator;
H
Haojun Liao 已提交
2099 2100
  }

2101
  size_t          size = LIST_LENGTH(pPhyNode->pChildren);
2102
  SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
2103 2104 2105 2106
  if (ops == NULL) {
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
2107
  for (int32_t i = 0; i < size; ++i) {
2108
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
2109
    ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTagCond, pTagIndexCond, pUser);
2110
    if (ops[i] == NULL) {
H
Haojun Liao 已提交
2111
      taosMemoryFree(ops);
2112 2113
      return NULL;
    }
2114
  }
H
Haojun Liao 已提交
2115

2116
  SOperatorInfo* pOptr = NULL;
H
Haojun Liao 已提交
2117
  if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
2118
    pOptr = createProjectOperatorInfo(ops[0], (SProjectPhysiNode*)pPhyNode, pTaskInfo);
2119
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) {
H
Haojun Liao 已提交
2120 2121
    SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
    if (pAggNode->pGroupKeys != NULL) {
H
Haojun Liao 已提交
2122
      pOptr = createGroupOperatorInfo(ops[0], pAggNode, pTaskInfo);
H
Haojun Liao 已提交
2123
    } else {
H
Haojun Liao 已提交
2124
      pOptr = createAggregateOperatorInfo(ops[0], pAggNode, pTaskInfo);
H
Haojun Liao 已提交
2125
    }
2126
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type) {
H
Haojun Liao 已提交
2127
    SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
2128

H
Haojun Liao 已提交
2129 2130
    bool isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
    pOptr = createIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo, isStream);
2131
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
2132
    pOptr = createStreamIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo);
2133 2134
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
    SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode;
2135
    pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo);
S
shenglian zhou 已提交
2136
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
X
Xiaoyu Wang 已提交
2137
    SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
2138
    pOptr = createMergeIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo);
5
54liuyao 已提交
2139
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
2140
    int32_t children = 0;
5
54liuyao 已提交
2141 2142
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) {
5
54liuyao 已提交
2143
    int32_t children = pHandle->numOfVgroups;
5
54liuyao 已提交
2144
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
2145
  } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
2146
    pOptr = createSortOperatorInfo(ops[0], (SSortPhysiNode*)pPhyNode, pTaskInfo);
S
shenglian zhou 已提交
2147 2148
  } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT == type) {
    pOptr = createGroupSortOperatorInfo(ops[0], (SGroupSortPhysiNode*)pPhyNode, pTaskInfo);
X
Xiaoyu Wang 已提交
2149
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) {
2150
    SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode;
2151
    pOptr = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo);
2152
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
H
Haojun Liao 已提交
2153
    SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
2154
    pOptr = createSessionAggOperatorInfo(ops[0], pSessionNode, pTaskInfo);
2155
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
2156 2157 2158 2159 2160
    pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
    int32_t children = 0;
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION == type) {
2161
    int32_t children = pHandle->numOfVgroups;
2162
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
2163
  } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
2164
    pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
2165
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION == type) {
2166
    pOptr = createStreamPartitionOperatorInfo(ops[0], (SStreamPartitionPhysiNode*)pPhyNode, pTaskInfo);
2167
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
dengyihao's avatar
dengyihao 已提交
2168
    SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
2169
    pOptr = createStatewindowOperatorInfo(ops[0], pStateNode, pTaskInfo);
2170
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
5
54liuyao 已提交
2171
    pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
2172
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
2173
    pOptr = createMergeJoinOperatorInfo(ops, size, (SSortMergeJoinPhysiNode*)pPhyNode, pTaskInfo);
2174
  } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
H
Haojun Liao 已提交
2175
    pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo);
5
54liuyao 已提交
2176 2177
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL == type) {
    pOptr = createStreamFillOperatorInfo(ops[0], (SStreamFillPhysiNode*)pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
2178 2179
  } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
    pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo);
2180 2181
  } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
    pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
2182 2183
  } else {
    ASSERT(0);
H
Haojun Liao 已提交
2184
  }
2185

2186
  taosMemoryFree(ops);
2187 2188 2189 2190
  if (pOptr) {
    pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
  }

2191
  return pOptr;
2192
}
H
Haojun Liao 已提交
2193

L
Liu Jicong 已提交
2194 2195 2196 2197
static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator");
S
Shengliang Guan 已提交
2198
      return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
2199 2200 2201 2202
    }

    if (pOperator->numOfDownstream > 1) {
      qError("join not supported for stream block scan");
S
Shengliang Guan 已提交
2203
      return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
2204 2205 2206
    }
    return extractTbscanInStreamOpTree(pOperator->pDownstream[0], ppInfo);
  } else {
2207 2208 2209
    SStreamScanInfo* pInfo = pOperator->info;
    ASSERT(pInfo->pTableScanOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
    *ppInfo = pInfo->pTableScanOp->info;
L
Liu Jicong 已提交
2210 2211 2212 2213
    return 0;
  }
}

2214 2215 2216 2217 2218 2219 2220
int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) {
  if (pNode->pChildren == NULL || LIST_LENGTH(pNode->pChildren) == 0) {
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == pNode->type) {
      *ppNode = (STableScanPhysiNode*)pNode;
      return 0;
    } else {
      ASSERT(0);
S
Shengliang Guan 已提交
2221
      terrno = TSDB_CODE_APP_ERROR;
2222 2223 2224 2225 2226
      return -1;
    }
  } else {
    if (LIST_LENGTH(pNode->pChildren) != 1) {
      ASSERT(0);
S
Shengliang Guan 已提交
2227
      terrno = TSDB_CODE_APP_ERROR;
2228 2229 2230 2231 2232 2233 2234 2235
      return -1;
    }
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pNode->pChildren, 0);
    return extractTableScanNode(pChildNode, ppNode);
  }
  return -1;
}

2236
#if 0
L
Liu Jicong 已提交
2237 2238 2239 2240 2241
int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHandle, int64_t uid, int64_t ts) {
  STableScanInfo* pTableScanInfo = NULL;
  if (extractTbscanInStreamOpTree(pOperator, &pTableScanInfo) < 0) {
    return -1;
  }
2242

L
Liu Jicong 已提交
2243 2244 2245 2246
  STableScanPhysiNode* pNode = NULL;
  if (extractTableScanNode(plan->pNode, &pNode) < 0) {
    ASSERT(0);
  }
2247

H
Haojun Liao 已提交
2248
  tsdbReaderClose(pTableScanInfo->dataReader);
2249

L
Liu Jicong 已提交
2250
  STableListInfo info = {0};
H
Haojun Liao 已提交
2251
  pTableScanInfo->dataReader = doCreateDataReader(pNode, pHandle, &info, NULL);
L
Liu Jicong 已提交
2252 2253 2254
  if (pTableScanInfo->dataReader == NULL) {
    ASSERT(0);
    qError("failed to create data reader");
S
Shengliang Guan 已提交
2255
    return TSDB_CODE_APP_ERROR;
2256
  }
L
Liu Jicong 已提交
2257
  // TODO: set uid and ts to data reader
2258 2259
  return 0;
}
2260
#endif
2261

D
dapan1121 已提交
2262
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle) {
D
dapan1121 已提交
2263
  SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo;
2264

D
dapan1121 已提交
2265
  switch (pNode->type) {
D
dapan1121 已提交
2266 2267 2268 2269 2270 2271
    case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
      SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
      if (NULL == pInserterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      pInserterParam->readHandle = readHandle;
L
Liu Jicong 已提交
2272

D
dapan1121 已提交
2273 2274 2275
      *pParam = pInserterParam;
      break;
    }
D
dapan1121 已提交
2276
    case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
2277
      SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
D
dapan1121 已提交
2278 2279 2280
      if (NULL == pDeleterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
H
Haojun Liao 已提交
2281 2282 2283 2284
      int32_t tbNum = tableListGetSize(pTask->pTableInfoList);
      pDeleterParam->suid = tableListGetSuid(pTask->pTableInfoList);

      // TODO extract uid list
D
dapan1121 已提交
2285 2286 2287 2288 2289
      pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t));
      if (NULL == pDeleterParam->pUidList) {
        taosMemoryFree(pDeleterParam);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
H
Haojun Liao 已提交
2290

D
dapan1121 已提交
2291
      for (int32_t i = 0; i < tbNum; ++i) {
H
Haojun Liao 已提交
2292
        STableKeyInfo* pTable = tableListGetInfo(pTask->pTableInfoList, i);
D
dapan1121 已提交
2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305
        taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
      }

      *pParam = pDeleterParam;
      break;
    }
    default:
      break;
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2306
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
D
dapan1121 已提交
2307
                               char* sql, EOPTR_EXEC_MODEL model) {
H
Haojun Liao 已提交
2308 2309
  uint64_t queryId = pPlan->id.queryId;

D
dapan1121 已提交
2310
  *pTaskInfo = createExecTaskInfo(queryId, taskId, model, pPlan->dbFName);
H
Haojun Liao 已提交
2311 2312 2313
  if (*pTaskInfo == NULL) {
    goto _complete;
  }
H
Haojun Liao 已提交
2314

2315
  if (pHandle) {
L
Liu Jicong 已提交
2316
    /*(*pTaskInfo)->streamInfo.fillHistoryVer1 = pHandle->fillHistoryVer1;*/
2317 2318 2319
    if (pHandle->pStateBackend) {
      (*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend;
    }
H
Haojun Liao 已提交
2320 2321
  }

2322
  (*pTaskInfo)->sql = sql;
D
dapan1121 已提交
2323
  sql = NULL;
H
Haojun Liao 已提交
2324

2325
  (*pTaskInfo)->pSubplan = pPlan;
2326 2327
  (*pTaskInfo)->pRoot =
      createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user);
L
Liu Jicong 已提交
2328

D
dapan1121 已提交
2329
  if (NULL == (*pTaskInfo)->pRoot) {
H
Haojun Liao 已提交
2330
    terrno = (*pTaskInfo)->code;
D
dapan1121 已提交
2331
    goto _complete;
2332 2333
  }

2334
  (*pTaskInfo)->cost.created = taosGetTimestampUs();
H
Haojun Liao 已提交
2335
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2336

H
Haojun Liao 已提交
2337
_complete:
D
dapan1121 已提交
2338
  taosMemoryFree(sql);
H
Haojun Liao 已提交
2339
  doDestroyTask(*pTaskInfo);
H
Haojun Liao 已提交
2340
  return terrno;
H
Haojun Liao 已提交
2341 2342
}

H
Haojun Liao 已提交
2343 2344 2345 2346 2347
static void freeBlock(void* pParam) {
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
  blockDataDestroy(pBlock);
}

L
Liu Jicong 已提交
2348
void doDestroyTask(SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
2349 2350
  qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));

H
Haojun Liao 已提交
2351
  pTaskInfo->pTableInfoList = tableListDestroy(pTaskInfo->pTableInfoList);
H
Haojun Liao 已提交
2352
  destroyOperatorInfo(pTaskInfo->pRoot);
2353
  cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);
2354
  cleanupStreamInfo(&pTaskInfo->streamInfo);
2355

D
dapan1121 已提交
2356
  if (!pTaskInfo->localFetch.localExec) {
D
dapan1121 已提交
2357 2358
    nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
  }
2359

H
Haojun Liao 已提交
2360
  taosArrayDestroyEx(pTaskInfo->pResultBlockList, freeBlock);
D
dapan1121 已提交
2361
  taosArrayDestroy(pTaskInfo->stopInfo.pStopInfo);
wafwerar's avatar
wafwerar 已提交
2362 2363 2364
  taosMemoryFreeClear(pTaskInfo->sql);
  taosMemoryFreeClear(pTaskInfo->id.str);
  taosMemoryFreeClear(pTaskInfo);
2365 2366 2367 2368
}

static int64_t getQuerySupportBufSize(size_t numOfTables) {
  size_t s1 = sizeof(STableQueryInfo);
L
Liu Jicong 已提交
2369 2370
  //  size_t s3 = sizeof(STableCheckInfo);  buffer consumption in tsdb
  return (int64_t)(s1 * 1.5 * numOfTables);
2371 2372 2373 2374 2375 2376 2377
}

int32_t checkForQueryBuf(size_t numOfTables) {
  int64_t t = getQuerySupportBufSize(numOfTables);
  if (tsQueryBufferSizeBytes < 0) {
    return TSDB_CODE_SUCCESS;
  } else if (tsQueryBufferSizeBytes > 0) {
L
Liu Jicong 已提交
2378
    while (1) {
2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404
      int64_t s = tsQueryBufferSizeBytes;
      int64_t remain = s - t;
      if (remain >= 0) {
        if (atomic_val_compare_exchange_64(&tsQueryBufferSizeBytes, s, remain) == s) {
          return TSDB_CODE_SUCCESS;
        }
      } else {
        return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
      }
    }
  }

  // disable query processing if the value of tsQueryBufferSize is zero.
  return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
}

void releaseQueryBuf(size_t numOfTables) {
  if (tsQueryBufferSizeBytes < 0) {
    return;
  }

  int64_t t = getQuerySupportBufSize(numOfTables);

  // restore value is not enough buffer available
  atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
}
D
dapan1121 已提交
2405

H
Haojun Liao 已提交
2406
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) {
2407
  SExplainExecInfo  execInfo = {0};
H
Haojun Liao 已提交
2408
  SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo);
2409

H
Haojun Liao 已提交
2410 2411 2412 2413 2414
  pExplainInfo->numOfRows = operatorInfo->resultInfo.totalRows;
  pExplainInfo->startupCost = operatorInfo->cost.openCost;
  pExplainInfo->totalCost = operatorInfo->cost.totalCost;
  pExplainInfo->verboseLen = 0;
  pExplainInfo->verboseInfo = NULL;
D
dapan1121 已提交
2415

2416
  if (operatorInfo->fpSet.getExplainFn) {
2417 2418
    int32_t code =
        operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen);
D
dapan1121 已提交
2419
    if (code) {
2420
      qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
D
dapan1121 已提交
2421 2422 2423
      return code;
    }
  }
dengyihao's avatar
dengyihao 已提交
2424

D
dapan1121 已提交
2425
  int32_t code = 0;
D
dapan1121 已提交
2426
  for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
H
Haojun Liao 已提交
2427 2428
    code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pExecInfoList);
    if (code != TSDB_CODE_SUCCESS) {
2429
      //      taosMemoryFreeClear(*pRes);
S
Shengliang Guan 已提交
2430
      return TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
2431 2432 2433 2434
    }
  }

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
2435
}
5
54liuyao 已提交
2436

2437 2438
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
                     SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup) {
2439 2440 2441 2442 2443 2444
  SWinKey key = {
      .ts = win->skey,
      .groupId = tableGroupId,
  };
  char*   value = NULL;
  int32_t size = pAggSup->resultRowSize;
5
54liuyao 已提交
2445

2446
  if (streamStateAddIfNotExist(pState, &key, (void**)&value, &size) < 0) {
S
Shengliang Guan 已提交
2447
    return TSDB_CODE_OUT_OF_MEMORY;
2448 2449 2450 2451 2452 2453 2454 2455 2456
  }
  *pResult = (SResultRow*)value;
  ASSERT(*pResult);
  // set time window for current result
  (*pResult)->win = (*win);
  setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset);
  return TSDB_CODE_SUCCESS;
}

2457 2458
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult) {
  streamStateReleaseBuf(pState, pKey, pResult);
2459 2460 2461
  return TSDB_CODE_SUCCESS;
}

2462 2463
int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize) {
  streamStatePut(pState, pKey, pResult, resSize);
2464 2465 2466
  return TSDB_CODE_SUCCESS;
}

2467
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
2468
                                   SGroupResInfo* pGroupResInfo) {
2469
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);

  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
    int32_t     size = 0;
    void*       pVal = NULL;
    SWinKey     key = {
2482 2483
            .ts = *(TSKEY*)pPos->key,
            .groupId = pPos->groupId,
2484
    };
2485
    int32_t code = streamStateGet(pState, &key, &pVal, &size);
2486 2487 2488 2489 2490 2491
    ASSERT(code == 0);
    SResultRow* pRow = (SResultRow*)pVal;
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
    // no results, continue to check the next one
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
2492
      releaseOutputBuf(pState, &key, pRow);
2493 2494 2495
      continue;
    }

H
Haojun Liao 已提交
2496 2497
    if (pBlock->info.id.groupId == 0) {
      pBlock->info.id.groupId = pPos->groupId;
2498
      void* tbname = NULL;
H
Haojun Liao 已提交
2499
      if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
L
Liu Jicong 已提交
2500
        pBlock->info.parTbName[0] = 0;
2501 2502
      } else {
        memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
2503
      }
2504
      tdbFree(tbname);
2505 2506
    } else {
      // current value belongs to different group, it can't be packed into one datablock
H
Haojun Liao 已提交
2507
      if (pBlock->info.id.groupId != pPos->groupId) {
2508
        releaseOutputBuf(pState, &key, pRow);
2509 2510 2511 2512 2513 2514
        break;
      }
    }

    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
      ASSERT(pBlock->info.rows > 0);
2515
      releaseOutputBuf(pState, &key, pRow);
2516 2517 2518 2519 2520 2521 2522 2523 2524 2525
      break;
    }

    pGroupResInfo->index += 1;

    for (int32_t j = 0; j < numOfExprs; ++j) {
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
      if (pCtx[j].fpSet.finalize) {
2526 2527 2528 2529
        int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
        if (TAOS_FAILED(code1)) {
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
          T_LONG_JMP(pTaskInfo->env, code1);
2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542
        }
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
      } else {
        // expand the result into multiple rows. E.g., _wstart, top(k, 20)
        // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
        for (int32_t k = 0; k < pRow->numOfRows; ++k) {
          colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
        }
      }
    }
5
54liuyao 已提交
2543

2544
    pBlock->info.rows += pRow->numOfRows;
2545
    releaseOutputBuf(pState, &key, pRow);
2546
  }
2547
  pBlock->info.dataLoad = 1;
2548 2549 2550
  blockDataUpdateTsWindow(pBlock, 0);
  return TSDB_CODE_SUCCESS;
}
5
54liuyao 已提交
2551 2552 2553 2554 2555 2556 2557

int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size) {
  streamStateSessionPut(pState, key, (const void*)buf, size);
  releaseOutputBuf(pState, NULL, (SResultRow*)buf);
  return TSDB_CODE_SUCCESS;
}

2558
int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
5
54liuyao 已提交
2559
                                    SExprSupp* pSup, SGroupResInfo* pGroupResInfo) {
2560
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
5
54liuyao 已提交
2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);

  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
    SSessionKey* pKey = taosArrayGet(pGroupResInfo->pRows, i);
    int32_t      size = 0;
    void*        pVal = NULL;
    int32_t      code = streamStateSessionGet(pState, pKey, &pVal, &size);
    ASSERT(code == 0);
2574 2575
    if (code == -1) {
      // coverity scan
5
54liuyao 已提交
2576
      pGroupResInfo->index += 1;
2577 2578
      continue;
    }
5
54liuyao 已提交
2579 2580 2581 2582 2583 2584 2585 2586 2587
    SResultRow* pRow = (SResultRow*)pVal;
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
    // no results, continue to check the next one
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
      releaseOutputBuf(pState, NULL, pRow);
      continue;
    }

H
Haojun Liao 已提交
2588 2589
    if (pBlock->info.id.groupId == 0) {
      pBlock->info.id.groupId = pKey->groupId;
2590

2591
      void* tbname = NULL;
H
Haojun Liao 已提交
2592
      if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
2593
        pBlock->info.parTbName[0] = 0;
2594
      } else {
2595
        memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
2596
      }
2597
      tdbFree(tbname);
5
54liuyao 已提交
2598 2599
    } else {
      // current value belongs to different group, it can't be packed into one datablock
H
Haojun Liao 已提交
2600
      if (pBlock->info.id.groupId != pKey->groupId) {
5
54liuyao 已提交
2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636
        releaseOutputBuf(pState, NULL, pRow);
        break;
      }
    }

    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
      ASSERT(pBlock->info.rows > 0);
      releaseOutputBuf(pState, NULL, pRow);
      break;
    }

    pGroupResInfo->index += 1;

    for (int32_t j = 0; j < numOfExprs; ++j) {
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
      if (pCtx[j].fpSet.finalize) {
        int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
        if (TAOS_FAILED(code1)) {
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
          T_LONG_JMP(pTaskInfo->env, code1);
        }
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
      } else {
        // expand the result into multiple rows. E.g., _wstart, top(k, 20)
        // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
        for (int32_t k = 0; k < pRow->numOfRows; ++k) {
          colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
        }
      }
    }

2637
    pBlock->info.dataLoad = 1;
5
54liuyao 已提交
2638 2639 2640 2641 2642 2643
    pBlock->info.rows += pRow->numOfRows;
    // saveSessionDiscBuf(pState, pKey, pVal, size);
    releaseOutputBuf(pState, NULL, pRow);
  }
  blockDataUpdateTsWindow(pBlock, 0);
  return TSDB_CODE_SUCCESS;
2644
}