executorimpl.c 96.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

H
Haojun Liao 已提交
16 17
#include "filter.h"
#include "function.h"
18 19
#include "functionMgt.h"
#include "os.h"
H
Haojun Liao 已提交
20
#include "querynodes.h"
21
#include "tfill.h"
dengyihao's avatar
dengyihao 已提交
22
#include "tname.h"
23

H
Haojun Liao 已提交
24
#include "tdatablock.h"
25
#include "tglobal.h"
H
Haojun Liao 已提交
26
#include "tmsg.h"
27
#include "ttime.h"
H
Haojun Liao 已提交
28

29
#include "executorimpl.h"
dengyihao's avatar
dengyihao 已提交
30
#include "index.h"
31
#include "query.h"
32
#include "tcompare.h"
H
Haojun Liao 已提交
33
#include "thash.h"
34
#include "ttypes.h"
dengyihao's avatar
dengyihao 已提交
35
#include "vnode.h"
36

H
Haojun Liao 已提交
37
#define IS_MAIN_SCAN(runtime)          ((runtime)->scanFlag == MAIN_SCAN)
38 39 40 41 42 43
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)

#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)

#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
wafwerar's avatar
wafwerar 已提交
44
  uint32_t v = taosRand();
45 46 47 48

  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
49
    return taosMemoryMalloc(__size);
50 51 52 53
  }
}

static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
wafwerar's avatar
wafwerar 已提交
54
  uint32_t v = taosRand();
55 56 57
  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
58
    return taosMemoryCalloc(num, __size);
59 60 61 62
  }
}

static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
wafwerar's avatar
wafwerar 已提交
63
  uint32_t v = taosRand();
64 65 66
  if (v % 5 <= 1) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
67
    return taosMemoryRealloc(p, __size);
68 69 70 71 72 73 74 75
  }
}

#define calloc  u_calloc
#define malloc  u_malloc
#define realloc u_realloc
#endif

X
Xiaoyu Wang 已提交
76
#define CLEAR_QUERY_STATUS(q, st)   ((q)->status &= (~(st)))
77 78
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)

H
Haojun Liao 已提交
79 80 81 82 83 84 85 86 87
typedef struct SAggOperatorInfo {
  SOptrBasicInfo   binfo;
  SAggSupporter    aggSup;
  STableQueryInfo* current;
  uint64_t         groupId;
  SGroupResInfo    groupResInfo;
  SExprSupp        scalarExprSup;
} SAggOperatorInfo;

88
static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock);
89

X
Xiaoyu Wang 已提交
90
static void releaseQueryBuf(size_t numOfTables);
91

H
Haojun Liao 已提交
92 93 94 95 96 97 98 99 100 101
static void    destroyAggOperatorInfo(void* param);
static void    initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
static void    doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
static void    doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                                const char* pKey);
static void    extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep,
                                                   int32_t status);
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
                                   bool createDummyCol);
H
Haojun Liao 已提交
102 103
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
                                  SGroupResInfo* pGroupResInfo);
H
Haojun Liao 已提交
104

H
Haojun Liao 已提交
105
void setOperatorCompleted(SOperatorInfo* pOperator) {
106
  pOperator->status = OP_EXEC_DONE;
H
Haojun Liao 已提交
107
  ASSERT(pOperator->pTaskInfo != NULL);
108

109
  pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start) / 1000.0;
H
Haojun Liao 已提交
110
  setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
111
}
112

H
Haojun Liao 已提交
113 114 115 116 117 118 119 120 121 122
void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status,
                     void* pInfo, SExecTaskInfo* pTaskInfo) {
  pOperator->name = (char*)name;
  pOperator->operatorType = type;
  pOperator->blocking = blocking;
  pOperator->status = status;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
}

123
int32_t optrDummyOpenFn(SOperatorInfo* pOperator) {
124
  OPTR_SET_OPENED(pOperator);
125
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
126
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
127 128
}

H
Haojun Liao 已提交
129
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
130 131
                                   __optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn,
                                   __optr_explain_fn_t explain) {
132 133 134 135 136
  SOperatorFpSet fpSet = {
      ._openFn = openFn,
      .getNextFn = nextFn,
      .cleanupFn = cleanup,
      .closeFn = closeFn,
137
      .reqBufFn = reqBufFn,
138 139 140 141 142 143
      .getExplainFn = explain,
  };

  return fpSet;
}

144
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
L
Liu Jicong 已提交
145
  SFilePage* pData = NULL;
146 147 148

  // in the first scan, new space needed for results
  int32_t pageId = -1;
149
  if (*currentPageId == -1) {
150
    pData = getNewBufPage(pResultBuf, &pageId);
151 152
    pData->num = sizeof(SFilePage);
  } else {
153 154
    pData = getBufPage(pResultBuf, *currentPageId);
    pageId = *currentPageId;
155

wmmhello's avatar
wmmhello 已提交
156
    if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
157
      // release current page first, and prepare the next one
158
      releaseBufPage(pResultBuf, pData);
159

160
      pData = getNewBufPage(pResultBuf, &pageId);
161 162 163 164 165 166 167 168 169 170
      if (pData != NULL) {
        pData->num = sizeof(SFilePage);
      }
    }
  }

  if (pData == NULL) {
    return NULL;
  }

171 172
  setBufPageDirty(pData, true);

173 174 175 176
  // set the number of rows in current disk page
  SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
  pResultRow->pageId = pageId;
  pResultRow->offset = (int32_t)pData->num;
177
  *currentPageId = pageId;
178

wmmhello's avatar
wmmhello 已提交
179
  pData->num += interBufSize;
180 181 182
  return pResultRow;
}

183 184 185 186 187 188 189
/**
 * the struct of key in hash table
 * +----------+---------------+
 * | group id |   key data    |
 * | 8 bytes  | actual length |
 * +----------+---------------+
 */
190 191 192
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
                                   int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
                                   bool isIntervalQuery, SAggSupporter* pSup) {
193
  SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
H
Haojun Liao 已提交
194

dengyihao's avatar
dengyihao 已提交
195
  SResultRowPosition* p1 =
196
      (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
H
Haojun Liao 已提交
197

198 199
  SResultRow* pResult = NULL;

H
Haojun Liao 已提交
200 201
  // in case of repeat scan/reverse scan, no new time window added.
  if (isIntervalQuery) {
202
    if (p1 != NULL) {  // the *p1 may be NULL in case of sliding+offset exists.
203
      pResult = getResultRowByPos(pResultBuf, p1, true);
204
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
205 206
    }
  } else {
dengyihao's avatar
dengyihao 已提交
207 208
    // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
    // pResultRowInfo object.
H
Haojun Liao 已提交
209
    if (p1 != NULL) {
210
      // todo
211
      pResult = getResultRowByPos(pResultBuf, p1, true);
212
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
213 214 215
    }
  }

L
Liu Jicong 已提交
216
  // 1. close current opened time window
217
  if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
218
    SResultRowPosition pos = pResultRowInfo->cur;
X
Xiaoyu Wang 已提交
219
    SFilePage*         pPage = getBufPage(pResultBuf, pos.pageId);
220 221 222 223 224
    releaseBufPage(pResultBuf, pPage);
  }

  // allocate a new buffer page
  if (pResult == NULL) {
H
Haojun Liao 已提交
225
    ASSERT(pSup->resultRowSize > 0);
226
    pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
227

228 229
    // add a new result set for a new group
    SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
230
    tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
L
Liu Jicong 已提交
231
                   sizeof(SResultRowPosition));
H
Haojun Liao 已提交
232 233
  }

234 235 236
  // 2. set the new time window to be the new active time window
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};

H
Haojun Liao 已提交
237
  // too many time window in query
238
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
239
      tSimpleHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
240
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
H
Haojun Liao 已提交
241 242
  }

H
Haojun Liao 已提交
243
  return pResult;
H
Haojun Liao 已提交
244 245
}

246
// a new buffer page for each table. Needs to opt this design
H
Haojun Liao 已提交
247
static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size) {
248 249 250 251
  if (pWindowRes->pageId != -1) {
    return 0;
  }

L
Liu Jicong 已提交
252
  SFilePage* pData = NULL;
253 254 255

  // in the first scan, new space needed for results
  int32_t pageId = -1;
256
  SArray* list = getDataBufPagesIdList(pResultBuf);
257 258

  if (taosArrayGetSize(list) == 0) {
259
    pData = getNewBufPage(pResultBuf, &pageId);
260
    pData->num = sizeof(SFilePage);
261 262
  } else {
    SPageInfo* pi = getLastPageInfo(list);
263
    pData = getBufPage(pResultBuf, getPageId(pi));
264
    pageId = getPageId(pi);
265

266
    if (pData->num + size > getBufPageSize(pResultBuf)) {
267
      // release current page first, and prepare the next one
268
      releaseBufPageInfo(pResultBuf, pi);
269

270
      pData = getNewBufPage(pResultBuf, &pageId);
271
      if (pData != NULL) {
272
        pData->num = sizeof(SFilePage);
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
      }
    }
  }

  if (pData == NULL) {
    return -1;
  }

  // set the number of rows in current disk page
  if (pWindowRes->pageId == -1) {  // not allocated yet, allocate new buffer
    pWindowRes->pageId = pageId;
    pWindowRes->offset = (int32_t)pData->num;

    pData->num += size;
    assert(pWindowRes->pageId >= 0);
  }

  return 0;
}

293
//  query_range_start, query_range_end, window_duration, window_start, window_end
294
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
295 296 297
  pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
  pColData->info.bytes = sizeof(int64_t);

H
Haojun Liao 已提交
298
  colInfoDataEnsureCapacity(pColData, 5, false);
299 300 301 302 303 304 305 306 307
  colDataAppendInt64(pColData, 0, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 1, &pQueryWindow->ekey);

  int64_t interval = 0;
  colDataAppendInt64(pColData, 2, &interval);  // this value may be variable in case of 'n' and 'y'.
  colDataAppendInt64(pColData, 3, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
}

308 309 310 311 312 313 314
typedef struct {
  bool    hasAgg;
  int32_t numOfRows;
  int32_t startOffset;
} SFunctionCtxStatus;

static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
H
Haojun Liao 已提交
315
  pStatus->hasAgg = pCtx->input.colDataSMAIsSet;
316 317 318 319 320
  pStatus->numOfRows = pCtx->input.numOfRows;
  pStatus->startOffset = pCtx->input.startRowIndex;
}

static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
H
Haojun Liao 已提交
321
  pCtx->input.colDataSMAIsSet = pStatus->hasAgg;
H
Haojun Liao 已提交
322
  pCtx->input.numOfRows = pStatus->numOfRows;
323 324 325
  pCtx->input.startRowIndex = pStatus->startOffset;
}

H
Haojun Liao 已提交
326 327
void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
                                     int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) {
328
  for (int32_t k = 0; k < numOfOutput; ++k) {
H
Haojun Liao 已提交
329
    // keep it temporarily
330 331
    SFunctionCtxStatus status = {0};
    functionCtxSave(&pCtx[k], &status);
332

333
    pCtx[k].input.startRowIndex = offset;
334
    pCtx[k].input.numOfRows = forwardStep;
335 336 337

    // not a whole block involved in query processing, statistics data can not be used
    // NOTE: the original value of isSet have been changed here
H
Haojun Liao 已提交
338 339
    if (pCtx[k].input.colDataSMAIsSet && forwardStep < numOfTotal) {
      pCtx[k].input.colDataSMAIsSet = false;
340 341
    }

342 343
    if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) {
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
344 345

      char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
346

347
      SColumnInfoData idata = {0};
dengyihao's avatar
dengyihao 已提交
348
      idata.info.type = TSDB_DATA_TYPE_BIGINT;
349
      idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
dengyihao's avatar
dengyihao 已提交
350
      idata.pData = p;
351 352 353 354

      SScalarParam out = {.columnData = &idata};
      SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
      pCtx[k].sfp.process(&tw, 1, &out);
355
      pEntryInfo->numOfRes = 1;
356 357 358 359 360 361 362 363
    } else {
      int32_t code = TSDB_CODE_SUCCESS;
      if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
        code = pCtx[k].fpSet.process(&pCtx[k]);

        if (code != TSDB_CODE_SUCCESS) {
          qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
          taskInfo->code = code;
364
          T_LONG_JMP(taskInfo->env, code);
365
        }
366
      }
367

368
      // restore it
369
      functionCtxRestore(&pCtx[k], &status);
370
    }
371 372 373
  }
}

374 375 376
static void doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order) {
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
377
    pCtx[i].order = order;
378
    pCtx[i].input.numOfRows = pBlock->info.rows;
379
    setBlockSMAInfo(&pCtx[i], &pExprSup->pExprInfo[i], pBlock);
380
    pCtx[i].pSrcBlock = pBlock;
381 382 383
  }
}

384
void setInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol) {
385
  if (pBlock->pBlockAgg != NULL) {
386
    doSetInputDataBlockInfo(pExprSup, pBlock, order);
387
  } else {
388
    doSetInputDataBlock(pExprSup, pBlock, order, scanFlag, createDummyCol);
H
Haojun Liao 已提交
389
  }
390 391
}

L
Liu Jicong 已提交
392 393
static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t paramIndex,
                                             int32_t numOfRows) {
394 395 396 397 398 399 400 401
  SColumnInfoData* pColInfo = NULL;
  if (pInput->pData[paramIndex] == NULL) {
    pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pColInfo == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    // Set the correct column info (data type and bytes)
402 403
    pColInfo->info.type = pFuncParam->param.nType;
    pColInfo->info.bytes = pFuncParam->param.nLen;
404 405

    pInput->pData[paramIndex] = pColInfo;
406 407
  } else {
    pColInfo = pInput->pData[paramIndex];
408 409
  }

H
Haojun Liao 已提交
410
  colInfoDataEnsureCapacity(pColInfo, numOfRows, false);
411

412
  int8_t type = pFuncParam->param.nType;
413 414
  if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
    int64_t v = pFuncParam->param.i;
dengyihao's avatar
dengyihao 已提交
415
    for (int32_t i = 0; i < numOfRows; ++i) {
416 417 418 419
      colDataAppendInt64(pColInfo, i, &v);
    }
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
dengyihao's avatar
dengyihao 已提交
420
    for (int32_t i = 0; i < numOfRows; ++i) {
421 422
      colDataAppendDouble(pColInfo, i, &v);
    }
423
  } else if (type == TSDB_DATA_TYPE_VARCHAR) {
L
Liu Jicong 已提交
424
    char* tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE);
425
    STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen);
L
Liu Jicong 已提交
426
    for (int32_t i = 0; i < numOfRows; ++i) {
427 428
      colDataAppend(pColInfo, i, tmp, false);
    }
H
Haojun Liao 已提交
429
    taosMemoryFree(tmp);
430 431 432 433 434
  }

  return TSDB_CODE_SUCCESS;
}

435 436
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
                                   bool createDummyCol) {
437
  int32_t         code = TSDB_CODE_SUCCESS;
438
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
439

440
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
L
Liu Jicong 已提交
441
    pCtx[i].order = order;
442 443
    pCtx[i].input.numOfRows = pBlock->info.rows;

L
Liu Jicong 已提交
444
    pCtx[i].pSrcBlock = pBlock;
X
Xiaoyu Wang 已提交
445
    pCtx[i].scanFlag = scanFlag;
H
Haojun Liao 已提交
446

447
    SInputColumnInfoData* pInput = &pCtx[i].input;
H
Haojun Liao 已提交
448
    pInput->uid = pBlock->info.id.uid;
H
Haojun Liao 已提交
449
    pInput->colDataSMAIsSet = false;
450

451
    SExprInfo* pOneExpr = &pExprSup->pExprInfo[i];
452
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
dengyihao's avatar
dengyihao 已提交
453
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
G
Ganlin Zhao 已提交
454 455
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
dengyihao's avatar
dengyihao 已提交
456
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
457 458 459
        pInput->totalRows = pBlock->info.rows;
        pInput->numOfRows = pBlock->info.rows;
        pInput->startRowIndex = 0;
460

461
        // NOTE: the last parameter is the primary timestamp column
H
Haojun Liao 已提交
462
        // todo: refactor this
463
        if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
L
Liu Jicong 已提交
464
          pInput->pPTS = pInput->pData[j];  // in case of merge function, this is not always the ts column data.
465
          //          ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP);
466
        }
467 468
        ASSERT(pInput->pData[j] != NULL);
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
469 470 471
        // todo avoid case: top(k, 12), 12 is the value parameter.
        // sum(11), 11 is also the value parameter.
        if (createDummyCol && pOneExpr->base.numOfParams == 1) {
472 473 474 475
          pInput->totalRows = pBlock->info.rows;
          pInput->numOfRows = pBlock->info.rows;
          pInput->startRowIndex = 0;

476
          code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows);
477 478 479
          if (code != TSDB_CODE_SUCCESS) {
            return code;
          }
480
        }
G
Ganlin Zhao 已提交
481 482
      }
    }
H
Haojun Liao 已提交
483
  }
484 485

  return code;
H
Haojun Liao 已提交
486 487
}

488
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
489
  for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
H
Haojun Liao 已提交
490
    if (functionNeedToExecute(&pCtx[k])) {
491
      // todo add a dummy funtion to avoid process check
492 493 494
      if (pCtx[k].fpSet.process == NULL) {
        continue;
      }
H
Haojun Liao 已提交
495

496 497 498 499
      int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
      if (code != TSDB_CODE_SUCCESS) {
        qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
        return code;
500
      }
501 502
    }
  }
503 504

  return TSDB_CODE_SUCCESS;
505 506
}

5
54liuyao 已提交
507
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
508
  struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
509

510 511 512 513 514
  // in case of timestamp column, always generated results.
  int32_t functionId = pCtx->functionId;
  if (functionId == -1) {
    return false;
  }
515

516 517
  if (pCtx->scanFlag == REPEAT_SCAN) {
    return fmIsRepeatScanFunc(pCtx->functionId);
518 519
  }

520 521
  if (isRowEntryCompleted(pResInfo)) {
    return false;
522 523
  }

524 525 526
  return true;
}

527 528 529 530 531 532 533
static int32_t doCreateConstantValColumnAggInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type,
                                                int32_t paramIndex, int32_t numOfRows) {
  if (pInput->pData[paramIndex] == NULL) {
    pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pInput->pData[paramIndex] == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
534

535 536 537
    // Set the correct column info (data type and bytes)
    pInput->pData[paramIndex]->info.type = type;
    pInput->pData[paramIndex]->info.bytes = tDataTypes[type].bytes;
538
  }
H
Haojun Liao 已提交
539

540 541 542 543 544 545
  SColumnDataAgg* da = NULL;
  if (pInput->pColumnDataAgg[paramIndex] == NULL) {
    da = taosMemoryCalloc(1, sizeof(SColumnDataAgg));
    pInput->pColumnDataAgg[paramIndex] = da;
    if (da == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
546 547
    }
  } else {
548
    da = pInput->pColumnDataAgg[paramIndex];
549 550
  }

551
  ASSERT(!IS_VAR_DATA_TYPE(type));
552

553 554
  if (type == TSDB_DATA_TYPE_BIGINT) {
    int64_t v = pFuncParam->param.i;
555
    *da = (SColumnDataAgg){.numOfNull = 0, .min = v, .max = v, .sum = v * numOfRows};
556 557
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
558
    *da = (SColumnDataAgg){.numOfNull = 0};
559

560 561 562 563 564 565
    *(double*)&da->min = v;
    *(double*)&da->max = v;
    *(double*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_BOOL) {  // todo validate this data type
    bool v = pFuncParam->param.i;

566
    *da = (SColumnDataAgg){.numOfNull = 0};
567 568 569 570 571
    *(bool*)&da->min = 0;
    *(bool*)&da->max = v;
    *(bool*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_TIMESTAMP) {
    // do nothing
572
  } else {
573
    ASSERT(0);
574 575
  }

576 577
  return TSDB_CODE_SUCCESS;
}
578

579
void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock) {
580 581 582 583 584 585 586
  int32_t numOfRows = pBlock->info.rows;

  SInputColumnInfoData* pInput = &pCtx->input;
  pInput->numOfRows = numOfRows;
  pInput->totalRows = numOfRows;

  if (pBlock->pBlockAgg != NULL) {
H
Haojun Liao 已提交
587
    pInput->colDataSMAIsSet = true;
588

589 590
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
591

592 593
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
594 595
        pInput->pColumnDataAgg[j] = pBlock->pBlockAgg[slotId];
        if (pInput->pColumnDataAgg[j] == NULL) {
H
Haojun Liao 已提交
596
          pInput->colDataSMAIsSet = false;
597
        }
598 599 600 601

        // Here we set the column info data since the data type for each column data is required, but
        // the data in the corresponding SColumnInfoData will not be used.
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
602 603
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
        doCreateConstantValColumnAggInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
604 605
      }
    }
606
  } else {
H
Haojun Liao 已提交
607
    pInput->colDataSMAIsSet = false;
608 609 610
  }
}

L
Liu Jicong 已提交
611
bool isTaskKilled(SExecTaskInfo* pTaskInfo) {
D
dapan1121 已提交
612
  return (0 != pTaskInfo->code) ? true : false;
613 614
}

D
dapan1121 已提交
615
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) { pTaskInfo->code = rspCode; }
616 617

/////////////////////////////////////////////////////////////////////////////////////////////
618
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) {
L
Liu Jicong 已提交
619
  STimeWindow win = {0};
620
  win.skey = taosTimeTruncate(key, pInterval, precision);
621 622

  /*
H
Haojun Liao 已提交
623
   * if the realSkey > INT64_MAX - pInterval->interval, the query duration between
624 625
   * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
   */
626 627 628
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
  if (win.ekey < win.skey) {
    win.ekey = INT64_MAX;
629
  }
630 631

  return win;
632 633
}

L
Liu Jicong 已提交
634 635
int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
                              uint32_t* status) {
636
  *status = BLK_DATA_NOT_LOAD;
637

H
Haojun Liao 已提交
638
  pBlock->pDataBlock = NULL;
L
Liu Jicong 已提交
639
  pBlock->pBlockAgg = NULL;
H
Haojun Liao 已提交
640

L
Liu Jicong 已提交
641 642
  //  int64_t groupId = pRuntimeEnv->current->groupIndex;
  //  bool    ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
643

H
Haojun Liao 已提交
644
  STaskCostInfo* pCost = &pTaskInfo->cost;
645

646 647
//  pCost->totalBlocks += 1;
//  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
648
#if 0
649 650 651
  // Calculate all time windows that are overlapping or contain current data block.
  // If current data block is contained by all possible time window, do not load current data block.
  if (/*pQueryAttr->pFilters || */pQueryAttr->groupbyColumn || pQueryAttr->sw.gap > 0 ||
H
Haojun Liao 已提交
652
      (QUERY_IS_INTERVAL_QUERY(pQueryAttr) && overlapWithTimeWindow(pTaskInfo, &pBlock->info))) {
653
    (*status) = BLK_DATA_DATA_LOAD;
654 655 656
  }

  // check if this data block is required to load
657
  if ((*status) != BLK_DATA_DATA_LOAD) {
658 659 660 661 662 663 664
    bool needFilter = true;

    // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
    // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
    if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
      SResultRow* pResult = NULL;

H
Haojun Liao 已提交
665
      bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
666 667 668 669 670 671
      TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

      STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
      if (pQueryAttr->pointInterpQuery) {
        needFilter = chkWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
672
                                    pTableScanInfo->rowEntryInfoOffset);
673
      } else {
H
Haojun Liao 已提交
674
        if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.id.uid, &win, masterScan, &pResult, groupId,
675
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
676
                                    pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
677
          T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_OUT_OF_MEMORY);
678 679 680 681
        }
      }
    } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate
      doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx,
682
                               pTableScanInfo->rowEntryInfoOffset, pTableScanInfo->numOfOutput,
683 684 685 686 687 688
                               pRuntimeEnv->current->groupIndex);
    }

    if (needFilter) {
      (*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock);
    } else {
689
      (*status) = BLK_DATA_DATA_LOAD;
690 691 692 693
    }
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
H
Haojun Liao 已提交
694
//  *status = updateBlockLoadStatus(pRuntimeEnv->pQueryAttr, *status);
695

696
  if ((*status) == BLK_DATA_NOT_LOAD || (*status) == BLK_DATA_FILTEROUT) {
697 698
    //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//           pBlockInfo->window.ekey, pBlockInfo->rows);
699
    pCost->skipBlocks += 1;
700
  } else if ((*status) == BLK_DATA_SMA_LOAD) {
701 702
    // this function never returns error?
    pCost->loadBlockStatis += 1;
703
//    tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
704 705

    if (pBlock->pBlockAgg == NULL) {  // data block statistics does not exist, load data block
706
//      pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
707 708 709
      pCost->totalCheckedRows += pBlock->info.rows;
    }
  } else {
710
    assert((*status) == BLK_DATA_DATA_LOAD);
711 712 713

    // load the data block statistics to perform further filter
    pCost->loadBlockStatis += 1;
714
//    tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
715 716 717 718 719 720

    if (pQueryAttr->topBotQuery && pBlock->pBlockAgg != NULL) {
      { // set previous window
        if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
          SResultRow* pResult = NULL;

H
Haojun Liao 已提交
721
          bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
722 723 724
          TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

          STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
H
Haojun Liao 已提交
725
          if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.id.uid, &win, masterScan, &pResult, groupId,
726
                                      pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
727
                                      pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
728
            T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_OUT_OF_MEMORY);
729 730 731 732 733 734 735 736 737 738
          }
        }
      }
      bool load = false;
      for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
        int32_t functionId = pTableScanInfo->pCtx[i].functionId;
        if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM) {
//          load = topbot_datablock_filter(&pTableScanInfo->pCtx[i], (char*)&(pBlock->pBlockAgg[i].min),
//                                         (char*)&(pBlock->pBlockAgg[i].max));
          if (!load) { // current block has been discard due to filter applied
739
            pCost->skipBlocks += 1;
740 741
            //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId,
//                   pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
742
            (*status) = BLK_DATA_FILTEROUT;
743 744 745 746 747 748 749
            return TSDB_CODE_SUCCESS;
          }
        }
      }
    }

    // current block has been discard due to filter applied
H
Haojun Liao 已提交
750
//    if (!doFilterByBlockSMA(pRuntimeEnv, pBlock->pBlockAgg, pTableScanInfo->pCtx, pBlockInfo->rows)) {
751
//      pCost->skipBlocks += 1;
752 753
//      qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//             pBlockInfo->window.ekey, pBlockInfo->rows);
754
//      (*status) = BLK_DATA_FILTEROUT;
755 756 757 758 759
//      return TSDB_CODE_SUCCESS;
//    }

    pCost->totalCheckedRows += pBlockInfo->rows;
    pCost->loadBlocks += 1;
760
//    pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
761 762 763 764 765
//    if (pBlock->pDataBlock == NULL) {
//      return terrno;
//    }

//    if (pQueryAttr->pFilters != NULL) {
766
//      filterSetColFieldData(pQueryAttr->pFilters, taosArrayGetSize(pBlock->pDataBlock), pBlock->pDataBlock);
767
//    }
768

769 770 771 772
//    if (pQueryAttr->pFilters != NULL || pRuntimeEnv->pTsBuf != NULL) {
//      filterColRowsInDataBlock(pRuntimeEnv, pBlock, ascQuery);
//    }
  }
H
Haojun Liao 已提交
773
#endif
774 775 776
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
777
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
778
  if (status == TASK_NOT_COMPLETED) {
H
Haojun Liao 已提交
779
    pTaskInfo->status = status;
780 781
  } else {
    // QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first
782
    CLEAR_QUERY_STATUS(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
783
    pTaskInfo->status |= status;
784 785 786
  }
}

787
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
5
54liuyao 已提交
788
  bool init = false;
789
  for (int32_t i = 0; i < numOfOutput; ++i) {
790
    pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
5
54liuyao 已提交
791 792 793
    if (init) {
      continue;
    }
794 795 796 797 798

    struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
    if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
      continue;
    }
799 800 801 802 803

    if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
      continue;
    }

804 805 806 807 808 809
    if (!pResInfo->initialized) {
      if (pCtx[i].functionId != -1) {
        pCtx[i].fpSet.init(&pCtx[i], pResInfo);
      } else {
        pResInfo->initialized = true;
      }
5
54liuyao 已提交
810 811
    } else {
      init = true;
812 813 814 815
    }
  }
}

H
Haojun Liao 已提交
816 817
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) {
  if (pFilterInfo == NULL || pBlock->info.rows == 0) {
S
shenglian zhou 已提交
818 819
    return;
  }
820

821
  SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
822
  int32_t            code = filterSetDataFromSlotId(pFilterInfo, &param1);
823

824
  SColumnInfoData* p = NULL;
825
  int32_t          status = 0;
H
Haojun Liao 已提交
826

827
  // todo the keep seems never to be True??
H
Haojun Liao 已提交
828
  bool keep = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
829
  extractQualifiedTupleByFilterResult(pBlock, p, keep, status);
H
Haojun Liao 已提交
830

831
  if (pColMatchInfo != NULL) {
832
    size_t size = taosArrayGetSize(pColMatchInfo->pList);
H
Haojun Liao 已提交
833
    for (int32_t i = 0; i < size; ++i) {
H
Haojun Liao 已提交
834
      SColMatchItem* pInfo = taosArrayGet(pColMatchInfo->pList, i);
835
      if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
836
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->dstSlotId);
837
        if (pColData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
H
Haojun Liao 已提交
838
          blockDataUpdateTsWindow(pBlock, pInfo->dstSlotId);
839 840 841 842 843 844
          break;
        }
      }
    }
  }

845 846
  colDataDestroy(p);
  taosMemoryFree(p);
847 848
}

849
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status) {
850 851 852 853
  if (keep) {
    return;
  }

H
Haojun Liao 已提交
854 855 856
  int32_t totalRows = pBlock->info.rows;

  if (status == FILTER_RESULT_ALL_QUALIFIED) {
857
    // here nothing needs to be done
H
Haojun Liao 已提交
858
  } else if (status == FILTER_RESULT_NONE_QUALIFIED) {
859
    pBlock->info.rows = 0;
H
Haojun Liao 已提交
860
  } else {
H
Haojun Liao 已提交
861 862
    int32_t bmLen = BitmapLen(totalRows);
    char* pBitmap = NULL;
863

864 865
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
    for (int32_t i = 0; i < numOfCols; ++i) {
866
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
867
      // it is a reserved column for scalar function, and no data in this column yet.
H
Haojun Liao 已提交
868
      if (pDst->pData == NULL) {
869 870 871
        continue;
      }

872
      int32_t numOfRows = 0;
873

H
Haojun Liao 已提交
874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954
      switch (pDst->info.type) {
        case TSDB_DATA_TYPE_VARCHAR:
        case TSDB_DATA_TYPE_NCHAR:
          break;
        default:
          if (pBitmap == NULL) {
            pBitmap = taosMemoryCalloc(1, bmLen);
          }
          memcpy(pBitmap, pDst->nullbitmap, bmLen);
          memset(pDst->nullbitmap, 0, bmLen);

          int32_t j = 0;

          switch (pDst->info.type) {
            case TSDB_DATA_TYPE_BIGINT:
            case TSDB_DATA_TYPE_UBIGINT:
            case TSDB_DATA_TYPE_DOUBLE:
            case TSDB_DATA_TYPE_TIMESTAMP:
              while (j < totalRows) {
                if (((int8_t*)p->pData)[j] == 0) {
                  j += 1;
                  continue;
                }

                if (colDataIsNull_f(pBitmap, j)) {
                  colDataAppendNULL(pDst, numOfRows);
                } else {
                  ((int64_t*)pDst->pData)[numOfRows] = ((int64_t*)pDst->pData)[j];
                }
                numOfRows += 1;
              }
              break;
            case TSDB_DATA_TYPE_FLOAT:
            case TSDB_DATA_TYPE_INT:
            case TSDB_DATA_TYPE_UINT:
              while (j < totalRows) {
                if (((int8_t*)p->pData)[j] == 0) {
                  j += 1;
                  continue;
                }
                if (colDataIsNull_f(pBitmap, j)) {
                  colDataAppendNULL(pDst, numOfRows);
                } else {
                  ((int32_t*)pDst->pData)[numOfRows++] = ((int32_t*)pDst->pData)[j];
                }
                numOfRows += 1;
              }
              break;
            case TSDB_DATA_TYPE_SMALLINT:
            case TSDB_DATA_TYPE_USMALLINT:
              while (j < totalRows) {
                if (((int8_t*)p->pData)[j] == 0) {
                  j += 1;
                  continue;
                }
                if (colDataIsNull_f(pBitmap, j)) {
                  colDataAppendNULL(pDst, numOfRows);
                } else {
                  ((int16_t*)pDst->pData)[numOfRows++] = ((int16_t*)pDst->pData)[j];
                }
                numOfRows += 1;
              }
              break;
            case TSDB_DATA_TYPE_BOOL:
            case TSDB_DATA_TYPE_TINYINT:
            case TSDB_DATA_TYPE_UTINYINT:
              while (j < totalRows) {
                if (((int8_t*)p->pData)[j] == 0) {
                  j += 1;
                  continue;
                }
                if (colDataIsNull_f(pBitmap, j)) {
                  colDataAppendNULL(pDst, numOfRows);
                } else {
                  ((int8_t*)pDst->pData)[numOfRows] = ((int8_t*)pDst->pData)[j];
                }
                numOfRows += 1;
              }
              break;
          }
      };
955

956
      // todo this value can be assigned directly
957 958 959 960 961
      if (pBlock->info.rows == totalRows) {
        pBlock->info.rows = numOfRows;
      } else {
        ASSERT(pBlock->info.rows == numOfRows);
      }
962
    }
963 964 965
  }
}

966
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
967
  // for simple group by query without interval, all the tables belong to one group result.
968 969 970
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
  SAggOperatorInfo* pAggInfo = pOperator->info;

971
  SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
972 973
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
974

975
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId,
L
Liu Jicong 已提交
976
                                                  sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
L
Liu Jicong 已提交
977
  assert(pResultRow != NULL);
978 979 980 981 982 983

  /*
   * not assign result buffer yet, add new result buffer
   * all group belong to one result set, and each group result has different group id so set the id to be one
   */
  if (pResultRow->pageId == -1) {
H
Haojun Liao 已提交
984
    int32_t ret = addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize);
985 986 987 988 989
    if (ret != TSDB_CODE_SUCCESS) {
      return;
    }
  }

990
  setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
991 992
}

993 994 995
static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
  SAggOperatorInfo* pAggInfo = pOperator->info;
  if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) {
996 997
    return;
  }
998 999

  doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId);
1000 1001

  // record the current active group id
H
Haojun Liao 已提交
1002
  pAggInfo->groupId = groupId;
1003 1004
}

H
Haojun Liao 已提交
1005
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset) {
1006
  bool returnNotNull = false;
1007
  for (int32_t j = 0; j < numOfExprs; ++j) {
1008
    SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
1009 1010 1011 1012 1013 1014 1015
    if (!isRowEntryInitialized(pResInfo)) {
      continue;
    }

    if (pRow->numOfRows < pResInfo->numOfRes) {
      pRow->numOfRows = pResInfo->numOfRes;
    }
1016

1017
    if (fmIsNotNullOutputFunc(pCtx[j].functionId)) {
1018 1019
      returnNotNull = true;
    }
1020
  }
S
shenglian zhou 已提交
1021 1022
  // if all expr skips all blocks, e.g. all null inputs for max function, output one row in final result.
  //  except for first/last, which require not null output, output no rows
1023
  if (pRow->numOfRows == 0 && !returnNotNull) {
1024
    pRow->numOfRows = 1;
1025 1026 1027
  }
}

H
Haojun Liao 已提交
1028 1029
void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
                              SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) {
1030 1031 1032
  for (int32_t j = 0; j < numOfExprs; ++j) {
    int32_t slotId = pExprInfo[j].base.resSchema.slotId;

1033
    pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
1034
    if (pCtx[j].fpSet.finalize) {
1035
      if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_key") == 0) {
1036 1037
        // for groupkey along with functions that output multiple lines(e.g. Histogram)
        // need to match groupkey result for each output row of that function.
1038 1039 1040 1041 1042
        if (pCtx[j].resultInfo->numOfRes != 0) {
          pCtx[j].resultInfo->numOfRes = pRow->numOfRows;
        }
      }

1043 1044 1045
      int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
      if (TAOS_FAILED(code)) {
        qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
1046
        T_LONG_JMP(pTaskInfo->env, code);
1047 1048
      }
    } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
1049
      // do nothing
1050
    } else {
1051 1052
      // expand the result into multiple rows. E.g., _wstart, top(k, 20)
      // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
1053 1054 1055 1056 1057 1058 1059
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
      char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
      for (int32_t k = 0; k < pRow->numOfRows; ++k) {
        colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
      }
    }
  }
1060 1061
}

1062 1063 1064
// todo refactor. SResultRow has direct pointer in miainfo
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
                           SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089
  SFilePage*  page = getBufPage(pBuf, resultRowPosition->pageId);
  SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);

  SqlFunctionCtx* pCtx = pSup->pCtx;
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  const int32_t*  rowEntryOffset = pSup->rowEntryInfoOffset;

  doUpdateNumOfRows(pCtx, pRow, pSup->numOfExprs, rowEntryOffset);
  if (pRow->numOfRows == 0) {
    releaseBufPage(pBuf, page);
    return 0;
  }

  int32_t size = pBlock->info.capacity;
  while (pBlock->info.rows + pRow->numOfRows > size) {
    size = size * 1.25;
  }

  int32_t code = blockDataEnsureCapacity(pBlock, size);
  if (TAOS_FAILED(code)) {
    releaseBufPage(pBuf, page);
    qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
    T_LONG_JMP(pTaskInfo->env, code);
  }

H
Haojun Liao 已提交
1090
  copyResultrowToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
1091 1092

  releaseBufPage(pBuf, page);
1093
  pBlock->info.rows += pRow->numOfRows;
1094 1095 1096
  return 0;
}

1097 1098 1099 1100 1101 1102 1103
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
                           SGroupResInfo* pGroupResInfo) {
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

1104
  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
1105

1106
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
L
Liu Jicong 已提交
1107 1108
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
    SFilePage*  page = getBufPage(pBuf, pPos->pos.pageId);
1109

1110
    SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
1111

H
Haojun Liao 已提交
1112
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
1113 1114

    // no results, continue to check the next one
1115 1116
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
1117
      releaseBufPage(pBuf, page);
1118 1119 1120
      continue;
    }

H
Haojun Liao 已提交
1121 1122
    if (pBlock->info.id.groupId == 0) {
      pBlock->info.id.groupId = pPos->groupId;
1123 1124
    } else {
      // current value belongs to different group, it can't be packed into one datablock
H
Haojun Liao 已提交
1125
      if (pBlock->info.id.groupId != pPos->groupId) {
1126
        releaseBufPage(pBuf, page);
1127 1128 1129 1130
        break;
      }
    }

1131
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
1132
      ASSERT(pBlock->info.rows > 0);
1133
      releaseBufPage(pBuf, page);
1134 1135 1136 1137
      break;
    }

    pGroupResInfo->index += 1;
H
Haojun Liao 已提交
1138
    copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
1139

1140
    releaseBufPage(pBuf, page);
1141
    pBlock->info.rows += pRow->numOfRows;
1142 1143
  }

X
Xiaoyu Wang 已提交
1144
  qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
H
Haojun Liao 已提交
1145
         pBlock->info.id.groupId);
H
Haojun Liao 已提交
1146
  pBlock->info.dataLoad = 1;
1147
  blockDataUpdateTsWindow(pBlock, 0);
1148 1149 1150
  return 0;
}

1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                           SDiskbasedBuf* pBuf) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SSDataBlock*   pBlock = pbInfo->pRes;

  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

  blockDataCleanup(pBlock);
  if (!hasRemainResults(pGroupResInfo)) {
    return;
  }

  // clear the existed group id
H
Haojun Liao 已提交
1165
  pBlock->info.id.groupId = 0;
1166 1167 1168
  ASSERT(!pbInfo->mergeResultBlock);
  doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);

1169
  void* tbname = NULL;
H
Haojun Liao 已提交
1170
  if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
1171 1172 1173
    pBlock->info.parTbName[0] = 0;
  } else {
    memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
1174
  }
1175
  tdbFree(tbname);
1176 1177
}

X
Xiaoyu Wang 已提交
1178 1179
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                            SDiskbasedBuf* pBuf) {
1180
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1181
  SSDataBlock*   pBlock = pbInfo->pRes;
1182

1183 1184 1185
  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

1186
  blockDataCleanup(pBlock);
1187
  if (!hasRemainResults(pGroupResInfo)) {
1188 1189 1190
    return;
  }

1191
  // clear the existed group id
H
Haojun Liao 已提交
1192
  pBlock->info.id.groupId = 0;
1193 1194 1195
  if (!pbInfo->mergeResultBlock) {
    doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
  } else {
dengyihao's avatar
dengyihao 已提交
1196
    while (hasRemainResults(pGroupResInfo)) {
1197 1198 1199
      doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
      if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
        break;
1200 1201
      }

1202
      // clearing group id to continue to merge data that belong to different groups
H
Haojun Liao 已提交
1203
      pBlock->info.id.groupId = 0;
1204
    }
1205 1206

    // clear the group id info in SSDataBlock, since the client does not need it
H
Haojun Liao 已提交
1207
    pBlock->info.id.groupId = 0;
1208 1209 1210
  }
}

L
Liu Jicong 已提交
1211 1212 1213 1214
// static TSKEY doSkipIntervalProcess(STaskRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo,
// STableQueryInfo* pTableQueryInfo) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
1215
//
L
Liu Jicong 已提交
1216 1217 1218
//   assert(pQueryAttr->limit.offset == 0);
//   STimeWindow tw = *win;
//   getNextTimeWindow(pQueryAttr, &tw);
1219
//
L
Liu Jicong 已提交
1220 1221
//   if ((tw.skey <= pBlockInfo->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) ||
//       (tw.ekey >= pBlockInfo->window.skey && !QUERY_IS_ASC_QUERY(pQueryAttr))) {
1222
//
L
Liu Jicong 已提交
1223 1224 1225 1226
//     // load the data block and check data remaining in current data block
//     // TODO optimize performance
//     SArray *         pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//     SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1227
//
L
Liu Jicong 已提交
1228 1229 1230 1231
//     tw = *win;
//     int32_t startPos =
//         getNextQualifiedWindow(pQueryAttr, &tw, pBlockInfo, pColInfoData->pData, binarySearchForKey, -1);
//     assert(startPos >= 0);
1232
//
L
Liu Jicong 已提交
1233 1234
//     // set the abort info
//     pQueryAttr->pos = startPos;
1235
//
L
Liu Jicong 已提交
1236 1237 1238 1239
//     // reset the query start timestamp
//     pTableQueryInfo->win.skey = ((TSKEY *)pColInfoData->pData)[startPos];
//     pQueryAttr->window.skey = pTableQueryInfo->win.skey;
//     TSKEY key = pTableQueryInfo->win.skey;
1240
//
L
Liu Jicong 已提交
1241 1242
//     pWindowResInfo->prevSKey = tw.skey;
//     int32_t index = pRuntimeEnv->resultRowInfo.curIndex;
1243
//
L
Liu Jicong 已提交
1244 1245
//     int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
//     pRuntimeEnv->resultRowInfo.curIndex = index;  // restore the window index
1246
//
L
Liu Jicong 已提交
1247 1248 1249 1250
//     //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d,
//     lastKey:%" PRId64,
//            GET_TASKID(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes,
//            pQueryAttr->current->lastKey);
1251
//
L
Liu Jicong 已提交
1252 1253 1254 1255 1256
//     return key;
//   } else {  // do nothing
//     pQueryAttr->window.skey      = tw.skey;
//     pWindowResInfo->prevSKey = tw.skey;
//     pTableQueryInfo->lastKey = tw.skey;
1257
//
L
Liu Jicong 已提交
1258 1259
//     return tw.skey;
//   }
1260
//
L
Liu Jicong 已提交
1261 1262 1263 1264 1265 1266 1267 1268 1269 1270
//   return true;
// }

// static bool skipTimeInterval(STaskRuntimeEnv *pRuntimeEnv, TSKEY* start) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//     assert(*start <= pRuntimeEnv->current->lastKey);
//   } else {
//     assert(*start >= pRuntimeEnv->current->lastKey);
//   }
1271
//
L
Liu Jicong 已提交
1272 1273 1274 1275 1276
//   // if queried with value filter, do NOT forward query start position
//   if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL ||
//   pRuntimeEnv->pFillInfo != NULL) {
//     return true;
//   }
1277
//
L
Liu Jicong 已提交
1278 1279 1280 1281 1282 1283 1284
//   /*
//    * 1. for interval without interpolation query we forward pQueryAttr->interval.interval at a time for
//    *    pQueryAttr->limit.offset times. Since hole exists, pQueryAttr->interval.interval*pQueryAttr->limit.offset
//    value is
//    *    not valid. otherwise, we only forward pQueryAttr->limit.offset number of points
//    */
//   assert(pRuntimeEnv->resultRowInfo.prevSKey == TSKEY_INITIAL_VAL);
1285
//
L
Liu Jicong 已提交
1286 1287
//   STimeWindow w = TSWINDOW_INITIALIZER;
//   bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
1288
//
L
Liu Jicong 已提交
1289 1290
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
//   STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
1291
//
L
Liu Jicong 已提交
1292 1293 1294
//   SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
//   while (tsdbNextDataBlock(pRuntimeEnv->pTsdbReadHandle)) {
//     tsdbRetrieveDataBlockInfo(pRuntimeEnv->pTsdbReadHandle, &blockInfo);
1295
//
L
Liu Jicong 已提交
1296 1297 1298 1299 1300 1301 1302 1303 1304
//     if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//       if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
//         getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.skey, blockInfo.window.skey, pQueryAttr->window.ekey,
//         &w); pWindowResInfo->prevSKey = w.skey;
//       }
//     } else {
//       getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.ekey, pQueryAttr->window.ekey, blockInfo.window.ekey, &w);
//       pWindowResInfo->prevSKey = w.skey;
//     }
1305
//
L
Liu Jicong 已提交
1306 1307
//     // the first time window
//     STimeWindow win = getActiveTimeWindow(pWindowResInfo, pWindowResInfo->prevSKey, pQueryAttr);
1308
//
L
Liu Jicong 已提交
1309 1310
//     while (pQueryAttr->limit.offset > 0) {
//       STimeWindow tw = win;
1311
//
L
Liu Jicong 已提交
1312 1313 1314
//       if ((win.ekey <= blockInfo.window.ekey && ascQuery) || (win.ekey >= blockInfo.window.skey && !ascQuery)) {
//         pQueryAttr->limit.offset -= 1;
//         pWindowResInfo->prevSKey = win.skey;
1315
//
L
Liu Jicong 已提交
1316 1317 1318 1319 1320 1321
//         // current time window is aligned with blockInfo.window.ekey
//         // restart it from next data block by set prevSKey to be TSKEY_INITIAL_VAL;
//         if ((win.ekey == blockInfo.window.ekey && ascQuery) || (win.ekey == blockInfo.window.skey && !ascQuery)) {
//           pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL;
//         }
//       }
1322
//
L
Liu Jicong 已提交
1323 1324 1325 1326
//       if (pQueryAttr->limit.offset == 0) {
//         *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//         return true;
//       }
1327
//
L
Liu Jicong 已提交
1328 1329
//       // current window does not ended in current data block, try next data block
//       getNextTimeWindow(pQueryAttr, &tw);
1330
//
L
Liu Jicong 已提交
1331 1332 1333 1334 1335 1336 1337 1338 1339
//       /*
//        * If the next time window still starts from current data block,
//        * load the primary timestamp column first, and then find the start position for the next queried time window.
//        * Note that only the primary timestamp column is required.
//        * TODO: Optimize for this cases. All data blocks are not needed to be loaded, only if the first actually
//        required
//        * time window resides in current data block.
//        */
//       if ((tw.skey <= blockInfo.window.ekey && ascQuery) || (tw.ekey >= blockInfo.window.skey && !ascQuery)) {
1340
//
L
Liu Jicong 已提交
1341 1342
//         SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//         SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1343
//
L
Liu Jicong 已提交
1344 1345 1346
//         if ((win.ekey > blockInfo.window.ekey && ascQuery) || (win.ekey < blockInfo.window.skey && !ascQuery)) {
//           pQueryAttr->limit.offset -= 1;
//         }
1347
//
L
Liu Jicong 已提交
1348 1349 1350 1351 1352 1353 1354 1355
//         if (pQueryAttr->limit.offset == 0) {
//           *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//           return true;
//         } else {
//           tw = win;
//           int32_t startPos =
//               getNextQualifiedWindow(pQueryAttr, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1);
//           assert(startPos >= 0);
1356
//
L
Liu Jicong 已提交
1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367
//           // set the abort info
//           pQueryAttr->pos = startPos;
//           pTableQueryInfo->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
//           pWindowResInfo->prevSKey = tw.skey;
//           win = tw;
//         }
//       } else {
//         break;  // offset is not 0, and next time window begins or ends in the next block.
//       }
//     }
//   }
1368
//
L
Liu Jicong 已提交
1369 1370
//   // check for error
//   if (terrno != TSDB_CODE_SUCCESS) {
1371
//     T_LONG_JMP(pRuntimeEnv->env, terrno);
L
Liu Jicong 已提交
1372
//   }
1373
//
L
Liu Jicong 已提交
1374 1375
//   return true;
// }
1376

1377
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) {
H
Haojun Liao 已提交
1378
  if (p->pDownstream == NULL) {
H
Haojun Liao 已提交
1379
    assert(p->numOfDownstream == 0);
1380 1381
  }

wafwerar's avatar
wafwerar 已提交
1382
  p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES);
1383 1384 1385 1386 1387 1388 1389
  if (p->pDownstream == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  memcpy(p->pDownstream, pDownstream, num * POINTER_BYTES);
  p->numOfDownstream = num;
  return TSDB_CODE_SUCCESS;
1390 1391
}

X
Xiaoyu Wang 已提交
1392
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) {
1393
  // todo add more information about exchange operation
1394
  int32_t type = pOperator->operatorType;
X
Xiaoyu Wang 已提交
1395
  if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN ||
1396
      type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN ||
1397 1398
      type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN ||
      type == QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN) {
1399 1400 1401
    *order = TSDB_ORDER_ASC;
    *scanFlag = MAIN_SCAN;
    return TSDB_CODE_SUCCESS;
1402
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
1403
    STableScanInfo* pTableScanInfo = pOperator->info;
H
Haojun Liao 已提交
1404 1405
    *order = pTableScanInfo->base.cond.order;
    *scanFlag = pTableScanInfo->base.scanFlag;
1406
    return TSDB_CODE_SUCCESS;
1407 1408
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) {
    STableMergeScanInfo* pTableScanInfo = pOperator->info;
H
Haojun Liao 已提交
1409 1410
    *order = pTableScanInfo->base.cond.order;
    *scanFlag = pTableScanInfo->base.scanFlag;
1411
    return TSDB_CODE_SUCCESS;
1412
  } else {
H
Haojun Liao 已提交
1413
    if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
1414
      return TSDB_CODE_INVALID_PARA;
H
Haojun Liao 已提交
1415
    } else {
1416
      return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag);
1417 1418 1419
    }
  }
}
1420

1421 1422 1423 1424 1425
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock **ppBlock) {
  if (!tsCountAlwaysReturnValue) {
    return TSDB_CODE_SUCCESS;
  }

1426
  SOperatorInfo* downstream = pOperator->pDownstream[0];
G
Ganlin Zhao 已提交
1427
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION ||
1428 1429 1430 1431 1432
      (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN &&
       ((STableScanInfo *)downstream->info)->hasGroupByTag == true)) {
    return TSDB_CODE_SUCCESS;
  }

1433 1434
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
  bool hasCountFunc = false;
1435

1436
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
1437 1438 1439
    const char* pName = pCtx[i].pExpr->pExpr->_function.functionName;
    if ((strcmp(pName, "count") == 0) || (strcmp(pName, "hyperloglog") == 0) ||
        (strcmp(pName, "_hyperloglog_partial") == 0) || (strcmp(pName, "_hyperloglog_merge") == 0)) {
1440 1441 1442 1443 1444 1445 1446 1447 1448 1449
      hasCountFunc = true;
      break;
    }
  }

  if (!hasCountFunc) {
    return TSDB_CODE_SUCCESS;
  }

  SSDataBlock* pBlock = createDataBlock();
G
Ganlin Zhao 已提交
1450
  pBlock->info.rows = 1;
1451 1452 1453
  pBlock->info.capacity = 0;

  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
G
Ganlin Zhao 已提交
1454 1455 1456 1457
    SColumnInfoData colInfo = {0};
    colInfo.hasNull = true;
    colInfo.info.type = TSDB_DATA_TYPE_NULL;
    colInfo.info.bytes = 1;
1458 1459 1460 1461 1462 1463

    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
G
Ganlin Zhao 已提交
1464 1465 1466 1467 1468 1469 1470
        int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
        if (slotId >= numOfCols) {
          taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1);
          for (int32_t k = numOfCols; k < slotId + 1; ++k) {
            taosArrayPush(pBlock->pDataBlock, &colInfo);
          }
        }
1471
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
G
Ganlin Zhao 已提交
1472
        // do nothing
1473 1474 1475 1476
      }
    }
  }

G
Ganlin Zhao 已提交
1477
  blockDataEnsureCapacity(pBlock, pBlock->info.rows);
1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491
  *ppBlock = pBlock;

  return TSDB_CODE_SUCCESS;
}

static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock **ppBlock) {
  if (!blockAllocated) {
    return;
  }

  blockDataDestroy(*ppBlock);
  *ppBlock = NULL;
}

1492
// this is a blocking operator
L
Liu Jicong 已提交
1493
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
1494 1495
  if (OPTR_IS_OPENED(pOperator)) {
    return TSDB_CODE_SUCCESS;
1496 1497
  }

H
Haojun Liao 已提交
1498
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
1499
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
1500

1501 1502
  SExprSupp*     pSup = &pOperator->exprSupp;
  SOperatorInfo* downstream = pOperator->pDownstream[0];
1503

1504 1505
  int64_t st = taosGetTimestampUs();

1506 1507 1508
  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;

1509 1510 1511
  bool    hasValidBlock = false;
  bool    blockAllocated = false;

H
Haojun Liao 已提交
1512
  while (1) {
1513
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
1514
    if (pBlock == NULL) {
G
Ganlin Zhao 已提交
1515
      if (!hasValidBlock) {
1516 1517 1518 1519 1520 1521 1522 1523
        createDataBlockForEmptyInput(pOperator, &pBlock);
        if (pBlock == NULL) {
          break;
        }
        blockAllocated = true;
      } else {
        break;
      }
1524
    }
1525
    hasValidBlock = true;
1526

1527 1528
    int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
    if (code != TSDB_CODE_SUCCESS) {
1529
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
1530
      T_LONG_JMP(pTaskInfo->env, code);
1531
    }
1532

1533
    // there is an scalar expression that needs to be calculated before apply the group aggregation.
G
Ganlin Zhao 已提交
1534
    if (pAggInfo->scalarExprSup.pExprInfo != NULL && !blockAllocated) {
1535 1536
      SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
      code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
1537
      if (code != TSDB_CODE_SUCCESS) {
1538
        destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
1539
        T_LONG_JMP(pTaskInfo->env, code);
1540
      }
1541 1542
    }

1543
    // the pDataBlock are always the same one, no need to call this again
H
Haojun Liao 已提交
1544
    setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
1545
    setInputDataBlock(pSup, pBlock, order, scanFlag, true);
1546
    code = doAggregateImpl(pOperator, pSup->pCtx);
1547
    if (code != 0) {
1548
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
1549
      T_LONG_JMP(pTaskInfo->env, code);
1550
    }
1551 1552 1553

    destroyDataBlockForEmptyInput(blockAllocated, &pBlock);

1554 1555
  }

1556 1557 1558 1559 1560
  // the downstream operator may return with error code, so let's check the code before generating results.
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
  }

1561
  initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
H
Haojun Liao 已提交
1562
  OPTR_SET_OPENED(pOperator);
1563

1564
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1565
  return pTaskInfo->code;
H
Haojun Liao 已提交
1566 1567
}

1568
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
1569
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
1570 1571 1572 1573 1574 1575
  SOptrBasicInfo*   pInfo = &pAggInfo->binfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

L
Liu Jicong 已提交
1576
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1577
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
H
Haojun Liao 已提交
1578
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1579
    setOperatorCompleted(pOperator);
H
Haojun Liao 已提交
1580 1581 1582
    return NULL;
  }

H
Haojun Liao 已提交
1583
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
S
slzhou 已提交
1584 1585
  while (1) {
    doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
H
Haojun Liao 已提交
1586
    doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
S
slzhou 已提交
1587

1588
    if (!hasRemainResults(&pAggInfo->groupResInfo)) {
H
Haojun Liao 已提交
1589
      setOperatorCompleted(pOperator);
S
slzhou 已提交
1590 1591
      break;
    }
1592

S
slzhou 已提交
1593 1594 1595 1596
    if (pInfo->pRes->info.rows > 0) {
      break;
    }
  }
1597

1598
  size_t rows = blockDataGetNumOfRows(pInfo->pRes);
1599 1600
  pOperator->resultInfo.totalRows += rows;

1601
  return (rows == 0) ? NULL : pInfo->pRes;
1602 1603
}

1604
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
C
Cary Xu 已提交
1605 1606 1607 1608 1609
  for (int32_t i = 0; i < numOfExprs; ++i) {
    SExprInfo* pExprInfo = &pExpr[i];
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
        taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
5
54liuyao 已提交
1610 1611
      } else if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_VALUE) {
        taosVariantDestroy(&pExprInfo->base.pParam[j].param);
H
Haojun Liao 已提交
1612
      }
1613
    }
C
Cary Xu 已提交
1614 1615 1616

    taosMemoryFree(pExprInfo->base.pParam);
    taosMemoryFree(pExprInfo->pExpr);
H
Haojun Liao 已提交
1617 1618 1619
  }
}

5
54liuyao 已提交
1620
void destroyOperatorInfo(SOperatorInfo* pOperator) {
1621 1622 1623 1624
  if (pOperator == NULL) {
    return;
  }

1625
  if (pOperator->fpSet.closeFn != NULL) {
1626
    pOperator->fpSet.closeFn(pOperator->info);
1627 1628
  }

H
Haojun Liao 已提交
1629
  if (pOperator->pDownstream != NULL) {
L
Liu Jicong 已提交
1630
    for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
H
Haojun Liao 已提交
1631
      destroyOperatorInfo(pOperator->pDownstream[i]);
1632 1633
    }

wafwerar's avatar
wafwerar 已提交
1634
    taosMemoryFreeClear(pOperator->pDownstream);
H
Haojun Liao 已提交
1635
    pOperator->numOfDownstream = 0;
1636 1637
  }

1638
  cleanupExprSupp(&pOperator->exprSupp);
wafwerar's avatar
wafwerar 已提交
1639
  taosMemoryFreeClear(pOperator);
1640 1641
}

1642 1643 1644 1645
// each operator should be set their own function to return total cost buffer
int32_t optrDefaultBufFn(SOperatorInfo* pOperator) {
  if (pOperator->blocking) {
    ASSERT(0);
H
Haojun Liao 已提交
1646
    return 0;
1647 1648 1649 1650 1651
  } else {
    return 0;
  }
}

1652 1653 1654 1655 1656 1657
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) {
  *defaultPgsz = 4096;
  while (*defaultPgsz < rowSize * 4) {
    *defaultPgsz <<= 1u;
  }

1658
  // The default buffer for each operator in query is 10MB.
1659
  // at least four pages need to be in buffer
1660 1661
  // TODO: make this variable to be configurable.
  *defaultBufsz = 4096 * 2560;
1662 1663 1664 1665 1666 1667 1668
  if ((*defaultBufsz) <= (*defaultPgsz)) {
    (*defaultBufsz) = (*defaultPgsz) * 4;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
1669 1670
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                         const char* pKey) {
L
Liu Jicong 已提交
1671
  int32_t    code = 0;
1672 1673
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);

1674
  pAggSup->currentPageId = -1;
dengyihao's avatar
dengyihao 已提交
1675 1676
  pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
  pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
H
Haojun Liao 已提交
1677
  pAggSup->pResultRowHashTable = tSimpleHashInit(100, hashFn);
1678

H
Haojun Liao 已提交
1679
  if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
1680 1681 1682
    return TSDB_CODE_OUT_OF_MEMORY;
  }

dengyihao's avatar
dengyihao 已提交
1683
  uint32_t defaultPgsz = 0;
1684 1685
  uint32_t defaultBufsz = 0;
  getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
H
Haojun Liao 已提交
1686

wafwerar's avatar
wafwerar 已提交
1687
  if (!osTempSpaceAvailable()) {
H
Haojun Liao 已提交
1688 1689 1690
    code = TSDB_CODE_NO_AVAIL_DISK;
    qError("Init stream agg supporter failed since %s, %s", terrstr(code), pKey);
    return code;
wafwerar's avatar
wafwerar 已提交
1691
  }
1692

H
Haojun Liao 已提交
1693
  code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
H
Haojun Liao 已提交
1694
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1695
    qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey);
H
Haojun Liao 已提交
1696 1697 1698
    return code;
  }

H
Haojun Liao 已提交
1699
  return code;
1700 1701
}

1702
void cleanupAggSup(SAggSupporter* pAggSup) {
wafwerar's avatar
wafwerar 已提交
1703
  taosMemoryFreeClear(pAggSup->keyBuf);
1704
  tSimpleHashCleanup(pAggSup->pResultRowHashTable);
H
Haojun Liao 已提交
1705
  destroyDiskbasedBuf(pAggSup->pResultBuf);
1706 1707
}

H
Haojun Liao 已提交
1708
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
L
Liu Jicong 已提交
1709
                    const char* pkey) {
1710 1711 1712 1713 1714
  int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1715 1716 1717 1718 1719
  code = doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

L
Liu Jicong 已提交
1720
  for (int32_t i = 0; i < numOfCols; ++i) {
1721
    pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf;
1722 1723
  }

1724
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1725 1726
}

L
Liu Jicong 已提交
1727
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) {
wmmhello's avatar
wmmhello 已提交
1728
  ASSERT(numOfRows != 0);
1729 1730
  pResultInfo->capacity = numOfRows;
  pResultInfo->threshold = numOfRows * 0.75;
1731

1732 1733
  if (pResultInfo->threshold == 0) {
    pResultInfo->threshold = numOfRows;
1734 1735 1736
  }
}

1737 1738 1739 1740 1741
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
  pInfo->pRes = pBlock;
  initResultRowInfo(&pInfo->resultRowInfo);
}

5
54liuyao 已提交
1742
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
1743 1744 1745 1746 1747 1748 1749 1750 1751 1752
  if (pCtx == NULL) {
    return NULL;
  }

  for (int32_t i = 0; i < numOfOutput; ++i) {
    for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) {
      taosVariantDestroy(&pCtx[i].param[j].param);
    }

    taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
1753
    taosMemoryFreeClear(pCtx[i].subsidiaries.buf);
1754 1755 1756 1757 1758 1759 1760 1761
    taosMemoryFree(pCtx[i].input.pData);
    taosMemoryFree(pCtx[i].input.pColumnDataAgg);
  }

  taosMemoryFreeClear(pCtx);
  return NULL;
}

1762
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) {
1763 1764 1765 1766
  pSup->pExprInfo = pExprInfo;
  pSup->numOfExprs = numOfExpr;
  if (pSup->pExprInfo != NULL) {
    pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset);
1767 1768 1769
    if (pSup->pCtx == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1770
  }
1771 1772

  return TSDB_CODE_SUCCESS;
1773 1774
}

1775 1776 1777 1778
void cleanupExprSupp(SExprSupp* pSupp) {
  destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs);
  if (pSupp->pExprInfo != NULL) {
    destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
C
Cary Xu 已提交
1779
    taosMemoryFreeClear(pSupp->pExprInfo);
1780
  }
H
Haojun Liao 已提交
1781 1782 1783 1784 1785 1786

  if (pSupp->pFilterInfo != NULL) {
    filterFreeInfo(pSupp->pFilterInfo);
    pSupp->pFilterInfo = NULL;
  }

1787 1788 1789
  taosMemoryFree(pSupp->rowEntryInfoOffset);
}

1790 1791
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode,
                                           SExecTaskInfo* pTaskInfo) {
wafwerar's avatar
wafwerar 已提交
1792
  SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
L
Liu Jicong 已提交
1793
  SOperatorInfo*    pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
1794 1795 1796
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
H
Haojun Liao 已提交
1797

H
Haojun Liao 已提交
1798
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc);
H
Haojun Liao 已提交
1799 1800 1801
  initBasicInfo(&pInfo->binfo, pResBlock);

  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
1802
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
1803

1804 1805
  int32_t    num = 0;
  SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
H
Haojun Liao 已提交
1806
  int32_t    code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
L
Liu Jicong 已提交
1807
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1808 1809
    goto _error;
  }
H
Haojun Liao 已提交
1810

H
Haojun Liao 已提交
1811 1812 1813 1814 1815 1816
  int32_t    numOfScalarExpr = 0;
  SExprInfo* pScalarExprInfo = NULL;
  if (pAggNode->pExprs != NULL) {
    pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
  }

1817 1818 1819 1820
  code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
1821

H
Haojun Liao 已提交
1822 1823 1824 1825 1826
  code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
1827
  pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
1828
  pInfo->groupId = UINT64_MAX;
H
Haojun Liao 已提交
1829

1830 1831
  setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
1832
  pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
1833

1834 1835
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    STableScanInfo* pTableScanInfo = downstream->info;
H
Haojun Liao 已提交
1836 1837
    pTableScanInfo->base.pdInfo.pExprSup = &pOperator->exprSupp;
    pTableScanInfo->base.pdInfo.pAggSup = &pInfo->aggSup;
1838 1839
  }

H
Haojun Liao 已提交
1840 1841 1842 1843
  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
1844 1845

  return pOperator;
H
Haojun Liao 已提交
1846

1847
_error:
H
Haojun Liao 已提交
1848 1849 1850 1851
  if (pInfo != NULL) {
    destroyAggOperatorInfo(pInfo);
  }

1852 1853 1854
  if (pOperator != NULL) {
    cleanupExprSupp(&pOperator->exprSupp);
  }
H
Haojun Liao 已提交
1855

1856
  taosMemoryFreeClear(pOperator);
H
Haojun Liao 已提交
1857
  pTaskInfo->code = code;
H
Haojun Liao 已提交
1858
  return NULL;
1859 1860
}

1861
void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
1862
  assert(pInfo != NULL);
H
Haojun Liao 已提交
1863
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
1864 1865
}

H
Haojun Liao 已提交
1866 1867 1868 1869 1870 1871 1872
static void freeItem(void* pItem) {
  void** p = pItem;
  if (*p != NULL) {
    taosMemoryFreeClear(*p);
  }
}

1873
void destroyAggOperatorInfo(void* param) {
L
Liu Jicong 已提交
1874
  SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
L
Liu Jicong 已提交
1875 1876
  cleanupBasicInfo(&pInfo->binfo);

H
Haojun Liao 已提交
1877
  cleanupAggSup(&pInfo->aggSup);
S
shenglian zhou 已提交
1878
  cleanupExprSupp(&pInfo->scalarExprSup);
H
Haojun Liao 已提交
1879
  cleanupGroupResInfo(&pInfo->groupResInfo);
D
dapan1121 已提交
1880
  taosMemoryFreeClear(param);
1881
}
1882

D
dapan1121 已提交
1883
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model, char* dbFName) {
wafwerar's avatar
wafwerar 已提交
1884
  SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
H
Haojun Liao 已提交
1885 1886 1887 1888 1889
  if (pTaskInfo == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

1890
  setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
1891

1892
  pTaskInfo->schemaInfo.dbname = strdup(dbFName);
H
Haojun Liao 已提交
1893
  pTaskInfo->id.queryId = queryId;
dengyihao's avatar
dengyihao 已提交
1894
  pTaskInfo->execModel = model;
H
Haojun Liao 已提交
1895
  pTaskInfo->pTableInfoList = tableListCreate();
D
dapan1121 已提交
1896
  pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo));
H
Haojun Liao 已提交
1897
  pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES);
H
Haojun Liao 已提交
1898

wafwerar's avatar
wafwerar 已提交
1899
  char* p = taosMemoryCalloc(1, 128);
L
Liu Jicong 已提交
1900
  snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId);
H
Haojun Liao 已提交
1901
  pTaskInfo->id.str = p;
H
Haojun Liao 已提交
1902

1903 1904
  return pTaskInfo;
}
H
Haojun Liao 已提交
1905

H
Haojun Liao 已提交
1906 1907
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);

1908
int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) {
1909 1910
  SMetaReader mr = {0};
  metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
1911
  int32_t code = metaGetTableEntryByUidCache(&mr, pScanNode->uid);
1912
  if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
1913 1914
    qError("failed to get the table meta, uid:0x%" PRIx64 ", suid:0x%" PRIx64 ", %s", pScanNode->uid, pScanNode->suid,
           GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
1915

D
dapan1121 已提交
1916
    metaReaderClear(&mr);
1917
    return terrno;
D
dapan1121 已提交
1918
  }
1919

1920 1921
  SSchemaInfo* pSchemaInfo = &pTaskInfo->schemaInfo;
  pSchemaInfo->tablename = strdup(mr.me.name);
1922 1923

  if (mr.me.type == TSDB_SUPER_TABLE) {
1924 1925
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
    pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
1926
  } else if (mr.me.type == TSDB_CHILD_TABLE) {
1927 1928
    tDecoderClear(&mr.coder);

1929
    tb_uid_t suid = mr.me.ctbEntry.suid;
H
Haojun Liao 已提交
1930
    metaGetTableEntryByUidCache(&mr, suid);
1931 1932
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
    pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
1933
  } else {
1934
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
1935
  }
1936 1937

  metaReaderClear(&mr);
1938

H
Haojun Liao 已提交
1939 1940 1941 1942 1943
  pSchemaInfo->qsw = extractQueriedColumnSchema(pScanNode);
  return TSDB_CODE_SUCCESS;
}

SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
1944 1945 1946
  int32_t numOfCols = LIST_LENGTH(pScanNode->pScanCols);
  int32_t numOfTags = LIST_LENGTH(pScanNode->pScanPseudoCols);

1947
  SSchemaWrapper* pqSw = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
1948
  pqSw->pSchema = taosMemoryCalloc(numOfCols + numOfTags, sizeof(SSchema));
1949

L
Liu Jicong 已提交
1950
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
1951
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanCols, i);
1952 1953
    SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

H
Haojun Liao 已提交
1954 1955 1956
    SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
    pSchema->colId = pColNode->colId;
    pSchema->type = pColNode->node.resType.type;
H
Haojun Liao 已提交
1957 1958
    pSchema->bytes = pColNode->node.resType.bytes;
    tstrncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
1959 1960
  }

1961
  // this the tags and pseudo function columns, we only keep the tag columns
1962
  for (int32_t i = 0; i < numOfTags; ++i) {
1963 1964 1965 1966 1967 1968 1969 1970 1971
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanPseudoCols, i);

    int32_t type = nodeType(pNode->pExpr);
    if (type == QUERY_NODE_COLUMN) {
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

      SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
      pSchema->colId = pColNode->colId;
      pSchema->type = pColNode->node.resType.type;
H
Haojun Liao 已提交
1972
      pSchema->bytes = pColNode->node.resType.bytes;
H
Haojun Liao 已提交
1973
      tstrncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
1974 1975 1976
    }
  }

H
Haojun Liao 已提交
1977
  return pqSw;
1978 1979
}

1980 1981
static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
  taosMemoryFreeClear(pSchemaInfo->dbname);
1982
  taosMemoryFreeClear(pSchemaInfo->tablename);
1983 1984
  tDeleteSSchemaWrapper(pSchemaInfo->sw);
  tDeleteSSchemaWrapper(pSchemaInfo->qsw);
1985 1986
}

1987
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); }
1988

1989
bool groupbyTbname(SNodeList* pGroupList) {
1990
  bool bytbname = false;
1991
  if (LIST_LENGTH(pGroupList) == 1) {
1992 1993 1994 1995 1996 1997 1998 1999 2000 2001
    SNode* p = nodesListGetNode(pGroupList, 0);
    if (p->type == QUERY_NODE_FUNCTION) {
      // partition by tbname/group by tbname
      bytbname = (strcmp(((struct SFunctionNode*)p)->functionName, "tbname") == 0);
    }
  }

  return bytbname;
}

2002 2003
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond,
                                  SNode* pTagIndexCond, const char* pUser) {
2004
  int32_t         type = nodeType(pPhyNode);
2005
  STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
2006
  const char*     idstr = GET_TASKID(pTaskInfo);
2007

X
Xiaoyu Wang 已提交
2008
  if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
2009
    SOperatorInfo* pOperator = NULL;
H
Haojun Liao 已提交
2010
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
dengyihao's avatar
dengyihao 已提交
2011
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
2012

2013 2014 2015 2016 2017 2018
      // NOTE: this is an patch to fix the physical plan
      // TODO remove it later
      if (pTableScanNode->scan.node.pLimit != NULL) {
        pTableScanNode->groupSort = true;
      }

L
Liu Jicong 已提交
2019 2020
      int32_t code =
          createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
H
Haojun Liao 已提交
2021
                                  pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
2022
      if (code) {
wmmhello's avatar
wmmhello 已提交
2023
        pTaskInfo->code = code;
2024
        qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr);
D
dapan1121 已提交
2025 2026
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
2027

2028
      code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo);
S
slzhou 已提交
2029
      if (code) {
2030
        pTaskInfo->code = terrno;
wmmhello's avatar
wmmhello 已提交
2031 2032 2033
        return NULL;
      }

2034
      pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
D
dapan1121 已提交
2035 2036 2037 2038 2039
      if (NULL == pOperator) {
        pTaskInfo->code = terrno;
        return NULL;
      }

2040
      STableScanInfo* pScanInfo = pOperator->info;
H
Haojun Liao 已提交
2041
      pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
S
slzhou 已提交
2042 2043
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
      STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
2044 2045 2046

      int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, true, pHandle,
                                             pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
L
Liu Jicong 已提交
2047
      if (code) {
wmmhello's avatar
wmmhello 已提交
2048
        pTaskInfo->code = code;
H
Haojun Liao 已提交
2049
        qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2050 2051
        return NULL;
      }
2052

2053
      code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo);
wmmhello's avatar
wmmhello 已提交
2054 2055 2056 2057
      if (code) {
        pTaskInfo->code = terrno;
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
2058

H
Haojun Liao 已提交
2059
      pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
D
dapan1121 已提交
2060 2061 2062 2063
      if (NULL == pOperator) {
        pTaskInfo->code = terrno;
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
2064

2065
      STableScanInfo* pScanInfo = pOperator->info;
H
Haojun Liao 已提交
2066
      pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
H
Haojun Liao 已提交
2067
    } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
2068 2069
      pOperator = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode,
                                             pTaskInfo);
H
Haojun Liao 已提交
2070
    } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
5
54liuyao 已提交
2071
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
5
54liuyao 已提交
2072
      if (pHandle->vnode) {
L
Liu Jicong 已提交
2073 2074
        int32_t code =
            createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
H
Haojun Liao 已提交
2075
                                    pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
L
Liu Jicong 已提交
2076
        if (code) {
wmmhello's avatar
wmmhello 已提交
2077
          pTaskInfo->code = code;
H
Haojun Liao 已提交
2078
          qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2079 2080
          return NULL;
        }
L
Liu Jicong 已提交
2081 2082

#ifndef NDEBUG
H
Haojun Liao 已提交
2083
        int32_t sz = tableListGetSize(pTableListInfo);
H
Haojun Liao 已提交
2084 2085
        qDebug("create stream task, total:%d", sz);

L
Liu Jicong 已提交
2086
        for (int32_t i = 0; i < sz; i++) {
H
Haojun Liao 已提交
2087
          STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
2088
          qDebug("add table uid:%" PRIu64 ", gid:%" PRIu64, pKeyInfo->uid, pKeyInfo->groupId);
L
Liu Jicong 已提交
2089 2090
        }
#endif
2091
      }
2092

H
Haojun Liao 已提交
2093
      pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan);
2094
      pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo);
H
Haojun Liao 已提交
2095
    } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
L
Liu Jicong 已提交
2096
      SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
2097
      pOperator = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo);
S
shenglian zhou 已提交
2098 2099 2100
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN == type) {
      STableCountScanPhysiNode* pTblCountScanNode = (STableCountScanPhysiNode*)pPhyNode;
      pOperator = createTableCountScanOperatorInfo(pHandle, pTblCountScanNode, pTaskInfo);
2101
    } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
X
Xiaoyu Wang 已提交
2102
      STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
2103 2104

      int32_t code = createScanTableListInfo(pScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
H
Haojun Liao 已提交
2105
                                             pTagIndexCond, pTaskInfo);
2106
      if (code != TSDB_CODE_SUCCESS) {
2107
        pTaskInfo->code = code;
H
Haojun Liao 已提交
2108
        qError("failed to getTableList, code: %s", tstrerror(code));
2109 2110 2111
        return NULL;
      }

H
Haojun Liao 已提交
2112
      pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTaskInfo);
2113
    } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
2114
      SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
2115 2116

      if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
H
Haojun Liao 已提交
2117 2118
        SArray* pList = taosArrayInit(4, sizeof(STableKeyInfo));
        int32_t code = vnodeGetAllTableList(pHandle->vnode, pBlockNode->uid, pList);
2119 2120 2121 2122
        if (code != TSDB_CODE_SUCCESS) {
          pTaskInfo->code = terrno;
          return NULL;
        }
H
Haojun Liao 已提交
2123

H
Haojun Liao 已提交
2124 2125
        size_t num = taosArrayGetSize(pList);
        for (int32_t i = 0; i < num; ++i) {
H
Haojun Liao 已提交
2126
          STableKeyInfo* p = taosArrayGet(pList, i);
H
Haojun Liao 已提交
2127
          tableListAddTableInfo(pTableListInfo, p->uid, 0);
H
Haojun Liao 已提交
2128
        }
H
Haojun Liao 已提交
2129

H
Haojun Liao 已提交
2130
        taosArrayDestroy(pList);
2131
      } else {  // Create group with only one table
H
Haojun Liao 已提交
2132
        tableListAddTableInfo(pTableListInfo, pBlockNode->uid, 0);
2133 2134
      }

2135
      pOperator = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTaskInfo);
H
Haojun Liao 已提交
2136 2137 2138
    } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
      SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;

L
Liu Jicong 已提交
2139
      int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo,
H
Haojun Liao 已提交
2140
                                             pTagCond, pTagIndexCond, pTaskInfo);
2141 2142 2143 2144
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
      }
2145

2146
      code = extractTableSchemaInfo(pHandle, &pScanNode->scan, pTaskInfo);
2147 2148 2149
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
H
Haojun Liao 已提交
2150 2151
      }

2152
      pOperator = createCacherowsScanOperator(pScanNode, pHandle, pTaskInfo);
2153
    } else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
2154
      pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
2155
    }
2156 2157 2158 2159 2160

    if (pOperator != NULL) {
      pOperator->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
    }

2161
    return pOperator;
H
Haojun Liao 已提交
2162 2163
  }

2164
  size_t          size = LIST_LENGTH(pPhyNode->pChildren);
2165
  SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
2166 2167 2168 2169
  if (ops == NULL) {
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
2170
  for (int32_t i = 0; i < size; ++i) {
2171
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
2172
    ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTagCond, pTagIndexCond, pUser);
2173
    if (ops[i] == NULL) {
H
Haojun Liao 已提交
2174
      taosMemoryFree(ops);
2175 2176
      return NULL;
    }
2177
  }
H
Haojun Liao 已提交
2178

2179
  SOperatorInfo* pOptr = NULL;
H
Haojun Liao 已提交
2180
  if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
2181
    pOptr = createProjectOperatorInfo(ops[0], (SProjectPhysiNode*)pPhyNode, pTaskInfo);
2182
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) {
H
Haojun Liao 已提交
2183 2184
    SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
    if (pAggNode->pGroupKeys != NULL) {
H
Haojun Liao 已提交
2185
      pOptr = createGroupOperatorInfo(ops[0], pAggNode, pTaskInfo);
H
Haojun Liao 已提交
2186
    } else {
H
Haojun Liao 已提交
2187
      pOptr = createAggregateOperatorInfo(ops[0], pAggNode, pTaskInfo);
H
Haojun Liao 已提交
2188
    }
2189
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type) {
H
Haojun Liao 已提交
2190
    SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
2191

H
Haojun Liao 已提交
2192 2193
    bool isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
    pOptr = createIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo, isStream);
2194
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
2195
    pOptr = createStreamIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo);
2196 2197
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
    SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode;
2198
    pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo);
S
shenglian zhou 已提交
2199
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
X
Xiaoyu Wang 已提交
2200
    SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
2201
    pOptr = createMergeIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo);
5
54liuyao 已提交
2202
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
2203
    int32_t children = 0;
5
54liuyao 已提交
2204 2205
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) {
5
54liuyao 已提交
2206
    int32_t children = pHandle->numOfVgroups;
5
54liuyao 已提交
2207
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
2208
  } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
2209
    pOptr = createSortOperatorInfo(ops[0], (SSortPhysiNode*)pPhyNode, pTaskInfo);
S
shenglian zhou 已提交
2210 2211
  } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT == type) {
    pOptr = createGroupSortOperatorInfo(ops[0], (SGroupSortPhysiNode*)pPhyNode, pTaskInfo);
X
Xiaoyu Wang 已提交
2212
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) {
2213
    SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode;
2214
    pOptr = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo);
2215
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
H
Haojun Liao 已提交
2216
    SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
2217
    pOptr = createSessionAggOperatorInfo(ops[0], pSessionNode, pTaskInfo);
2218
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
2219 2220 2221 2222 2223
    pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
    int32_t children = 0;
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION == type) {
2224
    int32_t children = pHandle->numOfVgroups;
2225
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
2226
  } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
2227
    pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
2228
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION == type) {
2229
    pOptr = createStreamPartitionOperatorInfo(ops[0], (SStreamPartitionPhysiNode*)pPhyNode, pTaskInfo);
2230
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
dengyihao's avatar
dengyihao 已提交
2231
    SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
2232
    pOptr = createStatewindowOperatorInfo(ops[0], pStateNode, pTaskInfo);
2233
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
5
54liuyao 已提交
2234
    pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
2235
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
2236
    pOptr = createMergeJoinOperatorInfo(ops, size, (SSortMergeJoinPhysiNode*)pPhyNode, pTaskInfo);
2237
  } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
H
Haojun Liao 已提交
2238
    pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo);
5
54liuyao 已提交
2239 2240
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL == type) {
    pOptr = createStreamFillOperatorInfo(ops[0], (SStreamFillPhysiNode*)pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
2241 2242
  } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
    pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo);
2243 2244
  } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
    pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
2245 2246
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT == type) {
    pOptr = createEventwindowOperatorInfo(ops[0], pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
2247
  }
2248

2249
  taosMemoryFree(ops);
2250 2251 2252 2253
  if (pOptr) {
    pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
  }

2254
  return pOptr;
2255
}
H
Haojun Liao 已提交
2256

L
Liu Jicong 已提交
2257 2258 2259 2260
static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator");
S
Shengliang Guan 已提交
2261
      return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
2262 2263 2264 2265
    }

    if (pOperator->numOfDownstream > 1) {
      qError("join not supported for stream block scan");
S
Shengliang Guan 已提交
2266
      return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
2267 2268 2269
    }
    return extractTbscanInStreamOpTree(pOperator->pDownstream[0], ppInfo);
  } else {
2270 2271 2272
    SStreamScanInfo* pInfo = pOperator->info;
    ASSERT(pInfo->pTableScanOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
    *ppInfo = pInfo->pTableScanOp->info;
L
Liu Jicong 已提交
2273 2274 2275 2276
    return 0;
  }
}

2277 2278 2279 2280 2281 2282 2283
int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) {
  if (pNode->pChildren == NULL || LIST_LENGTH(pNode->pChildren) == 0) {
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == pNode->type) {
      *ppNode = (STableScanPhysiNode*)pNode;
      return 0;
    } else {
      ASSERT(0);
S
Shengliang Guan 已提交
2284
      terrno = TSDB_CODE_APP_ERROR;
2285 2286 2287 2288 2289
      return -1;
    }
  } else {
    if (LIST_LENGTH(pNode->pChildren) != 1) {
      ASSERT(0);
S
Shengliang Guan 已提交
2290
      terrno = TSDB_CODE_APP_ERROR;
2291 2292 2293 2294 2295 2296 2297 2298
      return -1;
    }
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pNode->pChildren, 0);
    return extractTableScanNode(pChildNode, ppNode);
  }
  return -1;
}

2299
#if 0
L
Liu Jicong 已提交
2300 2301 2302 2303 2304
int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHandle, int64_t uid, int64_t ts) {
  STableScanInfo* pTableScanInfo = NULL;
  if (extractTbscanInStreamOpTree(pOperator, &pTableScanInfo) < 0) {
    return -1;
  }
2305

L
Liu Jicong 已提交
2306 2307 2308 2309
  STableScanPhysiNode* pNode = NULL;
  if (extractTableScanNode(plan->pNode, &pNode) < 0) {
    ASSERT(0);
  }
2310

H
Haojun Liao 已提交
2311
  tsdbReaderClose(pTableScanInfo->dataReader);
2312

L
Liu Jicong 已提交
2313
  STableListInfo info = {0};
H
Haojun Liao 已提交
2314
  pTableScanInfo->dataReader = doCreateDataReader(pNode, pHandle, &info, NULL);
L
Liu Jicong 已提交
2315 2316 2317
  if (pTableScanInfo->dataReader == NULL) {
    ASSERT(0);
    qError("failed to create data reader");
S
Shengliang Guan 已提交
2318
    return TSDB_CODE_APP_ERROR;
2319
  }
L
Liu Jicong 已提交
2320
  // TODO: set uid and ts to data reader
2321 2322
  return 0;
}
2323
#endif
2324

D
dapan1121 已提交
2325
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle) {
D
dapan1121 已提交
2326
  SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo;
2327

D
dapan1121 已提交
2328
  switch (pNode->type) {
D
dapan1121 已提交
2329 2330 2331 2332 2333 2334
    case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
      SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
      if (NULL == pInserterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      pInserterParam->readHandle = readHandle;
L
Liu Jicong 已提交
2335

D
dapan1121 已提交
2336 2337 2338
      *pParam = pInserterParam;
      break;
    }
D
dapan1121 已提交
2339
    case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
2340
      SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
D
dapan1121 已提交
2341 2342 2343
      if (NULL == pDeleterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
H
Haojun Liao 已提交
2344 2345 2346 2347
      int32_t tbNum = tableListGetSize(pTask->pTableInfoList);
      pDeleterParam->suid = tableListGetSuid(pTask->pTableInfoList);

      // TODO extract uid list
D
dapan1121 已提交
2348 2349 2350 2351 2352
      pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t));
      if (NULL == pDeleterParam->pUidList) {
        taosMemoryFree(pDeleterParam);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
H
Haojun Liao 已提交
2353

D
dapan1121 已提交
2354
      for (int32_t i = 0; i < tbNum; ++i) {
H
Haojun Liao 已提交
2355
        STableKeyInfo* pTable = tableListGetInfo(pTask->pTableInfoList, i);
D
dapan1121 已提交
2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368
        taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
      }

      *pParam = pDeleterParam;
      break;
    }
    default:
      break;
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2369
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
D
dapan1121 已提交
2370
                               char* sql, EOPTR_EXEC_MODEL model) {
H
Haojun Liao 已提交
2371 2372
  uint64_t queryId = pPlan->id.queryId;

D
dapan1121 已提交
2373
  *pTaskInfo = createExecTaskInfo(queryId, taskId, model, pPlan->dbFName);
H
Haojun Liao 已提交
2374 2375 2376
  if (*pTaskInfo == NULL) {
    goto _complete;
  }
H
Haojun Liao 已提交
2377

2378
  if (pHandle) {
L
Liu Jicong 已提交
2379
    /*(*pTaskInfo)->streamInfo.fillHistoryVer1 = pHandle->fillHistoryVer1;*/
2380 2381 2382
    if (pHandle->pStateBackend) {
      (*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend;
    }
H
Haojun Liao 已提交
2383 2384
  }

2385
  (*pTaskInfo)->sql = sql;
D
dapan1121 已提交
2386
  sql = NULL;
H
Haojun Liao 已提交
2387

2388
  (*pTaskInfo)->pSubplan = pPlan;
2389 2390
  (*pTaskInfo)->pRoot =
      createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user);
L
Liu Jicong 已提交
2391

D
dapan1121 已提交
2392
  if (NULL == (*pTaskInfo)->pRoot) {
H
Haojun Liao 已提交
2393
    terrno = (*pTaskInfo)->code;
D
dapan1121 已提交
2394
    goto _complete;
2395 2396
  }

2397
  (*pTaskInfo)->cost.created = taosGetTimestampUs();
H
Haojun Liao 已提交
2398
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2399

H
Haojun Liao 已提交
2400
_complete:
D
dapan1121 已提交
2401
  taosMemoryFree(sql);
H
Haojun Liao 已提交
2402
  doDestroyTask(*pTaskInfo);
H
Haojun Liao 已提交
2403
  return terrno;
H
Haojun Liao 已提交
2404 2405
}

H
Haojun Liao 已提交
2406 2407 2408 2409 2410
static void freeBlock(void* pParam) {
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
  blockDataDestroy(pBlock);
}

L
Liu Jicong 已提交
2411
void doDestroyTask(SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
2412 2413
  qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));

H
Haojun Liao 已提交
2414
  pTaskInfo->pTableInfoList = tableListDestroy(pTaskInfo->pTableInfoList);
H
Haojun Liao 已提交
2415
  destroyOperatorInfo(pTaskInfo->pRoot);
2416
  cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);
2417
  cleanupStreamInfo(&pTaskInfo->streamInfo);
2418

D
dapan1121 已提交
2419
  if (!pTaskInfo->localFetch.localExec) {
D
dapan1121 已提交
2420 2421
    nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
  }
2422

H
Haojun Liao 已提交
2423
  taosArrayDestroyEx(pTaskInfo->pResultBlockList, freeBlock);
D
dapan1121 已提交
2424
  taosArrayDestroy(pTaskInfo->stopInfo.pStopInfo);
wafwerar's avatar
wafwerar 已提交
2425 2426 2427
  taosMemoryFreeClear(pTaskInfo->sql);
  taosMemoryFreeClear(pTaskInfo->id.str);
  taosMemoryFreeClear(pTaskInfo);
2428 2429 2430 2431
}

static int64_t getQuerySupportBufSize(size_t numOfTables) {
  size_t s1 = sizeof(STableQueryInfo);
L
Liu Jicong 已提交
2432 2433
  //  size_t s3 = sizeof(STableCheckInfo);  buffer consumption in tsdb
  return (int64_t)(s1 * 1.5 * numOfTables);
2434 2435 2436 2437 2438 2439 2440
}

int32_t checkForQueryBuf(size_t numOfTables) {
  int64_t t = getQuerySupportBufSize(numOfTables);
  if (tsQueryBufferSizeBytes < 0) {
    return TSDB_CODE_SUCCESS;
  } else if (tsQueryBufferSizeBytes > 0) {
L
Liu Jicong 已提交
2441
    while (1) {
2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467
      int64_t s = tsQueryBufferSizeBytes;
      int64_t remain = s - t;
      if (remain >= 0) {
        if (atomic_val_compare_exchange_64(&tsQueryBufferSizeBytes, s, remain) == s) {
          return TSDB_CODE_SUCCESS;
        }
      } else {
        return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
      }
    }
  }

  // disable query processing if the value of tsQueryBufferSize is zero.
  return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
}

void releaseQueryBuf(size_t numOfTables) {
  if (tsQueryBufferSizeBytes < 0) {
    return;
  }

  int64_t t = getQuerySupportBufSize(numOfTables);

  // restore value is not enough buffer available
  atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
}
D
dapan1121 已提交
2468

H
Haojun Liao 已提交
2469
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) {
2470
  SExplainExecInfo  execInfo = {0};
H
Haojun Liao 已提交
2471
  SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo);
2472

H
Haojun Liao 已提交
2473 2474 2475 2476 2477
  pExplainInfo->numOfRows = operatorInfo->resultInfo.totalRows;
  pExplainInfo->startupCost = operatorInfo->cost.openCost;
  pExplainInfo->totalCost = operatorInfo->cost.totalCost;
  pExplainInfo->verboseLen = 0;
  pExplainInfo->verboseInfo = NULL;
D
dapan1121 已提交
2478

2479
  if (operatorInfo->fpSet.getExplainFn) {
2480 2481
    int32_t code =
        operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen);
D
dapan1121 已提交
2482
    if (code) {
2483
      qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
D
dapan1121 已提交
2484 2485 2486
      return code;
    }
  }
dengyihao's avatar
dengyihao 已提交
2487

D
dapan1121 已提交
2488
  int32_t code = 0;
D
dapan1121 已提交
2489
  for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
H
Haojun Liao 已提交
2490 2491
    code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pExecInfoList);
    if (code != TSDB_CODE_SUCCESS) {
2492
      //      taosMemoryFreeClear(*pRes);
S
Shengliang Guan 已提交
2493
      return TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
2494 2495 2496 2497
    }
  }

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
2498
}
5
54liuyao 已提交
2499

2500 2501
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
                     SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup) {
2502 2503 2504 2505 2506 2507
  SWinKey key = {
      .ts = win->skey,
      .groupId = tableGroupId,
  };
  char*   value = NULL;
  int32_t size = pAggSup->resultRowSize;
5
54liuyao 已提交
2508

2509
  if (streamStateAddIfNotExist(pState, &key, (void**)&value, &size) < 0) {
S
Shengliang Guan 已提交
2510
    return TSDB_CODE_OUT_OF_MEMORY;
2511 2512 2513 2514 2515 2516 2517 2518 2519
  }
  *pResult = (SResultRow*)value;
  ASSERT(*pResult);
  // set time window for current result
  (*pResult)->win = (*win);
  setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset);
  return TSDB_CODE_SUCCESS;
}

2520 2521
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult) {
  streamStateReleaseBuf(pState, pKey, pResult);
2522 2523 2524
  return TSDB_CODE_SUCCESS;
}

2525 2526
int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize) {
  streamStatePut(pState, pKey, pResult, resSize);
2527 2528 2529
  return TSDB_CODE_SUCCESS;
}

2530
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
2531
                                   SGroupResInfo* pGroupResInfo) {
2532
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);

  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
    int32_t     size = 0;
    void*       pVal = NULL;
    SWinKey     key = {
2545 2546
            .ts = *(TSKEY*)pPos->key,
            .groupId = pPos->groupId,
2547
    };
2548
    int32_t code = streamStateGet(pState, &key, &pVal, &size);
2549 2550 2551 2552 2553 2554
    ASSERT(code == 0);
    SResultRow* pRow = (SResultRow*)pVal;
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
    // no results, continue to check the next one
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
2555
      releaseOutputBuf(pState, &key, pRow);
2556 2557 2558
      continue;
    }

H
Haojun Liao 已提交
2559 2560
    if (pBlock->info.id.groupId == 0) {
      pBlock->info.id.groupId = pPos->groupId;
2561
      void* tbname = NULL;
H
Haojun Liao 已提交
2562
      if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
L
Liu Jicong 已提交
2563
        pBlock->info.parTbName[0] = 0;
2564 2565
      } else {
        memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
2566
      }
2567
      tdbFree(tbname);
2568 2569
    } else {
      // current value belongs to different group, it can't be packed into one datablock
H
Haojun Liao 已提交
2570
      if (pBlock->info.id.groupId != pPos->groupId) {
2571
        releaseOutputBuf(pState, &key, pRow);
2572 2573 2574 2575 2576 2577
        break;
      }
    }

    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
      ASSERT(pBlock->info.rows > 0);
2578
      releaseOutputBuf(pState, &key, pRow);
2579 2580 2581 2582 2583 2584 2585 2586 2587 2588
      break;
    }

    pGroupResInfo->index += 1;

    for (int32_t j = 0; j < numOfExprs; ++j) {
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
      if (pCtx[j].fpSet.finalize) {
2589 2590 2591 2592
        int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
        if (TAOS_FAILED(code1)) {
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
          T_LONG_JMP(pTaskInfo->env, code1);
2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605
        }
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
      } else {
        // expand the result into multiple rows. E.g., _wstart, top(k, 20)
        // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
        for (int32_t k = 0; k < pRow->numOfRows; ++k) {
          colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
        }
      }
    }
5
54liuyao 已提交
2606

2607
    pBlock->info.rows += pRow->numOfRows;
2608
    releaseOutputBuf(pState, &key, pRow);
2609
  }
2610
  pBlock->info.dataLoad = 1;
2611 2612 2613
  blockDataUpdateTsWindow(pBlock, 0);
  return TSDB_CODE_SUCCESS;
}
5
54liuyao 已提交
2614 2615 2616 2617 2618 2619 2620

int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size) {
  streamStateSessionPut(pState, key, (const void*)buf, size);
  releaseOutputBuf(pState, NULL, (SResultRow*)buf);
  return TSDB_CODE_SUCCESS;
}

2621
int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
5
54liuyao 已提交
2622
                                    SExprSupp* pSup, SGroupResInfo* pGroupResInfo) {
2623
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
5
54liuyao 已提交
2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);

  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
    SSessionKey* pKey = taosArrayGet(pGroupResInfo->pRows, i);
    int32_t      size = 0;
    void*        pVal = NULL;
    int32_t      code = streamStateSessionGet(pState, pKey, &pVal, &size);
    ASSERT(code == 0);
2637 2638
    if (code == -1) {
      // coverity scan
5
54liuyao 已提交
2639
      pGroupResInfo->index += 1;
2640 2641
      continue;
    }
5
54liuyao 已提交
2642 2643 2644 2645 2646 2647 2648 2649 2650
    SResultRow* pRow = (SResultRow*)pVal;
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
    // no results, continue to check the next one
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
      releaseOutputBuf(pState, NULL, pRow);
      continue;
    }

H
Haojun Liao 已提交
2651 2652
    if (pBlock->info.id.groupId == 0) {
      pBlock->info.id.groupId = pKey->groupId;
2653

2654
      void* tbname = NULL;
H
Haojun Liao 已提交
2655
      if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
2656
        pBlock->info.parTbName[0] = 0;
2657
      } else {
2658
        memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
2659
      }
2660
      tdbFree(tbname);
5
54liuyao 已提交
2661 2662
    } else {
      // current value belongs to different group, it can't be packed into one datablock
H
Haojun Liao 已提交
2663
      if (pBlock->info.id.groupId != pKey->groupId) {
5
54liuyao 已提交
2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699
        releaseOutputBuf(pState, NULL, pRow);
        break;
      }
    }

    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
      ASSERT(pBlock->info.rows > 0);
      releaseOutputBuf(pState, NULL, pRow);
      break;
    }

    pGroupResInfo->index += 1;

    for (int32_t j = 0; j < numOfExprs; ++j) {
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
      if (pCtx[j].fpSet.finalize) {
        int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
        if (TAOS_FAILED(code1)) {
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
          T_LONG_JMP(pTaskInfo->env, code1);
        }
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
      } else {
        // expand the result into multiple rows. E.g., _wstart, top(k, 20)
        // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
        for (int32_t k = 0; k < pRow->numOfRows; ++k) {
          colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
        }
      }
    }

2700
    pBlock->info.dataLoad = 1;
5
54liuyao 已提交
2701 2702 2703 2704 2705 2706
    pBlock->info.rows += pRow->numOfRows;
    // saveSessionDiscBuf(pState, pKey, pVal, size);
    releaseOutputBuf(pState, NULL, pRow);
  }
  blockDataUpdateTsWindow(pBlock, 0);
  return TSDB_CODE_SUCCESS;
2707
}