executorimpl.c 94.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

H
Haojun Liao 已提交
16 17
#include "filter.h"
#include "function.h"
18 19
#include "functionMgt.h"
#include "os.h"
H
Haojun Liao 已提交
20
#include "querynodes.h"
21
#include "tfill.h"
dengyihao's avatar
dengyihao 已提交
22
#include "tname.h"
23

H
Haojun Liao 已提交
24
#include "tdatablock.h"
25
#include "tglobal.h"
H
Haojun Liao 已提交
26
#include "tmsg.h"
27
#include "ttime.h"
H
Haojun Liao 已提交
28

29
#include "executorimpl.h"
dengyihao's avatar
dengyihao 已提交
30
#include "index.h"
31
#include "query.h"
32
#include "tcompare.h"
H
Haojun Liao 已提交
33
#include "thash.h"
34
#include "ttypes.h"
dengyihao's avatar
dengyihao 已提交
35
#include "vnode.h"
36

H
Haojun Liao 已提交
37
#define IS_MAIN_SCAN(runtime)          ((runtime)->scanFlag == MAIN_SCAN)
38 39 40 41 42 43
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)

#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)

#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
wafwerar's avatar
wafwerar 已提交
44
  uint32_t v = taosRand();
45 46 47 48

  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
49
    return taosMemoryMalloc(__size);
50 51 52 53
  }
}

static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
wafwerar's avatar
wafwerar 已提交
54
  uint32_t v = taosRand();
55 56 57
  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
58
    return taosMemoryCalloc(num, __size);
59 60 61 62
  }
}

static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
wafwerar's avatar
wafwerar 已提交
63
  uint32_t v = taosRand();
64 65 66
  if (v % 5 <= 1) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
67
    return taosMemoryRealloc(p, __size);
68 69 70 71 72 73 74 75
  }
}

#define calloc  u_calloc
#define malloc  u_malloc
#define realloc u_realloc
#endif

X
Xiaoyu Wang 已提交
76
#define CLEAR_QUERY_STATUS(q, st)   ((q)->status &= (~(st)))
77 78
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)

H
Haojun Liao 已提交
79 80 81 82 83 84 85 86 87
typedef struct SAggOperatorInfo {
  SOptrBasicInfo   binfo;
  SAggSupporter    aggSup;
  STableQueryInfo* current;
  uint64_t         groupId;
  SGroupResInfo    groupResInfo;
  SExprSupp        scalarExprSup;
} SAggOperatorInfo;

L
Liu Jicong 已提交
88 89
int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; }

90
static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock);
91

X
Xiaoyu Wang 已提交
92
static void releaseQueryBuf(size_t numOfTables);
93

H
Haojun Liao 已提交
94 95 96 97 98 99 100 101 102 103
static void    destroyAggOperatorInfo(void* param);
static void    initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
static void    doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
static void    doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                                const char* pKey);
static void    extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep,
                                                   int32_t status);
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
                                   bool createDummyCol);
H
Haojun Liao 已提交
104

H
Haojun Liao 已提交
105
void setOperatorCompleted(SOperatorInfo* pOperator) {
106
  pOperator->status = OP_EXEC_DONE;
H
Haojun Liao 已提交
107
  ASSERT(pOperator->pTaskInfo != NULL);
108

109
  pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000) / 1000.0;
H
Haojun Liao 已提交
110
  setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
111
}
112

H
Haojun Liao 已提交
113 114 115 116 117 118 119 120 121 122
void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status,
                     void* pInfo, SExecTaskInfo* pTaskInfo) {
  pOperator->name = (char*)name;
  pOperator->operatorType = type;
  pOperator->blocking = blocking;
  pOperator->status = status;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
}

H
Haojun Liao 已提交
123
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) {
124
  OPTR_SET_OPENED(pOperator);
125
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
126
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
127 128
}

H
Haojun Liao 已提交
129 130
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
                                   __optr_close_fn_t closeFn, __optr_explain_fn_t explain) {
131 132 133 134 135 136 137 138 139 140 141
  SOperatorFpSet fpSet = {
      ._openFn = openFn,
      .getNextFn = nextFn,
      .cleanupFn = cleanup,
      .closeFn = closeFn,
      .getExplainFn = explain,
  };

  return fpSet;
}

142 143
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
                                  SGroupResInfo* pGroupResInfo);
H
Haojun Liao 已提交
144

145
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
L
Liu Jicong 已提交
146
  SFilePage* pData = NULL;
147 148 149

  // in the first scan, new space needed for results
  int32_t pageId = -1;
150
  if (*currentPageId == -1) {
151
    pData = getNewBufPage(pResultBuf, &pageId);
152 153
    pData->num = sizeof(SFilePage);
  } else {
154 155
    pData = getBufPage(pResultBuf, *currentPageId);
    pageId = *currentPageId;
156

wmmhello's avatar
wmmhello 已提交
157
    if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
158
      // release current page first, and prepare the next one
159
      releaseBufPage(pResultBuf, pData);
160

161
      pData = getNewBufPage(pResultBuf, &pageId);
162 163 164 165 166 167 168 169 170 171
      if (pData != NULL) {
        pData->num = sizeof(SFilePage);
      }
    }
  }

  if (pData == NULL) {
    return NULL;
  }

172 173
  setBufPageDirty(pData, true);

174 175 176 177
  // set the number of rows in current disk page
  SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
  pResultRow->pageId = pageId;
  pResultRow->offset = (int32_t)pData->num;
178
  *currentPageId = pageId;
179

wmmhello's avatar
wmmhello 已提交
180
  pData->num += interBufSize;
181 182 183
  return pResultRow;
}

184 185 186 187 188 189 190
/**
 * the struct of key in hash table
 * +----------+---------------+
 * | group id |   key data    |
 * | 8 bytes  | actual length |
 * +----------+---------------+
 */
191 192 193
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
                                   int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
                                   bool isIntervalQuery, SAggSupporter* pSup) {
194
  SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
H
Haojun Liao 已提交
195

dengyihao's avatar
dengyihao 已提交
196
  SResultRowPosition* p1 =
197
      (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
H
Haojun Liao 已提交
198

199 200
  SResultRow* pResult = NULL;

H
Haojun Liao 已提交
201 202
  // in case of repeat scan/reverse scan, no new time window added.
  if (isIntervalQuery) {
203
    if (masterscan && p1 != NULL) {  // the *p1 may be NULL in case of sliding+offset exists.
204
      pResult = getResultRowByPos(pResultBuf, p1, true);
205
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
206 207
    }
  } else {
dengyihao's avatar
dengyihao 已提交
208 209
    // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
    // pResultRowInfo object.
H
Haojun Liao 已提交
210
    if (p1 != NULL) {
211
      // todo
212
      pResult = getResultRowByPos(pResultBuf, p1, true);
213
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
214 215 216
    }
  }

L
Liu Jicong 已提交
217
  // 1. close current opened time window
218
  if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
219
    SResultRowPosition pos = pResultRowInfo->cur;
X
Xiaoyu Wang 已提交
220
    SFilePage*         pPage = getBufPage(pResultBuf, pos.pageId);
221 222 223 224 225
    releaseBufPage(pResultBuf, pPage);
  }

  // allocate a new buffer page
  if (pResult == NULL) {
H
Haojun Liao 已提交
226
    ASSERT(pSup->resultRowSize > 0);
227
    pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
228

229 230
    // add a new result set for a new group
    SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
231
    tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
L
Liu Jicong 已提交
232
                   sizeof(SResultRowPosition));
H
Haojun Liao 已提交
233 234
  }

235 236 237
  // 2. set the new time window to be the new active time window
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};

H
Haojun Liao 已提交
238
  // too many time window in query
239
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
240
      tSimpleHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
241
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
H
Haojun Liao 已提交
242 243
  }

H
Haojun Liao 已提交
244
  return pResult;
H
Haojun Liao 已提交
245 246
}

247
// a new buffer page for each table. Needs to opt this design
L
Liu Jicong 已提交
248
static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, int32_t tid, uint32_t size) {
249 250 251 252
  if (pWindowRes->pageId != -1) {
    return 0;
  }

L
Liu Jicong 已提交
253
  SFilePage* pData = NULL;
254 255 256

  // in the first scan, new space needed for results
  int32_t pageId = -1;
257
  SArray* list = getDataBufPagesIdList(pResultBuf);
258 259

  if (taosArrayGetSize(list) == 0) {
260
    pData = getNewBufPage(pResultBuf, &pageId);
261
    pData->num = sizeof(SFilePage);
262 263
  } else {
    SPageInfo* pi = getLastPageInfo(list);
264
    pData = getBufPage(pResultBuf, getPageId(pi));
265
    pageId = getPageId(pi);
266

267
    if (pData->num + size > getBufPageSize(pResultBuf)) {
268
      // release current page first, and prepare the next one
269
      releaseBufPageInfo(pResultBuf, pi);
270

271
      pData = getNewBufPage(pResultBuf, &pageId);
272
      if (pData != NULL) {
273
        pData->num = sizeof(SFilePage);
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
      }
    }
  }

  if (pData == NULL) {
    return -1;
  }

  // set the number of rows in current disk page
  if (pWindowRes->pageId == -1) {  // not allocated yet, allocate new buffer
    pWindowRes->pageId = pageId;
    pWindowRes->offset = (int32_t)pData->num;

    pData->num += size;
    assert(pWindowRes->pageId >= 0);
  }

  return 0;
}

294
//  query_range_start, query_range_end, window_duration, window_start, window_end
295
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
296 297 298
  pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
  pColData->info.bytes = sizeof(int64_t);

H
Haojun Liao 已提交
299
  colInfoDataEnsureCapacity(pColData, 5, false);
300 301 302 303 304 305 306 307 308
  colDataAppendInt64(pColData, 0, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 1, &pQueryWindow->ekey);

  int64_t interval = 0;
  colDataAppendInt64(pColData, 2, &interval);  // this value may be variable in case of 'n' and 'y'.
  colDataAppendInt64(pColData, 3, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
}

309 310 311 312 313 314 315
typedef struct {
  bool    hasAgg;
  int32_t numOfRows;
  int32_t startOffset;
} SFunctionCtxStatus;

static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
H
Haojun Liao 已提交
316
  pStatus->hasAgg = pCtx->input.colDataSMAIsSet;
317 318 319 320 321
  pStatus->numOfRows = pCtx->input.numOfRows;
  pStatus->startOffset = pCtx->input.startRowIndex;
}

static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
H
Haojun Liao 已提交
322
  pCtx->input.colDataSMAIsSet = pStatus->hasAgg;
H
Haojun Liao 已提交
323
  pCtx->input.numOfRows = pStatus->numOfRows;
324 325 326
  pCtx->input.startRowIndex = pStatus->startOffset;
}

H
Haojun Liao 已提交
327 328
void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
                                     int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) {
329
  for (int32_t k = 0; k < numOfOutput; ++k) {
H
Haojun Liao 已提交
330
    // keep it temporarily
331 332
    SFunctionCtxStatus status = {0};
    functionCtxSave(&pCtx[k], &status);
333

334
    pCtx[k].input.startRowIndex = offset;
335
    pCtx[k].input.numOfRows = forwardStep;
336 337 338

    // not a whole block involved in query processing, statistics data can not be used
    // NOTE: the original value of isSet have been changed here
H
Haojun Liao 已提交
339 340
    if (pCtx[k].input.colDataSMAIsSet && forwardStep < numOfTotal) {
      pCtx[k].input.colDataSMAIsSet = false;
341 342
    }

343 344
    if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) {
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
345 346

      char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
347

348
      SColumnInfoData idata = {0};
dengyihao's avatar
dengyihao 已提交
349
      idata.info.type = TSDB_DATA_TYPE_BIGINT;
350
      idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
dengyihao's avatar
dengyihao 已提交
351
      idata.pData = p;
352 353 354 355

      SScalarParam out = {.columnData = &idata};
      SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
      pCtx[k].sfp.process(&tw, 1, &out);
356
      pEntryInfo->numOfRes = 1;
357 358 359 360 361 362 363 364
    } else {
      int32_t code = TSDB_CODE_SUCCESS;
      if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
        code = pCtx[k].fpSet.process(&pCtx[k]);

        if (code != TSDB_CODE_SUCCESS) {
          qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
          taskInfo->code = code;
365
          T_LONG_JMP(taskInfo->env, code);
366
        }
367
      }
368

369
      // restore it
370
      functionCtxRestore(&pCtx[k], &status);
371
    }
372 373 374
  }
}

375 376 377
static void doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order) {
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
378
    pCtx[i].order = order;
379
    pCtx[i].input.numOfRows = pBlock->info.rows;
380
    setBlockSMAInfo(&pCtx[i], &pExprSup->pExprInfo[i], pBlock);
381
    pCtx[i].pSrcBlock = pBlock;
382 383 384
  }
}

385
void setInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol) {
386
  if (pBlock->pBlockAgg != NULL) {
387
    doSetInputDataBlockInfo(pExprSup, pBlock, order);
388
  } else {
389
    doSetInputDataBlock(pExprSup, pBlock, order, scanFlag, createDummyCol);
H
Haojun Liao 已提交
390
  }
391 392
}

L
Liu Jicong 已提交
393 394
static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t paramIndex,
                                             int32_t numOfRows) {
395 396 397 398 399 400 401 402
  SColumnInfoData* pColInfo = NULL;
  if (pInput->pData[paramIndex] == NULL) {
    pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pColInfo == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    // Set the correct column info (data type and bytes)
403 404
    pColInfo->info.type = pFuncParam->param.nType;
    pColInfo->info.bytes = pFuncParam->param.nLen;
405 406

    pInput->pData[paramIndex] = pColInfo;
407 408
  } else {
    pColInfo = pInput->pData[paramIndex];
409 410
  }

H
Haojun Liao 已提交
411
  colInfoDataEnsureCapacity(pColInfo, numOfRows, false);
412

413
  int8_t type = pFuncParam->param.nType;
414 415
  if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
    int64_t v = pFuncParam->param.i;
dengyihao's avatar
dengyihao 已提交
416
    for (int32_t i = 0; i < numOfRows; ++i) {
417 418 419 420
      colDataAppendInt64(pColInfo, i, &v);
    }
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
dengyihao's avatar
dengyihao 已提交
421
    for (int32_t i = 0; i < numOfRows; ++i) {
422 423
      colDataAppendDouble(pColInfo, i, &v);
    }
424
  } else if (type == TSDB_DATA_TYPE_VARCHAR) {
L
Liu Jicong 已提交
425
    char* tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE);
426
    STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen);
L
Liu Jicong 已提交
427
    for (int32_t i = 0; i < numOfRows; ++i) {
428 429
      colDataAppend(pColInfo, i, tmp, false);
    }
H
Haojun Liao 已提交
430
    taosMemoryFree(tmp);
431 432 433 434 435
  }

  return TSDB_CODE_SUCCESS;
}

436 437
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
                                   bool createDummyCol) {
438
  int32_t         code = TSDB_CODE_SUCCESS;
439
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
440

441
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
L
Liu Jicong 已提交
442
    pCtx[i].order = order;
443 444
    pCtx[i].input.numOfRows = pBlock->info.rows;

L
Liu Jicong 已提交
445
    pCtx[i].pSrcBlock = pBlock;
X
Xiaoyu Wang 已提交
446
    pCtx[i].scanFlag = scanFlag;
H
Haojun Liao 已提交
447

448
    SInputColumnInfoData* pInput = &pCtx[i].input;
H
Haojun Liao 已提交
449
    pInput->uid = pBlock->info.id.uid;
H
Haojun Liao 已提交
450
    pInput->colDataSMAIsSet = false;
451

452
    SExprInfo* pOneExpr = &pExprSup->pExprInfo[i];
453
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
dengyihao's avatar
dengyihao 已提交
454
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
G
Ganlin Zhao 已提交
455 456
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
dengyihao's avatar
dengyihao 已提交
457
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
458 459 460
        pInput->totalRows = pBlock->info.rows;
        pInput->numOfRows = pBlock->info.rows;
        pInput->startRowIndex = 0;
461

462
        // NOTE: the last parameter is the primary timestamp column
H
Haojun Liao 已提交
463
        // todo: refactor this
464
        if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
L
Liu Jicong 已提交
465
          pInput->pPTS = pInput->pData[j];  // in case of merge function, this is not always the ts column data.
466
          //          ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP);
467
        }
468 469
        ASSERT(pInput->pData[j] != NULL);
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
470 471 472
        // todo avoid case: top(k, 12), 12 is the value parameter.
        // sum(11), 11 is also the value parameter.
        if (createDummyCol && pOneExpr->base.numOfParams == 1) {
473 474 475 476
          pInput->totalRows = pBlock->info.rows;
          pInput->numOfRows = pBlock->info.rows;
          pInput->startRowIndex = 0;

477
          code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows);
478 479 480
          if (code != TSDB_CODE_SUCCESS) {
            return code;
          }
481
        }
G
Ganlin Zhao 已提交
482 483
      }
    }
H
Haojun Liao 已提交
484
  }
485 486

  return code;
H
Haojun Liao 已提交
487 488
}

489
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
490
  for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
H
Haojun Liao 已提交
491
    if (functionNeedToExecute(&pCtx[k])) {
492
      // todo add a dummy funtion to avoid process check
493 494 495
      if (pCtx[k].fpSet.process == NULL) {
        continue;
      }
H
Haojun Liao 已提交
496

497 498 499 500
      int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
      if (code != TSDB_CODE_SUCCESS) {
        qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
        return code;
501
      }
502 503
    }
  }
504 505

  return TSDB_CODE_SUCCESS;
506 507
}

5
54liuyao 已提交
508
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
509
  struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
510

511 512 513 514 515
  // in case of timestamp column, always generated results.
  int32_t functionId = pCtx->functionId;
  if (functionId == -1) {
    return false;
  }
516

517 518
  if (pCtx->scanFlag == REPEAT_SCAN) {
    return fmIsRepeatScanFunc(pCtx->functionId);
519 520
  }

521 522
  if (isRowEntryCompleted(pResInfo)) {
    return false;
523 524
  }

525 526 527
  return true;
}

528 529 530 531 532 533 534
static int32_t doCreateConstantValColumnAggInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type,
                                                int32_t paramIndex, int32_t numOfRows) {
  if (pInput->pData[paramIndex] == NULL) {
    pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pInput->pData[paramIndex] == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
535

536 537 538
    // Set the correct column info (data type and bytes)
    pInput->pData[paramIndex]->info.type = type;
    pInput->pData[paramIndex]->info.bytes = tDataTypes[type].bytes;
539
  }
H
Haojun Liao 已提交
540

541 542 543 544 545 546
  SColumnDataAgg* da = NULL;
  if (pInput->pColumnDataAgg[paramIndex] == NULL) {
    da = taosMemoryCalloc(1, sizeof(SColumnDataAgg));
    pInput->pColumnDataAgg[paramIndex] = da;
    if (da == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
547 548
    }
  } else {
549
    da = pInput->pColumnDataAgg[paramIndex];
550 551
  }

552
  ASSERT(!IS_VAR_DATA_TYPE(type));
553

554 555
  if (type == TSDB_DATA_TYPE_BIGINT) {
    int64_t v = pFuncParam->param.i;
556
    *da = (SColumnDataAgg){.numOfNull = 0, .min = v, .max = v, .sum = v * numOfRows};
557 558
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
559
    *da = (SColumnDataAgg){.numOfNull = 0};
560

561 562 563 564 565 566
    *(double*)&da->min = v;
    *(double*)&da->max = v;
    *(double*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_BOOL) {  // todo validate this data type
    bool v = pFuncParam->param.i;

567
    *da = (SColumnDataAgg){.numOfNull = 0};
568 569 570 571 572
    *(bool*)&da->min = 0;
    *(bool*)&da->max = v;
    *(bool*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_TIMESTAMP) {
    // do nothing
573
  } else {
574
    ASSERT(0);
575 576
  }

577 578
  return TSDB_CODE_SUCCESS;
}
579

580
void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock) {
581 582 583 584 585 586 587
  int32_t numOfRows = pBlock->info.rows;

  SInputColumnInfoData* pInput = &pCtx->input;
  pInput->numOfRows = numOfRows;
  pInput->totalRows = numOfRows;

  if (pBlock->pBlockAgg != NULL) {
H
Haojun Liao 已提交
588
    pInput->colDataSMAIsSet = true;
589

590 591
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
592

593 594
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
595 596
        pInput->pColumnDataAgg[j] = pBlock->pBlockAgg[slotId];
        if (pInput->pColumnDataAgg[j] == NULL) {
H
Haojun Liao 已提交
597
          pInput->colDataSMAIsSet = false;
598
        }
599 600 601 602

        // Here we set the column info data since the data type for each column data is required, but
        // the data in the corresponding SColumnInfoData will not be used.
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
603 604
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
        doCreateConstantValColumnAggInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
605 606
      }
    }
607
  } else {
H
Haojun Liao 已提交
608
    pInput->colDataSMAIsSet = false;
609 610 611
  }
}

L
Liu Jicong 已提交
612
bool isTaskKilled(SExecTaskInfo* pTaskInfo) {
D
dapan1121 已提交
613
  return (0 != pTaskInfo->code) ? true : false;
614 615
}

D
dapan1121 已提交
616
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) { pTaskInfo->code = rspCode; }
617 618

/////////////////////////////////////////////////////////////////////////////////////////////
619
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) {
L
Liu Jicong 已提交
620
  STimeWindow win = {0};
621
  win.skey = taosTimeTruncate(key, pInterval, precision);
622 623

  /*
H
Haojun Liao 已提交
624
   * if the realSkey > INT64_MAX - pInterval->interval, the query duration between
625 626
   * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
   */
627 628 629
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
  if (win.ekey < win.skey) {
    win.ekey = INT64_MAX;
630
  }
631 632

  return win;
633 634
}

L
Liu Jicong 已提交
635 636
int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
                              uint32_t* status) {
637
  *status = BLK_DATA_NOT_LOAD;
638

H
Haojun Liao 已提交
639
  pBlock->pDataBlock = NULL;
L
Liu Jicong 已提交
640
  pBlock->pBlockAgg = NULL;
H
Haojun Liao 已提交
641

L
Liu Jicong 已提交
642 643
  //  int64_t groupId = pRuntimeEnv->current->groupIndex;
  //  bool    ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
644

H
Haojun Liao 已提交
645
  STaskCostInfo* pCost = &pTaskInfo->cost;
646

647 648
//  pCost->totalBlocks += 1;
//  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
649
#if 0
650 651 652
  // Calculate all time windows that are overlapping or contain current data block.
  // If current data block is contained by all possible time window, do not load current data block.
  if (/*pQueryAttr->pFilters || */pQueryAttr->groupbyColumn || pQueryAttr->sw.gap > 0 ||
H
Haojun Liao 已提交
653
      (QUERY_IS_INTERVAL_QUERY(pQueryAttr) && overlapWithTimeWindow(pTaskInfo, &pBlock->info))) {
654
    (*status) = BLK_DATA_DATA_LOAD;
655 656 657
  }

  // check if this data block is required to load
658
  if ((*status) != BLK_DATA_DATA_LOAD) {
659 660 661 662 663 664 665
    bool needFilter = true;

    // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
    // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
    if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
      SResultRow* pResult = NULL;

H
Haojun Liao 已提交
666
      bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
667 668 669 670 671 672
      TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

      STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
      if (pQueryAttr->pointInterpQuery) {
        needFilter = chkWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
673
                                    pTableScanInfo->rowEntryInfoOffset);
674
      } else {
H
Haojun Liao 已提交
675
        if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.id.uid, &win, masterScan, &pResult, groupId,
676
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
677
                                    pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
678
          T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
679 680 681 682
        }
      }
    } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate
      doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx,
683
                               pTableScanInfo->rowEntryInfoOffset, pTableScanInfo->numOfOutput,
684 685 686 687 688 689
                               pRuntimeEnv->current->groupIndex);
    }

    if (needFilter) {
      (*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock);
    } else {
690
      (*status) = BLK_DATA_DATA_LOAD;
691 692 693 694
    }
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
H
Haojun Liao 已提交
695
//  *status = updateBlockLoadStatus(pRuntimeEnv->pQueryAttr, *status);
696

697
  if ((*status) == BLK_DATA_NOT_LOAD || (*status) == BLK_DATA_FILTEROUT) {
698 699
    //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//           pBlockInfo->window.ekey, pBlockInfo->rows);
700
    pCost->skipBlocks += 1;
701
  } else if ((*status) == BLK_DATA_SMA_LOAD) {
702 703
    // this function never returns error?
    pCost->loadBlockStatis += 1;
704
//    tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
705 706

    if (pBlock->pBlockAgg == NULL) {  // data block statistics does not exist, load data block
707
//      pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
708 709 710
      pCost->totalCheckedRows += pBlock->info.rows;
    }
  } else {
711
    assert((*status) == BLK_DATA_DATA_LOAD);
712 713 714

    // load the data block statistics to perform further filter
    pCost->loadBlockStatis += 1;
715
//    tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
716 717 718 719 720 721

    if (pQueryAttr->topBotQuery && pBlock->pBlockAgg != NULL) {
      { // set previous window
        if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
          SResultRow* pResult = NULL;

H
Haojun Liao 已提交
722
          bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
723 724 725
          TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

          STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
H
Haojun Liao 已提交
726
          if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.id.uid, &win, masterScan, &pResult, groupId,
727
                                      pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
728
                                      pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
729
            T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
730 731 732 733 734 735 736 737 738 739
          }
        }
      }
      bool load = false;
      for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
        int32_t functionId = pTableScanInfo->pCtx[i].functionId;
        if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM) {
//          load = topbot_datablock_filter(&pTableScanInfo->pCtx[i], (char*)&(pBlock->pBlockAgg[i].min),
//                                         (char*)&(pBlock->pBlockAgg[i].max));
          if (!load) { // current block has been discard due to filter applied
740
            pCost->skipBlocks += 1;
741 742
            //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId,
//                   pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
743
            (*status) = BLK_DATA_FILTEROUT;
744 745 746 747 748 749 750
            return TSDB_CODE_SUCCESS;
          }
        }
      }
    }

    // current block has been discard due to filter applied
H
Haojun Liao 已提交
751
//    if (!doFilterByBlockSMA(pRuntimeEnv, pBlock->pBlockAgg, pTableScanInfo->pCtx, pBlockInfo->rows)) {
752
//      pCost->skipBlocks += 1;
753 754
//      qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//             pBlockInfo->window.ekey, pBlockInfo->rows);
755
//      (*status) = BLK_DATA_FILTEROUT;
756 757 758 759 760
//      return TSDB_CODE_SUCCESS;
//    }

    pCost->totalCheckedRows += pBlockInfo->rows;
    pCost->loadBlocks += 1;
761
//    pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
762 763 764 765 766
//    if (pBlock->pDataBlock == NULL) {
//      return terrno;
//    }

//    if (pQueryAttr->pFilters != NULL) {
767
//      filterSetColFieldData(pQueryAttr->pFilters, taosArrayGetSize(pBlock->pDataBlock), pBlock->pDataBlock);
768
//    }
769

770 771 772 773
//    if (pQueryAttr->pFilters != NULL || pRuntimeEnv->pTsBuf != NULL) {
//      filterColRowsInDataBlock(pRuntimeEnv, pBlock, ascQuery);
//    }
  }
H
Haojun Liao 已提交
774
#endif
775 776 777
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
778
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
779
  if (status == TASK_NOT_COMPLETED) {
H
Haojun Liao 已提交
780
    pTaskInfo->status = status;
781 782
  } else {
    // QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first
783
    CLEAR_QUERY_STATUS(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
784
    pTaskInfo->status |= status;
785 786 787
  }
}

788
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
5
54liuyao 已提交
789
  bool init = false;
790
  for (int32_t i = 0; i < numOfOutput; ++i) {
791
    pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
5
54liuyao 已提交
792 793 794
    if (init) {
      continue;
    }
795 796 797 798 799

    struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
    if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
      continue;
    }
800 801 802 803 804

    if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
      continue;
    }

805 806 807 808 809 810
    if (!pResInfo->initialized) {
      if (pCtx[i].functionId != -1) {
        pCtx[i].fpSet.init(&pCtx[i], pResInfo);
      } else {
        pResInfo->initialized = true;
      }
5
54liuyao 已提交
811 812
    } else {
      init = true;
813 814 815 816
    }
  }
}

H
Haojun Liao 已提交
817 818
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) {
  if (pFilterInfo == NULL || pBlock->info.rows == 0) {
S
shenglian zhou 已提交
819 820
    return;
  }
821

822
  SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
823
  int32_t            code = filterSetDataFromSlotId(pFilterInfo, &param1);
824

825
  SColumnInfoData* p = NULL;
826
  int32_t          status = 0;
H
Haojun Liao 已提交
827

828
  // todo the keep seems never to be True??
H
Haojun Liao 已提交
829
  bool keep = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
830
  extractQualifiedTupleByFilterResult(pBlock, p, keep, status);
H
Haojun Liao 已提交
831

832
  if (pColMatchInfo != NULL) {
833
    size_t size = taosArrayGetSize(pColMatchInfo->pList);
H
Haojun Liao 已提交
834
    for (int32_t i = 0; i < size; ++i) {
H
Haojun Liao 已提交
835
      SColMatchItem* pInfo = taosArrayGet(pColMatchInfo->pList, i);
836
      if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
837
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->dstSlotId);
838
        if (pColData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
H
Haojun Liao 已提交
839
          blockDataUpdateTsWindow(pBlock, pInfo->dstSlotId);
840 841 842 843 844 845
          break;
        }
      }
    }
  }

846 847
  colDataDestroy(p);
  taosMemoryFree(p);
848 849
}

850
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status) {
851 852 853 854
  if (keep) {
    return;
  }

H
Haojun Liao 已提交
855 856 857
  int32_t totalRows = pBlock->info.rows;

  if (status == FILTER_RESULT_ALL_QUALIFIED) {
858
    // here nothing needs to be done
H
Haojun Liao 已提交
859
  } else if (status == FILTER_RESULT_NONE_QUALIFIED) {
860
    pBlock->info.rows = 0;
H
Haojun Liao 已提交
861
  } else {
862
    SSDataBlock* px = createOneDataBlock(pBlock, true);
863

864 865
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
    for (int32_t i = 0; i < numOfCols; ++i) {
866 867
      SColumnInfoData* pSrc = taosArrayGet(px->pDataBlock, i);
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
868
      // it is a reserved column for scalar function, and no data in this column yet.
869
      if (pDst->pData == NULL || pSrc->pData == NULL) {
870 871 872
        continue;
      }

873 874
      colInfoDataCleanup(pDst, pBlock->info.rows);

875
      int32_t numOfRows = 0;
876
      for (int32_t j = 0; j < totalRows; ++j) {
877
        if (((int8_t*)p->pData)[j] == 0) {
D
dapan1121 已提交
878 879
          continue;
        }
880

D
dapan1121 已提交
881
        if (colDataIsNull_s(pSrc, j)) {
882
          colDataAppendNULL(pDst, numOfRows);
D
dapan1121 已提交
883
        } else {
884
          colDataAppend(pDst, numOfRows, colDataGetData(pSrc, j), false);
D
dapan1121 已提交
885
        }
886
        numOfRows += 1;
H
Haojun Liao 已提交
887
      }
888

889
      // todo this value can be assigned directly
890 891 892 893 894
      if (pBlock->info.rows == totalRows) {
        pBlock->info.rows = numOfRows;
      } else {
        ASSERT(pBlock->info.rows == numOfRows);
      }
895
    }
896

dengyihao's avatar
dengyihao 已提交
897
    blockDataDestroy(px);  // fix memory leak
898 899 900
  }
}

901
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
902
  // for simple group by query without interval, all the tables belong to one group result.
903 904 905
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
  SAggOperatorInfo* pAggInfo = pOperator->info;

906
  SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
907 908
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
909

910
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId,
L
Liu Jicong 已提交
911
                                                  sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
L
Liu Jicong 已提交
912
  assert(pResultRow != NULL);
913 914 915 916 917 918

  /*
   * not assign result buffer yet, add new result buffer
   * all group belong to one result set, and each group result has different group id so set the id to be one
   */
  if (pResultRow->pageId == -1) {
dengyihao's avatar
dengyihao 已提交
919 920
    int32_t ret =
        addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, groupId, pAggInfo->binfo.pRes->info.rowSize);
921 922 923 924 925
    if (ret != TSDB_CODE_SUCCESS) {
      return;
    }
  }

926
  setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
927 928
}

929 930 931
static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
  SAggOperatorInfo* pAggInfo = pOperator->info;
  if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) {
932 933
    return;
  }
934 935

  doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId);
936 937

  // record the current active group id
H
Haojun Liao 已提交
938
  pAggInfo->groupId = groupId;
939 940
}

dengyihao's avatar
dengyihao 已提交
941 942
static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs,
                              const int32_t* rowCellOffset) {
943
  bool returnNotNull = false;
944
  for (int32_t j = 0; j < numOfExprs; ++j) {
945
    struct SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowCellOffset);
946 947 948 949 950 951 952
    if (!isRowEntryInitialized(pResInfo)) {
      continue;
    }

    if (pRow->numOfRows < pResInfo->numOfRes) {
      pRow->numOfRows = pResInfo->numOfRes;
    }
953

954
    if (fmIsNotNullOutputFunc(pCtx[j].functionId)) {
955 956
      returnNotNull = true;
    }
957
  }
S
shenglian zhou 已提交
958 959
  // if all expr skips all blocks, e.g. all null inputs for max function, output one row in final result.
  //  except for first/last, which require not null output, output no rows
960
  if (pRow->numOfRows == 0 && !returnNotNull) {
961
    pRow->numOfRows = 1;
962 963 964
  }
}

965 966
static void doCopyResultToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
                                    SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) {
967 968 969
  for (int32_t j = 0; j < numOfExprs; ++j) {
    int32_t slotId = pExprInfo[j].base.resSchema.slotId;

970
    pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
971
    if (pCtx[j].fpSet.finalize) {
972
      if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_key") == 0) {
973 974
        // for groupkey along with functions that output multiple lines(e.g. Histogram)
        // need to match groupkey result for each output row of that function.
975 976 977 978 979
        if (pCtx[j].resultInfo->numOfRes != 0) {
          pCtx[j].resultInfo->numOfRes = pRow->numOfRows;
        }
      }

980 981 982
      int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
      if (TAOS_FAILED(code)) {
        qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
983
        T_LONG_JMP(pTaskInfo->env, code);
984 985
      }
    } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
986
      // do nothing
987
    } else {
988 989
      // expand the result into multiple rows. E.g., _wstart, top(k, 20)
      // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
990 991 992 993 994 995 996
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
      char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
      for (int32_t k = 0; k < pRow->numOfRows; ++k) {
        colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
      }
    }
  }
997 998
}

999 1000 1001
// todo refactor. SResultRow has direct pointer in miainfo
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
                           SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027
  SFilePage*  page = getBufPage(pBuf, resultRowPosition->pageId);
  SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);

  SqlFunctionCtx* pCtx = pSup->pCtx;
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  const int32_t*  rowEntryOffset = pSup->rowEntryInfoOffset;

  doUpdateNumOfRows(pCtx, pRow, pSup->numOfExprs, rowEntryOffset);
  if (pRow->numOfRows == 0) {
    releaseBufPage(pBuf, page);
    return 0;
  }

  int32_t size = pBlock->info.capacity;
  while (pBlock->info.rows + pRow->numOfRows > size) {
    size = size * 1.25;
  }

  int32_t code = blockDataEnsureCapacity(pBlock, size);
  if (TAOS_FAILED(code)) {
    releaseBufPage(pBuf, page);
    qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
    T_LONG_JMP(pTaskInfo->env, code);
  }

  doCopyResultToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
1028 1029

  releaseBufPage(pBuf, page);
1030
  pBlock->info.rows += pRow->numOfRows;
1031 1032 1033
  return 0;
}

1034 1035 1036 1037 1038 1039 1040
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
                           SGroupResInfo* pGroupResInfo) {
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

1041
  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
1042

1043
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
L
Liu Jicong 已提交
1044 1045
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
    SFilePage*  page = getBufPage(pBuf, pPos->pos.pageId);
1046

1047
    SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
1048

H
Haojun Liao 已提交
1049
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
1050 1051

    // no results, continue to check the next one
1052 1053
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
1054
      releaseBufPage(pBuf, page);
1055 1056 1057
      continue;
    }

H
Haojun Liao 已提交
1058 1059
    if (pBlock->info.id.groupId == 0) {
      pBlock->info.id.groupId = pPos->groupId;
1060 1061
    } else {
      // current value belongs to different group, it can't be packed into one datablock
H
Haojun Liao 已提交
1062
      if (pBlock->info.id.groupId != pPos->groupId) {
1063
        releaseBufPage(pBuf, page);
1064 1065 1066 1067
        break;
      }
    }

1068
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
1069
      ASSERT(pBlock->info.rows > 0);
1070
      releaseBufPage(pBuf, page);
1071 1072 1073 1074
      break;
    }

    pGroupResInfo->index += 1;
1075
    doCopyResultToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
1076

1077
    releaseBufPage(pBuf, page);
1078
    pBlock->info.rows += pRow->numOfRows;
1079 1080
  }

X
Xiaoyu Wang 已提交
1081
  qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
H
Haojun Liao 已提交
1082
         pBlock->info.id.groupId);
1083

1084
  blockDataUpdateTsWindow(pBlock, 0);
1085 1086 1087
  return 0;
}

1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                           SDiskbasedBuf* pBuf) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SSDataBlock*   pBlock = pbInfo->pRes;

  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

  blockDataCleanup(pBlock);
  if (!hasRemainResults(pGroupResInfo)) {
    return;
  }

  // clear the existed group id
H
Haojun Liao 已提交
1102
  pBlock->info.id.groupId = 0;
1103 1104 1105
  ASSERT(!pbInfo->mergeResultBlock);
  doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);

1106
  void* tbname = NULL;
H
Haojun Liao 已提交
1107
  if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
1108 1109 1110
    pBlock->info.parTbName[0] = 0;
  } else {
    memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
1111
  }
1112
  tdbFree(tbname);
1113 1114
}

X
Xiaoyu Wang 已提交
1115 1116
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                            SDiskbasedBuf* pBuf) {
1117
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1118
  SSDataBlock*   pBlock = pbInfo->pRes;
1119

1120 1121 1122
  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

1123
  blockDataCleanup(pBlock);
1124
  if (!hasRemainResults(pGroupResInfo)) {
1125 1126 1127
    return;
  }

1128
  // clear the existed group id
H
Haojun Liao 已提交
1129
  pBlock->info.id.groupId = 0;
1130 1131 1132
  if (!pbInfo->mergeResultBlock) {
    doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
  } else {
dengyihao's avatar
dengyihao 已提交
1133
    while (hasRemainResults(pGroupResInfo)) {
1134 1135 1136
      doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
      if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
        break;
1137 1138
      }

1139
      // clearing group id to continue to merge data that belong to different groups
H
Haojun Liao 已提交
1140
      pBlock->info.id.groupId = 0;
1141
    }
1142 1143

    // clear the group id info in SSDataBlock, since the client does not need it
H
Haojun Liao 已提交
1144
    pBlock->info.id.groupId = 0;
1145 1146 1147
  }
}

L
Liu Jicong 已提交
1148 1149
// void skipBlocks(STaskRuntimeEnv *pRuntimeEnv) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
1150
//
L
Liu Jicong 已提交
1151 1152 1153
//   if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0) {
//     return;
//   }
1154
//
L
Liu Jicong 已提交
1155 1156
//   pQueryAttr->pos = 0;
//   int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
1157
//
L
Liu Jicong 已提交
1158 1159
//   STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
//   TsdbQueryHandleT pTsdbReadHandle = pRuntimeEnv->pTsdbReadHandle;
1160
//
L
Liu Jicong 已提交
1161 1162 1163
//   SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
//   while (tsdbNextDataBlock(pTsdbReadHandle)) {
//     if (isTaskKilled(pRuntimeEnv->qinfo)) {
1164
//       T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
L
Liu Jicong 已提交
1165
//     }
1166
//
L
Liu Jicong 已提交
1167
//     tsdbRetrieveDataBlockInfo(pTsdbReadHandle, &blockInfo);
1168
//
L
Liu Jicong 已提交
1169 1170 1171 1172
//     if (pQueryAttr->limit.offset > blockInfo.rows) {
//       pQueryAttr->limit.offset -= blockInfo.rows;
//       pTableQueryInfo->lastKey = (QUERY_IS_ASC_QUERY(pQueryAttr)) ? blockInfo.window.ekey : blockInfo.window.skey;
//       pTableQueryInfo->lastKey += step;
1173
//
L
Liu Jicong 已提交
1174 1175 1176 1177 1178 1179 1180
//       //qDebug("QInfo:0x%"PRIx64" skip rows:%d, offset:%" PRId64, GET_TASKID(pRuntimeEnv), blockInfo.rows,
//              pQuery->limit.offset);
//     } else {  // find the appropriated start position in current block
//       updateOffsetVal(pRuntimeEnv, &blockInfo);
//       break;
//     }
//   }
1181
//
L
Liu Jicong 已提交
1182
//   if (terrno != TSDB_CODE_SUCCESS) {
1183
//     T_LONG_JMP(pRuntimeEnv->env, terrno);
L
Liu Jicong 已提交
1184 1185 1186 1187 1188 1189 1190
//   }
// }

// static TSKEY doSkipIntervalProcess(STaskRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo,
// STableQueryInfo* pTableQueryInfo) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
1191
//
L
Liu Jicong 已提交
1192 1193 1194
//   assert(pQueryAttr->limit.offset == 0);
//   STimeWindow tw = *win;
//   getNextTimeWindow(pQueryAttr, &tw);
1195
//
L
Liu Jicong 已提交
1196 1197
//   if ((tw.skey <= pBlockInfo->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) ||
//       (tw.ekey >= pBlockInfo->window.skey && !QUERY_IS_ASC_QUERY(pQueryAttr))) {
1198
//
L
Liu Jicong 已提交
1199 1200 1201 1202
//     // load the data block and check data remaining in current data block
//     // TODO optimize performance
//     SArray *         pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//     SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1203
//
L
Liu Jicong 已提交
1204 1205 1206 1207
//     tw = *win;
//     int32_t startPos =
//         getNextQualifiedWindow(pQueryAttr, &tw, pBlockInfo, pColInfoData->pData, binarySearchForKey, -1);
//     assert(startPos >= 0);
1208
//
L
Liu Jicong 已提交
1209 1210
//     // set the abort info
//     pQueryAttr->pos = startPos;
1211
//
L
Liu Jicong 已提交
1212 1213 1214 1215
//     // reset the query start timestamp
//     pTableQueryInfo->win.skey = ((TSKEY *)pColInfoData->pData)[startPos];
//     pQueryAttr->window.skey = pTableQueryInfo->win.skey;
//     TSKEY key = pTableQueryInfo->win.skey;
1216
//
L
Liu Jicong 已提交
1217 1218
//     pWindowResInfo->prevSKey = tw.skey;
//     int32_t index = pRuntimeEnv->resultRowInfo.curIndex;
1219
//
L
Liu Jicong 已提交
1220 1221
//     int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
//     pRuntimeEnv->resultRowInfo.curIndex = index;  // restore the window index
1222
//
L
Liu Jicong 已提交
1223 1224 1225 1226
//     //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d,
//     lastKey:%" PRId64,
//            GET_TASKID(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes,
//            pQueryAttr->current->lastKey);
1227
//
L
Liu Jicong 已提交
1228 1229 1230 1231 1232
//     return key;
//   } else {  // do nothing
//     pQueryAttr->window.skey      = tw.skey;
//     pWindowResInfo->prevSKey = tw.skey;
//     pTableQueryInfo->lastKey = tw.skey;
1233
//
L
Liu Jicong 已提交
1234 1235
//     return tw.skey;
//   }
1236
//
L
Liu Jicong 已提交
1237 1238 1239 1240 1241 1242 1243 1244 1245 1246
//   return true;
// }

// static bool skipTimeInterval(STaskRuntimeEnv *pRuntimeEnv, TSKEY* start) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//     assert(*start <= pRuntimeEnv->current->lastKey);
//   } else {
//     assert(*start >= pRuntimeEnv->current->lastKey);
//   }
1247
//
L
Liu Jicong 已提交
1248 1249 1250 1251 1252
//   // if queried with value filter, do NOT forward query start position
//   if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL ||
//   pRuntimeEnv->pFillInfo != NULL) {
//     return true;
//   }
1253
//
L
Liu Jicong 已提交
1254 1255 1256 1257 1258 1259 1260
//   /*
//    * 1. for interval without interpolation query we forward pQueryAttr->interval.interval at a time for
//    *    pQueryAttr->limit.offset times. Since hole exists, pQueryAttr->interval.interval*pQueryAttr->limit.offset
//    value is
//    *    not valid. otherwise, we only forward pQueryAttr->limit.offset number of points
//    */
//   assert(pRuntimeEnv->resultRowInfo.prevSKey == TSKEY_INITIAL_VAL);
1261
//
L
Liu Jicong 已提交
1262 1263
//   STimeWindow w = TSWINDOW_INITIALIZER;
//   bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
1264
//
L
Liu Jicong 已提交
1265 1266
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
//   STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
1267
//
L
Liu Jicong 已提交
1268 1269 1270
//   SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
//   while (tsdbNextDataBlock(pRuntimeEnv->pTsdbReadHandle)) {
//     tsdbRetrieveDataBlockInfo(pRuntimeEnv->pTsdbReadHandle, &blockInfo);
1271
//
L
Liu Jicong 已提交
1272 1273 1274 1275 1276 1277 1278 1279 1280
//     if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//       if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
//         getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.skey, blockInfo.window.skey, pQueryAttr->window.ekey,
//         &w); pWindowResInfo->prevSKey = w.skey;
//       }
//     } else {
//       getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.ekey, pQueryAttr->window.ekey, blockInfo.window.ekey, &w);
//       pWindowResInfo->prevSKey = w.skey;
//     }
1281
//
L
Liu Jicong 已提交
1282 1283
//     // the first time window
//     STimeWindow win = getActiveTimeWindow(pWindowResInfo, pWindowResInfo->prevSKey, pQueryAttr);
1284
//
L
Liu Jicong 已提交
1285 1286
//     while (pQueryAttr->limit.offset > 0) {
//       STimeWindow tw = win;
1287
//
L
Liu Jicong 已提交
1288 1289 1290
//       if ((win.ekey <= blockInfo.window.ekey && ascQuery) || (win.ekey >= blockInfo.window.skey && !ascQuery)) {
//         pQueryAttr->limit.offset -= 1;
//         pWindowResInfo->prevSKey = win.skey;
1291
//
L
Liu Jicong 已提交
1292 1293 1294 1295 1296 1297
//         // current time window is aligned with blockInfo.window.ekey
//         // restart it from next data block by set prevSKey to be TSKEY_INITIAL_VAL;
//         if ((win.ekey == blockInfo.window.ekey && ascQuery) || (win.ekey == blockInfo.window.skey && !ascQuery)) {
//           pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL;
//         }
//       }
1298
//
L
Liu Jicong 已提交
1299 1300 1301 1302
//       if (pQueryAttr->limit.offset == 0) {
//         *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//         return true;
//       }
1303
//
L
Liu Jicong 已提交
1304 1305
//       // current window does not ended in current data block, try next data block
//       getNextTimeWindow(pQueryAttr, &tw);
1306
//
L
Liu Jicong 已提交
1307 1308 1309 1310 1311 1312 1313 1314 1315
//       /*
//        * If the next time window still starts from current data block,
//        * load the primary timestamp column first, and then find the start position for the next queried time window.
//        * Note that only the primary timestamp column is required.
//        * TODO: Optimize for this cases. All data blocks are not needed to be loaded, only if the first actually
//        required
//        * time window resides in current data block.
//        */
//       if ((tw.skey <= blockInfo.window.ekey && ascQuery) || (tw.ekey >= blockInfo.window.skey && !ascQuery)) {
1316
//
L
Liu Jicong 已提交
1317 1318
//         SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//         SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1319
//
L
Liu Jicong 已提交
1320 1321 1322
//         if ((win.ekey > blockInfo.window.ekey && ascQuery) || (win.ekey < blockInfo.window.skey && !ascQuery)) {
//           pQueryAttr->limit.offset -= 1;
//         }
1323
//
L
Liu Jicong 已提交
1324 1325 1326 1327 1328 1329 1330 1331
//         if (pQueryAttr->limit.offset == 0) {
//           *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//           return true;
//         } else {
//           tw = win;
//           int32_t startPos =
//               getNextQualifiedWindow(pQueryAttr, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1);
//           assert(startPos >= 0);
1332
//
L
Liu Jicong 已提交
1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343
//           // set the abort info
//           pQueryAttr->pos = startPos;
//           pTableQueryInfo->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
//           pWindowResInfo->prevSKey = tw.skey;
//           win = tw;
//         }
//       } else {
//         break;  // offset is not 0, and next time window begins or ends in the next block.
//       }
//     }
//   }
1344
//
L
Liu Jicong 已提交
1345 1346
//   // check for error
//   if (terrno != TSDB_CODE_SUCCESS) {
1347
//     T_LONG_JMP(pRuntimeEnv->env, terrno);
L
Liu Jicong 已提交
1348
//   }
1349
//
L
Liu Jicong 已提交
1350 1351
//   return true;
// }
1352

1353
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) {
H
Haojun Liao 已提交
1354
  if (p->pDownstream == NULL) {
H
Haojun Liao 已提交
1355
    assert(p->numOfDownstream == 0);
1356 1357
  }

wafwerar's avatar
wafwerar 已提交
1358
  p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES);
1359 1360 1361 1362 1363 1364 1365
  if (p->pDownstream == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  memcpy(p->pDownstream, pDownstream, num * POINTER_BYTES);
  p->numOfDownstream = num;
  return TSDB_CODE_SUCCESS;
1366 1367
}

X
Xiaoyu Wang 已提交
1368
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) {
1369
  // todo add more information about exchange operation
1370
  int32_t type = pOperator->operatorType;
X
Xiaoyu Wang 已提交
1371
  if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN ||
1372
      type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN ||
1373
      type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN) {
1374 1375 1376
    *order = TSDB_ORDER_ASC;
    *scanFlag = MAIN_SCAN;
    return TSDB_CODE_SUCCESS;
1377
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
1378
    STableScanInfo* pTableScanInfo = pOperator->info;
H
Haojun Liao 已提交
1379 1380
    *order = pTableScanInfo->base.cond.order;
    *scanFlag = pTableScanInfo->base.scanFlag;
1381
    return TSDB_CODE_SUCCESS;
1382 1383
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) {
    STableMergeScanInfo* pTableScanInfo = pOperator->info;
H
Haojun Liao 已提交
1384 1385
    *order = pTableScanInfo->base.cond.order;
    *scanFlag = pTableScanInfo->base.scanFlag;
1386
    return TSDB_CODE_SUCCESS;
1387
  } else {
H
Haojun Liao 已提交
1388
    if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
1389
      return TSDB_CODE_INVALID_PARA;
H
Haojun Liao 已提交
1390
    } else {
1391
      return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag);
1392 1393 1394
    }
  }
}
1395

1396 1397 1398 1399 1400
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock **ppBlock) {
  if (!tsCountAlwaysReturnValue) {
    return TSDB_CODE_SUCCESS;
  }

1401
  SOperatorInfo* downstream = pOperator->pDownstream[0];
G
Ganlin Zhao 已提交
1402
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION ||
1403 1404 1405 1406 1407
      (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN &&
       ((STableScanInfo *)downstream->info)->hasGroupByTag == true)) {
    return TSDB_CODE_SUCCESS;
  }

1408 1409 1410 1411
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
  bool hasCountFunc = false;
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
    if ((strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "count") == 0) ||
G
Ganlin Zhao 已提交
1412 1413 1414
        (strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "hyperloglog") == 0) ||
        (strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_hyperloglog_partial") == 0) ||
        (strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_hyperloglog_merge") == 0)) {
1415 1416 1417 1418 1419 1420 1421 1422 1423 1424
      hasCountFunc = true;
      break;
    }
  }

  if (!hasCountFunc) {
    return TSDB_CODE_SUCCESS;
  }

  SSDataBlock* pBlock = createDataBlock();
G
Ganlin Zhao 已提交
1425
  pBlock->info.rows = 1;
1426 1427 1428
  pBlock->info.capacity = 0;

  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
G
Ganlin Zhao 已提交
1429 1430 1431 1432
    SColumnInfoData colInfo = {0};
    colInfo.hasNull = true;
    colInfo.info.type = TSDB_DATA_TYPE_NULL;
    colInfo.info.bytes = 1;
1433 1434 1435 1436 1437 1438

    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
G
Ganlin Zhao 已提交
1439 1440 1441 1442 1443 1444 1445
        int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
        if (slotId >= numOfCols) {
          taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1);
          for (int32_t k = numOfCols; k < slotId + 1; ++k) {
            taosArrayPush(pBlock->pDataBlock, &colInfo);
          }
        }
1446
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
G
Ganlin Zhao 已提交
1447
        // do nothing
1448 1449 1450 1451
      }
    }
  }

G
Ganlin Zhao 已提交
1452
  blockDataEnsureCapacity(pBlock, pBlock->info.rows);
1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467
  *ppBlock = pBlock;

  return TSDB_CODE_SUCCESS;
}

static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock **ppBlock) {
  if (!blockAllocated) {
    return;
  }

  blockDataDestroy(*ppBlock);
  *ppBlock = NULL;
}


1468
// this is a blocking operator
L
Liu Jicong 已提交
1469
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
1470 1471
  if (OPTR_IS_OPENED(pOperator)) {
    return TSDB_CODE_SUCCESS;
1472 1473
  }

H
Haojun Liao 已提交
1474
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
1475
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
1476

1477 1478
  SExprSupp*     pSup = &pOperator->exprSupp;
  SOperatorInfo* downstream = pOperator->pDownstream[0];
1479

1480 1481
  int64_t st = taosGetTimestampUs();

1482 1483 1484
  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;

1485 1486 1487
  bool    hasValidBlock = false;
  bool    blockAllocated = false;

H
Haojun Liao 已提交
1488
  while (1) {
1489
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
1490
    if (pBlock == NULL) {
G
Ganlin Zhao 已提交
1491
      if (!hasValidBlock) {
1492 1493 1494 1495 1496 1497 1498 1499
        createDataBlockForEmptyInput(pOperator, &pBlock);
        if (pBlock == NULL) {
          break;
        }
        blockAllocated = true;
      } else {
        break;
      }
1500
    }
1501
    hasValidBlock = true;
1502

1503 1504
    int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
    if (code != TSDB_CODE_SUCCESS) {
1505
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
1506
      T_LONG_JMP(pTaskInfo->env, code);
1507
    }
1508

1509
    // there is an scalar expression that needs to be calculated before apply the group aggregation.
G
Ganlin Zhao 已提交
1510
    if (pAggInfo->scalarExprSup.pExprInfo != NULL && !blockAllocated) {
1511 1512
      SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
      code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
1513
      if (code != TSDB_CODE_SUCCESS) {
1514
        destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
1515
        T_LONG_JMP(pTaskInfo->env, code);
1516
      }
1517 1518
    }

1519
    // the pDataBlock are always the same one, no need to call this again
H
Haojun Liao 已提交
1520
    setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
1521
    setInputDataBlock(pSup, pBlock, order, scanFlag, true);
1522
    code = doAggregateImpl(pOperator, pSup->pCtx);
1523
    if (code != 0) {
1524
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
1525
      T_LONG_JMP(pTaskInfo->env, code);
1526
    }
1527 1528 1529

    destroyDataBlockForEmptyInput(blockAllocated, &pBlock);

1530 1531
  }

1532 1533 1534 1535 1536
  // the downstream operator may return with error code, so let's check the code before generating results.
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
  }

1537
  initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
H
Haojun Liao 已提交
1538
  OPTR_SET_OPENED(pOperator);
1539

1540
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1541
  return pTaskInfo->code;
H
Haojun Liao 已提交
1542 1543
}

1544
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
1545
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
1546 1547 1548 1549 1550 1551
  SOptrBasicInfo*   pInfo = &pAggInfo->binfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

L
Liu Jicong 已提交
1552
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1553
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
H
Haojun Liao 已提交
1554
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1555
    setOperatorCompleted(pOperator);
H
Haojun Liao 已提交
1556 1557 1558
    return NULL;
  }

H
Haojun Liao 已提交
1559
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
S
slzhou 已提交
1560 1561
  while (1) {
    doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
H
Haojun Liao 已提交
1562
    doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
S
slzhou 已提交
1563

1564
    if (!hasRemainResults(&pAggInfo->groupResInfo)) {
H
Haojun Liao 已提交
1565
      setOperatorCompleted(pOperator);
S
slzhou 已提交
1566 1567
      break;
    }
1568

S
slzhou 已提交
1569 1570 1571 1572
    if (pInfo->pRes->info.rows > 0) {
      break;
    }
  }
1573

1574
  size_t rows = blockDataGetNumOfRows(pInfo->pRes);
1575 1576
  pOperator->resultInfo.totalRows += rows;

1577
  return (rows == 0) ? NULL : pInfo->pRes;
1578 1579
}

1580
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
C
Cary Xu 已提交
1581 1582 1583 1584 1585
  for (int32_t i = 0; i < numOfExprs; ++i) {
    SExprInfo* pExprInfo = &pExpr[i];
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
        taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
5
54liuyao 已提交
1586 1587
      } else if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_VALUE) {
        taosVariantDestroy(&pExprInfo->base.pParam[j].param);
H
Haojun Liao 已提交
1588
      }
1589
    }
C
Cary Xu 已提交
1590 1591 1592

    taosMemoryFree(pExprInfo->base.pParam);
    taosMemoryFree(pExprInfo->pExpr);
H
Haojun Liao 已提交
1593 1594 1595
  }
}

5
54liuyao 已提交
1596
void destroyOperatorInfo(SOperatorInfo* pOperator) {
1597 1598 1599 1600
  if (pOperator == NULL) {
    return;
  }

1601
  if (pOperator->fpSet.closeFn != NULL) {
1602
    pOperator->fpSet.closeFn(pOperator->info);
1603 1604
  }

H
Haojun Liao 已提交
1605
  if (pOperator->pDownstream != NULL) {
L
Liu Jicong 已提交
1606
    for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
H
Haojun Liao 已提交
1607
      destroyOperatorInfo(pOperator->pDownstream[i]);
1608 1609
    }

wafwerar's avatar
wafwerar 已提交
1610
    taosMemoryFreeClear(pOperator->pDownstream);
H
Haojun Liao 已提交
1611
    pOperator->numOfDownstream = 0;
1612 1613
  }

1614
  cleanupExprSupp(&pOperator->exprSupp);
wafwerar's avatar
wafwerar 已提交
1615
  taosMemoryFreeClear(pOperator);
1616 1617
}

1618 1619 1620 1621 1622 1623
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) {
  *defaultPgsz = 4096;
  while (*defaultPgsz < rowSize * 4) {
    *defaultPgsz <<= 1u;
  }

1624
  // The default buffer for each operator in query is 10MB.
1625
  // at least four pages need to be in buffer
1626 1627
  // TODO: make this variable to be configurable.
  *defaultBufsz = 4096 * 2560;
1628 1629 1630 1631 1632 1633 1634
  if ((*defaultBufsz) <= (*defaultPgsz)) {
    (*defaultBufsz) = (*defaultPgsz) * 4;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
1635 1636
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                         const char* pKey) {
L
Liu Jicong 已提交
1637
  int32_t    code = 0;
1638 1639
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);

1640
  pAggSup->currentPageId = -1;
dengyihao's avatar
dengyihao 已提交
1641 1642
  pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
  pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
1643
  pAggSup->pResultRowHashTable = tSimpleHashInit(10, hashFn);
1644

H
Haojun Liao 已提交
1645
  if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
1646 1647 1648
    return TSDB_CODE_OUT_OF_MEMORY;
  }

dengyihao's avatar
dengyihao 已提交
1649
  uint32_t defaultPgsz = 0;
1650 1651
  uint32_t defaultBufsz = 0;
  getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
H
Haojun Liao 已提交
1652

wafwerar's avatar
wafwerar 已提交
1653
  if (!osTempSpaceAvailable()) {
H
Haojun Liao 已提交
1654 1655 1656
    code = TSDB_CODE_NO_AVAIL_DISK;
    qError("Init stream agg supporter failed since %s, %s", terrstr(code), pKey);
    return code;
wafwerar's avatar
wafwerar 已提交
1657
  }
1658

H
Haojun Liao 已提交
1659
  code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
H
Haojun Liao 已提交
1660
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1661
    qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey);
H
Haojun Liao 已提交
1662 1663 1664
    return code;
  }

H
Haojun Liao 已提交
1665
  return code;
1666 1667
}

1668
void cleanupAggSup(SAggSupporter* pAggSup) {
wafwerar's avatar
wafwerar 已提交
1669
  taosMemoryFreeClear(pAggSup->keyBuf);
1670
  tSimpleHashCleanup(pAggSup->pResultRowHashTable);
H
Haojun Liao 已提交
1671
  destroyDiskbasedBuf(pAggSup->pResultBuf);
1672 1673
}

H
Haojun Liao 已提交
1674
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
L
Liu Jicong 已提交
1675
                    const char* pkey) {
1676 1677 1678 1679 1680
  int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1681 1682 1683 1684 1685
  code = doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

L
Liu Jicong 已提交
1686
  for (int32_t i = 0; i < numOfCols; ++i) {
1687
    pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf;
1688 1689
  }

1690
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1691 1692
}

L
Liu Jicong 已提交
1693
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) {
wmmhello's avatar
wmmhello 已提交
1694
  ASSERT(numOfRows != 0);
1695 1696
  pResultInfo->capacity = numOfRows;
  pResultInfo->threshold = numOfRows * 0.75;
1697

1698 1699
  if (pResultInfo->threshold == 0) {
    pResultInfo->threshold = numOfRows;
1700 1701 1702
  }
}

1703 1704 1705 1706 1707
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
  pInfo->pRes = pBlock;
  initResultRowInfo(&pInfo->resultRowInfo);
}

5
54liuyao 已提交
1708
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
1709 1710 1711 1712 1713 1714 1715 1716 1717 1718
  if (pCtx == NULL) {
    return NULL;
  }

  for (int32_t i = 0; i < numOfOutput; ++i) {
    for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) {
      taosVariantDestroy(&pCtx[i].param[j].param);
    }

    taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
1719
    taosMemoryFreeClear(pCtx[i].subsidiaries.buf);
1720 1721 1722 1723 1724 1725 1726 1727
    taosMemoryFree(pCtx[i].input.pData);
    taosMemoryFree(pCtx[i].input.pColumnDataAgg);
  }

  taosMemoryFreeClear(pCtx);
  return NULL;
}

1728
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) {
1729 1730 1731 1732
  pSup->pExprInfo = pExprInfo;
  pSup->numOfExprs = numOfExpr;
  if (pSup->pExprInfo != NULL) {
    pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset);
1733 1734 1735
    if (pSup->pCtx == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1736
  }
1737 1738

  return TSDB_CODE_SUCCESS;
1739 1740
}

1741 1742 1743 1744
void cleanupExprSupp(SExprSupp* pSupp) {
  destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs);
  if (pSupp->pExprInfo != NULL) {
    destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
C
Cary Xu 已提交
1745
    taosMemoryFreeClear(pSupp->pExprInfo);
1746
  }
H
Haojun Liao 已提交
1747 1748 1749 1750 1751 1752

  if (pSupp->pFilterInfo != NULL) {
    filterFreeInfo(pSupp->pFilterInfo);
    pSupp->pFilterInfo = NULL;
  }

1753 1754 1755
  taosMemoryFree(pSupp->rowEntryInfoOffset);
}

1756 1757
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode,
                                           SExecTaskInfo* pTaskInfo) {
wafwerar's avatar
wafwerar 已提交
1758
  SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
L
Liu Jicong 已提交
1759
  SOperatorInfo*    pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
1760 1761 1762
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
H
Haojun Liao 已提交
1763

H
Haojun Liao 已提交
1764
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc);
H
Haojun Liao 已提交
1765 1766 1767
  initBasicInfo(&pInfo->binfo, pResBlock);

  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
1768
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
1769

1770 1771
  int32_t    num = 0;
  SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
H
Haojun Liao 已提交
1772
  int32_t    code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
L
Liu Jicong 已提交
1773
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1774 1775
    goto _error;
  }
H
Haojun Liao 已提交
1776

H
Haojun Liao 已提交
1777 1778 1779 1780 1781 1782
  int32_t    numOfScalarExpr = 0;
  SExprInfo* pScalarExprInfo = NULL;
  if (pAggNode->pExprs != NULL) {
    pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
  }

1783 1784 1785 1786
  code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
1787

H
Haojun Liao 已提交
1788 1789 1790 1791 1792
  code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
1793
  pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
1794
  pInfo->groupId = UINT64_MAX;
H
Haojun Liao 已提交
1795

1796 1797 1798
  setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
  pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, NULL);
H
Haojun Liao 已提交
1799

1800 1801
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    STableScanInfo* pTableScanInfo = downstream->info;
H
Haojun Liao 已提交
1802 1803
    pTableScanInfo->base.pdInfo.pExprSup = &pOperator->exprSupp;
    pTableScanInfo->base.pdInfo.pAggSup = &pInfo->aggSup;
1804 1805
  }

H
Haojun Liao 已提交
1806 1807 1808 1809
  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
1810 1811

  return pOperator;
H
Haojun Liao 已提交
1812

1813
_error:
H
Haojun Liao 已提交
1814 1815 1816 1817
  if (pInfo != NULL) {
    destroyAggOperatorInfo(pInfo);
  }

1818 1819 1820
  if (pOperator != NULL) {
    cleanupExprSupp(&pOperator->exprSupp);
  }
H
Haojun Liao 已提交
1821

1822
  taosMemoryFreeClear(pOperator);
H
Haojun Liao 已提交
1823
  pTaskInfo->code = code;
H
Haojun Liao 已提交
1824
  return NULL;
1825 1826
}

1827
void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
1828
  assert(pInfo != NULL);
H
Haojun Liao 已提交
1829
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
1830 1831
}

H
Haojun Liao 已提交
1832 1833 1834 1835 1836 1837 1838
static void freeItem(void* pItem) {
  void** p = pItem;
  if (*p != NULL) {
    taosMemoryFreeClear(*p);
  }
}

1839
void destroyAggOperatorInfo(void* param) {
L
Liu Jicong 已提交
1840
  SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
L
Liu Jicong 已提交
1841 1842
  cleanupBasicInfo(&pInfo->binfo);

H
Haojun Liao 已提交
1843
  cleanupAggSup(&pInfo->aggSup);
S
shenglian zhou 已提交
1844
  cleanupExprSupp(&pInfo->scalarExprSup);
H
Haojun Liao 已提交
1845
  cleanupGroupResInfo(&pInfo->groupResInfo);
D
dapan1121 已提交
1846
  taosMemoryFreeClear(param);
1847
}
1848

D
dapan1121 已提交
1849
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model, char* dbFName) {
wafwerar's avatar
wafwerar 已提交
1850
  SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
H
Haojun Liao 已提交
1851 1852 1853 1854 1855
  if (pTaskInfo == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

1856
  setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
1857

1858
  pTaskInfo->schemaInfo.dbname = strdup(dbFName);
1859
  pTaskInfo->cost.created = taosGetTimestampMs();
H
Haojun Liao 已提交
1860
  pTaskInfo->id.queryId = queryId;
dengyihao's avatar
dengyihao 已提交
1861
  pTaskInfo->execModel = model;
H
Haojun Liao 已提交
1862
  pTaskInfo->pTableInfoList = tableListCreate();
D
dapan1121 已提交
1863
  pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo));
H
Haojun Liao 已提交
1864
  pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES);
H
Haojun Liao 已提交
1865

wafwerar's avatar
wafwerar 已提交
1866
  char* p = taosMemoryCalloc(1, 128);
L
Liu Jicong 已提交
1867
  snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId);
H
Haojun Liao 已提交
1868
  pTaskInfo->id.str = p;
H
Haojun Liao 已提交
1869

1870 1871
  return pTaskInfo;
}
H
Haojun Liao 已提交
1872

H
Haojun Liao 已提交
1873 1874
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);

1875
int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) {
1876 1877
  SMetaReader mr = {0};
  metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
1878
  int32_t code = metaGetTableEntryByUidCache(&mr, pScanNode->uid);
1879
  if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
1880 1881
    qError("failed to get the table meta, uid:0x%" PRIx64 ", suid:0x%" PRIx64 ", %s", pScanNode->uid, pScanNode->suid,
           GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
1882

D
dapan1121 已提交
1883
    metaReaderClear(&mr);
1884
    return terrno;
D
dapan1121 已提交
1885
  }
1886

1887 1888
  SSchemaInfo* pSchemaInfo = &pTaskInfo->schemaInfo;
  pSchemaInfo->tablename = strdup(mr.me.name);
1889 1890

  if (mr.me.type == TSDB_SUPER_TABLE) {
1891 1892
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
    pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
1893
  } else if (mr.me.type == TSDB_CHILD_TABLE) {
1894 1895
    tDecoderClear(&mr.coder);

1896
    tb_uid_t suid = mr.me.ctbEntry.suid;
H
Haojun Liao 已提交
1897
    metaGetTableEntryByUidCache(&mr, suid);
1898 1899
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
    pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
1900
  } else {
1901
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
1902
  }
1903 1904

  metaReaderClear(&mr);
1905

H
Haojun Liao 已提交
1906 1907 1908 1909 1910
  pSchemaInfo->qsw = extractQueriedColumnSchema(pScanNode);
  return TSDB_CODE_SUCCESS;
}

SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
1911 1912 1913
  int32_t numOfCols = LIST_LENGTH(pScanNode->pScanCols);
  int32_t numOfTags = LIST_LENGTH(pScanNode->pScanPseudoCols);

1914
  SSchemaWrapper* pqSw = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
1915
  pqSw->pSchema = taosMemoryCalloc(numOfCols + numOfTags, sizeof(SSchema));
1916

L
Liu Jicong 已提交
1917
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
1918
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanCols, i);
1919 1920
    SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

H
Haojun Liao 已提交
1921 1922 1923
    SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
    pSchema->colId = pColNode->colId;
    pSchema->type = pColNode->node.resType.type;
H
Haojun Liao 已提交
1924 1925
    pSchema->bytes = pColNode->node.resType.bytes;
    tstrncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
1926 1927
  }

1928
  // this the tags and pseudo function columns, we only keep the tag columns
1929
  for (int32_t i = 0; i < numOfTags; ++i) {
1930 1931 1932 1933 1934 1935 1936 1937 1938
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanPseudoCols, i);

    int32_t type = nodeType(pNode->pExpr);
    if (type == QUERY_NODE_COLUMN) {
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

      SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
      pSchema->colId = pColNode->colId;
      pSchema->type = pColNode->node.resType.type;
H
Haojun Liao 已提交
1939
      pSchema->bytes = pColNode->node.resType.bytes;
H
Haojun Liao 已提交
1940
      tstrncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
1941 1942 1943
    }
  }

H
Haojun Liao 已提交
1944
  return pqSw;
1945 1946
}

1947 1948
static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
  taosMemoryFreeClear(pSchemaInfo->dbname);
1949
  taosMemoryFreeClear(pSchemaInfo->tablename);
1950 1951
  tDeleteSSchemaWrapper(pSchemaInfo->sw);
  tDeleteSSchemaWrapper(pSchemaInfo->qsw);
1952 1953
}

1954
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); }
1955

1956
bool groupbyTbname(SNodeList* pGroupList) {
1957
  bool bytbname = false;
1958
  if (LIST_LENGTH(pGroupList) == 1) {
1959 1960 1961 1962 1963 1964 1965 1966 1967 1968
    SNode* p = nodesListGetNode(pGroupList, 0);
    if (p->type == QUERY_NODE_FUNCTION) {
      // partition by tbname/group by tbname
      bytbname = (strcmp(((struct SFunctionNode*)p)->functionName, "tbname") == 0);
    }
  }

  return bytbname;
}

1969 1970
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond,
                                  SNode* pTagIndexCond, const char* pUser) {
1971
  int32_t         type = nodeType(pPhyNode);
1972
  STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
1973
  const char*     idstr = GET_TASKID(pTaskInfo);
1974

X
Xiaoyu Wang 已提交
1975
  if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
1976
    SOperatorInfo* pOperator = NULL;
H
Haojun Liao 已提交
1977
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
dengyihao's avatar
dengyihao 已提交
1978
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
1979

1980 1981 1982 1983 1984 1985
      // NOTE: this is an patch to fix the physical plan
      // TODO remove it later
      if (pTableScanNode->scan.node.pLimit != NULL) {
        pTableScanNode->groupSort = true;
      }

L
Liu Jicong 已提交
1986 1987
      int32_t code =
          createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
H
Haojun Liao 已提交
1988
                                  pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
1989
      if (code) {
wmmhello's avatar
wmmhello 已提交
1990
        pTaskInfo->code = code;
1991
        qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr);
D
dapan1121 已提交
1992 1993
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
1994

1995
      code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo);
S
slzhou 已提交
1996
      if (code) {
1997
        pTaskInfo->code = terrno;
wmmhello's avatar
wmmhello 已提交
1998 1999 2000
        return NULL;
      }

2001
      pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
D
dapan1121 已提交
2002 2003 2004 2005 2006
      if (NULL == pOperator) {
        pTaskInfo->code = terrno;
        return NULL;
      }

2007
      STableScanInfo* pScanInfo = pOperator->info;
H
Haojun Liao 已提交
2008
      pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
S
slzhou 已提交
2009 2010
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
      STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
2011 2012 2013

      int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, true, pHandle,
                                             pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
L
Liu Jicong 已提交
2014
      if (code) {
wmmhello's avatar
wmmhello 已提交
2015
        pTaskInfo->code = code;
H
Haojun Liao 已提交
2016
        qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2017 2018
        return NULL;
      }
2019

2020
      code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo);
wmmhello's avatar
wmmhello 已提交
2021 2022 2023 2024
      if (code) {
        pTaskInfo->code = terrno;
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
2025

H
Haojun Liao 已提交
2026
      pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
D
dapan1121 已提交
2027 2028 2029 2030
      if (NULL == pOperator) {
        pTaskInfo->code = terrno;
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
2031

2032
      STableScanInfo* pScanInfo = pOperator->info;
H
Haojun Liao 已提交
2033
      pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
H
Haojun Liao 已提交
2034
    } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
2035 2036
      pOperator = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode,
                                             pTaskInfo);
H
Haojun Liao 已提交
2037
    } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
5
54liuyao 已提交
2038
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
5
54liuyao 已提交
2039
      if (pHandle->vnode) {
L
Liu Jicong 已提交
2040 2041
        int32_t code =
            createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
H
Haojun Liao 已提交
2042
                                    pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
L
Liu Jicong 已提交
2043
        if (code) {
wmmhello's avatar
wmmhello 已提交
2044
          pTaskInfo->code = code;
H
Haojun Liao 已提交
2045
          qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2046 2047
          return NULL;
        }
L
Liu Jicong 已提交
2048 2049

#ifndef NDEBUG
H
Haojun Liao 已提交
2050
        int32_t sz = tableListGetSize(pTableListInfo);
H
Haojun Liao 已提交
2051 2052
        qDebug("create stream task, total:%d", sz);

L
Liu Jicong 已提交
2053
        for (int32_t i = 0; i < sz; i++) {
H
Haojun Liao 已提交
2054
          STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
2055
          qDebug("add table uid:%" PRIu64 ", gid:%" PRIu64, pKeyInfo->uid, pKeyInfo->groupId);
L
Liu Jicong 已提交
2056 2057
        }
#endif
2058
      }
2059

H
Haojun Liao 已提交
2060
      pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan);
2061
      pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo);
H
Haojun Liao 已提交
2062
    } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
L
Liu Jicong 已提交
2063
      SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
2064
      pOperator = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo);
2065
    } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
X
Xiaoyu Wang 已提交
2066
      STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
2067 2068

      int32_t code = createScanTableListInfo(pScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
H
Haojun Liao 已提交
2069
                                             pTagIndexCond, pTaskInfo);
2070
      if (code != TSDB_CODE_SUCCESS) {
2071
        pTaskInfo->code = code;
H
Haojun Liao 已提交
2072
        qError("failed to getTableList, code: %s", tstrerror(code));
2073 2074 2075
        return NULL;
      }

H
Haojun Liao 已提交
2076
      pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTaskInfo);
2077
    } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
2078
      SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
2079 2080

      if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
H
Haojun Liao 已提交
2081 2082
        SArray* pList = taosArrayInit(4, sizeof(STableKeyInfo));
        int32_t code = vnodeGetAllTableList(pHandle->vnode, pBlockNode->uid, pList);
2083 2084 2085 2086
        if (code != TSDB_CODE_SUCCESS) {
          pTaskInfo->code = terrno;
          return NULL;
        }
H
Haojun Liao 已提交
2087

H
Haojun Liao 已提交
2088 2089
        size_t num = taosArrayGetSize(pList);
        for (int32_t i = 0; i < num; ++i) {
H
Haojun Liao 已提交
2090
          STableKeyInfo* p = taosArrayGet(pList, i);
H
Haojun Liao 已提交
2091
          tableListAddTableInfo(pTableListInfo, p->uid, 0);
H
Haojun Liao 已提交
2092
        }
H
Haojun Liao 已提交
2093

H
Haojun Liao 已提交
2094
        taosArrayDestroy(pList);
2095
      } else {  // Create group with only one table
H
Haojun Liao 已提交
2096
        tableListAddTableInfo(pTableListInfo, pBlockNode->uid, 0);
2097 2098
      }

2099
      pOperator = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTaskInfo);
H
Haojun Liao 已提交
2100 2101 2102
    } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
      SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;

L
Liu Jicong 已提交
2103
      int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo,
H
Haojun Liao 已提交
2104
                                             pTagCond, pTagIndexCond, pTaskInfo);
2105 2106 2107 2108
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
      }
2109

2110
      code = extractTableSchemaInfo(pHandle, &pScanNode->scan, pTaskInfo);
2111 2112 2113
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
H
Haojun Liao 已提交
2114 2115
      }

2116
      pOperator = createCacherowsScanOperator(pScanNode, pHandle, pTaskInfo);
2117
    } else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
2118
      pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
2119 2120
    } else {
      ASSERT(0);
H
Haojun Liao 已提交
2121
    }
2122 2123 2124 2125 2126

    if (pOperator != NULL) {
      pOperator->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
    }

2127
    return pOperator;
H
Haojun Liao 已提交
2128 2129
  }

2130
  size_t          size = LIST_LENGTH(pPhyNode->pChildren);
2131
  SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
2132 2133 2134 2135
  if (ops == NULL) {
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
2136
  for (int32_t i = 0; i < size; ++i) {
2137
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
2138
    ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTagCond, pTagIndexCond, pUser);
2139
    if (ops[i] == NULL) {
H
Haojun Liao 已提交
2140
      taosMemoryFree(ops);
2141 2142
      return NULL;
    }
2143
  }
H
Haojun Liao 已提交
2144

2145
  SOperatorInfo* pOptr = NULL;
H
Haojun Liao 已提交
2146
  if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
2147
    pOptr = createProjectOperatorInfo(ops[0], (SProjectPhysiNode*)pPhyNode, pTaskInfo);
2148
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) {
H
Haojun Liao 已提交
2149 2150
    SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
    if (pAggNode->pGroupKeys != NULL) {
H
Haojun Liao 已提交
2151
      pOptr = createGroupOperatorInfo(ops[0], pAggNode, pTaskInfo);
H
Haojun Liao 已提交
2152
    } else {
H
Haojun Liao 已提交
2153
      pOptr = createAggregateOperatorInfo(ops[0], pAggNode, pTaskInfo);
H
Haojun Liao 已提交
2154
    }
2155
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type) {
H
Haojun Liao 已提交
2156
    SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
2157

H
Haojun Liao 已提交
2158 2159
    bool isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
    pOptr = createIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo, isStream);
2160
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
2161
    pOptr = createStreamIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo);
2162 2163
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
    SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode;
2164
    pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo);
S
shenglian zhou 已提交
2165
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
X
Xiaoyu Wang 已提交
2166
    SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
2167
    pOptr = createMergeIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo);
5
54liuyao 已提交
2168
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
2169
    int32_t children = 0;
5
54liuyao 已提交
2170 2171
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) {
5
54liuyao 已提交
2172
    int32_t children = pHandle->numOfVgroups;
5
54liuyao 已提交
2173
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
2174
  } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
2175
    pOptr = createSortOperatorInfo(ops[0], (SSortPhysiNode*)pPhyNode, pTaskInfo);
S
shenglian zhou 已提交
2176 2177
  } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT == type) {
    pOptr = createGroupSortOperatorInfo(ops[0], (SGroupSortPhysiNode*)pPhyNode, pTaskInfo);
X
Xiaoyu Wang 已提交
2178
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) {
2179
    SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode;
2180
    pOptr = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo);
2181
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
H
Haojun Liao 已提交
2182
    SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
2183
    pOptr = createSessionAggOperatorInfo(ops[0], pSessionNode, pTaskInfo);
2184
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
2185 2186 2187 2188 2189
    pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
    int32_t children = 0;
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION == type) {
2190
    int32_t children = pHandle->numOfVgroups;
2191
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
2192
  } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
2193
    pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
2194
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION == type) {
2195
    pOptr = createStreamPartitionOperatorInfo(ops[0], (SStreamPartitionPhysiNode*)pPhyNode, pTaskInfo);
2196
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
dengyihao's avatar
dengyihao 已提交
2197
    SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
2198
    pOptr = createStatewindowOperatorInfo(ops[0], pStateNode, pTaskInfo);
2199
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
5
54liuyao 已提交
2200
    pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
2201
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
2202
    pOptr = createMergeJoinOperatorInfo(ops, size, (SSortMergeJoinPhysiNode*)pPhyNode, pTaskInfo);
2203
  } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
H
Haojun Liao 已提交
2204
    pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo);
5
54liuyao 已提交
2205 2206
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL == type) {
    pOptr = createStreamFillOperatorInfo(ops[0], (SStreamFillPhysiNode*)pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
2207 2208
  } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
    pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo);
2209 2210
  } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
    pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
2211 2212
  } else {
    ASSERT(0);
H
Haojun Liao 已提交
2213
  }
2214

2215
  taosMemoryFree(ops);
2216 2217 2218 2219
  if (pOptr) {
    pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
  }

2220
  return pOptr;
2221
}
H
Haojun Liao 已提交
2222

L
Liu Jicong 已提交
2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235
static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator");
      return TSDB_CODE_QRY_APP_ERROR;
    }

    if (pOperator->numOfDownstream > 1) {
      qError("join not supported for stream block scan");
      return TSDB_CODE_QRY_APP_ERROR;
    }
    return extractTbscanInStreamOpTree(pOperator->pDownstream[0], ppInfo);
  } else {
2236 2237 2238
    SStreamScanInfo* pInfo = pOperator->info;
    ASSERT(pInfo->pTableScanOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
    *ppInfo = pInfo->pTableScanOp->info;
L
Liu Jicong 已提交
2239 2240 2241 2242
    return 0;
  }
}

2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264
int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) {
  if (pNode->pChildren == NULL || LIST_LENGTH(pNode->pChildren) == 0) {
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == pNode->type) {
      *ppNode = (STableScanPhysiNode*)pNode;
      return 0;
    } else {
      ASSERT(0);
      terrno = TSDB_CODE_QRY_APP_ERROR;
      return -1;
    }
  } else {
    if (LIST_LENGTH(pNode->pChildren) != 1) {
      ASSERT(0);
      terrno = TSDB_CODE_QRY_APP_ERROR;
      return -1;
    }
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pNode->pChildren, 0);
    return extractTableScanNode(pChildNode, ppNode);
  }
  return -1;
}

2265
#if 0
L
Liu Jicong 已提交
2266 2267 2268 2269 2270
int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHandle, int64_t uid, int64_t ts) {
  STableScanInfo* pTableScanInfo = NULL;
  if (extractTbscanInStreamOpTree(pOperator, &pTableScanInfo) < 0) {
    return -1;
  }
2271

L
Liu Jicong 已提交
2272 2273 2274 2275
  STableScanPhysiNode* pNode = NULL;
  if (extractTableScanNode(plan->pNode, &pNode) < 0) {
    ASSERT(0);
  }
2276

H
Haojun Liao 已提交
2277
  tsdbReaderClose(pTableScanInfo->dataReader);
2278

L
Liu Jicong 已提交
2279
  STableListInfo info = {0};
H
Haojun Liao 已提交
2280
  pTableScanInfo->dataReader = doCreateDataReader(pNode, pHandle, &info, NULL);
L
Liu Jicong 已提交
2281 2282 2283 2284
  if (pTableScanInfo->dataReader == NULL) {
    ASSERT(0);
    qError("failed to create data reader");
    return TSDB_CODE_QRY_APP_ERROR;
2285
  }
L
Liu Jicong 已提交
2286
  // TODO: set uid and ts to data reader
2287 2288
  return 0;
}
2289
#endif
2290

D
dapan1121 已提交
2291
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle) {
D
dapan1121 已提交
2292
  SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo;
2293

D
dapan1121 已提交
2294
  switch (pNode->type) {
D
dapan1121 已提交
2295 2296 2297 2298 2299 2300
    case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
      SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
      if (NULL == pInserterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      pInserterParam->readHandle = readHandle;
L
Liu Jicong 已提交
2301

D
dapan1121 已提交
2302 2303 2304
      *pParam = pInserterParam;
      break;
    }
D
dapan1121 已提交
2305
    case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
2306
      SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
D
dapan1121 已提交
2307 2308 2309
      if (NULL == pDeleterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
H
Haojun Liao 已提交
2310 2311 2312 2313
      int32_t tbNum = tableListGetSize(pTask->pTableInfoList);
      pDeleterParam->suid = tableListGetSuid(pTask->pTableInfoList);

      // TODO extract uid list
D
dapan1121 已提交
2314 2315 2316 2317 2318
      pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t));
      if (NULL == pDeleterParam->pUidList) {
        taosMemoryFree(pDeleterParam);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
H
Haojun Liao 已提交
2319

D
dapan1121 已提交
2320
      for (int32_t i = 0; i < tbNum; ++i) {
H
Haojun Liao 已提交
2321
        STableKeyInfo* pTable = tableListGetInfo(pTask->pTableInfoList, i);
D
dapan1121 已提交
2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334
        taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
      }

      *pParam = pDeleterParam;
      break;
    }
    default:
      break;
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2335
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
D
dapan1121 已提交
2336
                               char* sql, EOPTR_EXEC_MODEL model) {
H
Haojun Liao 已提交
2337 2338
  uint64_t queryId = pPlan->id.queryId;

D
dapan1121 已提交
2339
  *pTaskInfo = createExecTaskInfo(queryId, taskId, model, pPlan->dbFName);
H
Haojun Liao 已提交
2340 2341 2342
  if (*pTaskInfo == NULL) {
    goto _complete;
  }
H
Haojun Liao 已提交
2343

2344
  if (pHandle) {
L
Liu Jicong 已提交
2345
    /*(*pTaskInfo)->streamInfo.fillHistoryVer1 = pHandle->fillHistoryVer1;*/
2346 2347 2348
    if (pHandle->pStateBackend) {
      (*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend;
    }
H
Haojun Liao 已提交
2349 2350
  }

2351
  (*pTaskInfo)->sql = sql;
D
dapan1121 已提交
2352
  sql = NULL;
H
Haojun Liao 已提交
2353

2354
  (*pTaskInfo)->pSubplan = pPlan;
2355 2356
  (*pTaskInfo)->pRoot =
      createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user);
L
Liu Jicong 已提交
2357

D
dapan1121 已提交
2358
  if (NULL == (*pTaskInfo)->pRoot) {
H
Haojun Liao 已提交
2359
    terrno = (*pTaskInfo)->code;
D
dapan1121 已提交
2360
    goto _complete;
2361 2362
  }

H
Haojun Liao 已提交
2363
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2364

H
Haojun Liao 已提交
2365
_complete:
D
dapan1121 已提交
2366
  taosMemoryFree(sql);
H
Haojun Liao 已提交
2367
  doDestroyTask(*pTaskInfo);
H
Haojun Liao 已提交
2368
  return terrno;
H
Haojun Liao 已提交
2369 2370
}

H
Haojun Liao 已提交
2371 2372 2373 2374 2375
static void freeBlock(void* pParam) {
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
  blockDataDestroy(pBlock);
}

L
Liu Jicong 已提交
2376
void doDestroyTask(SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
2377 2378
  qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));

H
Haojun Liao 已提交
2379
  pTaskInfo->pTableInfoList = tableListDestroy(pTaskInfo->pTableInfoList);
H
Haojun Liao 已提交
2380
  destroyOperatorInfo(pTaskInfo->pRoot);
2381
  cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);
2382
  cleanupStreamInfo(&pTaskInfo->streamInfo);
2383

D
dapan1121 已提交
2384
  if (!pTaskInfo->localFetch.localExec) {
D
dapan1121 已提交
2385 2386
    nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
  }
2387

H
Haojun Liao 已提交
2388
  taosArrayDestroyEx(pTaskInfo->pResultBlockList, freeBlock);
D
dapan1121 已提交
2389
  taosArrayDestroy(pTaskInfo->stopInfo.pStopInfo);
wafwerar's avatar
wafwerar 已提交
2390 2391 2392
  taosMemoryFreeClear(pTaskInfo->sql);
  taosMemoryFreeClear(pTaskInfo->id.str);
  taosMemoryFreeClear(pTaskInfo);
2393 2394 2395 2396
}

static int64_t getQuerySupportBufSize(size_t numOfTables) {
  size_t s1 = sizeof(STableQueryInfo);
L
Liu Jicong 已提交
2397 2398
  //  size_t s3 = sizeof(STableCheckInfo);  buffer consumption in tsdb
  return (int64_t)(s1 * 1.5 * numOfTables);
2399 2400 2401 2402 2403 2404 2405
}

int32_t checkForQueryBuf(size_t numOfTables) {
  int64_t t = getQuerySupportBufSize(numOfTables);
  if (tsQueryBufferSizeBytes < 0) {
    return TSDB_CODE_SUCCESS;
  } else if (tsQueryBufferSizeBytes > 0) {
L
Liu Jicong 已提交
2406
    while (1) {
2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432
      int64_t s = tsQueryBufferSizeBytes;
      int64_t remain = s - t;
      if (remain >= 0) {
        if (atomic_val_compare_exchange_64(&tsQueryBufferSizeBytes, s, remain) == s) {
          return TSDB_CODE_SUCCESS;
        }
      } else {
        return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
      }
    }
  }

  // disable query processing if the value of tsQueryBufferSize is zero.
  return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
}

void releaseQueryBuf(size_t numOfTables) {
  if (tsQueryBufferSizeBytes < 0) {
    return;
  }

  int64_t t = getQuerySupportBufSize(numOfTables);

  // restore value is not enough buffer available
  atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
}
D
dapan1121 已提交
2433

H
Haojun Liao 已提交
2434
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) {
2435
  SExplainExecInfo  execInfo = {0};
H
Haojun Liao 已提交
2436
  SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo);
2437

H
Haojun Liao 已提交
2438 2439 2440 2441 2442
  pExplainInfo->numOfRows = operatorInfo->resultInfo.totalRows;
  pExplainInfo->startupCost = operatorInfo->cost.openCost;
  pExplainInfo->totalCost = operatorInfo->cost.totalCost;
  pExplainInfo->verboseLen = 0;
  pExplainInfo->verboseInfo = NULL;
D
dapan1121 已提交
2443

2444
  if (operatorInfo->fpSet.getExplainFn) {
2445 2446
    int32_t code =
        operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen);
D
dapan1121 已提交
2447
    if (code) {
2448
      qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
D
dapan1121 已提交
2449 2450 2451
      return code;
    }
  }
dengyihao's avatar
dengyihao 已提交
2452

D
dapan1121 已提交
2453
  int32_t code = 0;
D
dapan1121 已提交
2454
  for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
H
Haojun Liao 已提交
2455 2456
    code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pExecInfoList);
    if (code != TSDB_CODE_SUCCESS) {
2457
      //      taosMemoryFreeClear(*pRes);
D
dapan1121 已提交
2458 2459 2460 2461 2462
      return TSDB_CODE_QRY_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
2463
}
5
54liuyao 已提交
2464

2465 2466
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
                     SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup) {
2467 2468 2469 2470 2471 2472
  SWinKey key = {
      .ts = win->skey,
      .groupId = tableGroupId,
  };
  char*   value = NULL;
  int32_t size = pAggSup->resultRowSize;
5
54liuyao 已提交
2473

2474
  if (streamStateAddIfNotExist(pState, &key, (void**)&value, &size) < 0) {
2475 2476 2477 2478 2479 2480 2481 2482 2483 2484
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }
  *pResult = (SResultRow*)value;
  ASSERT(*pResult);
  // set time window for current result
  (*pResult)->win = (*win);
  setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset);
  return TSDB_CODE_SUCCESS;
}

2485 2486
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult) {
  streamStateReleaseBuf(pState, pKey, pResult);
2487 2488 2489
  return TSDB_CODE_SUCCESS;
}

2490 2491
int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize) {
  streamStatePut(pState, pKey, pResult, resSize);
2492 2493 2494
  return TSDB_CODE_SUCCESS;
}

2495
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
2496
                                   SGroupResInfo* pGroupResInfo) {
2497
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);

  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
    int32_t     size = 0;
    void*       pVal = NULL;
    SWinKey     key = {
2510 2511
            .ts = *(TSKEY*)pPos->key,
            .groupId = pPos->groupId,
2512
    };
2513
    int32_t code = streamStateGet(pState, &key, &pVal, &size);
2514 2515 2516 2517 2518 2519
    ASSERT(code == 0);
    SResultRow* pRow = (SResultRow*)pVal;
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
    // no results, continue to check the next one
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
2520
      releaseOutputBuf(pState, &key, pRow);
2521 2522 2523
      continue;
    }

H
Haojun Liao 已提交
2524 2525
    if (pBlock->info.id.groupId == 0) {
      pBlock->info.id.groupId = pPos->groupId;
2526
      void* tbname = NULL;
H
Haojun Liao 已提交
2527
      if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
L
Liu Jicong 已提交
2528
        pBlock->info.parTbName[0] = 0;
2529 2530
      } else {
        memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
2531
      }
2532
      tdbFree(tbname);
2533 2534
    } else {
      // current value belongs to different group, it can't be packed into one datablock
H
Haojun Liao 已提交
2535
      if (pBlock->info.id.groupId != pPos->groupId) {
2536
        releaseOutputBuf(pState, &key, pRow);
2537 2538 2539 2540 2541 2542
        break;
      }
    }

    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
      ASSERT(pBlock->info.rows > 0);
2543
      releaseOutputBuf(pState, &key, pRow);
2544 2545 2546 2547 2548 2549 2550 2551 2552 2553
      break;
    }

    pGroupResInfo->index += 1;

    for (int32_t j = 0; j < numOfExprs; ++j) {
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
      if (pCtx[j].fpSet.finalize) {
2554 2555 2556 2557
        int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
        if (TAOS_FAILED(code1)) {
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
          T_LONG_JMP(pTaskInfo->env, code1);
2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570
        }
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
      } else {
        // expand the result into multiple rows. E.g., _wstart, top(k, 20)
        // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
        for (int32_t k = 0; k < pRow->numOfRows; ++k) {
          colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
        }
      }
    }
5
54liuyao 已提交
2571

2572
    pBlock->info.rows += pRow->numOfRows;
2573
    releaseOutputBuf(pState, &key, pRow);
2574 2575 2576 2577
  }
  blockDataUpdateTsWindow(pBlock, 0);
  return TSDB_CODE_SUCCESS;
}
5
54liuyao 已提交
2578 2579 2580 2581 2582 2583 2584

int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size) {
  streamStateSessionPut(pState, key, (const void*)buf, size);
  releaseOutputBuf(pState, NULL, (SResultRow*)buf);
  return TSDB_CODE_SUCCESS;
}

2585
int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
5
54liuyao 已提交
2586
                                    SExprSupp* pSup, SGroupResInfo* pGroupResInfo) {
2587
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
5
54liuyao 已提交
2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);

  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
    SSessionKey* pKey = taosArrayGet(pGroupResInfo->pRows, i);
    int32_t      size = 0;
    void*        pVal = NULL;
    int32_t      code = streamStateSessionGet(pState, pKey, &pVal, &size);
    ASSERT(code == 0);
2601 2602
    if (code == -1) {
      // coverity scan
5
54liuyao 已提交
2603
      pGroupResInfo->index += 1;
2604 2605
      continue;
    }
5
54liuyao 已提交
2606 2607 2608 2609 2610 2611 2612 2613 2614
    SResultRow* pRow = (SResultRow*)pVal;
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
    // no results, continue to check the next one
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
      releaseOutputBuf(pState, NULL, pRow);
      continue;
    }

H
Haojun Liao 已提交
2615 2616
    if (pBlock->info.id.groupId == 0) {
      pBlock->info.id.groupId = pKey->groupId;
2617

2618
      void* tbname = NULL;
H
Haojun Liao 已提交
2619
      if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
2620
        pBlock->info.parTbName[0] = 0;
2621
      } else {
2622
        memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
2623
      }
2624
      tdbFree(tbname);
5
54liuyao 已提交
2625 2626
    } else {
      // current value belongs to different group, it can't be packed into one datablock
H
Haojun Liao 已提交
2627
      if (pBlock->info.id.groupId != pKey->groupId) {
5
54liuyao 已提交
2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669
        releaseOutputBuf(pState, NULL, pRow);
        break;
      }
    }

    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
      ASSERT(pBlock->info.rows > 0);
      releaseOutputBuf(pState, NULL, pRow);
      break;
    }

    pGroupResInfo->index += 1;

    for (int32_t j = 0; j < numOfExprs; ++j) {
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
      if (pCtx[j].fpSet.finalize) {
        int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
        if (TAOS_FAILED(code1)) {
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
          T_LONG_JMP(pTaskInfo->env, code1);
        }
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
      } else {
        // expand the result into multiple rows. E.g., _wstart, top(k, 20)
        // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
        for (int32_t k = 0; k < pRow->numOfRows; ++k) {
          colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
        }
      }
    }

    pBlock->info.rows += pRow->numOfRows;
    // saveSessionDiscBuf(pState, pKey, pVal, size);
    releaseOutputBuf(pState, NULL, pRow);
  }
  blockDataUpdateTsWindow(pBlock, 0);
  return TSDB_CODE_SUCCESS;
2670
}