executorimpl.c 114.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

H
Haojun Liao 已提交
16 17
#include "filter.h"
#include "function.h"
18 19
#include "functionMgt.h"
#include "os.h"
H
Haojun Liao 已提交
20
#include "querynodes.h"
21
#include "tfill.h"
dengyihao's avatar
dengyihao 已提交
22
#include "tname.h"
23

H
Haojun Liao 已提交
24
#include "tdatablock.h"
25
#include "tglobal.h"
H
Haojun Liao 已提交
26
#include "tmsg.h"
27
#include "ttime.h"
H
Haojun Liao 已提交
28

29
#include "executorimpl.h"
dengyihao's avatar
dengyihao 已提交
30
#include "index.h"
31
#include "query.h"
32
#include "tcompare.h"
H
Haojun Liao 已提交
33
#include "thash.h"
34
#include "ttypes.h"
dengyihao's avatar
dengyihao 已提交
35
#include "vnode.h"
36

H
Haojun Liao 已提交
37
#define IS_MAIN_SCAN(runtime)          ((runtime)->scanFlag == MAIN_SCAN)
38 39 40 41 42 43
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)

#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)

#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
wafwerar's avatar
wafwerar 已提交
44
  uint32_t v = taosRand();
45 46 47 48

  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
49
    return taosMemoryMalloc(__size);
50 51 52 53
  }
}

static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
wafwerar's avatar
wafwerar 已提交
54
  uint32_t v = taosRand();
55 56 57
  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
58
    return taosMemoryCalloc(num, __size);
59 60 61 62
  }
}

static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
wafwerar's avatar
wafwerar 已提交
63
  uint32_t v = taosRand();
64 65 66
  if (v % 5 <= 1) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
67
    return taosMemoryRealloc(p, __size);
68 69 70 71 72 73 74 75
  }
}

#define calloc  u_calloc
#define malloc  u_malloc
#define realloc u_realloc
#endif

X
Xiaoyu Wang 已提交
76
#define CLEAR_QUERY_STATUS(q, st)   ((q)->status &= (~(st)))
77 78
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)

L
Liu Jicong 已提交
79 80
int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; }

81
static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock);
82

X
Xiaoyu Wang 已提交
83
static void releaseQueryBuf(size_t numOfTables);
84

85 86
static void destroyFillOperatorInfo(void* param);
static void destroyProjectOperatorInfo(void* param);
H
Haojun Liao 已提交
87
static void destroySortOperatorInfo(void* param);
88
static void destroyAggOperatorInfo(void* param);
X
Xiaoyu Wang 已提交
89

90
static void destroyIntervalOperatorInfo(void* param);
H
Haojun Liao 已提交
91

H
Haojun Liao 已提交
92
void setOperatorCompleted(SOperatorInfo* pOperator) {
93
  pOperator->status = OP_EXEC_DONE;
H
Haojun Liao 已提交
94
  ASSERT(pOperator->pTaskInfo != NULL);
95

96
  pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000) / 1000.0;
H
Haojun Liao 已提交
97
  setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
98
}
99

H
Haojun Liao 已提交
100 101 102 103 104 105 106 107 108 109
void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status,
                     void* pInfo, SExecTaskInfo* pTaskInfo) {
  pOperator->name = (char*)name;
  pOperator->operatorType = type;
  pOperator->blocking = blocking;
  pOperator->status = status;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
}

H
Haojun Liao 已提交
110
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) {
111
  OPTR_SET_OPENED(pOperator);
112
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
113
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
114 115
}

H
Haojun Liao 已提交
116 117
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
                                   __optr_close_fn_t closeFn, __optr_explain_fn_t explain) {
118 119 120 121 122 123 124 125 126 127 128
  SOperatorFpSet fpSet = {
      ._openFn = openFn,
      .getNextFn = nextFn,
      .cleanupFn = cleanup,
      .closeFn = closeFn,
      .getExplainFn = explain,
  };

  return fpSet;
}

129 130
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
                                  SGroupResInfo* pGroupResInfo);
H
Haojun Liao 已提交
131

132
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
133
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
134

135
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
L
Liu Jicong 已提交
136
  SFilePage* pData = NULL;
137 138 139

  // in the first scan, new space needed for results
  int32_t pageId = -1;
140
  if (*currentPageId == -1) {
141
    pData = getNewBufPage(pResultBuf, &pageId);
142 143
    pData->num = sizeof(SFilePage);
  } else {
144 145
    pData = getBufPage(pResultBuf, *currentPageId);
    pageId = *currentPageId;
146

wmmhello's avatar
wmmhello 已提交
147
    if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
148
      // release current page first, and prepare the next one
149
      releaseBufPage(pResultBuf, pData);
150

151
      pData = getNewBufPage(pResultBuf, &pageId);
152 153 154 155 156 157 158 159 160 161
      if (pData != NULL) {
        pData->num = sizeof(SFilePage);
      }
    }
  }

  if (pData == NULL) {
    return NULL;
  }

162 163
  setBufPageDirty(pData, true);

164 165 166 167
  // set the number of rows in current disk page
  SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
  pResultRow->pageId = pageId;
  pResultRow->offset = (int32_t)pData->num;
168
  *currentPageId = pageId;
169

wmmhello's avatar
wmmhello 已提交
170
  pData->num += interBufSize;
171 172 173
  return pResultRow;
}

174 175 176 177 178 179 180
/**
 * the struct of key in hash table
 * +----------+---------------+
 * | group id |   key data    |
 * | 8 bytes  | actual length |
 * +----------+---------------+
 */
181 182 183
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
                                   int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
                                   bool isIntervalQuery, SAggSupporter* pSup) {
184
  SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
H
Haojun Liao 已提交
185

dengyihao's avatar
dengyihao 已提交
186
  SResultRowPosition* p1 =
187
      (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
H
Haojun Liao 已提交
188

189 190
  SResultRow* pResult = NULL;

H
Haojun Liao 已提交
191 192
  // in case of repeat scan/reverse scan, no new time window added.
  if (isIntervalQuery) {
193
    if (masterscan && p1 != NULL) {  // the *p1 may be NULL in case of sliding+offset exists.
194
      pResult = getResultRowByPos(pResultBuf, p1, true);
195
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
196 197
    }
  } else {
dengyihao's avatar
dengyihao 已提交
198 199
    // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
    // pResultRowInfo object.
H
Haojun Liao 已提交
200
    if (p1 != NULL) {
201
      // todo
202
      pResult = getResultRowByPos(pResultBuf, p1, true);
203
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
204 205 206
    }
  }

L
Liu Jicong 已提交
207
  // 1. close current opened time window
208
  if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
209
    SResultRowPosition pos = pResultRowInfo->cur;
X
Xiaoyu Wang 已提交
210
    SFilePage*         pPage = getBufPage(pResultBuf, pos.pageId);
211 212 213 214 215
    releaseBufPage(pResultBuf, pPage);
  }

  // allocate a new buffer page
  if (pResult == NULL) {
H
Haojun Liao 已提交
216
    ASSERT(pSup->resultRowSize > 0);
217
    pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
218

219 220
    // add a new result set for a new group
    SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
221
    tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
L
Liu Jicong 已提交
222
                   sizeof(SResultRowPosition));
H
Haojun Liao 已提交
223 224
  }

225 226 227
  // 2. set the new time window to be the new active time window
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};

H
Haojun Liao 已提交
228
  // too many time window in query
229
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
230
      tSimpleHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
231
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
H
Haojun Liao 已提交
232 233
  }

H
Haojun Liao 已提交
234
  return pResult;
H
Haojun Liao 已提交
235 236
}

237
// a new buffer page for each table. Needs to opt this design
L
Liu Jicong 已提交
238
static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, int32_t tid, uint32_t size) {
239 240 241 242
  if (pWindowRes->pageId != -1) {
    return 0;
  }

L
Liu Jicong 已提交
243
  SFilePage* pData = NULL;
244 245 246

  // in the first scan, new space needed for results
  int32_t pageId = -1;
247
  SIDList list = getDataBufPagesIdList(pResultBuf);
248 249

  if (taosArrayGetSize(list) == 0) {
250
    pData = getNewBufPage(pResultBuf, &pageId);
251
    pData->num = sizeof(SFilePage);
252 253
  } else {
    SPageInfo* pi = getLastPageInfo(list);
254
    pData = getBufPage(pResultBuf, getPageId(pi));
255
    pageId = getPageId(pi);
256

257
    if (pData->num + size > getBufPageSize(pResultBuf)) {
258
      // release current page first, and prepare the next one
259
      releaseBufPageInfo(pResultBuf, pi);
260

261
      pData = getNewBufPage(pResultBuf, &pageId);
262
      if (pData != NULL) {
263
        pData->num = sizeof(SFilePage);
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
      }
    }
  }

  if (pData == NULL) {
    return -1;
  }

  // set the number of rows in current disk page
  if (pWindowRes->pageId == -1) {  // not allocated yet, allocate new buffer
    pWindowRes->pageId = pageId;
    pWindowRes->offset = (int32_t)pData->num;

    pData->num += size;
    assert(pWindowRes->pageId >= 0);
  }

  return 0;
}

284
//  query_range_start, query_range_end, window_duration, window_start, window_end
285
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
286 287 288
  pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
  pColData->info.bytes = sizeof(int64_t);

H
Haojun Liao 已提交
289
  colInfoDataEnsureCapacity(pColData, 5, false);
290 291 292 293 294 295 296 297 298
  colDataAppendInt64(pColData, 0, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 1, &pQueryWindow->ekey);

  int64_t interval = 0;
  colDataAppendInt64(pColData, 2, &interval);  // this value may be variable in case of 'n' and 'y'.
  colDataAppendInt64(pColData, 3, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
}

299 300 301 302 303 304 305 306 307 308 309 310 311 312
typedef struct {
  bool    hasAgg;
  int32_t numOfRows;
  int32_t startOffset;
} SFunctionCtxStatus;

static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
  pStatus->hasAgg = pCtx->input.colDataAggIsSet;
  pStatus->numOfRows = pCtx->input.numOfRows;
  pStatus->startOffset = pCtx->input.startRowIndex;
}

static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
  pCtx->input.colDataAggIsSet = pStatus->hasAgg;
H
Haojun Liao 已提交
313
  pCtx->input.numOfRows = pStatus->numOfRows;
314 315 316 317 318
  pCtx->input.startRowIndex = pStatus->startOffset;
}

void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset,
                      int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) {
319
  for (int32_t k = 0; k < numOfOutput; ++k) {
H
Haojun Liao 已提交
320
    // keep it temporarily
321 322
    SFunctionCtxStatus status = {0};
    functionCtxSave(&pCtx[k], &status);
323

324
    pCtx[k].input.startRowIndex = offset;
325
    pCtx[k].input.numOfRows = forwardStep;
326 327 328

    // not a whole block involved in query processing, statistics data can not be used
    // NOTE: the original value of isSet have been changed here
329 330
    if (pCtx[k].input.colDataAggIsSet && forwardStep < numOfTotal) {
      pCtx[k].input.colDataAggIsSet = false;
331 332
    }

333 334
    if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) {
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
335 336

      char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
337

338
      SColumnInfoData idata = {0};
dengyihao's avatar
dengyihao 已提交
339
      idata.info.type = TSDB_DATA_TYPE_BIGINT;
340
      idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
dengyihao's avatar
dengyihao 已提交
341
      idata.pData = p;
342 343 344 345

      SScalarParam out = {.columnData = &idata};
      SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
      pCtx[k].sfp.process(&tw, 1, &out);
346
      pEntryInfo->numOfRes = 1;
347 348 349 350 351 352 353 354
    } else {
      int32_t code = TSDB_CODE_SUCCESS;
      if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
        code = pCtx[k].fpSet.process(&pCtx[k]);

        if (code != TSDB_CODE_SUCCESS) {
          qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
          taskInfo->code = code;
355
          T_LONG_JMP(taskInfo->env, code);
356
        }
357
      }
358

359
      // restore it
360
      functionCtxRestore(&pCtx[k], &status);
361
    }
362 363 364
  }
}

365 366
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
                                   bool createDummyCol);
367

368 369 370
static void doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order) {
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
371
    pCtx[i].order = order;
372
    pCtx[i].input.numOfRows = pBlock->info.rows;
373
    setBlockSMAInfo(&pCtx[i], &pExprSup->pExprInfo[i], pBlock);
374
    pCtx[i].pSrcBlock = pBlock;
375 376 377
  }
}

378
void setInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol) {
379
  if (pBlock->pBlockAgg != NULL) {
380
    doSetInputDataBlockInfo(pExprSup, pBlock, order);
381
  } else {
382
    doSetInputDataBlock(pExprSup, pBlock, order, scanFlag, createDummyCol);
H
Haojun Liao 已提交
383
  }
384 385
}

L
Liu Jicong 已提交
386 387
static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t paramIndex,
                                             int32_t numOfRows) {
388 389 390 391 392 393 394 395
  SColumnInfoData* pColInfo = NULL;
  if (pInput->pData[paramIndex] == NULL) {
    pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pColInfo == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    // Set the correct column info (data type and bytes)
396 397
    pColInfo->info.type = pFuncParam->param.nType;
    pColInfo->info.bytes = pFuncParam->param.nLen;
398 399

    pInput->pData[paramIndex] = pColInfo;
400 401
  } else {
    pColInfo = pInput->pData[paramIndex];
402 403
  }

H
Haojun Liao 已提交
404
  colInfoDataEnsureCapacity(pColInfo, numOfRows, false);
405

406
  int8_t type = pFuncParam->param.nType;
407 408
  if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
    int64_t v = pFuncParam->param.i;
dengyihao's avatar
dengyihao 已提交
409
    for (int32_t i = 0; i < numOfRows; ++i) {
410 411 412 413
      colDataAppendInt64(pColInfo, i, &v);
    }
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
dengyihao's avatar
dengyihao 已提交
414
    for (int32_t i = 0; i < numOfRows; ++i) {
415 416
      colDataAppendDouble(pColInfo, i, &v);
    }
417
  } else if (type == TSDB_DATA_TYPE_VARCHAR) {
L
Liu Jicong 已提交
418
    char* tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE);
419
    STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen);
L
Liu Jicong 已提交
420
    for (int32_t i = 0; i < numOfRows; ++i) {
421 422
      colDataAppend(pColInfo, i, tmp, false);
    }
H
Haojun Liao 已提交
423
    taosMemoryFree(tmp);
424 425 426 427 428
  }

  return TSDB_CODE_SUCCESS;
}

429 430
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
                                   bool createDummyCol) {
431
  int32_t         code = TSDB_CODE_SUCCESS;
432
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
433

434
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
L
Liu Jicong 已提交
435
    pCtx[i].order = order;
436 437
    pCtx[i].input.numOfRows = pBlock->info.rows;

L
Liu Jicong 已提交
438
    pCtx[i].pSrcBlock = pBlock;
X
Xiaoyu Wang 已提交
439
    pCtx[i].scanFlag = scanFlag;
H
Haojun Liao 已提交
440

441
    SInputColumnInfoData* pInput = &pCtx[i].input;
442
    pInput->uid = pBlock->info.uid;
C
Cary Xu 已提交
443
    pInput->colDataAggIsSet = false;
444

445
    SExprInfo* pOneExpr = &pExprSup->pExprInfo[i];
446
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
dengyihao's avatar
dengyihao 已提交
447
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
G
Ganlin Zhao 已提交
448 449
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
dengyihao's avatar
dengyihao 已提交
450
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
451 452 453
        pInput->totalRows = pBlock->info.rows;
        pInput->numOfRows = pBlock->info.rows;
        pInput->startRowIndex = 0;
454

455
        // NOTE: the last parameter is the primary timestamp column
H
Haojun Liao 已提交
456
        // todo: refactor this
457
        if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
L
Liu Jicong 已提交
458
          pInput->pPTS = pInput->pData[j];  // in case of merge function, this is not always the ts column data.
459
          //          ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP);
460
        }
461 462
        ASSERT(pInput->pData[j] != NULL);
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
463 464 465
        // todo avoid case: top(k, 12), 12 is the value parameter.
        // sum(11), 11 is also the value parameter.
        if (createDummyCol && pOneExpr->base.numOfParams == 1) {
466 467 468 469
          pInput->totalRows = pBlock->info.rows;
          pInput->numOfRows = pBlock->info.rows;
          pInput->startRowIndex = 0;

470
          code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows);
471 472 473
          if (code != TSDB_CODE_SUCCESS) {
            return code;
          }
474
        }
G
Ganlin Zhao 已提交
475 476
      }
    }
H
Haojun Liao 已提交
477
  }
478 479

  return code;
H
Haojun Liao 已提交
480 481
}

482
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
483
  for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
H
Haojun Liao 已提交
484
    if (functionNeedToExecute(&pCtx[k])) {
485
      // todo add a dummy funtion to avoid process check
486 487 488
      if (pCtx[k].fpSet.process == NULL) {
        continue;
      }
H
Haojun Liao 已提交
489

490 491 492 493
      int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
      if (code != TSDB_CODE_SUCCESS) {
        qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
        return code;
494
      }
495 496
    }
  }
497 498

  return TSDB_CODE_SUCCESS;
499 500
}

H
Haojun Liao 已提交
501
static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) {
dengyihao's avatar
dengyihao 已提交
502
  size_t num = (pPseudoList != NULL) ? taosArrayGetSize(pPseudoList) : 0;
H
Haojun Liao 已提交
503 504 505 506 507
  for (int32_t i = 0; i < num; ++i) {
    pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i);
  }
}

508
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
X
Xiaoyu Wang 已提交
509
                              int32_t numOfOutput, SArray* pPseudoList) {
H
Haojun Liao 已提交
510
  setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530

  if (pSrcBlock == NULL) {
    for (int32_t k = 0; k < numOfOutput; ++k) {
      int32_t outputSlotId = pExpr[k].base.resSchema.slotId;

      ASSERT(pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE);
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);

      int32_t type = pExpr[k].base.pParam[0].param.nType;
      if (TSDB_DATA_TYPE_NULL == type) {
        colDataAppendNNULL(pColInfoData, 0, 1);
      } else {
        colDataAppend(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false);
      }
    }

    pResult->info.rows = 1;
    return TSDB_CODE_SUCCESS;
  }

L
Liu Jicong 已提交
531 532 533 534
  if (pResult != pSrcBlock) {
    pResult->info.groupId = pSrcBlock->info.groupId;
    memcpy(pResult->info.parTbName, pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
  }
H
Haojun Liao 已提交
535

536 537
  // if the source equals to the destination, it is to create a new column as the result of scalar
  // function or some operators.
538
  bool createNewColModel = (pResult == pSrcBlock);
539 540 541
  if (createNewColModel) {
    blockDataEnsureCapacity(pResult, pResult->info.rows);
  }
542

543 544
  int32_t numOfRows = 0;

545
  for (int32_t k = 0; k < numOfOutput; ++k) {
546 547
    int32_t               outputSlotId = pExpr[k].base.resSchema.slotId;
    SqlFunctionCtx*       pfCtx = &pCtx[k];
548
    SInputColumnInfoData* pInputData = &pfCtx->input;
549

L
Liu Jicong 已提交
550
    if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) {  // it is a project query
551
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
552
      if (pResult->info.rows > 0 && !createNewColModel) {
553
        colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity, pInputData->pData[0],
554
                        pInputData->numOfRows);
555
      } else {
556
        colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info);
557
      }
558

559
      numOfRows = pInputData->numOfRows;
560
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
561
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
562

dengyihao's avatar
dengyihao 已提交
563
      int32_t offset = createNewColModel ? 0 : pResult->info.rows;
564 565 566 567 568 569 570 571

      int32_t type = pExpr[k].base.pParam[0].param.nType;
      if (TSDB_DATA_TYPE_NULL == type) {
        colDataAppendNNULL(pColInfoData, offset, pSrcBlock->info.rows);
      } else {
        for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
          colDataAppend(pColInfoData, i + offset, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false);
        }
572
      }
573 574

      numOfRows = pSrcBlock->info.rows;
H
Haojun Liao 已提交
575
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) {
576 577 578
      SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
      taosArrayPush(pBlockList, &pSrcBlock);

579
      SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
580
      SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
581

582
      SScalarParam dest = {.columnData = &idata};
X
Xiaoyu Wang 已提交
583
      int32_t      code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
584 585 586 587
      if (code != TSDB_CODE_SUCCESS) {
        taosArrayDestroy(pBlockList);
        return code;
      }
588

dengyihao's avatar
dengyihao 已提交
589
      int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
590
      ASSERT(pResult->info.capacity > 0);
591

592
      colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
D
dapan1121 已提交
593
      colDataDestroy(&idata);
L
Liu Jicong 已提交
594

595
      numOfRows = dest.numOfRows;
596 597
      taosArrayDestroy(pBlockList);
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
598 599
      // _rowts/_c0, not tbname column
      if (fmIsPseudoColumnFunc(pfCtx->functionId) && (!fmIsScanPseudoColumnFunc(pfCtx->functionId))) {
H
Haojun Liao 已提交
600
        // do nothing
601
      } else if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) {
602 603
        SResultRowEntryInfo* pResInfo = GET_RES_INFO(pfCtx);
        pfCtx->fpSet.init(pfCtx, pResInfo);
604 605 606 607 608 609 610 611 612 613

        pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
        pfCtx->offset = createNewColModel ? 0 : pResult->info.rows;  // set the start offset

        // set the timestamp(_rowts) output buffer
        if (taosArrayGetSize(pPseudoList) > 0) {
          int32_t* outputColIndex = taosArrayGet(pPseudoList, 0);
          pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
        }

614 615 616 617 618
        // link pDstBlock to set selectivity value
        if (pfCtx->subsidiaries.num > 0) {
          pfCtx->pDstBlock = pResult;
        }

619 620 621 622 623
        int32_t code = pfCtx->fpSet.process(pfCtx);
        if (code != TSDB_CODE_SUCCESS) {
          return code;
        }
        numOfRows = pResInfo->numOfRes;
H
Haojun Liao 已提交
624
      } else if (fmIsAggFunc(pfCtx->functionId)) {
G
Ganlin Zhao 已提交
625
        // selective value output should be set during corresponding function execution
626 627 628
        if (fmIsSelectValueFunc(pfCtx->functionId)) {
          continue;
        }
629 630
        // _group_key function for "partition by tbname" + csum(col_name) query
        SColumnInfoData* pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
631
        int32_t          slotId = pfCtx->param[0].pCol->slotId;
632 633 634

        // todo handle the json tag
        SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
635
        for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
636 637 638 639 640 641 642 643 644
          bool isNull = colDataIsNull_s(pInput, f);
          if (isNull) {
            colDataAppendNULL(pOutput, pResult->info.rows + f);
          } else {
            char* data = colDataGetData(pInput, f);
            colDataAppend(pOutput, pResult->info.rows + f, data, isNull);
          }
        }

H
Haojun Liao 已提交
645 646 647
      } else {
        SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
        taosArrayPush(pBlockList, &pSrcBlock);
G
Ganlin Zhao 已提交
648

649
        SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
650
        SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
H
Haojun Liao 已提交
651

652
        SScalarParam dest = {.columnData = &idata};
X
Xiaoyu Wang 已提交
653
        int32_t      code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
654 655 656 657
        if (code != TSDB_CODE_SUCCESS) {
          taosArrayDestroy(pBlockList);
          return code;
        }
658

dengyihao's avatar
dengyihao 已提交
659
        int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
660
        ASSERT(pResult->info.capacity > 0);
661
        colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
D
dapan1121 已提交
662
        colDataDestroy(&idata);
663 664

        numOfRows = dest.numOfRows;
H
Haojun Liao 已提交
665 666
        taosArrayDestroy(pBlockList);
      }
667
    } else {
668
      return TSDB_CODE_OPS_NOT_SUPPORT;
669 670
    }
  }
671

672 673 674
  if (!createNewColModel) {
    pResult->info.rows += numOfRows;
  }
675 676

  return TSDB_CODE_SUCCESS;
677 678
}

5
54liuyao 已提交
679
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
680
  struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
681

682 683 684 685 686
  // in case of timestamp column, always generated results.
  int32_t functionId = pCtx->functionId;
  if (functionId == -1) {
    return false;
  }
687

688 689
  if (pCtx->scanFlag == REPEAT_SCAN) {
    return fmIsRepeatScanFunc(pCtx->functionId);
690 691
  }

692 693
  if (isRowEntryCompleted(pResInfo)) {
    return false;
694 695
  }

696 697 698
  return true;
}

699 700 701 702 703 704 705
static int32_t doCreateConstantValColumnAggInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type,
                                                int32_t paramIndex, int32_t numOfRows) {
  if (pInput->pData[paramIndex] == NULL) {
    pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pInput->pData[paramIndex] == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
706

707 708 709
    // Set the correct column info (data type and bytes)
    pInput->pData[paramIndex]->info.type = type;
    pInput->pData[paramIndex]->info.bytes = tDataTypes[type].bytes;
710
  }
H
Haojun Liao 已提交
711

712 713 714 715 716 717
  SColumnDataAgg* da = NULL;
  if (pInput->pColumnDataAgg[paramIndex] == NULL) {
    da = taosMemoryCalloc(1, sizeof(SColumnDataAgg));
    pInput->pColumnDataAgg[paramIndex] = da;
    if (da == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
718 719
    }
  } else {
720
    da = pInput->pColumnDataAgg[paramIndex];
721 722
  }

723
  ASSERT(!IS_VAR_DATA_TYPE(type));
724

725 726
  if (type == TSDB_DATA_TYPE_BIGINT) {
    int64_t v = pFuncParam->param.i;
727
    *da = (SColumnDataAgg){.numOfNull = 0, .min = v, .max = v, .sum = v * numOfRows};
728 729
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
730
    *da = (SColumnDataAgg){.numOfNull = 0};
731

732 733 734 735 736 737
    *(double*)&da->min = v;
    *(double*)&da->max = v;
    *(double*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_BOOL) {  // todo validate this data type
    bool v = pFuncParam->param.i;

738
    *da = (SColumnDataAgg){.numOfNull = 0};
739 740 741 742 743
    *(bool*)&da->min = 0;
    *(bool*)&da->max = v;
    *(bool*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_TIMESTAMP) {
    // do nothing
744
  } else {
745
    ASSERT(0);
746 747
  }

748 749
  return TSDB_CODE_SUCCESS;
}
750

751
void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock) {
752 753 754 755 756 757 758 759 760
  int32_t numOfRows = pBlock->info.rows;

  SInputColumnInfoData* pInput = &pCtx->input;
  pInput->numOfRows = numOfRows;
  pInput->totalRows = numOfRows;

  if (pBlock->pBlockAgg != NULL) {
    pInput->colDataAggIsSet = true;

761 762
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
763

764 765
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
766 767 768 769
        pInput->pColumnDataAgg[j] = pBlock->pBlockAgg[slotId];
        if (pInput->pColumnDataAgg[j] == NULL) {
          pInput->colDataAggIsSet = false;
        }
770 771 772 773

        // Here we set the column info data since the data type for each column data is required, but
        // the data in the corresponding SColumnInfoData will not be used.
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
774 775
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
        doCreateConstantValColumnAggInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
776 777
      }
    }
778
  } else {
779
    pInput->colDataAggIsSet = false;
780 781 782
  }
}

L
Liu Jicong 已提交
783
bool isTaskKilled(SExecTaskInfo* pTaskInfo) {
784 785
  // query has been executed more than tsShellActivityTimer, and the retrieve has not arrived
  // abort current query execution.
L
Liu Jicong 已提交
786 787
  if (pTaskInfo->owner != 0 &&
      ((taosGetTimestampSec() - pTaskInfo->cost.start / 1000) > 10 * getMaximumIdleDurationSec())
788
      /*(!needBuildResAfterQueryComplete(pTaskInfo))*/) {
789
    assert(pTaskInfo->cost.start != 0);
L
Liu Jicong 已提交
790 791 792
    //    qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64
    //           ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec());
    //    return true;
793 794 795 796 797
  }

  return false;
}

L
Liu Jicong 已提交
798
void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED; }
799 800

/////////////////////////////////////////////////////////////////////////////////////////////
801
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) {
L
Liu Jicong 已提交
802
  STimeWindow win = {0};
803
  win.skey = taosTimeTruncate(key, pInterval, precision);
804 805

  /*
H
Haojun Liao 已提交
806
   * if the realSkey > INT64_MAX - pInterval->interval, the query duration between
807 808
   * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
   */
809 810 811
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
  if (win.ekey < win.skey) {
    win.ekey = INT64_MAX;
812
  }
813 814

  return win;
815 816
}

L
Liu Jicong 已提交
817 818
int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
                              uint32_t* status) {
819
  *status = BLK_DATA_NOT_LOAD;
820

H
Haojun Liao 已提交
821
  pBlock->pDataBlock = NULL;
L
Liu Jicong 已提交
822
  pBlock->pBlockAgg = NULL;
H
Haojun Liao 已提交
823

L
Liu Jicong 已提交
824 825
  //  int64_t groupId = pRuntimeEnv->current->groupIndex;
  //  bool    ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
826

H
Haojun Liao 已提交
827
  STaskCostInfo* pCost = &pTaskInfo->cost;
828

829 830
//  pCost->totalBlocks += 1;
//  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
831
#if 0
832 833 834
  // Calculate all time windows that are overlapping or contain current data block.
  // If current data block is contained by all possible time window, do not load current data block.
  if (/*pQueryAttr->pFilters || */pQueryAttr->groupbyColumn || pQueryAttr->sw.gap > 0 ||
H
Haojun Liao 已提交
835
      (QUERY_IS_INTERVAL_QUERY(pQueryAttr) && overlapWithTimeWindow(pTaskInfo, &pBlock->info))) {
836
    (*status) = BLK_DATA_DATA_LOAD;
837 838 839
  }

  // check if this data block is required to load
840
  if ((*status) != BLK_DATA_DATA_LOAD) {
841 842 843 844 845 846 847
    bool needFilter = true;

    // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
    // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
    if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
      SResultRow* pResult = NULL;

H
Haojun Liao 已提交
848
      bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
849 850 851 852 853 854
      TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

      STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
      if (pQueryAttr->pointInterpQuery) {
        needFilter = chkWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
855
                                    pTableScanInfo->rowEntryInfoOffset);
856 857 858
      } else {
        if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId,
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
859
                                    pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
860
          T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
861 862 863 864
        }
      }
    } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate
      doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx,
865
                               pTableScanInfo->rowEntryInfoOffset, pTableScanInfo->numOfOutput,
866 867 868 869 870 871
                               pRuntimeEnv->current->groupIndex);
    }

    if (needFilter) {
      (*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock);
    } else {
872
      (*status) = BLK_DATA_DATA_LOAD;
873 874 875 876
    }
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
H
Haojun Liao 已提交
877
//  *status = updateBlockLoadStatus(pRuntimeEnv->pQueryAttr, *status);
878

879
  if ((*status) == BLK_DATA_NOT_LOAD || (*status) == BLK_DATA_FILTEROUT) {
880 881
    //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//           pBlockInfo->window.ekey, pBlockInfo->rows);
882
    pCost->skipBlocks += 1;
883
  } else if ((*status) == BLK_DATA_SMA_LOAD) {
884 885
    // this function never returns error?
    pCost->loadBlockStatis += 1;
886
//    tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
887 888

    if (pBlock->pBlockAgg == NULL) {  // data block statistics does not exist, load data block
889
//      pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
890 891 892
      pCost->totalCheckedRows += pBlock->info.rows;
    }
  } else {
893
    assert((*status) == BLK_DATA_DATA_LOAD);
894 895 896

    // load the data block statistics to perform further filter
    pCost->loadBlockStatis += 1;
897
//    tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
898 899 900 901 902 903

    if (pQueryAttr->topBotQuery && pBlock->pBlockAgg != NULL) {
      { // set previous window
        if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
          SResultRow* pResult = NULL;

H
Haojun Liao 已提交
904
          bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
905 906 907 908 909
          TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

          STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
          if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId,
                                      pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
910
                                      pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
911
            T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
912 913 914 915 916 917 918 919 920 921
          }
        }
      }
      bool load = false;
      for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
        int32_t functionId = pTableScanInfo->pCtx[i].functionId;
        if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM) {
//          load = topbot_datablock_filter(&pTableScanInfo->pCtx[i], (char*)&(pBlock->pBlockAgg[i].min),
//                                         (char*)&(pBlock->pBlockAgg[i].max));
          if (!load) { // current block has been discard due to filter applied
922
            pCost->skipBlocks += 1;
923 924
            //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId,
//                   pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
925
            (*status) = BLK_DATA_FILTEROUT;
926 927 928 929 930 931 932
            return TSDB_CODE_SUCCESS;
          }
        }
      }
    }

    // current block has been discard due to filter applied
H
Haojun Liao 已提交
933
//    if (!doFilterByBlockSMA(pRuntimeEnv, pBlock->pBlockAgg, pTableScanInfo->pCtx, pBlockInfo->rows)) {
934
//      pCost->skipBlocks += 1;
935 936
//      qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//             pBlockInfo->window.ekey, pBlockInfo->rows);
937
//      (*status) = BLK_DATA_FILTEROUT;
938 939 940 941 942
//      return TSDB_CODE_SUCCESS;
//    }

    pCost->totalCheckedRows += pBlockInfo->rows;
    pCost->loadBlocks += 1;
943
//    pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
944 945 946 947 948
//    if (pBlock->pDataBlock == NULL) {
//      return terrno;
//    }

//    if (pQueryAttr->pFilters != NULL) {
949
//      filterSetColFieldData(pQueryAttr->pFilters, taosArrayGetSize(pBlock->pDataBlock), pBlock->pDataBlock);
950
//    }
951

952 953 954 955
//    if (pQueryAttr->pFilters != NULL || pRuntimeEnv->pTsBuf != NULL) {
//      filterColRowsInDataBlock(pRuntimeEnv, pBlock, ascQuery);
//    }
  }
H
Haojun Liao 已提交
956
#endif
957 958 959
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
960
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
961
  if (status == TASK_NOT_COMPLETED) {
H
Haojun Liao 已提交
962
    pTaskInfo->status = status;
963 964
  } else {
    // QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first
965
    CLEAR_QUERY_STATUS(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
966
    pTaskInfo->status |= status;
967 968 969
  }
}

970
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
5
54liuyao 已提交
971
  bool init = false;
972
  for (int32_t i = 0; i < numOfOutput; ++i) {
973
    pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
5
54liuyao 已提交
974 975 976
    if (init) {
      continue;
    }
977 978 979 980 981

    struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
    if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
      continue;
    }
982 983 984 985 986

    if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
      continue;
    }

987 988 989 990 991 992
    if (!pResInfo->initialized) {
      if (pCtx[i].functionId != -1) {
        pCtx[i].fpSet.init(&pCtx[i], pResInfo);
      } else {
        pResInfo->initialized = true;
      }
5
54liuyao 已提交
993 994
    } else {
      init = true;
995 996 997 998
    }
  }
}

999 1000
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep,
                                                int32_t status);
1001

H
Haojun Liao 已提交
1002 1003
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) {
  if (pFilterInfo == NULL || pBlock->info.rows == 0) {
S
shenglian zhou 已提交
1004 1005
    return;
  }
1006

1007
  SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
1008
  int32_t            code = filterSetDataFromSlotId(pFilterInfo, &param1);
1009

1010
  SColumnInfoData* p = NULL;
1011
  int32_t          status = 0;
H
Haojun Liao 已提交
1012

1013
  // todo the keep seems never to be True??
H
Haojun Liao 已提交
1014
  bool keep = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
1015
  extractQualifiedTupleByFilterResult(pBlock, p, keep, status);
H
Haojun Liao 已提交
1016

1017
  if (pColMatchInfo != NULL) {
1018
    size_t size = taosArrayGetSize(pColMatchInfo->pList);
H
Haojun Liao 已提交
1019
    for (int32_t i = 0; i < size; ++i) {
H
Haojun Liao 已提交
1020
      SColMatchItem* pInfo = taosArrayGet(pColMatchInfo->pList, i);
1021
      if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
1022
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->dstSlotId);
1023
        if (pColData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
H
Haojun Liao 已提交
1024
          blockDataUpdateTsWindow(pBlock, pInfo->dstSlotId);
1025 1026 1027 1028 1029 1030
          break;
        }
      }
    }
  }

1031 1032
  colDataDestroy(p);
  taosMemoryFree(p);
1033 1034
}

1035
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status) {
1036 1037 1038 1039
  if (keep) {
    return;
  }

H
Haojun Liao 已提交
1040 1041 1042
  int32_t totalRows = pBlock->info.rows;

  if (status == FILTER_RESULT_ALL_QUALIFIED) {
1043
    // here nothing needs to be done
H
Haojun Liao 已提交
1044
  } else if (status == FILTER_RESULT_NONE_QUALIFIED) {
1045
    pBlock->info.rows = 0;
H
Haojun Liao 已提交
1046
  } else {
1047
    SSDataBlock* px = createOneDataBlock(pBlock, true);
1048

1049 1050
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
    for (int32_t i = 0; i < numOfCols; ++i) {
1051 1052
      SColumnInfoData* pSrc = taosArrayGet(px->pDataBlock, i);
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
1053
      // it is a reserved column for scalar function, and no data in this column yet.
1054
      if (pDst->pData == NULL || pSrc->pData == NULL) {
1055 1056 1057
        continue;
      }

1058 1059
      colInfoDataCleanup(pDst, pBlock->info.rows);

1060
      int32_t numOfRows = 0;
1061
      for (int32_t j = 0; j < totalRows; ++j) {
1062
        if (((int8_t*)p->pData)[j] == 0) {
D
dapan1121 已提交
1063 1064
          continue;
        }
1065

D
dapan1121 已提交
1066
        if (colDataIsNull_s(pSrc, j)) {
1067
          colDataAppendNULL(pDst, numOfRows);
D
dapan1121 已提交
1068
        } else {
1069
          colDataAppend(pDst, numOfRows, colDataGetData(pSrc, j), false);
D
dapan1121 已提交
1070
        }
1071
        numOfRows += 1;
H
Haojun Liao 已提交
1072
      }
1073

1074
      // todo this value can be assigned directly
1075 1076 1077 1078 1079
      if (pBlock->info.rows == totalRows) {
        pBlock->info.rows = numOfRows;
      } else {
        ASSERT(pBlock->info.rows == numOfRows);
      }
1080
    }
1081

dengyihao's avatar
dengyihao 已提交
1082
    blockDataDestroy(px);  // fix memory leak
1083 1084 1085
  }
}

1086
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
1087
  // for simple group by query without interval, all the tables belong to one group result.
1088 1089 1090
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
  SAggOperatorInfo* pAggInfo = pOperator->info;

1091
  SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
1092 1093
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
1094

1095
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId,
L
Liu Jicong 已提交
1096
                                                  sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
L
Liu Jicong 已提交
1097
  assert(pResultRow != NULL);
1098 1099 1100 1101 1102 1103

  /*
   * not assign result buffer yet, add new result buffer
   * all group belong to one result set, and each group result has different group id so set the id to be one
   */
  if (pResultRow->pageId == -1) {
dengyihao's avatar
dengyihao 已提交
1104 1105
    int32_t ret =
        addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, groupId, pAggInfo->binfo.pRes->info.rowSize);
1106 1107 1108 1109 1110
    if (ret != TSDB_CODE_SUCCESS) {
      return;
    }
  }

1111
  setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
1112 1113
}

1114 1115 1116
static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
  SAggOperatorInfo* pAggInfo = pOperator->info;
  if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) {
1117 1118
    return;
  }
1119 1120

  doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId);
1121 1122

  // record the current active group id
H
Haojun Liao 已提交
1123
  pAggInfo->groupId = groupId;
1124 1125
}

dengyihao's avatar
dengyihao 已提交
1126 1127
static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs,
                              const int32_t* rowCellOffset) {
1128
  bool returnNotNull = false;
1129
  for (int32_t j = 0; j < numOfExprs; ++j) {
1130
    struct SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowCellOffset);
1131 1132 1133 1134 1135 1136 1137
    if (!isRowEntryInitialized(pResInfo)) {
      continue;
    }

    if (pRow->numOfRows < pResInfo->numOfRes) {
      pRow->numOfRows = pResInfo->numOfRes;
    }
1138

1139
    if (fmIsNotNullOutputFunc(pCtx[j].functionId)) {
1140 1141
      returnNotNull = true;
    }
1142
  }
S
shenglian zhou 已提交
1143 1144
  // if all expr skips all blocks, e.g. all null inputs for max function, output one row in final result.
  //  except for first/last, which require not null output, output no rows
1145
  if (pRow->numOfRows == 0 && !returnNotNull) {
1146
    pRow->numOfRows = 1;
1147 1148 1149
  }
}

1150 1151
static void doCopyResultToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
                                    SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) {
1152 1153 1154
  for (int32_t j = 0; j < numOfExprs; ++j) {
    int32_t slotId = pExprInfo[j].base.resSchema.slotId;

1155
    pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
1156
    if (pCtx[j].fpSet.finalize) {
1157
      if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_key") == 0) {
1158 1159
        // for groupkey along with functions that output multiple lines(e.g. Histogram)
        // need to match groupkey result for each output row of that function.
1160 1161 1162 1163 1164
        if (pCtx[j].resultInfo->numOfRes != 0) {
          pCtx[j].resultInfo->numOfRes = pRow->numOfRows;
        }
      }

1165 1166 1167
      int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
      if (TAOS_FAILED(code)) {
        qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
1168
        T_LONG_JMP(pTaskInfo->env, code);
1169 1170
      }
    } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
1171
      // do nothing
1172
    } else {
1173 1174
      // expand the result into multiple rows. E.g., _wstart, top(k, 20)
      // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
1175 1176 1177 1178 1179 1180 1181
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
      char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
      for (int32_t k = 0; k < pRow->numOfRows; ++k) {
        colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
      }
    }
  }
1182 1183
}

1184 1185 1186
// todo refactor. SResultRow has direct pointer in miainfo
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
                           SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212
  SFilePage*  page = getBufPage(pBuf, resultRowPosition->pageId);
  SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);

  SqlFunctionCtx* pCtx = pSup->pCtx;
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  const int32_t*  rowEntryOffset = pSup->rowEntryInfoOffset;

  doUpdateNumOfRows(pCtx, pRow, pSup->numOfExprs, rowEntryOffset);
  if (pRow->numOfRows == 0) {
    releaseBufPage(pBuf, page);
    return 0;
  }

  int32_t size = pBlock->info.capacity;
  while (pBlock->info.rows + pRow->numOfRows > size) {
    size = size * 1.25;
  }

  int32_t code = blockDataEnsureCapacity(pBlock, size);
  if (TAOS_FAILED(code)) {
    releaseBufPage(pBuf, page);
    qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
    T_LONG_JMP(pTaskInfo->env, code);
  }

  doCopyResultToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
1213 1214

  releaseBufPage(pBuf, page);
1215
  pBlock->info.rows += pRow->numOfRows;
1216 1217 1218
  return 0;
}

1219 1220 1221 1222 1223 1224 1225
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
                           SGroupResInfo* pGroupResInfo) {
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

1226
  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
1227

1228
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
L
Liu Jicong 已提交
1229 1230
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
    SFilePage*  page = getBufPage(pBuf, pPos->pos.pageId);
1231

1232
    SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
1233

H
Haojun Liao 已提交
1234
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
1235 1236

    // no results, continue to check the next one
1237 1238
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
1239
      releaseBufPage(pBuf, page);
1240 1241 1242
      continue;
    }

1243 1244 1245 1246 1247
    if (pBlock->info.groupId == 0) {
      pBlock->info.groupId = pPos->groupId;
    } else {
      // current value belongs to different group, it can't be packed into one datablock
      if (pBlock->info.groupId != pPos->groupId) {
1248
        releaseBufPage(pBuf, page);
1249 1250 1251 1252
        break;
      }
    }

1253
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
1254
      ASSERT(pBlock->info.rows > 0);
1255
      releaseBufPage(pBuf, page);
1256 1257 1258 1259
      break;
    }

    pGroupResInfo->index += 1;
1260
    doCopyResultToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
1261

1262
    releaseBufPage(pBuf, page);
1263
    pBlock->info.rows += pRow->numOfRows;
1264 1265
  }

X
Xiaoyu Wang 已提交
1266 1267
  qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
         pBlock->info.groupId);
1268

1269
  blockDataUpdateTsWindow(pBlock, 0);
1270 1271 1272
  return 0;
}

1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                           SDiskbasedBuf* pBuf) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SSDataBlock*   pBlock = pbInfo->pRes;

  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

  blockDataCleanup(pBlock);
  if (!hasRemainResults(pGroupResInfo)) {
    return;
  }

  // clear the existed group id
  pBlock->info.groupId = 0;
  ASSERT(!pbInfo->mergeResultBlock);
  doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);

1291 1292 1293 1294 1295
  void* tbname = NULL;
  if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.groupId, &tbname) < 0) {
    pBlock->info.parTbName[0] = 0;
  } else {
    memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
1296
  }
1297
  tdbFree(tbname);
1298 1299
}

X
Xiaoyu Wang 已提交
1300 1301
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                            SDiskbasedBuf* pBuf) {
1302
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1303
  SSDataBlock*   pBlock = pbInfo->pRes;
1304

1305 1306 1307
  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

1308
  blockDataCleanup(pBlock);
1309
  if (!hasRemainResults(pGroupResInfo)) {
1310 1311 1312
    return;
  }

1313 1314
  // clear the existed group id
  pBlock->info.groupId = 0;
1315 1316 1317
  if (!pbInfo->mergeResultBlock) {
    doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
  } else {
dengyihao's avatar
dengyihao 已提交
1318
    while (hasRemainResults(pGroupResInfo)) {
1319 1320 1321
      doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
      if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
        break;
1322 1323
      }

1324 1325
      // clearing group id to continue to merge data that belong to different groups
      pBlock->info.groupId = 0;
1326
    }
1327 1328 1329

    // clear the group id info in SSDataBlock, since the client does not need it
    pBlock->info.groupId = 0;
1330 1331 1332
  }
}

L
Liu Jicong 已提交
1333 1334
void queryCostStatis(SExecTaskInfo* pTaskInfo) {
  STaskCostInfo* pSummary = &pTaskInfo->cost;
1335

1336 1337
  SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
  if (pSummary->pRecoder != NULL) {
1338
    qDebug(
H
Haojun Liao 已提交
1339 1340 1341 1342 1343
        "%s :cost summary: elapsed time:%.2f ms, extract tableList:%.2f ms, createGroupIdMap:%.2f ms, total blocks:%d, "
        "load block SMA:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
        GET_TASKID(pTaskInfo), pSummary->elapsedTime / 1000.0, pSummary->extractListTime, pSummary->groupIdMapTime,
        pRecorder->totalBlocks, pRecorder->loadBlockStatis, pRecorder->loadBlocks, pRecorder->totalRows,
        pRecorder->totalCheckedRows);
1344
  }
1345 1346
}

L
Liu Jicong 已提交
1347 1348
// void skipBlocks(STaskRuntimeEnv *pRuntimeEnv) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
1349
//
L
Liu Jicong 已提交
1350 1351 1352
//   if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0) {
//     return;
//   }
1353
//
L
Liu Jicong 已提交
1354 1355
//   pQueryAttr->pos = 0;
//   int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
1356
//
L
Liu Jicong 已提交
1357 1358
//   STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
//   TsdbQueryHandleT pTsdbReadHandle = pRuntimeEnv->pTsdbReadHandle;
1359
//
L
Liu Jicong 已提交
1360 1361 1362
//   SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
//   while (tsdbNextDataBlock(pTsdbReadHandle)) {
//     if (isTaskKilled(pRuntimeEnv->qinfo)) {
1363
//       T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
L
Liu Jicong 已提交
1364
//     }
1365
//
L
Liu Jicong 已提交
1366
//     tsdbRetrieveDataBlockInfo(pTsdbReadHandle, &blockInfo);
1367
//
L
Liu Jicong 已提交
1368 1369 1370 1371
//     if (pQueryAttr->limit.offset > blockInfo.rows) {
//       pQueryAttr->limit.offset -= blockInfo.rows;
//       pTableQueryInfo->lastKey = (QUERY_IS_ASC_QUERY(pQueryAttr)) ? blockInfo.window.ekey : blockInfo.window.skey;
//       pTableQueryInfo->lastKey += step;
1372
//
L
Liu Jicong 已提交
1373 1374 1375 1376 1377 1378 1379
//       //qDebug("QInfo:0x%"PRIx64" skip rows:%d, offset:%" PRId64, GET_TASKID(pRuntimeEnv), blockInfo.rows,
//              pQuery->limit.offset);
//     } else {  // find the appropriated start position in current block
//       updateOffsetVal(pRuntimeEnv, &blockInfo);
//       break;
//     }
//   }
1380
//
L
Liu Jicong 已提交
1381
//   if (terrno != TSDB_CODE_SUCCESS) {
1382
//     T_LONG_JMP(pRuntimeEnv->env, terrno);
L
Liu Jicong 已提交
1383 1384 1385 1386 1387 1388 1389
//   }
// }

// static TSKEY doSkipIntervalProcess(STaskRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo,
// STableQueryInfo* pTableQueryInfo) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
1390
//
L
Liu Jicong 已提交
1391 1392 1393
//   assert(pQueryAttr->limit.offset == 0);
//   STimeWindow tw = *win;
//   getNextTimeWindow(pQueryAttr, &tw);
1394
//
L
Liu Jicong 已提交
1395 1396
//   if ((tw.skey <= pBlockInfo->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) ||
//       (tw.ekey >= pBlockInfo->window.skey && !QUERY_IS_ASC_QUERY(pQueryAttr))) {
1397
//
L
Liu Jicong 已提交
1398 1399 1400 1401
//     // load the data block and check data remaining in current data block
//     // TODO optimize performance
//     SArray *         pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//     SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1402
//
L
Liu Jicong 已提交
1403 1404 1405 1406
//     tw = *win;
//     int32_t startPos =
//         getNextQualifiedWindow(pQueryAttr, &tw, pBlockInfo, pColInfoData->pData, binarySearchForKey, -1);
//     assert(startPos >= 0);
1407
//
L
Liu Jicong 已提交
1408 1409
//     // set the abort info
//     pQueryAttr->pos = startPos;
1410
//
L
Liu Jicong 已提交
1411 1412 1413 1414
//     // reset the query start timestamp
//     pTableQueryInfo->win.skey = ((TSKEY *)pColInfoData->pData)[startPos];
//     pQueryAttr->window.skey = pTableQueryInfo->win.skey;
//     TSKEY key = pTableQueryInfo->win.skey;
1415
//
L
Liu Jicong 已提交
1416 1417
//     pWindowResInfo->prevSKey = tw.skey;
//     int32_t index = pRuntimeEnv->resultRowInfo.curIndex;
1418
//
L
Liu Jicong 已提交
1419 1420
//     int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
//     pRuntimeEnv->resultRowInfo.curIndex = index;  // restore the window index
1421
//
L
Liu Jicong 已提交
1422 1423 1424 1425
//     //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d,
//     lastKey:%" PRId64,
//            GET_TASKID(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes,
//            pQueryAttr->current->lastKey);
1426
//
L
Liu Jicong 已提交
1427 1428 1429 1430 1431
//     return key;
//   } else {  // do nothing
//     pQueryAttr->window.skey      = tw.skey;
//     pWindowResInfo->prevSKey = tw.skey;
//     pTableQueryInfo->lastKey = tw.skey;
1432
//
L
Liu Jicong 已提交
1433 1434
//     return tw.skey;
//   }
1435
//
L
Liu Jicong 已提交
1436 1437 1438 1439 1440 1441 1442 1443 1444 1445
//   return true;
// }

// static bool skipTimeInterval(STaskRuntimeEnv *pRuntimeEnv, TSKEY* start) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//     assert(*start <= pRuntimeEnv->current->lastKey);
//   } else {
//     assert(*start >= pRuntimeEnv->current->lastKey);
//   }
1446
//
L
Liu Jicong 已提交
1447 1448 1449 1450 1451
//   // if queried with value filter, do NOT forward query start position
//   if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL ||
//   pRuntimeEnv->pFillInfo != NULL) {
//     return true;
//   }
1452
//
L
Liu Jicong 已提交
1453 1454 1455 1456 1457 1458 1459
//   /*
//    * 1. for interval without interpolation query we forward pQueryAttr->interval.interval at a time for
//    *    pQueryAttr->limit.offset times. Since hole exists, pQueryAttr->interval.interval*pQueryAttr->limit.offset
//    value is
//    *    not valid. otherwise, we only forward pQueryAttr->limit.offset number of points
//    */
//   assert(pRuntimeEnv->resultRowInfo.prevSKey == TSKEY_INITIAL_VAL);
1460
//
L
Liu Jicong 已提交
1461 1462
//   STimeWindow w = TSWINDOW_INITIALIZER;
//   bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
1463
//
L
Liu Jicong 已提交
1464 1465
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
//   STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
1466
//
L
Liu Jicong 已提交
1467 1468 1469
//   SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
//   while (tsdbNextDataBlock(pRuntimeEnv->pTsdbReadHandle)) {
//     tsdbRetrieveDataBlockInfo(pRuntimeEnv->pTsdbReadHandle, &blockInfo);
1470
//
L
Liu Jicong 已提交
1471 1472 1473 1474 1475 1476 1477 1478 1479
//     if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//       if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
//         getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.skey, blockInfo.window.skey, pQueryAttr->window.ekey,
//         &w); pWindowResInfo->prevSKey = w.skey;
//       }
//     } else {
//       getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.ekey, pQueryAttr->window.ekey, blockInfo.window.ekey, &w);
//       pWindowResInfo->prevSKey = w.skey;
//     }
1480
//
L
Liu Jicong 已提交
1481 1482
//     // the first time window
//     STimeWindow win = getActiveTimeWindow(pWindowResInfo, pWindowResInfo->prevSKey, pQueryAttr);
1483
//
L
Liu Jicong 已提交
1484 1485
//     while (pQueryAttr->limit.offset > 0) {
//       STimeWindow tw = win;
1486
//
L
Liu Jicong 已提交
1487 1488 1489
//       if ((win.ekey <= blockInfo.window.ekey && ascQuery) || (win.ekey >= blockInfo.window.skey && !ascQuery)) {
//         pQueryAttr->limit.offset -= 1;
//         pWindowResInfo->prevSKey = win.skey;
1490
//
L
Liu Jicong 已提交
1491 1492 1493 1494 1495 1496
//         // current time window is aligned with blockInfo.window.ekey
//         // restart it from next data block by set prevSKey to be TSKEY_INITIAL_VAL;
//         if ((win.ekey == blockInfo.window.ekey && ascQuery) || (win.ekey == blockInfo.window.skey && !ascQuery)) {
//           pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL;
//         }
//       }
1497
//
L
Liu Jicong 已提交
1498 1499 1500 1501
//       if (pQueryAttr->limit.offset == 0) {
//         *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//         return true;
//       }
1502
//
L
Liu Jicong 已提交
1503 1504
//       // current window does not ended in current data block, try next data block
//       getNextTimeWindow(pQueryAttr, &tw);
1505
//
L
Liu Jicong 已提交
1506 1507 1508 1509 1510 1511 1512 1513 1514
//       /*
//        * If the next time window still starts from current data block,
//        * load the primary timestamp column first, and then find the start position for the next queried time window.
//        * Note that only the primary timestamp column is required.
//        * TODO: Optimize for this cases. All data blocks are not needed to be loaded, only if the first actually
//        required
//        * time window resides in current data block.
//        */
//       if ((tw.skey <= blockInfo.window.ekey && ascQuery) || (tw.ekey >= blockInfo.window.skey && !ascQuery)) {
1515
//
L
Liu Jicong 已提交
1516 1517
//         SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//         SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1518
//
L
Liu Jicong 已提交
1519 1520 1521
//         if ((win.ekey > blockInfo.window.ekey && ascQuery) || (win.ekey < blockInfo.window.skey && !ascQuery)) {
//           pQueryAttr->limit.offset -= 1;
//         }
1522
//
L
Liu Jicong 已提交
1523 1524 1525 1526 1527 1528 1529 1530
//         if (pQueryAttr->limit.offset == 0) {
//           *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//           return true;
//         } else {
//           tw = win;
//           int32_t startPos =
//               getNextQualifiedWindow(pQueryAttr, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1);
//           assert(startPos >= 0);
1531
//
L
Liu Jicong 已提交
1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542
//           // set the abort info
//           pQueryAttr->pos = startPos;
//           pTableQueryInfo->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
//           pWindowResInfo->prevSKey = tw.skey;
//           win = tw;
//         }
//       } else {
//         break;  // offset is not 0, and next time window begins or ends in the next block.
//       }
//     }
//   }
1543
//
L
Liu Jicong 已提交
1544 1545
//   // check for error
//   if (terrno != TSDB_CODE_SUCCESS) {
1546
//     T_LONG_JMP(pRuntimeEnv->env, terrno);
L
Liu Jicong 已提交
1547
//   }
1548
//
L
Liu Jicong 已提交
1549 1550
//   return true;
// }
1551

1552
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) {
H
Haojun Liao 已提交
1553
  if (p->pDownstream == NULL) {
H
Haojun Liao 已提交
1554
    assert(p->numOfDownstream == 0);
1555 1556
  }

wafwerar's avatar
wafwerar 已提交
1557
  p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES);
1558 1559 1560 1561 1562 1563 1564
  if (p->pDownstream == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  memcpy(p->pDownstream, pDownstream, num * POINTER_BYTES);
  p->numOfDownstream = num;
  return TSDB_CODE_SUCCESS;
1565 1566
}

dengyihao's avatar
dengyihao 已提交
1567 1568
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                                const char* pKey);
1569

X
Xiaoyu Wang 已提交
1570
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) {
1571
  // todo add more information about exchange operation
1572
  int32_t type = pOperator->operatorType;
X
Xiaoyu Wang 已提交
1573
  if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN ||
1574
      type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN ||
1575
      type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN) {
1576 1577 1578
    *order = TSDB_ORDER_ASC;
    *scanFlag = MAIN_SCAN;
    return TSDB_CODE_SUCCESS;
1579
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
1580
    STableScanInfo* pTableScanInfo = pOperator->info;
H
Haojun Liao 已提交
1581 1582
    *order = pTableScanInfo->base.cond.order;
    *scanFlag = pTableScanInfo->base.scanFlag;
1583
    return TSDB_CODE_SUCCESS;
1584 1585
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) {
    STableMergeScanInfo* pTableScanInfo = pOperator->info;
H
Haojun Liao 已提交
1586 1587
    *order = pTableScanInfo->base.cond.order;
    *scanFlag = pTableScanInfo->base.scanFlag;
1588
    return TSDB_CODE_SUCCESS;
1589
  } else {
H
Haojun Liao 已提交
1590
    if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
1591
      return TSDB_CODE_INVALID_PARA;
H
Haojun Liao 已提交
1592
    } else {
1593
      return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag);
1594 1595 1596
    }
  }
}
1597

1598
// this is a blocking operator
L
Liu Jicong 已提交
1599
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
1600 1601
  if (OPTR_IS_OPENED(pOperator)) {
    return TSDB_CODE_SUCCESS;
1602 1603
  }

H
Haojun Liao 已提交
1604
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
1605
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
1606

1607 1608
  SExprSupp*     pSup = &pOperator->exprSupp;
  SOperatorInfo* downstream = pOperator->pDownstream[0];
1609

1610 1611
  int64_t st = taosGetTimestampUs();

1612 1613 1614
  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;

H
Haojun Liao 已提交
1615
  while (1) {
1616
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
1617 1618 1619 1620
    if (pBlock == NULL) {
      break;
    }

1621 1622
    int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
    if (code != TSDB_CODE_SUCCESS) {
1623
      T_LONG_JMP(pTaskInfo->env, code);
1624
    }
1625

1626
    // there is an scalar expression that needs to be calculated before apply the group aggregation.
1627 1628 1629
    if (pAggInfo->scalarExprSup.pExprInfo != NULL) {
      SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
      code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
1630
      if (code != TSDB_CODE_SUCCESS) {
1631
        T_LONG_JMP(pTaskInfo->env, code);
1632
      }
1633 1634
    }

1635
    // the pDataBlock are always the same one, no need to call this again
1636
    setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId);
1637
    setInputDataBlock(pSup, pBlock, order, scanFlag, true);
1638
    code = doAggregateImpl(pOperator, pSup->pCtx);
1639
    if (code != 0) {
1640
      T_LONG_JMP(pTaskInfo->env, code);
1641
    }
1642 1643
  }

1644
  initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
H
Haojun Liao 已提交
1645
  OPTR_SET_OPENED(pOperator);
1646

1647
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
1648 1649 1650
  return TSDB_CODE_SUCCESS;
}

1651
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
1652
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
1653 1654 1655 1656 1657 1658
  SOptrBasicInfo*   pInfo = &pAggInfo->binfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

L
Liu Jicong 已提交
1659
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1660
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
H
Haojun Liao 已提交
1661
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1662
    setOperatorCompleted(pOperator);
H
Haojun Liao 已提交
1663 1664 1665
    return NULL;
  }

H
Haojun Liao 已提交
1666
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
S
slzhou 已提交
1667 1668
  while (1) {
    doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
H
Haojun Liao 已提交
1669
    doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
S
slzhou 已提交
1670

1671
    if (!hasRemainResults(&pAggInfo->groupResInfo)) {
H
Haojun Liao 已提交
1672
      setOperatorCompleted(pOperator);
S
slzhou 已提交
1673 1674
      break;
    }
1675

S
slzhou 已提交
1676 1677 1678 1679
    if (pInfo->pRes->info.rows > 0) {
      break;
    }
  }
1680

1681
  size_t rows = blockDataGetNumOfRows(pInfo->pRes);
1682 1683
  pOperator->resultInfo.totalRows += rows;

1684
  return (rows == 0) ? NULL : pInfo->pRes;
1685 1686
}

1687
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
L
Liu Jicong 已提交
1688 1689
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
                                               SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) {
1690
  pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
1691 1692 1693 1694 1695
  SSDataBlock* pResBlock = pInfo->pFinalRes;

  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;
  getTableScanInfo(pOperator, &order, &scanFlag);
H
Haojun Liao 已提交
1696

5
54liuyao 已提交
1697
  int64_t ekey = pInfo->existNewGroupBlock->info.window.ekey;
1698 1699
  taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));

1700
  blockDataCleanup(pInfo->pRes);
1701 1702 1703 1704
  doApplyScalarCalculation(pOperator, pInfo->existNewGroupBlock, order, scanFlag);

  taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, ekey);
  taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
1705

1706 1707
  int32_t numOfResultRows = pResultInfo->capacity - pResBlock->info.rows;
  taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows);
H
Haojun Liao 已提交
1708

1709
  pInfo->curGroupId = pInfo->existNewGroupBlock->info.groupId;
1710 1711 1712
  pInfo->existNewGroupBlock = NULL;
}

L
Liu Jicong 已提交
1713 1714
static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
                                            SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) {
1715
  if (taosFillHasMoreResults(pInfo->pFillInfo)) {
H
Haojun Liao 已提交
1716 1717
    int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows;
    taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pFinalRes, numOfResultRows);
1718 1719
    pInfo->pRes->info.groupId = pInfo->curGroupId;
    return;
1720 1721 1722 1723
  }

  // handle the cached new group data block
  if (pInfo->existNewGroupBlock) {
1724 1725 1726 1727 1728 1729
    doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo);
  }
}

static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
  SFillOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
1730
  SExprSupp*         pSup = &pOperator->exprSupp;
1731
  setInputDataBlock(pSup, pBlock, order, scanFlag, false);
1732 1733
  projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL);

1734 1735 1736 1737
  // reset the row value before applying the no-fill functions to the input data block, which is "pBlock" in this case.
  pInfo->pRes->info.rows = 0;
  SExprSupp* pNoFillSupp = &pInfo->noFillExprSupp;
  setInputDataBlock(pNoFillSupp, pBlock, order, scanFlag, false);
1738

1739 1740
  projectApplyFunctions(pNoFillSupp->pExprInfo, pInfo->pRes, pBlock, pNoFillSupp->pCtx, pNoFillSupp->numOfExprs, NULL);
  pInfo->pRes->info.groupId = pBlock->info.groupId;
1741 1742
}

S
slzhou 已提交
1743
static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
1744 1745
  SFillOperatorInfo* pInfo = pOperator->info;
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
1746

H
Haojun Liao 已提交
1747
  SResultInfo* pResultInfo = &pOperator->resultInfo;
H
Haojun Liao 已提交
1748
  SSDataBlock* pResBlock = pInfo->pFinalRes;
1749 1750

  blockDataCleanup(pResBlock);
1751

H
Haojun Liao 已提交
1752 1753
  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;
1754
  getTableScanInfo(pOperator, &order, &scanFlag);
1755

1756
  doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo);
1757
  if (pResBlock->info.rows > 0) {
1758
    pResBlock->info.groupId = pInfo->curGroupId;
1759
    return pResBlock;
H
Haojun Liao 已提交
1760
  }
1761

H
Haojun Liao 已提交
1762
  SOperatorInfo* pDownstream = pOperator->pDownstream[0];
L
Liu Jicong 已提交
1763
  while (1) {
1764
    SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream);
1765 1766
    if (pBlock == NULL) {
      if (pInfo->totalInputRows == 0) {
H
Haojun Liao 已提交
1767
        setOperatorCompleted(pOperator);
1768 1769
        return NULL;
      }
1770

1771
      taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
1772
    } else {
1773
      blockDataUpdateTsWindow(pBlock, pInfo->primarySrcSlotId);
1774 1775

      blockDataCleanup(pInfo->pRes);
1776 1777
      blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
      blockDataEnsureCapacity(pInfo->pFinalRes, pBlock->info.rows);
1778
      doApplyScalarCalculation(pOperator, pBlock, order, scanFlag);
1779

H
Haojun Liao 已提交
1780 1781 1782
      if (pInfo->curGroupId == 0 || pInfo->curGroupId == pInfo->pRes->info.groupId) {
        pInfo->curGroupId = pInfo->pRes->info.groupId;  // the first data block
        pInfo->totalInputRows += pInfo->pRes->info.rows;
1783

H
Haojun Liao 已提交
1784 1785 1786 1787 1788
        if (order == pInfo->pFillInfo->order) {
          taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, pBlock->info.window.ekey);
        } else {
          taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, pBlock->info.window.skey);
        }
H
Haojun Liao 已提交
1789
        taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
L
Liu Jicong 已提交
1790
      } else if (pInfo->curGroupId != pBlock->info.groupId) {  // the new group data block
1791 1792 1793 1794 1795
        pInfo->existNewGroupBlock = pBlock;

        // Fill the previous group data block, before handle the data block of new group.
        // Close the fill operation for previous group data block
        taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
1796 1797 1798
      }
    }

1799 1800
    int32_t numOfResultRows = pOperator->resultInfo.capacity - pResBlock->info.rows;
    taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows);
1801 1802

    // current group has no more result to return
1803
    if (pResBlock->info.rows > 0) {
1804 1805
      // 1. The result in current group not reach the threshold of output result, continue
      // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
1806
      if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || pInfo->existNewGroupBlock != NULL) {
1807
        pResBlock->info.groupId = pInfo->curGroupId;
1808
        return pResBlock;
1809 1810
      }

1811
      doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo);
1812
      if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) {
1813
        pResBlock->info.groupId = pInfo->curGroupId;
1814
        return pResBlock;
1815 1816 1817
      }
    } else if (pInfo->existNewGroupBlock) {  // try next group
      assert(pBlock != NULL);
1818 1819 1820 1821

      blockDataCleanup(pResBlock);

      doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo);
1822
      if (pResBlock->info.rows > pResultInfo->threshold) {
1823
        pResBlock->info.groupId = pInfo->curGroupId;
1824
        return pResBlock;
1825 1826 1827 1828 1829 1830 1831
      }
    } else {
      return NULL;
    }
  }
}

S
slzhou 已提交
1832 1833 1834 1835 1836 1837 1838 1839
static SSDataBlock* doFill(SOperatorInfo* pOperator) {
  SFillOperatorInfo* pInfo = pOperator->info;
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

S
slzhou 已提交
1840
  SSDataBlock* fillResult = NULL;
S
slzhou 已提交
1841
  while (true) {
S
slzhou 已提交
1842
    fillResult = doFillImpl(pOperator);
S
slzhou 已提交
1843
    if (fillResult == NULL) {
H
Haojun Liao 已提交
1844
      setOperatorCompleted(pOperator);
S
slzhou 已提交
1845 1846 1847
      break;
    }

1848
    doFilter(fillResult, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo);
S
slzhou 已提交
1849 1850 1851 1852 1853
    if (fillResult->info.rows > 0) {
      break;
    }
  }

S
slzhou 已提交
1854
  if (fillResult != NULL) {
1855
    pOperator->resultInfo.totalRows += fillResult->info.rows;
S
slzhou 已提交
1856
  }
S
slzhou 已提交
1857

S
slzhou 已提交
1858
  return fillResult;
S
slzhou 已提交
1859 1860
}

1861
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
C
Cary Xu 已提交
1862 1863 1864 1865 1866
  for (int32_t i = 0; i < numOfExprs; ++i) {
    SExprInfo* pExprInfo = &pExpr[i];
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
        taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
5
54liuyao 已提交
1867 1868
      } else if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_VALUE) {
        taosVariantDestroy(&pExprInfo->base.pParam[j].param);
H
Haojun Liao 已提交
1869
      }
1870
    }
C
Cary Xu 已提交
1871 1872 1873

    taosMemoryFree(pExprInfo->base.pParam);
    taosMemoryFree(pExprInfo->pExpr);
H
Haojun Liao 已提交
1874 1875 1876
  }
}

5
54liuyao 已提交
1877
void destroyOperatorInfo(SOperatorInfo* pOperator) {
1878 1879 1880 1881
  if (pOperator == NULL) {
    return;
  }

1882
  if (pOperator->fpSet.closeFn != NULL) {
1883
    pOperator->fpSet.closeFn(pOperator->info);
1884 1885
  }

H
Haojun Liao 已提交
1886
  if (pOperator->pDownstream != NULL) {
L
Liu Jicong 已提交
1887
    for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
H
Haojun Liao 已提交
1888
      destroyOperatorInfo(pOperator->pDownstream[i]);
1889 1890
    }

wafwerar's avatar
wafwerar 已提交
1891
    taosMemoryFreeClear(pOperator->pDownstream);
H
Haojun Liao 已提交
1892
    pOperator->numOfDownstream = 0;
1893 1894
  }

1895
  cleanupExprSupp(&pOperator->exprSupp);
wafwerar's avatar
wafwerar 已提交
1896
  taosMemoryFreeClear(pOperator);
1897 1898
}

1899 1900 1901 1902 1903 1904
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) {
  *defaultPgsz = 4096;
  while (*defaultPgsz < rowSize * 4) {
    *defaultPgsz <<= 1u;
  }

1905
  // The default buffer for each operator in query is 10MB.
1906
  // at least four pages need to be in buffer
1907 1908
  // TODO: make this variable to be configurable.
  *defaultBufsz = 4096 * 2560;
1909 1910 1911 1912 1913 1914 1915
  if ((*defaultBufsz) <= (*defaultPgsz)) {
    (*defaultBufsz) = (*defaultPgsz) * 4;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
1916 1917
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                         const char* pKey) {
L
Liu Jicong 已提交
1918
  int32_t    code = 0;
1919 1920
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);

1921
  pAggSup->currentPageId = -1;
dengyihao's avatar
dengyihao 已提交
1922 1923
  pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
  pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
1924
  pAggSup->pResultRowHashTable = tSimpleHashInit(10, hashFn);
1925

H
Haojun Liao 已提交
1926
  if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
1927 1928 1929
    return TSDB_CODE_OUT_OF_MEMORY;
  }

dengyihao's avatar
dengyihao 已提交
1930
  uint32_t defaultPgsz = 0;
1931 1932
  uint32_t defaultBufsz = 0;
  getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
H
Haojun Liao 已提交
1933

wafwerar's avatar
wafwerar 已提交
1934
  if (!osTempSpaceAvailable()) {
H
Haojun Liao 已提交
1935 1936 1937
    code = TSDB_CODE_NO_AVAIL_DISK;
    qError("Init stream agg supporter failed since %s, %s", terrstr(code), pKey);
    return code;
wafwerar's avatar
wafwerar 已提交
1938
  }
1939

H
Haojun Liao 已提交
1940
  code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
H
Haojun Liao 已提交
1941
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1942
    qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey);
H
Haojun Liao 已提交
1943 1944 1945
    return code;
  }

H
Haojun Liao 已提交
1946
  return code;
1947 1948
}

1949
void cleanupAggSup(SAggSupporter* pAggSup) {
wafwerar's avatar
wafwerar 已提交
1950
  taosMemoryFreeClear(pAggSup->keyBuf);
1951
  tSimpleHashCleanup(pAggSup->pResultRowHashTable);
H
Haojun Liao 已提交
1952
  destroyDiskbasedBuf(pAggSup->pResultBuf);
1953 1954
}

L
Liu Jicong 已提交
1955 1956
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
                    const char* pkey) {
1957 1958 1959 1960 1961
  int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1962 1963 1964 1965 1966
  code = doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

L
Liu Jicong 已提交
1967
  for (int32_t i = 0; i < numOfCols; ++i) {
1968
    pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf;
1969 1970
  }

1971
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1972 1973
}

L
Liu Jicong 已提交
1974
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) {
wmmhello's avatar
wmmhello 已提交
1975
  ASSERT(numOfRows != 0);
1976 1977
  pResultInfo->capacity = numOfRows;
  pResultInfo->threshold = numOfRows * 0.75;
1978

1979 1980
  if (pResultInfo->threshold == 0) {
    pResultInfo->threshold = numOfRows;
1981 1982 1983
  }
}

1984 1985 1986 1987 1988
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
  pInfo->pRes = pBlock;
  initResultRowInfo(&pInfo->resultRowInfo);
}

5
54liuyao 已提交
1989
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
1990 1991 1992 1993 1994 1995 1996 1997 1998 1999
  if (pCtx == NULL) {
    return NULL;
  }

  for (int32_t i = 0; i < numOfOutput; ++i) {
    for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) {
      taosVariantDestroy(&pCtx[i].param[j].param);
    }

    taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
2000
    taosMemoryFreeClear(pCtx[i].subsidiaries.buf);
2001 2002 2003 2004 2005 2006 2007 2008
    taosMemoryFree(pCtx[i].input.pData);
    taosMemoryFree(pCtx[i].input.pColumnDataAgg);
  }

  taosMemoryFreeClear(pCtx);
  return NULL;
}

2009
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) {
2010 2011 2012 2013
  pSup->pExprInfo = pExprInfo;
  pSup->numOfExprs = numOfExpr;
  if (pSup->pExprInfo != NULL) {
    pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset);
2014 2015 2016
    if (pSup->pCtx == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
2017
  }
2018 2019

  return TSDB_CODE_SUCCESS;
2020 2021
}

2022 2023 2024 2025
void cleanupExprSupp(SExprSupp* pSupp) {
  destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs);
  if (pSupp->pExprInfo != NULL) {
    destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
C
Cary Xu 已提交
2026
    taosMemoryFreeClear(pSupp->pExprInfo);
2027
  }
H
Haojun Liao 已提交
2028 2029 2030 2031 2032 2033

  if (pSupp->pFilterInfo != NULL) {
    filterFreeInfo(pSupp->pFilterInfo);
    pSupp->pFilterInfo = NULL;
  }

2034 2035 2036
  taosMemoryFree(pSupp->rowEntryInfoOffset);
}

2037 2038
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode,
                                           SExecTaskInfo* pTaskInfo) {
wafwerar's avatar
wafwerar 已提交
2039
  SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
L
Liu Jicong 已提交
2040
  SOperatorInfo*    pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
2041 2042 2043
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
H
Haojun Liao 已提交
2044

H
Haojun Liao 已提交
2045 2046 2047 2048
  SSDataBlock* pResBlock = createResDataBlock(pAggNode->node.pOutputDataBlockDesc);
  initBasicInfo(&pInfo->binfo, pResBlock);

  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
2049
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
2050

2051 2052 2053
  int32_t    num = 0;
  SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
  int32_t    code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
L
Liu Jicong 已提交
2054
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2055 2056
    goto _error;
  }
H
Haojun Liao 已提交
2057

H
Haojun Liao 已提交
2058 2059 2060 2061 2062 2063
  int32_t    numOfScalarExpr = 0;
  SExprInfo* pScalarExprInfo = NULL;
  if (pAggNode->pExprs != NULL) {
    pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
  }

2064 2065 2066 2067
  code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2068

H
Haojun Liao 已提交
2069 2070 2071 2072 2073
  code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2074
  pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
2075
  pInfo->groupId = UINT64_MAX;
H
Haojun Liao 已提交
2076

2077 2078 2079
  setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
  pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, NULL);
H
Haojun Liao 已提交
2080

2081 2082
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    STableScanInfo* pTableScanInfo = downstream->info;
H
Haojun Liao 已提交
2083 2084
    pTableScanInfo->base.pdInfo.pExprSup = &pOperator->exprSupp;
    pTableScanInfo->base.pdInfo.pAggSup = &pInfo->aggSup;
2085 2086
  }

H
Haojun Liao 已提交
2087 2088 2089 2090
  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2091 2092

  return pOperator;
H
Haojun Liao 已提交
2093

2094
_error:
H
Haojun Liao 已提交
2095 2096 2097 2098
  if (pInfo != NULL) {
    destroyAggOperatorInfo(pInfo);
  }

2099 2100 2101
  if (pOperator != NULL) {
    cleanupExprSupp(&pOperator->exprSupp);
  }
H
Haojun Liao 已提交
2102

2103
  taosMemoryFreeClear(pOperator);
H
Haojun Liao 已提交
2104
  pTaskInfo->code = code;
H
Haojun Liao 已提交
2105
  return NULL;
2106 2107
}

2108
void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
2109
  assert(pInfo != NULL);
H
Haojun Liao 已提交
2110
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
2111 2112
}

H
Haojun Liao 已提交
2113 2114 2115 2116 2117 2118 2119
static void freeItem(void* pItem) {
  void** p = pItem;
  if (*p != NULL) {
    taosMemoryFreeClear(*p);
  }
}

2120
void destroyAggOperatorInfo(void* param) {
L
Liu Jicong 已提交
2121
  SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
L
Liu Jicong 已提交
2122 2123
  cleanupBasicInfo(&pInfo->binfo);

H
Haojun Liao 已提交
2124
  cleanupAggSup(&pInfo->aggSup);
S
shenglian zhou 已提交
2125
  cleanupExprSupp(&pInfo->scalarExprSup);
H
Haojun Liao 已提交
2126
  cleanupGroupResInfo(&pInfo->groupResInfo);
D
dapan1121 已提交
2127
  taosMemoryFreeClear(param);
2128
}
2129

2130
void destroyFillOperatorInfo(void* param) {
L
Liu Jicong 已提交
2131
  SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param;
2132
  pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
H
Haojun Liao 已提交
2133
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
H
Haojun Liao 已提交
2134 2135
  pInfo->pFinalRes = blockDataDestroy(pInfo->pFinalRes);

2136
  cleanupExprSupp(&pInfo->noFillExprSupp);
H
Haojun Liao 已提交
2137

wafwerar's avatar
wafwerar 已提交
2138
  taosMemoryFreeClear(pInfo->p);
H
Haojun Liao 已提交
2139
  taosArrayDestroy(pInfo->matchInfo.pList);
D
dapan1121 已提交
2140
  taosMemoryFreeClear(param);
2141 2142
}

H
Haojun Liao 已提交
2143 2144 2145 2146
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SExprInfo* pNotFillExpr,
                            int32_t numOfNotFillCols, SNodeListNode* pValNode, STimeWindow win, int32_t capacity,
                            const char* id, SInterval* pInterval, int32_t fillType, int32_t order) {
  SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pValNode);
H
Haojun Liao 已提交
2147

H
Haojun Liao 已提交
2148 2149 2150
  int64_t     startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey;
  STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, startKey);
  w = getFirstQualifiedTimeWindow(startKey, &w, pInterval, order);
H
Haojun Liao 已提交
2151

L
Liu Jicong 已提交
2152 2153
  pInfo->pFillInfo = taosCreateFillInfo(w.skey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo,
                                        pInfo->primaryTsCol, order, id);
H
Haojun Liao 已提交
2154

H
Haojun Liao 已提交
2155 2156 2157 2158 2159 2160 2161
  if (order == TSDB_ORDER_ASC) {
    pInfo->win.skey = win.skey;
    pInfo->win.ekey = win.ekey;
  } else {
    pInfo->win.skey = win.ekey;
    pInfo->win.ekey = win.skey;
  }
L
Liu Jicong 已提交
2162
  pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
2163

H
Haojun Liao 已提交
2164
  if (pInfo->pFillInfo == NULL || pInfo->p == NULL) {
H
Haojun Liao 已提交
2165 2166
    taosMemoryFree(pInfo->pFillInfo);
    taosMemoryFree(pInfo->p);
H
Haojun Liao 已提交
2167 2168 2169 2170 2171 2172
    return TSDB_CODE_OUT_OF_MEMORY;
  } else {
    return TSDB_CODE_SUCCESS;
  }
}

2173
static bool isWstartColumnExist(SFillOperatorInfo* pInfo) {
2174
  if (pInfo->noFillExprSupp.numOfExprs == 0) {
2175 2176
    return false;
  }
2177 2178 2179

  for (int32_t i = 0; i < pInfo->noFillExprSupp.numOfExprs; ++i) {
    SExprInfo* exprInfo = pInfo->noFillExprSupp.pExprInfo + i;
2180
    if (exprInfo->pExpr->nodeType == QUERY_NODE_COLUMN && exprInfo->base.numOfParams == 1 &&
2181 2182 2183 2184 2185 2186 2187
        exprInfo->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_START) {
      return true;
    }
  }
  return false;
}

2188 2189
static int32_t createPrimaryTsExprIfNeeded(SFillOperatorInfo* pInfo, SFillPhysiNode* pPhyFillNode, SExprSupp* pExprSupp,
                                           const char* idStr) {
2190
  bool wstartExist = isWstartColumnExist(pInfo);
2191

2192 2193
  if (wstartExist == false) {
    if (pPhyFillNode->pWStartTs->type != QUERY_NODE_TARGET) {
2194
      qError("pWStartTs of fill physical node is not a target node, %s", idStr);
2195 2196 2197
      return TSDB_CODE_QRY_SYS_ERROR;
    }

2198 2199
    SExprInfo* pExpr = taosMemoryRealloc(pExprSupp->pExprInfo, (pExprSupp->numOfExprs + 1) * sizeof(SExprInfo));
    if (pExpr == NULL) {
2200 2201 2202
      return TSDB_CODE_OUT_OF_MEMORY;
    }

2203 2204 2205
    createExprFromTargetNode(&pExpr[pExprSupp->numOfExprs], (STargetNode*)pPhyFillNode->pWStartTs);
    pExprSupp->numOfExprs += 1;
    pExprSupp->pExprInfo = pExpr;
2206
  }
2207

2208 2209 2210
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
2211 2212
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode,
                                      SExecTaskInfo* pTaskInfo) {
2213 2214 2215 2216 2217 2218
  SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

H
Haojun Liao 已提交
2219 2220 2221 2222
  pInfo->pRes = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc);
  SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr);
  pOperator->exprSupp.pExprInfo = pExprInfo;

2223 2224 2225 2226 2227 2228 2229 2230
  SExprSupp* pNoFillSupp = &pInfo->noFillExprSupp;
  pNoFillSupp->pExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pNoFillSupp->numOfExprs);
  int32_t code = createPrimaryTsExprIfNeeded(pInfo, pPhyFillNode, pNoFillSupp, pTaskInfo->id.str);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  code = initExprSupp(pNoFillSupp, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs);
2231 2232 2233
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
2234

L
Liu Jicong 已提交
2235
  SInterval* pInterval =
2236
      QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
2237 2238
          ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
          : &((SIntervalAggOperatorInfo*)downstream->info)->interval;
2239

2240
  int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
2241
  int32_t type = convertFillType(pPhyFillNode->mode);
2242

H
Haojun Liao 已提交
2243
  SResultInfo* pResultInfo = &pOperator->resultInfo;
H
Haojun Liao 已提交
2244

2245
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
2246 2247 2248 2249 2250
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
  code = initExprSupp(&pOperator->exprSupp, pExprInfo, pInfo->numOfExpr);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
2251

H
Haojun Liao 已提交
2252 2253
  pInfo->primaryTsCol = ((STargetNode*)pPhyFillNode->pWStartTs)->slotId;
  pInfo->primarySrcSlotId = ((SColumnNode*)((STargetNode*)pPhyFillNode->pWStartTs)->pExpr)->slotId;
2254

2255
  int32_t numOfOutputCols = 0;
H
Haojun Liao 已提交
2256 2257
  code = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols,
                             COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
2258

2259
  code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs,
2260 2261
                      (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity,
                      pTaskInfo->id.str, pInterval, type, order);
2262 2263 2264
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2265

H
Haojun Liao 已提交
2266
  pInfo->pFinalRes = createOneDataBlock(pInfo->pRes, false);
H
Haojun Liao 已提交
2267 2268
  blockDataEnsureCapacity(pInfo->pFinalRes, pOperator->resultInfo.capacity);

H
Haojun Liao 已提交
2269 2270 2271 2272 2273
  code = filterInitFromNode((SNode*)pPhyFillNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2274
  setOperatorInfo(pOperator, "FillOperator", QUERY_NODE_PHYSICAL_PLAN_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo);
2275
  pOperator->exprSupp.numOfExprs = pInfo->numOfExpr;
H
Haojun Liao 已提交
2276
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, destroyFillOperatorInfo, NULL);
2277

2278
  code = appendDownstream(pOperator, &downstream, 1);
2279
  return pOperator;
H
Haojun Liao 已提交
2280

2281
_error:
H
Haojun Liao 已提交
2282 2283 2284 2285
  if (pInfo != NULL) {
    destroyFillOperatorInfo(pInfo);
  }

2286
  pTaskInfo->code = code;
wafwerar's avatar
wafwerar 已提交
2287
  taosMemoryFreeClear(pOperator);
H
Haojun Liao 已提交
2288
  return NULL;
2289 2290
}

D
dapan1121 已提交
2291
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model, char* dbFName) {
wafwerar's avatar
wafwerar 已提交
2292
  SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
H
Haojun Liao 已提交
2293 2294 2295 2296 2297
  if (pTaskInfo == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

2298
  setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
2299

2300
  pTaskInfo->schemaInfo.dbname = strdup(dbFName);
2301
  pTaskInfo->cost.created = taosGetTimestampMs();
H
Haojun Liao 已提交
2302
  pTaskInfo->id.queryId = queryId;
dengyihao's avatar
dengyihao 已提交
2303
  pTaskInfo->execModel = model;
H
Haojun Liao 已提交
2304
  pTaskInfo->pTableInfoList = tableListCreate();
D
dapan1121 已提交
2305
  pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo));
H
Haojun Liao 已提交
2306

wafwerar's avatar
wafwerar 已提交
2307
  char* p = taosMemoryCalloc(1, 128);
L
Liu Jicong 已提交
2308
  snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId);
H
Haojun Liao 已提交
2309
  pTaskInfo->id.str = p;
H
Haojun Liao 已提交
2310

2311 2312
  return pTaskInfo;
}
H
Haojun Liao 已提交
2313

H
Haojun Liao 已提交
2314 2315
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);

2316
int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) {
2317 2318
  SMetaReader mr = {0};
  metaReaderInit(&mr, pHandle->meta, 0);
2319
  int32_t code = metaGetTableEntryByUid(&mr, pScanNode->uid);
2320
  if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
2321 2322
    qError("failed to get the table meta, uid:0x%" PRIx64 ", suid:0x%" PRIx64 ", %s", pScanNode->uid, pScanNode->suid,
           GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
2323

D
dapan1121 已提交
2324
    metaReaderClear(&mr);
2325
    return terrno;
D
dapan1121 已提交
2326
  }
2327

2328 2329
  SSchemaInfo* pSchemaInfo = &pTaskInfo->schemaInfo;
  pSchemaInfo->tablename = strdup(mr.me.name);
2330 2331

  if (mr.me.type == TSDB_SUPER_TABLE) {
2332 2333
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
    pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
2334
  } else if (mr.me.type == TSDB_CHILD_TABLE) {
2335 2336
    tDecoderClear(&mr.coder);

2337 2338
    tb_uid_t suid = mr.me.ctbEntry.suid;
    metaGetTableEntryByUid(&mr, suid);
2339 2340
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
    pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
2341
  } else {
2342
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
2343
  }
2344 2345

  metaReaderClear(&mr);
2346

H
Haojun Liao 已提交
2347 2348 2349 2350 2351
  pSchemaInfo->qsw = extractQueriedColumnSchema(pScanNode);
  return TSDB_CODE_SUCCESS;
}

SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
2352 2353 2354
  int32_t numOfCols = LIST_LENGTH(pScanNode->pScanCols);
  int32_t numOfTags = LIST_LENGTH(pScanNode->pScanPseudoCols);

2355
  SSchemaWrapper* pqSw = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
2356
  pqSw->pSchema = taosMemoryCalloc(numOfCols + numOfTags, sizeof(SSchema));
2357

L
Liu Jicong 已提交
2358
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
2359
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanCols, i);
2360 2361
    SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

H
Haojun Liao 已提交
2362 2363 2364
    SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
    pSchema->colId = pColNode->colId;
    pSchema->type = pColNode->node.resType.type;
H
Haojun Liao 已提交
2365 2366
    pSchema->bytes = pColNode->node.resType.bytes;
    tstrncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
2367 2368
  }

2369
  // this the tags and pseudo function columns, we only keep the tag columns
2370
  for (int32_t i = 0; i < numOfTags; ++i) {
2371 2372 2373 2374 2375 2376 2377 2378 2379
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanPseudoCols, i);

    int32_t type = nodeType(pNode->pExpr);
    if (type == QUERY_NODE_COLUMN) {
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

      SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
      pSchema->colId = pColNode->colId;
      pSchema->type = pColNode->node.resType.type;
H
Haojun Liao 已提交
2380
      pSchema->bytes = pColNode->node.resType.bytes;
H
Haojun Liao 已提交
2381
      tstrncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
2382 2383 2384
    }
  }

H
Haojun Liao 已提交
2385
  return pqSw;
2386 2387
}

2388 2389
static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
  taosMemoryFreeClear(pSchemaInfo->dbname);
2390
  taosMemoryFreeClear(pSchemaInfo->tablename);
2391 2392
  tDeleteSSchemaWrapper(pSchemaInfo->sw);
  tDeleteSSchemaWrapper(pSchemaInfo->qsw);
2393 2394
}

2395
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); }
2396

2397
bool groupbyTbname(SNodeList* pGroupList) {
2398
  bool bytbname = false;
2399
  if (LIST_LENGTH(pGroupList) == 1) {
2400 2401 2402 2403 2404 2405 2406 2407 2408 2409
    SNode* p = nodesListGetNode(pGroupList, 0);
    if (p->type == QUERY_NODE_FUNCTION) {
      // partition by tbname/group by tbname
      bytbname = (strcmp(((struct SFunctionNode*)p)->functionName, "tbname") == 0);
    }
  }

  return bytbname;
}

2410 2411
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond,
                                  SNode* pTagIndexCond, const char* pUser) {
2412
  int32_t         type = nodeType(pPhyNode);
2413
  STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
2414
  const char*     idstr = GET_TASKID(pTaskInfo);
2415

X
Xiaoyu Wang 已提交
2416
  if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
2417
    SOperatorInfo* pOperator = NULL;
H
Haojun Liao 已提交
2418
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
dengyihao's avatar
dengyihao 已提交
2419
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
2420

2421 2422 2423 2424 2425 2426
      // NOTE: this is an patch to fix the physical plan
      // TODO remove it later
      if (pTableScanNode->scan.node.pLimit != NULL) {
        pTableScanNode->groupSort = true;
      }

L
Liu Jicong 已提交
2427 2428
      int32_t code =
          createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
H
Haojun Liao 已提交
2429
                                  pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
2430
      if (code) {
wmmhello's avatar
wmmhello 已提交
2431
        pTaskInfo->code = code;
2432
        qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr);
D
dapan1121 已提交
2433 2434
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
2435

2436
      code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo);
S
slzhou 已提交
2437
      if (code) {
2438
        pTaskInfo->code = terrno;
wmmhello's avatar
wmmhello 已提交
2439 2440 2441
        return NULL;
      }

2442
      pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
D
dapan1121 已提交
2443 2444 2445 2446 2447
      if (NULL == pOperator) {
        pTaskInfo->code = terrno;
        return NULL;
      }

2448
      STableScanInfo* pScanInfo = pOperator->info;
H
Haojun Liao 已提交
2449
      pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
S
slzhou 已提交
2450 2451
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
      STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
2452 2453 2454

      int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, true, pHandle,
                                             pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
L
Liu Jicong 已提交
2455
      if (code) {
wmmhello's avatar
wmmhello 已提交
2456
        pTaskInfo->code = code;
H
Haojun Liao 已提交
2457
        qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2458 2459
        return NULL;
      }
2460

2461
      code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo);
wmmhello's avatar
wmmhello 已提交
2462 2463 2464 2465
      if (code) {
        pTaskInfo->code = terrno;
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
2466

H
Haojun Liao 已提交
2467
      pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
D
dapan1121 已提交
2468 2469 2470 2471
      if (NULL == pOperator) {
        pTaskInfo->code = terrno;
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
2472

2473
      STableScanInfo* pScanInfo = pOperator->info;
H
Haojun Liao 已提交
2474
      pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
H
Haojun Liao 已提交
2475
    } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
2476 2477
      pOperator = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode,
                                             pTaskInfo);
H
Haojun Liao 已提交
2478
    } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
5
54liuyao 已提交
2479
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
5
54liuyao 已提交
2480
      if (pHandle->vnode) {
L
Liu Jicong 已提交
2481 2482
        int32_t code =
            createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
H
Haojun Liao 已提交
2483
                                    pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
L
Liu Jicong 已提交
2484
        if (code) {
wmmhello's avatar
wmmhello 已提交
2485
          pTaskInfo->code = code;
H
Haojun Liao 已提交
2486
          qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2487 2488
          return NULL;
        }
L
Liu Jicong 已提交
2489 2490

#ifndef NDEBUG
H
Haojun Liao 已提交
2491
        int32_t sz = tableListGetSize(pTableListInfo);
H
Haojun Liao 已提交
2492 2493
        qDebug("create stream task, total:%d", sz);

L
Liu Jicong 已提交
2494
        for (int32_t i = 0; i < sz; i++) {
H
Haojun Liao 已提交
2495
          STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
2496
          qDebug("add table uid:%" PRIu64 ", gid:%" PRIu64, pKeyInfo->uid, pKeyInfo->groupId);
L
Liu Jicong 已提交
2497 2498
        }
#endif
2499
      }
2500

H
Haojun Liao 已提交
2501
      pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan);
2502
      pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo);
H
Haojun Liao 已提交
2503
    } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
L
Liu Jicong 已提交
2504
      SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
2505
      pOperator = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo);
2506
    } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
X
Xiaoyu Wang 已提交
2507
      STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
2508 2509

      int32_t code = createScanTableListInfo(pScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
H
Haojun Liao 已提交
2510
                                             pTagIndexCond, pTaskInfo);
2511
      if (code != TSDB_CODE_SUCCESS) {
2512
        pTaskInfo->code = code;
H
Haojun Liao 已提交
2513
        qError("failed to getTableList, code: %s", tstrerror(code));
2514 2515 2516
        return NULL;
      }

2517
      pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo);
2518
    } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
2519
      SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
2520 2521

      if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
H
Haojun Liao 已提交
2522 2523
        SArray* pList = taosArrayInit(4, sizeof(STableKeyInfo));
        int32_t code = vnodeGetAllTableList(pHandle->vnode, pBlockNode->uid, pList);
2524 2525 2526 2527
        if (code != TSDB_CODE_SUCCESS) {
          pTaskInfo->code = terrno;
          return NULL;
        }
H
Haojun Liao 已提交
2528

2529
        for (int32_t i = 0; i < tableListGetSize(pTableListInfo); ++i) {
H
Haojun Liao 已提交
2530
          STableKeyInfo* p = taosArrayGet(pList, i);
H
Haojun Liao 已提交
2531
          tableListAddTableInfo(pTableListInfo, p->uid, 0);
H
Haojun Liao 已提交
2532 2533
        }
        taosArrayDestroy(pList);
2534
      } else {  // Create group with only one table
H
Haojun Liao 已提交
2535
        tableListAddTableInfo(pTableListInfo, pBlockNode->uid, 0);
2536 2537
      }

2538
      pOperator = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTaskInfo);
H
Haojun Liao 已提交
2539 2540 2541
    } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
      SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;

L
Liu Jicong 已提交
2542
      int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo,
H
Haojun Liao 已提交
2543
                                             pTagCond, pTagIndexCond, pTaskInfo);
2544 2545 2546 2547
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
      }
2548

2549
      code = extractTableSchemaInfo(pHandle, &pScanNode->scan, pTaskInfo);
2550 2551 2552
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
H
Haojun Liao 已提交
2553 2554
      }

2555
      pOperator = createCacherowsScanOperator(pScanNode, pHandle, pTaskInfo);
2556
    } else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
2557
      pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
2558 2559
    } else {
      ASSERT(0);
H
Haojun Liao 已提交
2560
    }
2561 2562 2563 2564 2565

    if (pOperator != NULL) {
      pOperator->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
    }

2566
    return pOperator;
H
Haojun Liao 已提交
2567 2568
  }

2569
  size_t          size = LIST_LENGTH(pPhyNode->pChildren);
2570
  SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
2571 2572 2573 2574
  if (ops == NULL) {
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
2575
  for (int32_t i = 0; i < size; ++i) {
2576
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
2577
    ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTagCond, pTagIndexCond, pUser);
2578
    if (ops[i] == NULL) {
H
Haojun Liao 已提交
2579
      taosMemoryFree(ops);
2580 2581
      return NULL;
    }
2582
  }
H
Haojun Liao 已提交
2583

2584
  SOperatorInfo* pOptr = NULL;
H
Haojun Liao 已提交
2585
  if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
2586
    pOptr = createProjectOperatorInfo(ops[0], (SProjectPhysiNode*)pPhyNode, pTaskInfo);
2587
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) {
H
Haojun Liao 已提交
2588 2589
    SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
    if (pAggNode->pGroupKeys != NULL) {
H
Haojun Liao 已提交
2590
      pOptr = createGroupOperatorInfo(ops[0], pAggNode, pTaskInfo);
H
Haojun Liao 已提交
2591
    } else {
H
Haojun Liao 已提交
2592
      pOptr = createAggregateOperatorInfo(ops[0], pAggNode, pTaskInfo);
H
Haojun Liao 已提交
2593
    }
2594
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type) {
H
Haojun Liao 已提交
2595
    SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
2596

H
Haojun Liao 已提交
2597 2598
    bool isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
    pOptr = createIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo, isStream);
2599
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
2600
    pOptr = createStreamIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo);
2601 2602
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
    SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode;
2603
    pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo);
S
shenglian zhou 已提交
2604
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
X
Xiaoyu Wang 已提交
2605
    SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
2606
    pOptr = createMergeIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo);
5
54liuyao 已提交
2607
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
2608
    int32_t children = 0;
5
54liuyao 已提交
2609 2610
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) {
5
54liuyao 已提交
2611
    int32_t children = pHandle->numOfVgroups;
5
54liuyao 已提交
2612
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
2613
  } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
2614
    pOptr = createSortOperatorInfo(ops[0], (SSortPhysiNode*)pPhyNode, pTaskInfo);
S
shenglian zhou 已提交
2615 2616
  } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT == type) {
    pOptr = createGroupSortOperatorInfo(ops[0], (SGroupSortPhysiNode*)pPhyNode, pTaskInfo);
X
Xiaoyu Wang 已提交
2617
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) {
2618
    SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode;
2619
    pOptr = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo);
2620
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
H
Haojun Liao 已提交
2621
    SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
2622
    pOptr = createSessionAggOperatorInfo(ops[0], pSessionNode, pTaskInfo);
2623
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
2624 2625 2626 2627 2628
    pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
    int32_t children = 0;
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION == type) {
2629
    int32_t children = pHandle->numOfVgroups;
2630
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
2631
  } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
2632
    pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
2633
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION == type) {
2634
    pOptr = createStreamPartitionOperatorInfo(ops[0], (SStreamPartitionPhysiNode*)pPhyNode, pTaskInfo);
2635
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
dengyihao's avatar
dengyihao 已提交
2636
    SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
2637
    pOptr = createStatewindowOperatorInfo(ops[0], pStateNode, pTaskInfo);
2638
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
5
54liuyao 已提交
2639
    pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
2640
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
2641
    pOptr = createMergeJoinOperatorInfo(ops, size, (SSortMergeJoinPhysiNode*)pPhyNode, pTaskInfo);
2642
  } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
H
Haojun Liao 已提交
2643
    pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo);
5
54liuyao 已提交
2644 2645
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL == type) {
    pOptr = createStreamFillOperatorInfo(ops[0], (SStreamFillPhysiNode*)pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
2646 2647
  } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
    pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo);
2648 2649
  } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
    pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
2650 2651
  } else {
    ASSERT(0);
H
Haojun Liao 已提交
2652
  }
2653

2654
  taosMemoryFree(ops);
2655 2656 2657 2658
  if (pOptr) {
    pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
  }

2659
  return pOptr;
2660
}
H
Haojun Liao 已提交
2661

L
Liu Jicong 已提交
2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674
static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator");
      return TSDB_CODE_QRY_APP_ERROR;
    }

    if (pOperator->numOfDownstream > 1) {
      qError("join not supported for stream block scan");
      return TSDB_CODE_QRY_APP_ERROR;
    }
    return extractTbscanInStreamOpTree(pOperator->pDownstream[0], ppInfo);
  } else {
2675 2676 2677
    SStreamScanInfo* pInfo = pOperator->info;
    ASSERT(pInfo->pTableScanOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
    *ppInfo = pInfo->pTableScanOp->info;
L
Liu Jicong 已提交
2678 2679 2680 2681
    return 0;
  }
}

2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703
int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) {
  if (pNode->pChildren == NULL || LIST_LENGTH(pNode->pChildren) == 0) {
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == pNode->type) {
      *ppNode = (STableScanPhysiNode*)pNode;
      return 0;
    } else {
      ASSERT(0);
      terrno = TSDB_CODE_QRY_APP_ERROR;
      return -1;
    }
  } else {
    if (LIST_LENGTH(pNode->pChildren) != 1) {
      ASSERT(0);
      terrno = TSDB_CODE_QRY_APP_ERROR;
      return -1;
    }
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pNode->pChildren, 0);
    return extractTableScanNode(pChildNode, ppNode);
  }
  return -1;
}

2704
#if 0
L
Liu Jicong 已提交
2705 2706 2707 2708 2709
int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHandle, int64_t uid, int64_t ts) {
  STableScanInfo* pTableScanInfo = NULL;
  if (extractTbscanInStreamOpTree(pOperator, &pTableScanInfo) < 0) {
    return -1;
  }
2710

L
Liu Jicong 已提交
2711 2712 2713 2714
  STableScanPhysiNode* pNode = NULL;
  if (extractTableScanNode(plan->pNode, &pNode) < 0) {
    ASSERT(0);
  }
2715

H
Haojun Liao 已提交
2716
  tsdbReaderClose(pTableScanInfo->dataReader);
2717

L
Liu Jicong 已提交
2718
  STableListInfo info = {0};
H
Haojun Liao 已提交
2719
  pTableScanInfo->dataReader = doCreateDataReader(pNode, pHandle, &info, NULL);
L
Liu Jicong 已提交
2720 2721 2722 2723
  if (pTableScanInfo->dataReader == NULL) {
    ASSERT(0);
    qError("failed to create data reader");
    return TSDB_CODE_QRY_APP_ERROR;
2724
  }
L
Liu Jicong 已提交
2725
  // TODO: set uid and ts to data reader
2726 2727
  return 0;
}
2728
#endif
2729

C
Cary Xu 已提交
2730
int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length, int32_t* nOptrWithVal) {
wmmhello's avatar
wmmhello 已提交
2731
  int32_t code = TDB_CODE_SUCCESS;
2732
  char*   pCurrent = NULL;
wmmhello's avatar
wmmhello 已提交
2733
  int32_t currLength = 0;
2734
  if (ops->fpSet.encodeResultRow) {
C
Cary Xu 已提交
2735
    if (result == NULL || length == NULL || nOptrWithVal == NULL) {
wmmhello's avatar
wmmhello 已提交
2736 2737 2738
      return TSDB_CODE_TSC_INVALID_INPUT;
    }
    code = ops->fpSet.encodeResultRow(ops, &pCurrent, &currLength);
wmmhello's avatar
wmmhello 已提交
2739

2740 2741
    if (code != TDB_CODE_SUCCESS) {
      if (*result != NULL) {
wmmhello's avatar
wmmhello 已提交
2742 2743 2744 2745
        taosMemoryFree(*result);
        *result = NULL;
      }
      return code;
C
Cary Xu 已提交
2746 2747 2748
    } else if (currLength == 0) {
      ASSERT(!pCurrent);
      goto _downstream;
wmmhello's avatar
wmmhello 已提交
2749
    }
wmmhello's avatar
wmmhello 已提交
2750

C
Cary Xu 已提交
2751 2752
    ++(*nOptrWithVal);

C
Cary Xu 已提交
2753
    ASSERT(currLength >= 0);
wmmhello's avatar
wmmhello 已提交
2754

2755
    if (*result == NULL) {
wmmhello's avatar
wmmhello 已提交
2756
      *result = (char*)taosMemoryCalloc(1, currLength + sizeof(int32_t));
wmmhello's avatar
wmmhello 已提交
2757 2758 2759 2760 2761 2762
      if (*result == NULL) {
        taosMemoryFree(pCurrent);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      memcpy(*result + sizeof(int32_t), pCurrent, currLength);
      *(int32_t*)(*result) = currLength + sizeof(int32_t);
2763
    } else {
wmmhello's avatar
wmmhello 已提交
2764
      int32_t sizePre = *(int32_t*)(*result);
2765
      char*   tmp = (char*)taosMemoryRealloc(*result, sizePre + currLength);
wmmhello's avatar
wmmhello 已提交
2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777
      if (tmp == NULL) {
        taosMemoryFree(pCurrent);
        taosMemoryFree(*result);
        *result = NULL;
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      *result = tmp;
      memcpy(*result + sizePre, pCurrent, currLength);
      *(int32_t*)(*result) += currLength;
    }
    taosMemoryFree(pCurrent);
    *length = *(int32_t*)(*result);
wmmhello's avatar
wmmhello 已提交
2778 2779
  }

2780
_downstream:
wmmhello's avatar
wmmhello 已提交
2781
  for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
C
Cary Xu 已提交
2782
    code = encodeOperator(ops->pDownstream[i], result, length, nOptrWithVal);
2783
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2784
      return code;
wmmhello's avatar
wmmhello 已提交
2785 2786
    }
  }
wmmhello's avatar
wmmhello 已提交
2787
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2788 2789
}

H
Haojun Liao 已提交
2790
int32_t decodeOperator(SOperatorInfo* ops, const char* result, int32_t length) {
wmmhello's avatar
wmmhello 已提交
2791
  int32_t code = TDB_CODE_SUCCESS;
2792 2793
  if (ops->fpSet.decodeResultRow) {
    if (result == NULL) {
wmmhello's avatar
wmmhello 已提交
2794 2795
      return TSDB_CODE_TSC_INVALID_INPUT;
    }
H
Haojun Liao 已提交
2796

2797
    ASSERT(length == *(int32_t*)result);
H
Haojun Liao 已提交
2798 2799

    const char* data = result + sizeof(int32_t);
L
Liu Jicong 已提交
2800
    code = ops->fpSet.decodeResultRow(ops, (char*)data);
2801
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2802 2803
      return code;
    }
wmmhello's avatar
wmmhello 已提交
2804

wmmhello's avatar
wmmhello 已提交
2805
    int32_t totalLength = *(int32_t*)result;
2806 2807
    int32_t dataLength = *(int32_t*)data;

2808
    if (totalLength == dataLength + sizeof(int32_t)) {  // the last data
wmmhello's avatar
wmmhello 已提交
2809 2810
      result = NULL;
      length = 0;
2811
    } else {
wmmhello's avatar
wmmhello 已提交
2812 2813 2814 2815
      result += dataLength;
      *(int32_t*)(result) = totalLength - dataLength;
      length = totalLength - dataLength;
    }
wmmhello's avatar
wmmhello 已提交
2816 2817
  }

wmmhello's avatar
wmmhello 已提交
2818 2819
  for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
    code = decodeOperator(ops->pDownstream[i], result, length);
2820
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
2821
      return code;
wmmhello's avatar
wmmhello 已提交
2822 2823
    }
  }
wmmhello's avatar
wmmhello 已提交
2824
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2825 2826
}

D
dapan1121 已提交
2827
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle) {
D
dapan1121 已提交
2828
  SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo;
2829

D
dapan1121 已提交
2830
  switch (pNode->type) {
D
dapan1121 已提交
2831 2832 2833 2834 2835 2836
    case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
      SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
      if (NULL == pInserterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      pInserterParam->readHandle = readHandle;
L
Liu Jicong 已提交
2837

D
dapan1121 已提交
2838 2839 2840
      *pParam = pInserterParam;
      break;
    }
D
dapan1121 已提交
2841
    case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
2842
      SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
D
dapan1121 已提交
2843 2844 2845
      if (NULL == pDeleterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
H
Haojun Liao 已提交
2846 2847 2848 2849
      int32_t tbNum = tableListGetSize(pTask->pTableInfoList);
      pDeleterParam->suid = tableListGetSuid(pTask->pTableInfoList);

      // TODO extract uid list
D
dapan1121 已提交
2850 2851 2852 2853 2854
      pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t));
      if (NULL == pDeleterParam->pUidList) {
        taosMemoryFree(pDeleterParam);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
H
Haojun Liao 已提交
2855

D
dapan1121 已提交
2856
      for (int32_t i = 0; i < tbNum; ++i) {
H
Haojun Liao 已提交
2857
        STableKeyInfo* pTable = tableListGetInfo(pTask->pTableInfoList, i);
D
dapan1121 已提交
2858 2859 2860 2861 2862 2863 2864 2865 2866 2867 2868 2869 2870
        taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
      }

      *pParam = pDeleterParam;
      break;
    }
    default:
      break;
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2871
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
D
dapan1121 已提交
2872
                               char* sql, EOPTR_EXEC_MODEL model) {
H
Haojun Liao 已提交
2873 2874
  uint64_t queryId = pPlan->id.queryId;

D
dapan1121 已提交
2875
  *pTaskInfo = createExecTaskInfo(queryId, taskId, model, pPlan->dbFName);
H
Haojun Liao 已提交
2876 2877 2878
  if (*pTaskInfo == NULL) {
    goto _complete;
  }
H
Haojun Liao 已提交
2879

2880
  if (pHandle) {
L
Liu Jicong 已提交
2881
    /*(*pTaskInfo)->streamInfo.fillHistoryVer1 = pHandle->fillHistoryVer1;*/
2882 2883 2884
    if (pHandle->pStateBackend) {
      (*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend;
    }
H
Haojun Liao 已提交
2885 2886
  }

2887
  (*pTaskInfo)->sql = sql;
D
dapan1121 已提交
2888
  sql = NULL;
H
Haojun Liao 已提交
2889

2890
  (*pTaskInfo)->pSubplan = pPlan;
2891 2892
  (*pTaskInfo)->pRoot =
      createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user);
L
Liu Jicong 已提交
2893

D
dapan1121 已提交
2894
  if (NULL == (*pTaskInfo)->pRoot) {
H
Haojun Liao 已提交
2895
    terrno = (*pTaskInfo)->code;
D
dapan1121 已提交
2896
    goto _complete;
2897 2898
  }

H
Haojun Liao 已提交
2899
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2900

H
Haojun Liao 已提交
2901
_complete:
D
dapan1121 已提交
2902
  taosMemoryFree(sql);
H
Haojun Liao 已提交
2903
  doDestroyTask(*pTaskInfo);
H
Haojun Liao 已提交
2904
  return terrno;
H
Haojun Liao 已提交
2905 2906
}

L
Liu Jicong 已提交
2907
void doDestroyTask(SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
2908 2909
  qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));

H
Haojun Liao 已提交
2910
  pTaskInfo->pTableInfoList = tableListDestroy(pTaskInfo->pTableInfoList);
H
Haojun Liao 已提交
2911
  destroyOperatorInfo(pTaskInfo->pRoot);
2912
  cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);
2913
  cleanupStreamInfo(&pTaskInfo->streamInfo);
2914

D
dapan1121 已提交
2915
  if (!pTaskInfo->localFetch.localExec) {
D
dapan1121 已提交
2916 2917
    nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
  }
2918

D
dapan1121 已提交
2919
  taosArrayDestroy(pTaskInfo->stopInfo.pStopInfo);
wafwerar's avatar
wafwerar 已提交
2920 2921 2922
  taosMemoryFreeClear(pTaskInfo->sql);
  taosMemoryFreeClear(pTaskInfo->id.str);
  taosMemoryFreeClear(pTaskInfo);
2923 2924 2925 2926
}

static int64_t getQuerySupportBufSize(size_t numOfTables) {
  size_t s1 = sizeof(STableQueryInfo);
L
Liu Jicong 已提交
2927 2928
  //  size_t s3 = sizeof(STableCheckInfo);  buffer consumption in tsdb
  return (int64_t)(s1 * 1.5 * numOfTables);
2929 2930 2931 2932 2933 2934 2935
}

int32_t checkForQueryBuf(size_t numOfTables) {
  int64_t t = getQuerySupportBufSize(numOfTables);
  if (tsQueryBufferSizeBytes < 0) {
    return TSDB_CODE_SUCCESS;
  } else if (tsQueryBufferSizeBytes > 0) {
L
Liu Jicong 已提交
2936
    while (1) {
2937 2938 2939 2940 2941 2942 2943 2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960 2961 2962
      int64_t s = tsQueryBufferSizeBytes;
      int64_t remain = s - t;
      if (remain >= 0) {
        if (atomic_val_compare_exchange_64(&tsQueryBufferSizeBytes, s, remain) == s) {
          return TSDB_CODE_SUCCESS;
        }
      } else {
        return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
      }
    }
  }

  // disable query processing if the value of tsQueryBufferSize is zero.
  return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
}

void releaseQueryBuf(size_t numOfTables) {
  if (tsQueryBufferSizeBytes < 0) {
    return;
  }

  int64_t t = getQuerySupportBufSize(numOfTables);

  // restore value is not enough buffer available
  atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
}
D
dapan1121 已提交
2963

H
Haojun Liao 已提交
2964
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) {
2965
  SExplainExecInfo  execInfo = {0};
H
Haojun Liao 已提交
2966
  SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo);
2967

H
Haojun Liao 已提交
2968 2969 2970 2971 2972
  pExplainInfo->numOfRows = operatorInfo->resultInfo.totalRows;
  pExplainInfo->startupCost = operatorInfo->cost.openCost;
  pExplainInfo->totalCost = operatorInfo->cost.totalCost;
  pExplainInfo->verboseLen = 0;
  pExplainInfo->verboseInfo = NULL;
D
dapan1121 已提交
2973

2974
  if (operatorInfo->fpSet.getExplainFn) {
2975 2976
    int32_t code =
        operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen);
D
dapan1121 已提交
2977
    if (code) {
2978
      qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
D
dapan1121 已提交
2979 2980 2981
      return code;
    }
  }
dengyihao's avatar
dengyihao 已提交
2982

D
dapan1121 已提交
2983
  int32_t code = 0;
D
dapan1121 已提交
2984
  for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
H
Haojun Liao 已提交
2985 2986
    code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pExecInfoList);
    if (code != TSDB_CODE_SUCCESS) {
2987
      //      taosMemoryFreeClear(*pRes);
D
dapan1121 已提交
2988 2989 2990 2991 2992
      return TSDB_CODE_QRY_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
2993
}
5
54liuyao 已提交
2994

2995 2996
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
                     SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup) {
2997 2998 2999 3000 3001 3002
  SWinKey key = {
      .ts = win->skey,
      .groupId = tableGroupId,
  };
  char*   value = NULL;
  int32_t size = pAggSup->resultRowSize;
5
54liuyao 已提交
3003

3004
  if (streamStateAddIfNotExist(pState, &key, (void**)&value, &size) < 0) {
3005 3006 3007 3008 3009 3010 3011 3012 3013 3014
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }
  *pResult = (SResultRow*)value;
  ASSERT(*pResult);
  // set time window for current result
  (*pResult)->win = (*win);
  setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset);
  return TSDB_CODE_SUCCESS;
}

3015 3016
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult) {
  streamStateReleaseBuf(pState, pKey, pResult);
3017 3018 3019
  return TSDB_CODE_SUCCESS;
}

3020 3021
int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize) {
  streamStatePut(pState, pKey, pResult, resSize);
3022 3023 3024
  return TSDB_CODE_SUCCESS;
}

3025
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
3026
                                   SGroupResInfo* pGroupResInfo) {
3027
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);

  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
    int32_t     size = 0;
    void*       pVal = NULL;
    SWinKey     key = {
3040 3041
            .ts = *(TSKEY*)pPos->key,
            .groupId = pPos->groupId,
3042
    };
3043
    int32_t code = streamStateGet(pState, &key, &pVal, &size);
3044 3045 3046 3047 3048 3049
    ASSERT(code == 0);
    SResultRow* pRow = (SResultRow*)pVal;
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
    // no results, continue to check the next one
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
3050
      releaseOutputBuf(pState, &key, pRow);
3051 3052 3053 3054 3055
      continue;
    }

    if (pBlock->info.groupId == 0) {
      pBlock->info.groupId = pPos->groupId;
3056 3057
      void* tbname = NULL;
      if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.groupId, &tbname) < 0) {
L
Liu Jicong 已提交
3058
        pBlock->info.parTbName[0] = 0;
3059 3060
      } else {
        memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
3061
      }
3062
      tdbFree(tbname);
3063 3064 3065
    } else {
      // current value belongs to different group, it can't be packed into one datablock
      if (pBlock->info.groupId != pPos->groupId) {
3066
        releaseOutputBuf(pState, &key, pRow);
3067 3068 3069 3070 3071 3072
        break;
      }
    }

    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
      ASSERT(pBlock->info.rows > 0);
3073
      releaseOutputBuf(pState, &key, pRow);
3074 3075 3076 3077 3078 3079 3080 3081 3082 3083
      break;
    }

    pGroupResInfo->index += 1;

    for (int32_t j = 0; j < numOfExprs; ++j) {
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
      if (pCtx[j].fpSet.finalize) {
3084 3085 3086 3087
        int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
        if (TAOS_FAILED(code1)) {
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
          T_LONG_JMP(pTaskInfo->env, code1);
3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100
        }
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
      } else {
        // expand the result into multiple rows. E.g., _wstart, top(k, 20)
        // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
        for (int32_t k = 0; k < pRow->numOfRows; ++k) {
          colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
        }
      }
    }
5
54liuyao 已提交
3101

3102
    pBlock->info.rows += pRow->numOfRows;
3103
    releaseOutputBuf(pState, &key, pRow);
3104 3105 3106 3107
  }
  blockDataUpdateTsWindow(pBlock, 0);
  return TSDB_CODE_SUCCESS;
}
5
54liuyao 已提交
3108 3109 3110 3111 3112 3113 3114

int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size) {
  streamStateSessionPut(pState, key, (const void*)buf, size);
  releaseOutputBuf(pState, NULL, (SResultRow*)buf);
  return TSDB_CODE_SUCCESS;
}

3115
int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
5
54liuyao 已提交
3116
                                    SExprSupp* pSup, SGroupResInfo* pGroupResInfo) {
3117
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
5
54liuyao 已提交
3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);

  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
    SSessionKey* pKey = taosArrayGet(pGroupResInfo->pRows, i);
    int32_t      size = 0;
    void*        pVal = NULL;
    int32_t      code = streamStateSessionGet(pState, pKey, &pVal, &size);
    ASSERT(code == 0);
3131 3132
    if (code == -1) {
      // coverity scan
5
54liuyao 已提交
3133
      pGroupResInfo->index += 1;
3134 3135
      continue;
    }
5
54liuyao 已提交
3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146
    SResultRow* pRow = (SResultRow*)pVal;
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
    // no results, continue to check the next one
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
      releaseOutputBuf(pState, NULL, pRow);
      continue;
    }

    if (pBlock->info.groupId == 0) {
      pBlock->info.groupId = pKey->groupId;
3147

3148 3149 3150
      void* tbname = NULL;
      if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.groupId, &tbname) < 0) {
        pBlock->info.parTbName[0] = 0;
3151
      } else {
3152
        memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
3153
      }
3154
      tdbFree(tbname);
5
54liuyao 已提交
3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173 3174 3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197 3198 3199
    } else {
      // current value belongs to different group, it can't be packed into one datablock
      if (pBlock->info.groupId != pKey->groupId) {
        releaseOutputBuf(pState, NULL, pRow);
        break;
      }
    }

    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
      ASSERT(pBlock->info.rows > 0);
      releaseOutputBuf(pState, NULL, pRow);
      break;
    }

    pGroupResInfo->index += 1;

    for (int32_t j = 0; j < numOfExprs; ++j) {
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
      if (pCtx[j].fpSet.finalize) {
        int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
        if (TAOS_FAILED(code1)) {
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
          T_LONG_JMP(pTaskInfo->env, code1);
        }
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
      } else {
        // expand the result into multiple rows. E.g., _wstart, top(k, 20)
        // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
        for (int32_t k = 0; k < pRow->numOfRows; ++k) {
          colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
        }
      }
    }

    pBlock->info.rows += pRow->numOfRows;
    // saveSessionDiscBuf(pState, pKey, pVal, size);
    releaseOutputBuf(pState, NULL, pRow);
  }
  blockDataUpdateTsWindow(pBlock, 0);
  return TSDB_CODE_SUCCESS;
3200
}