executorimpl.c 93.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

H
Haojun Liao 已提交
16 17
#include "filter.h"
#include "function.h"
18 19
#include "functionMgt.h"
#include "os.h"
H
Haojun Liao 已提交
20
#include "querynodes.h"
21
#include "tfill.h"
dengyihao's avatar
dengyihao 已提交
22
#include "tname.h"
23

H
Haojun Liao 已提交
24
#include "tdatablock.h"
25
#include "tglobal.h"
H
Haojun Liao 已提交
26
#include "tmsg.h"
27
#include "ttime.h"
H
Haojun Liao 已提交
28

29
#include "executorimpl.h"
dengyihao's avatar
dengyihao 已提交
30
#include "index.h"
31
#include "query.h"
32
#include "tcompare.h"
H
Haojun Liao 已提交
33
#include "thash.h"
34
#include "ttypes.h"
dengyihao's avatar
dengyihao 已提交
35
#include "vnode.h"
36

H
Haojun Liao 已提交
37
#define IS_MAIN_SCAN(runtime)          ((runtime)->scanFlag == MAIN_SCAN)
38 39 40 41 42 43
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)

#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)

#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
wafwerar's avatar
wafwerar 已提交
44
  uint32_t v = taosRand();
45 46 47 48

  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
49
    return taosMemoryMalloc(__size);
50 51 52 53
  }
}

static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
wafwerar's avatar
wafwerar 已提交
54
  uint32_t v = taosRand();
55 56 57
  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
58
    return taosMemoryCalloc(num, __size);
59 60 61 62
  }
}

static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
wafwerar's avatar
wafwerar 已提交
63
  uint32_t v = taosRand();
64 65 66
  if (v % 5 <= 1) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
67
    return taosMemoryRealloc(p, __size);
68 69 70 71 72 73 74 75
  }
}

#define calloc  u_calloc
#define malloc  u_malloc
#define realloc u_realloc
#endif

X
Xiaoyu Wang 已提交
76
#define CLEAR_QUERY_STATUS(q, st)   ((q)->status &= (~(st)))
77 78
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)

H
Haojun Liao 已提交
79 80 81 82 83 84 85 86 87
typedef struct SAggOperatorInfo {
  SOptrBasicInfo   binfo;
  SAggSupporter    aggSup;
  STableQueryInfo* current;
  uint64_t         groupId;
  SGroupResInfo    groupResInfo;
  SExprSupp        scalarExprSup;
} SAggOperatorInfo;

88
static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock);
89

X
Xiaoyu Wang 已提交
90
static void releaseQueryBuf(size_t numOfTables);
91

H
Haojun Liao 已提交
92 93 94 95 96 97 98 99 100 101
static void    destroyAggOperatorInfo(void* param);
static void    initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
static void    doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
static void    doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                                const char* pKey);
static void    extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep,
                                                   int32_t status);
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
                                   bool createDummyCol);
H
Haojun Liao 已提交
102

H
Haojun Liao 已提交
103
void setOperatorCompleted(SOperatorInfo* pOperator) {
104
  pOperator->status = OP_EXEC_DONE;
H
Haojun Liao 已提交
105
  ASSERT(pOperator->pTaskInfo != NULL);
106

107
  pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start) / 1000.0;
H
Haojun Liao 已提交
108
  setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
109
}
110

H
Haojun Liao 已提交
111 112 113 114 115 116 117 118 119 120
void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status,
                     void* pInfo, SExecTaskInfo* pTaskInfo) {
  pOperator->name = (char*)name;
  pOperator->operatorType = type;
  pOperator->blocking = blocking;
  pOperator->status = status;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
}

121
int32_t optrDummyOpenFn(SOperatorInfo* pOperator) {
122
  OPTR_SET_OPENED(pOperator);
123
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
124
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
125 126
}

H
Haojun Liao 已提交
127
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
128 129
                                   __optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn,
                                   __optr_explain_fn_t explain) {
130 131 132 133 134
  SOperatorFpSet fpSet = {
      ._openFn = openFn,
      .getNextFn = nextFn,
      .cleanupFn = cleanup,
      .closeFn = closeFn,
135
      .reqBufFn = reqBufFn,
136 137 138 139 140 141
      .getExplainFn = explain,
  };

  return fpSet;
}

142 143
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
                                  SGroupResInfo* pGroupResInfo);
H
Haojun Liao 已提交
144

145
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
L
Liu Jicong 已提交
146
  SFilePage* pData = NULL;
147 148 149

  // in the first scan, new space needed for results
  int32_t pageId = -1;
150
  if (*currentPageId == -1) {
151
    pData = getNewBufPage(pResultBuf, &pageId);
152 153
    pData->num = sizeof(SFilePage);
  } else {
154 155
    pData = getBufPage(pResultBuf, *currentPageId);
    pageId = *currentPageId;
156

wmmhello's avatar
wmmhello 已提交
157
    if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
158
      // release current page first, and prepare the next one
159
      releaseBufPage(pResultBuf, pData);
160

161
      pData = getNewBufPage(pResultBuf, &pageId);
162 163 164 165 166 167 168 169 170 171
      if (pData != NULL) {
        pData->num = sizeof(SFilePage);
      }
    }
  }

  if (pData == NULL) {
    return NULL;
  }

172 173
  setBufPageDirty(pData, true);

174 175 176 177
  // set the number of rows in current disk page
  SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
  pResultRow->pageId = pageId;
  pResultRow->offset = (int32_t)pData->num;
178
  *currentPageId = pageId;
179

wmmhello's avatar
wmmhello 已提交
180
  pData->num += interBufSize;
181 182 183
  return pResultRow;
}

184 185 186 187 188 189 190
/**
 * the struct of key in hash table
 * +----------+---------------+
 * | group id |   key data    |
 * | 8 bytes  | actual length |
 * +----------+---------------+
 */
191 192 193
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
                                   int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
                                   bool isIntervalQuery, SAggSupporter* pSup) {
194
  SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
H
Haojun Liao 已提交
195

dengyihao's avatar
dengyihao 已提交
196
  SResultRowPosition* p1 =
197
      (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
H
Haojun Liao 已提交
198

199 200
  SResultRow* pResult = NULL;

H
Haojun Liao 已提交
201 202
  // in case of repeat scan/reverse scan, no new time window added.
  if (isIntervalQuery) {
203
    if (masterscan && p1 != NULL) {  // the *p1 may be NULL in case of sliding+offset exists.
204
      pResult = getResultRowByPos(pResultBuf, p1, true);
205
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
206 207
    }
  } else {
dengyihao's avatar
dengyihao 已提交
208 209
    // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
    // pResultRowInfo object.
H
Haojun Liao 已提交
210
    if (p1 != NULL) {
211
      // todo
212
      pResult = getResultRowByPos(pResultBuf, p1, true);
213
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
214 215 216
    }
  }

L
Liu Jicong 已提交
217
  // 1. close current opened time window
218
  if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
219
    SResultRowPosition pos = pResultRowInfo->cur;
X
Xiaoyu Wang 已提交
220
    SFilePage*         pPage = getBufPage(pResultBuf, pos.pageId);
221 222 223 224 225
    releaseBufPage(pResultBuf, pPage);
  }

  // allocate a new buffer page
  if (pResult == NULL) {
H
Haojun Liao 已提交
226
    ASSERT(pSup->resultRowSize > 0);
227
    pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
228

229 230
    // add a new result set for a new group
    SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
231
    tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
L
Liu Jicong 已提交
232
                   sizeof(SResultRowPosition));
H
Haojun Liao 已提交
233 234
  }

235 236 237
  // 2. set the new time window to be the new active time window
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};

H
Haojun Liao 已提交
238
  // too many time window in query
239
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
240
      tSimpleHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
241
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
H
Haojun Liao 已提交
242 243
  }

H
Haojun Liao 已提交
244
  return pResult;
H
Haojun Liao 已提交
245 246
}

247
// a new buffer page for each table. Needs to opt this design
L
Liu Jicong 已提交
248
static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, int32_t tid, uint32_t size) {
249 250 251 252
  if (pWindowRes->pageId != -1) {
    return 0;
  }

L
Liu Jicong 已提交
253
  SFilePage* pData = NULL;
254 255 256

  // in the first scan, new space needed for results
  int32_t pageId = -1;
257
  SArray* list = getDataBufPagesIdList(pResultBuf);
258 259

  if (taosArrayGetSize(list) == 0) {
260
    pData = getNewBufPage(pResultBuf, &pageId);
261
    pData->num = sizeof(SFilePage);
262 263
  } else {
    SPageInfo* pi = getLastPageInfo(list);
264
    pData = getBufPage(pResultBuf, getPageId(pi));
265
    pageId = getPageId(pi);
266

267
    if (pData->num + size > getBufPageSize(pResultBuf)) {
268
      // release current page first, and prepare the next one
269
      releaseBufPageInfo(pResultBuf, pi);
270

271
      pData = getNewBufPage(pResultBuf, &pageId);
272
      if (pData != NULL) {
273
        pData->num = sizeof(SFilePage);
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
      }
    }
  }

  if (pData == NULL) {
    return -1;
  }

  // set the number of rows in current disk page
  if (pWindowRes->pageId == -1) {  // not allocated yet, allocate new buffer
    pWindowRes->pageId = pageId;
    pWindowRes->offset = (int32_t)pData->num;

    pData->num += size;
    assert(pWindowRes->pageId >= 0);
  }

  return 0;
}

294
//  query_range_start, query_range_end, window_duration, window_start, window_end
295
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
296 297 298
  pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
  pColData->info.bytes = sizeof(int64_t);

H
Haojun Liao 已提交
299
  colInfoDataEnsureCapacity(pColData, 5, false);
300 301 302 303 304 305 306 307 308
  colDataAppendInt64(pColData, 0, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 1, &pQueryWindow->ekey);

  int64_t interval = 0;
  colDataAppendInt64(pColData, 2, &interval);  // this value may be variable in case of 'n' and 'y'.
  colDataAppendInt64(pColData, 3, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
}

309 310 311 312 313 314 315
typedef struct {
  bool    hasAgg;
  int32_t numOfRows;
  int32_t startOffset;
} SFunctionCtxStatus;

static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
H
Haojun Liao 已提交
316
  pStatus->hasAgg = pCtx->input.colDataSMAIsSet;
317 318 319 320 321
  pStatus->numOfRows = pCtx->input.numOfRows;
  pStatus->startOffset = pCtx->input.startRowIndex;
}

static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
H
Haojun Liao 已提交
322
  pCtx->input.colDataSMAIsSet = pStatus->hasAgg;
H
Haojun Liao 已提交
323
  pCtx->input.numOfRows = pStatus->numOfRows;
324 325 326
  pCtx->input.startRowIndex = pStatus->startOffset;
}

H
Haojun Liao 已提交
327 328
void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
                                     int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) {
329
  for (int32_t k = 0; k < numOfOutput; ++k) {
H
Haojun Liao 已提交
330
    // keep it temporarily
331 332
    SFunctionCtxStatus status = {0};
    functionCtxSave(&pCtx[k], &status);
333

334
    pCtx[k].input.startRowIndex = offset;
335
    pCtx[k].input.numOfRows = forwardStep;
336 337 338

    // not a whole block involved in query processing, statistics data can not be used
    // NOTE: the original value of isSet have been changed here
H
Haojun Liao 已提交
339 340
    if (pCtx[k].input.colDataSMAIsSet && forwardStep < numOfTotal) {
      pCtx[k].input.colDataSMAIsSet = false;
341 342
    }

343 344
    if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) {
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
345 346

      char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
347

348
      SColumnInfoData idata = {0};
dengyihao's avatar
dengyihao 已提交
349
      idata.info.type = TSDB_DATA_TYPE_BIGINT;
350
      idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
dengyihao's avatar
dengyihao 已提交
351
      idata.pData = p;
352 353 354 355

      SScalarParam out = {.columnData = &idata};
      SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
      pCtx[k].sfp.process(&tw, 1, &out);
356
      pEntryInfo->numOfRes = 1;
357 358 359 360 361 362 363 364
    } else {
      int32_t code = TSDB_CODE_SUCCESS;
      if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
        code = pCtx[k].fpSet.process(&pCtx[k]);

        if (code != TSDB_CODE_SUCCESS) {
          qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
          taskInfo->code = code;
365
          T_LONG_JMP(taskInfo->env, code);
366
        }
367
      }
368

369
      // restore it
370
      functionCtxRestore(&pCtx[k], &status);
371
    }
372 373 374
  }
}

375 376 377
static void doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order) {
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
378
    pCtx[i].order = order;
379
    pCtx[i].input.numOfRows = pBlock->info.rows;
380
    setBlockSMAInfo(&pCtx[i], &pExprSup->pExprInfo[i], pBlock);
381
    pCtx[i].pSrcBlock = pBlock;
382 383 384
  }
}

385
void setInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol) {
386
  if (pBlock->pBlockAgg != NULL) {
387
    doSetInputDataBlockInfo(pExprSup, pBlock, order);
388
  } else {
389
    doSetInputDataBlock(pExprSup, pBlock, order, scanFlag, createDummyCol);
H
Haojun Liao 已提交
390
  }
391 392
}

L
Liu Jicong 已提交
393 394
static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t paramIndex,
                                             int32_t numOfRows) {
395 396 397 398 399 400 401 402
  SColumnInfoData* pColInfo = NULL;
  if (pInput->pData[paramIndex] == NULL) {
    pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pColInfo == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    // Set the correct column info (data type and bytes)
403 404
    pColInfo->info.type = pFuncParam->param.nType;
    pColInfo->info.bytes = pFuncParam->param.nLen;
405 406

    pInput->pData[paramIndex] = pColInfo;
407 408
  } else {
    pColInfo = pInput->pData[paramIndex];
409 410
  }

H
Haojun Liao 已提交
411
  colInfoDataEnsureCapacity(pColInfo, numOfRows, false);
412

413
  int8_t type = pFuncParam->param.nType;
414 415
  if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
    int64_t v = pFuncParam->param.i;
dengyihao's avatar
dengyihao 已提交
416
    for (int32_t i = 0; i < numOfRows; ++i) {
417 418 419 420
      colDataAppendInt64(pColInfo, i, &v);
    }
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
dengyihao's avatar
dengyihao 已提交
421
    for (int32_t i = 0; i < numOfRows; ++i) {
422 423
      colDataAppendDouble(pColInfo, i, &v);
    }
424
  } else if (type == TSDB_DATA_TYPE_VARCHAR) {
L
Liu Jicong 已提交
425
    char* tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE);
426
    STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen);
L
Liu Jicong 已提交
427
    for (int32_t i = 0; i < numOfRows; ++i) {
428 429
      colDataAppend(pColInfo, i, tmp, false);
    }
H
Haojun Liao 已提交
430
    taosMemoryFree(tmp);
431 432 433 434 435
  }

  return TSDB_CODE_SUCCESS;
}

436 437
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
                                   bool createDummyCol) {
438
  int32_t         code = TSDB_CODE_SUCCESS;
439
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
440

441
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
L
Liu Jicong 已提交
442
    pCtx[i].order = order;
443 444
    pCtx[i].input.numOfRows = pBlock->info.rows;

L
Liu Jicong 已提交
445
    pCtx[i].pSrcBlock = pBlock;
X
Xiaoyu Wang 已提交
446
    pCtx[i].scanFlag = scanFlag;
H
Haojun Liao 已提交
447

448
    SInputColumnInfoData* pInput = &pCtx[i].input;
H
Haojun Liao 已提交
449
    pInput->uid = pBlock->info.id.uid;
H
Haojun Liao 已提交
450
    pInput->colDataSMAIsSet = false;
451

452
    SExprInfo* pOneExpr = &pExprSup->pExprInfo[i];
453
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
dengyihao's avatar
dengyihao 已提交
454
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
G
Ganlin Zhao 已提交
455 456
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
dengyihao's avatar
dengyihao 已提交
457
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
458 459 460
        pInput->totalRows = pBlock->info.rows;
        pInput->numOfRows = pBlock->info.rows;
        pInput->startRowIndex = 0;
461

462
        // NOTE: the last parameter is the primary timestamp column
H
Haojun Liao 已提交
463
        // todo: refactor this
464
        if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
L
Liu Jicong 已提交
465
          pInput->pPTS = pInput->pData[j];  // in case of merge function, this is not always the ts column data.
466
          //          ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP);
467
        }
468 469
        ASSERT(pInput->pData[j] != NULL);
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
470 471 472
        // todo avoid case: top(k, 12), 12 is the value parameter.
        // sum(11), 11 is also the value parameter.
        if (createDummyCol && pOneExpr->base.numOfParams == 1) {
473 474 475 476
          pInput->totalRows = pBlock->info.rows;
          pInput->numOfRows = pBlock->info.rows;
          pInput->startRowIndex = 0;

477
          code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows);
478 479 480
          if (code != TSDB_CODE_SUCCESS) {
            return code;
          }
481
        }
G
Ganlin Zhao 已提交
482 483
      }
    }
H
Haojun Liao 已提交
484
  }
485 486

  return code;
H
Haojun Liao 已提交
487 488
}

489
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
490
  for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
H
Haojun Liao 已提交
491
    if (functionNeedToExecute(&pCtx[k])) {
492
      // todo add a dummy funtion to avoid process check
493 494 495
      if (pCtx[k].fpSet.process == NULL) {
        continue;
      }
H
Haojun Liao 已提交
496

497 498 499 500
      int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
      if (code != TSDB_CODE_SUCCESS) {
        qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
        return code;
501
      }
502 503
    }
  }
504 505

  return TSDB_CODE_SUCCESS;
506 507
}

5
54liuyao 已提交
508
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
509
  struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
510

511 512 513 514 515
  // in case of timestamp column, always generated results.
  int32_t functionId = pCtx->functionId;
  if (functionId == -1) {
    return false;
  }
516

517 518
  if (pCtx->scanFlag == REPEAT_SCAN) {
    return fmIsRepeatScanFunc(pCtx->functionId);
519 520
  }

521 522
  if (isRowEntryCompleted(pResInfo)) {
    return false;
523 524
  }

525 526 527
  return true;
}

528 529 530 531 532 533 534
static int32_t doCreateConstantValColumnAggInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type,
                                                int32_t paramIndex, int32_t numOfRows) {
  if (pInput->pData[paramIndex] == NULL) {
    pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pInput->pData[paramIndex] == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
535

536 537 538
    // Set the correct column info (data type and bytes)
    pInput->pData[paramIndex]->info.type = type;
    pInput->pData[paramIndex]->info.bytes = tDataTypes[type].bytes;
539
  }
H
Haojun Liao 已提交
540

541 542 543 544 545 546
  SColumnDataAgg* da = NULL;
  if (pInput->pColumnDataAgg[paramIndex] == NULL) {
    da = taosMemoryCalloc(1, sizeof(SColumnDataAgg));
    pInput->pColumnDataAgg[paramIndex] = da;
    if (da == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
547 548
    }
  } else {
549
    da = pInput->pColumnDataAgg[paramIndex];
550 551
  }

552
  ASSERT(!IS_VAR_DATA_TYPE(type));
553

554 555
  if (type == TSDB_DATA_TYPE_BIGINT) {
    int64_t v = pFuncParam->param.i;
556
    *da = (SColumnDataAgg){.numOfNull = 0, .min = v, .max = v, .sum = v * numOfRows};
557 558
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
559
    *da = (SColumnDataAgg){.numOfNull = 0};
560

561 562 563 564 565 566
    *(double*)&da->min = v;
    *(double*)&da->max = v;
    *(double*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_BOOL) {  // todo validate this data type
    bool v = pFuncParam->param.i;

567
    *da = (SColumnDataAgg){.numOfNull = 0};
568 569 570 571 572
    *(bool*)&da->min = 0;
    *(bool*)&da->max = v;
    *(bool*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_TIMESTAMP) {
    // do nothing
573
  } else {
574
    ASSERT(0);
575 576
  }

577 578
  return TSDB_CODE_SUCCESS;
}
579

580
void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock) {
581 582 583 584 585 586 587
  int32_t numOfRows = pBlock->info.rows;

  SInputColumnInfoData* pInput = &pCtx->input;
  pInput->numOfRows = numOfRows;
  pInput->totalRows = numOfRows;

  if (pBlock->pBlockAgg != NULL) {
H
Haojun Liao 已提交
588
    pInput->colDataSMAIsSet = true;
589

590 591
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
592

593 594
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
595 596
        pInput->pColumnDataAgg[j] = pBlock->pBlockAgg[slotId];
        if (pInput->pColumnDataAgg[j] == NULL) {
H
Haojun Liao 已提交
597
          pInput->colDataSMAIsSet = false;
598
        }
599 600 601 602

        // Here we set the column info data since the data type for each column data is required, but
        // the data in the corresponding SColumnInfoData will not be used.
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
603 604
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
        doCreateConstantValColumnAggInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
605 606
      }
    }
607
  } else {
H
Haojun Liao 已提交
608
    pInput->colDataSMAIsSet = false;
609 610 611
  }
}

L
Liu Jicong 已提交
612
bool isTaskKilled(SExecTaskInfo* pTaskInfo) {
D
dapan1121 已提交
613
  return (0 != pTaskInfo->code) ? true : false;
614 615
}

D
dapan1121 已提交
616
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) { pTaskInfo->code = rspCode; }
617 618

/////////////////////////////////////////////////////////////////////////////////////////////
619
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) {
L
Liu Jicong 已提交
620
  STimeWindow win = {0};
621
  win.skey = taosTimeTruncate(key, pInterval, precision);
622 623

  /*
H
Haojun Liao 已提交
624
   * if the realSkey > INT64_MAX - pInterval->interval, the query duration between
625 626
   * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
   */
627 628 629
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
  if (win.ekey < win.skey) {
    win.ekey = INT64_MAX;
630
  }
631 632

  return win;
633 634
}

L
Liu Jicong 已提交
635 636
int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
                              uint32_t* status) {
637
  *status = BLK_DATA_NOT_LOAD;
638

H
Haojun Liao 已提交
639
  pBlock->pDataBlock = NULL;
L
Liu Jicong 已提交
640
  pBlock->pBlockAgg = NULL;
H
Haojun Liao 已提交
641

L
Liu Jicong 已提交
642 643
  //  int64_t groupId = pRuntimeEnv->current->groupIndex;
  //  bool    ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
644

H
Haojun Liao 已提交
645
  STaskCostInfo* pCost = &pTaskInfo->cost;
646

647 648
//  pCost->totalBlocks += 1;
//  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
649
#if 0
650 651 652
  // Calculate all time windows that are overlapping or contain current data block.
  // If current data block is contained by all possible time window, do not load current data block.
  if (/*pQueryAttr->pFilters || */pQueryAttr->groupbyColumn || pQueryAttr->sw.gap > 0 ||
H
Haojun Liao 已提交
653
      (QUERY_IS_INTERVAL_QUERY(pQueryAttr) && overlapWithTimeWindow(pTaskInfo, &pBlock->info))) {
654
    (*status) = BLK_DATA_DATA_LOAD;
655 656 657
  }

  // check if this data block is required to load
658
  if ((*status) != BLK_DATA_DATA_LOAD) {
659 660 661 662 663 664 665
    bool needFilter = true;

    // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
    // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
    if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
      SResultRow* pResult = NULL;

H
Haojun Liao 已提交
666
      bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
667 668 669 670 671 672
      TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

      STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
      if (pQueryAttr->pointInterpQuery) {
        needFilter = chkWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
673
                                    pTableScanInfo->rowEntryInfoOffset);
674
      } else {
H
Haojun Liao 已提交
675
        if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.id.uid, &win, masterScan, &pResult, groupId,
676
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
677
                                    pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
678
          T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_OUT_OF_MEMORY);
679 680 681 682
        }
      }
    } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate
      doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx,
683
                               pTableScanInfo->rowEntryInfoOffset, pTableScanInfo->numOfOutput,
684 685 686 687 688 689
                               pRuntimeEnv->current->groupIndex);
    }

    if (needFilter) {
      (*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock);
    } else {
690
      (*status) = BLK_DATA_DATA_LOAD;
691 692 693 694
    }
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
H
Haojun Liao 已提交
695
//  *status = updateBlockLoadStatus(pRuntimeEnv->pQueryAttr, *status);
696

697
  if ((*status) == BLK_DATA_NOT_LOAD || (*status) == BLK_DATA_FILTEROUT) {
698 699
    //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//           pBlockInfo->window.ekey, pBlockInfo->rows);
700
    pCost->skipBlocks += 1;
701
  } else if ((*status) == BLK_DATA_SMA_LOAD) {
702 703
    // this function never returns error?
    pCost->loadBlockStatis += 1;
704
//    tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
705 706

    if (pBlock->pBlockAgg == NULL) {  // data block statistics does not exist, load data block
707
//      pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
708 709 710
      pCost->totalCheckedRows += pBlock->info.rows;
    }
  } else {
711
    assert((*status) == BLK_DATA_DATA_LOAD);
712 713 714

    // load the data block statistics to perform further filter
    pCost->loadBlockStatis += 1;
715
//    tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
716 717 718 719 720 721

    if (pQueryAttr->topBotQuery && pBlock->pBlockAgg != NULL) {
      { // set previous window
        if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
          SResultRow* pResult = NULL;

H
Haojun Liao 已提交
722
          bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
723 724 725
          TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

          STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
H
Haojun Liao 已提交
726
          if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.id.uid, &win, masterScan, &pResult, groupId,
727
                                      pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
728
                                      pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
729
            T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_OUT_OF_MEMORY);
730 731 732 733 734 735 736 737 738 739
          }
        }
      }
      bool load = false;
      for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
        int32_t functionId = pTableScanInfo->pCtx[i].functionId;
        if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM) {
//          load = topbot_datablock_filter(&pTableScanInfo->pCtx[i], (char*)&(pBlock->pBlockAgg[i].min),
//                                         (char*)&(pBlock->pBlockAgg[i].max));
          if (!load) { // current block has been discard due to filter applied
740
            pCost->skipBlocks += 1;
741 742
            //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId,
//                   pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
743
            (*status) = BLK_DATA_FILTEROUT;
744 745 746 747 748 749 750
            return TSDB_CODE_SUCCESS;
          }
        }
      }
    }

    // current block has been discard due to filter applied
H
Haojun Liao 已提交
751
//    if (!doFilterByBlockSMA(pRuntimeEnv, pBlock->pBlockAgg, pTableScanInfo->pCtx, pBlockInfo->rows)) {
752
//      pCost->skipBlocks += 1;
753 754
//      qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//             pBlockInfo->window.ekey, pBlockInfo->rows);
755
//      (*status) = BLK_DATA_FILTEROUT;
756 757 758 759 760
//      return TSDB_CODE_SUCCESS;
//    }

    pCost->totalCheckedRows += pBlockInfo->rows;
    pCost->loadBlocks += 1;
761
//    pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
762 763 764 765 766
//    if (pBlock->pDataBlock == NULL) {
//      return terrno;
//    }

//    if (pQueryAttr->pFilters != NULL) {
767
//      filterSetColFieldData(pQueryAttr->pFilters, taosArrayGetSize(pBlock->pDataBlock), pBlock->pDataBlock);
768
//    }
769

770 771 772 773
//    if (pQueryAttr->pFilters != NULL || pRuntimeEnv->pTsBuf != NULL) {
//      filterColRowsInDataBlock(pRuntimeEnv, pBlock, ascQuery);
//    }
  }
H
Haojun Liao 已提交
774
#endif
775 776 777
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
778
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
779
  if (status == TASK_NOT_COMPLETED) {
H
Haojun Liao 已提交
780
    pTaskInfo->status = status;
781 782
  } else {
    // QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first
783
    CLEAR_QUERY_STATUS(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
784
    pTaskInfo->status |= status;
785 786 787
  }
}

788
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
5
54liuyao 已提交
789
  bool init = false;
790
  for (int32_t i = 0; i < numOfOutput; ++i) {
791
    pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
5
54liuyao 已提交
792 793 794
    if (init) {
      continue;
    }
795 796 797 798 799

    struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
    if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
      continue;
    }
800 801 802 803 804

    if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
      continue;
    }

805 806 807 808 809 810
    if (!pResInfo->initialized) {
      if (pCtx[i].functionId != -1) {
        pCtx[i].fpSet.init(&pCtx[i], pResInfo);
      } else {
        pResInfo->initialized = true;
      }
5
54liuyao 已提交
811 812
    } else {
      init = true;
813 814 815 816
    }
  }
}

H
Haojun Liao 已提交
817 818
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) {
  if (pFilterInfo == NULL || pBlock->info.rows == 0) {
S
shenglian zhou 已提交
819 820
    return;
  }
821

822
  SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
823
  int32_t            code = filterSetDataFromSlotId(pFilterInfo, &param1);
824

825
  SColumnInfoData* p = NULL;
826
  int32_t          status = 0;
H
Haojun Liao 已提交
827

828
  // todo the keep seems never to be True??
H
Haojun Liao 已提交
829
  bool keep = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
830
  extractQualifiedTupleByFilterResult(pBlock, p, keep, status);
H
Haojun Liao 已提交
831

832
  if (pColMatchInfo != NULL) {
833
    size_t size = taosArrayGetSize(pColMatchInfo->pList);
H
Haojun Liao 已提交
834
    for (int32_t i = 0; i < size; ++i) {
H
Haojun Liao 已提交
835
      SColMatchItem* pInfo = taosArrayGet(pColMatchInfo->pList, i);
836
      if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
837
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->dstSlotId);
838
        if (pColData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
H
Haojun Liao 已提交
839
          blockDataUpdateTsWindow(pBlock, pInfo->dstSlotId);
840 841 842 843 844 845
          break;
        }
      }
    }
  }

846 847
  colDataDestroy(p);
  taosMemoryFree(p);
848 849
}

850
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status) {
851 852 853 854
  if (keep) {
    return;
  }

H
Haojun Liao 已提交
855 856 857
  int32_t totalRows = pBlock->info.rows;

  if (status == FILTER_RESULT_ALL_QUALIFIED) {
858
    // here nothing needs to be done
H
Haojun Liao 已提交
859
  } else if (status == FILTER_RESULT_NONE_QUALIFIED) {
860
    pBlock->info.rows = 0;
H
Haojun Liao 已提交
861
  } else {
862
    SSDataBlock* px = createOneDataBlock(pBlock, true);
863

864 865
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
    for (int32_t i = 0; i < numOfCols; ++i) {
866 867
      SColumnInfoData* pSrc = taosArrayGet(px->pDataBlock, i);
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
868
      // it is a reserved column for scalar function, and no data in this column yet.
869
      if (pDst->pData == NULL || pSrc->pData == NULL) {
870 871 872
        continue;
      }

873 874
      colInfoDataCleanup(pDst, pBlock->info.rows);

875
      int32_t numOfRows = 0;
876
      for (int32_t j = 0; j < totalRows; ++j) {
877
        if (((int8_t*)p->pData)[j] == 0) {
D
dapan1121 已提交
878 879
          continue;
        }
880

D
dapan1121 已提交
881
        if (colDataIsNull_s(pSrc, j)) {
882
          colDataAppendNULL(pDst, numOfRows);
D
dapan1121 已提交
883
        } else {
884
          colDataAppend(pDst, numOfRows, colDataGetData(pSrc, j), false);
D
dapan1121 已提交
885
        }
886
        numOfRows += 1;
H
Haojun Liao 已提交
887
      }
888

889
      // todo this value can be assigned directly
890 891 892 893 894
      if (pBlock->info.rows == totalRows) {
        pBlock->info.rows = numOfRows;
      } else {
        ASSERT(pBlock->info.rows == numOfRows);
      }
895
    }
896

dengyihao's avatar
dengyihao 已提交
897
    blockDataDestroy(px);  // fix memory leak
898 899 900
  }
}

901
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
902
  // for simple group by query without interval, all the tables belong to one group result.
903 904 905
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
  SAggOperatorInfo* pAggInfo = pOperator->info;

906
  SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
907 908
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
909

910
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId,
L
Liu Jicong 已提交
911
                                                  sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
L
Liu Jicong 已提交
912
  assert(pResultRow != NULL);
913 914 915 916 917 918

  /*
   * not assign result buffer yet, add new result buffer
   * all group belong to one result set, and each group result has different group id so set the id to be one
   */
  if (pResultRow->pageId == -1) {
dengyihao's avatar
dengyihao 已提交
919 920
    int32_t ret =
        addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, groupId, pAggInfo->binfo.pRes->info.rowSize);
921 922 923 924 925
    if (ret != TSDB_CODE_SUCCESS) {
      return;
    }
  }

926
  setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
927 928
}

929 930 931
static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
  SAggOperatorInfo* pAggInfo = pOperator->info;
  if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) {
932 933
    return;
  }
934 935

  doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId);
936 937

  // record the current active group id
H
Haojun Liao 已提交
938
  pAggInfo->groupId = groupId;
939 940
}

dengyihao's avatar
dengyihao 已提交
941
static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs,
942
                              const int32_t* rowEntryOffset) {
943
  bool returnNotNull = false;
944
  for (int32_t j = 0; j < numOfExprs; ++j) {
945
    SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
946 947 948 949 950 951 952
    if (!isRowEntryInitialized(pResInfo)) {
      continue;
    }

    if (pRow->numOfRows < pResInfo->numOfRes) {
      pRow->numOfRows = pResInfo->numOfRes;
    }
953

954
    if (fmIsNotNullOutputFunc(pCtx[j].functionId)) {
955 956
      returnNotNull = true;
    }
957
  }
S
shenglian zhou 已提交
958 959
  // if all expr skips all blocks, e.g. all null inputs for max function, output one row in final result.
  //  except for first/last, which require not null output, output no rows
960
  if (pRow->numOfRows == 0 && !returnNotNull) {
961
    pRow->numOfRows = 1;
962 963 964
  }
}

965 966
static void doCopyResultToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
                                    SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) {
967 968 969
  for (int32_t j = 0; j < numOfExprs; ++j) {
    int32_t slotId = pExprInfo[j].base.resSchema.slotId;

970
    pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
971
    if (pCtx[j].fpSet.finalize) {
972
      if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_key") == 0) {
973 974
        // for groupkey along with functions that output multiple lines(e.g. Histogram)
        // need to match groupkey result for each output row of that function.
975 976 977 978 979
        if (pCtx[j].resultInfo->numOfRes != 0) {
          pCtx[j].resultInfo->numOfRes = pRow->numOfRows;
        }
      }

980 981 982
      int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
      if (TAOS_FAILED(code)) {
        qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
983
        T_LONG_JMP(pTaskInfo->env, code);
984 985
      }
    } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
986
      // do nothing
987
    } else {
988 989
      // expand the result into multiple rows. E.g., _wstart, top(k, 20)
      // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
990 991 992 993 994 995 996
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
      char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
      for (int32_t k = 0; k < pRow->numOfRows; ++k) {
        colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
      }
    }
  }
997 998
}

999 1000 1001
// todo refactor. SResultRow has direct pointer in miainfo
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
                           SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027
  SFilePage*  page = getBufPage(pBuf, resultRowPosition->pageId);
  SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);

  SqlFunctionCtx* pCtx = pSup->pCtx;
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  const int32_t*  rowEntryOffset = pSup->rowEntryInfoOffset;

  doUpdateNumOfRows(pCtx, pRow, pSup->numOfExprs, rowEntryOffset);
  if (pRow->numOfRows == 0) {
    releaseBufPage(pBuf, page);
    return 0;
  }

  int32_t size = pBlock->info.capacity;
  while (pBlock->info.rows + pRow->numOfRows > size) {
    size = size * 1.25;
  }

  int32_t code = blockDataEnsureCapacity(pBlock, size);
  if (TAOS_FAILED(code)) {
    releaseBufPage(pBuf, page);
    qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
    T_LONG_JMP(pTaskInfo->env, code);
  }

  doCopyResultToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
1028 1029

  releaseBufPage(pBuf, page);
1030
  pBlock->info.rows += pRow->numOfRows;
1031 1032 1033
  return 0;
}

1034 1035 1036 1037 1038 1039 1040
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
                           SGroupResInfo* pGroupResInfo) {
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

1041
  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
1042

1043
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
L
Liu Jicong 已提交
1044 1045
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
    SFilePage*  page = getBufPage(pBuf, pPos->pos.pageId);
1046

1047
    SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
1048

H
Haojun Liao 已提交
1049
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
1050 1051

    // no results, continue to check the next one
1052 1053
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
1054
      releaseBufPage(pBuf, page);
1055 1056 1057
      continue;
    }

H
Haojun Liao 已提交
1058 1059
    if (pBlock->info.id.groupId == 0) {
      pBlock->info.id.groupId = pPos->groupId;
1060 1061
    } else {
      // current value belongs to different group, it can't be packed into one datablock
H
Haojun Liao 已提交
1062
      if (pBlock->info.id.groupId != pPos->groupId) {
1063
        releaseBufPage(pBuf, page);
1064 1065 1066 1067
        break;
      }
    }

1068
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
1069
      ASSERT(pBlock->info.rows > 0);
1070
      releaseBufPage(pBuf, page);
1071 1072 1073 1074
      break;
    }

    pGroupResInfo->index += 1;
1075
    doCopyResultToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
1076

1077
    releaseBufPage(pBuf, page);
1078
    pBlock->info.rows += pRow->numOfRows;
1079 1080
  }

X
Xiaoyu Wang 已提交
1081
  qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
H
Haojun Liao 已提交
1082
         pBlock->info.id.groupId);
1083

1084
  blockDataUpdateTsWindow(pBlock, 0);
1085 1086 1087
  return 0;
}

1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                           SDiskbasedBuf* pBuf) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SSDataBlock*   pBlock = pbInfo->pRes;

  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

  blockDataCleanup(pBlock);
  if (!hasRemainResults(pGroupResInfo)) {
    return;
  }

  // clear the existed group id
H
Haojun Liao 已提交
1102
  pBlock->info.id.groupId = 0;
1103 1104 1105
  ASSERT(!pbInfo->mergeResultBlock);
  doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);

1106
  void* tbname = NULL;
H
Haojun Liao 已提交
1107
  if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
1108 1109 1110
    pBlock->info.parTbName[0] = 0;
  } else {
    memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
1111
  }
1112
  tdbFree(tbname);
1113 1114
}

X
Xiaoyu Wang 已提交
1115 1116
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                            SDiskbasedBuf* pBuf) {
1117
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1118
  SSDataBlock*   pBlock = pbInfo->pRes;
1119

1120 1121 1122
  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

1123
  blockDataCleanup(pBlock);
1124
  if (!hasRemainResults(pGroupResInfo)) {
1125 1126 1127
    return;
  }

1128
  // clear the existed group id
H
Haojun Liao 已提交
1129
  pBlock->info.id.groupId = 0;
1130 1131 1132
  if (!pbInfo->mergeResultBlock) {
    doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
  } else {
dengyihao's avatar
dengyihao 已提交
1133
    while (hasRemainResults(pGroupResInfo)) {
1134 1135 1136
      doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
      if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
        break;
1137 1138
      }

1139
      // clearing group id to continue to merge data that belong to different groups
H
Haojun Liao 已提交
1140
      pBlock->info.id.groupId = 0;
1141
    }
1142 1143

    // clear the group id info in SSDataBlock, since the client does not need it
H
Haojun Liao 已提交
1144
    pBlock->info.id.groupId = 0;
1145 1146 1147
  }
}

L
Liu Jicong 已提交
1148 1149 1150 1151
// static TSKEY doSkipIntervalProcess(STaskRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo,
// STableQueryInfo* pTableQueryInfo) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
1152
//
L
Liu Jicong 已提交
1153 1154 1155
//   assert(pQueryAttr->limit.offset == 0);
//   STimeWindow tw = *win;
//   getNextTimeWindow(pQueryAttr, &tw);
1156
//
L
Liu Jicong 已提交
1157 1158
//   if ((tw.skey <= pBlockInfo->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) ||
//       (tw.ekey >= pBlockInfo->window.skey && !QUERY_IS_ASC_QUERY(pQueryAttr))) {
1159
//
L
Liu Jicong 已提交
1160 1161 1162 1163
//     // load the data block and check data remaining in current data block
//     // TODO optimize performance
//     SArray *         pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//     SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1164
//
L
Liu Jicong 已提交
1165 1166 1167 1168
//     tw = *win;
//     int32_t startPos =
//         getNextQualifiedWindow(pQueryAttr, &tw, pBlockInfo, pColInfoData->pData, binarySearchForKey, -1);
//     assert(startPos >= 0);
1169
//
L
Liu Jicong 已提交
1170 1171
//     // set the abort info
//     pQueryAttr->pos = startPos;
1172
//
L
Liu Jicong 已提交
1173 1174 1175 1176
//     // reset the query start timestamp
//     pTableQueryInfo->win.skey = ((TSKEY *)pColInfoData->pData)[startPos];
//     pQueryAttr->window.skey = pTableQueryInfo->win.skey;
//     TSKEY key = pTableQueryInfo->win.skey;
1177
//
L
Liu Jicong 已提交
1178 1179
//     pWindowResInfo->prevSKey = tw.skey;
//     int32_t index = pRuntimeEnv->resultRowInfo.curIndex;
1180
//
L
Liu Jicong 已提交
1181 1182
//     int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
//     pRuntimeEnv->resultRowInfo.curIndex = index;  // restore the window index
1183
//
L
Liu Jicong 已提交
1184 1185 1186 1187
//     //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d,
//     lastKey:%" PRId64,
//            GET_TASKID(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes,
//            pQueryAttr->current->lastKey);
1188
//
L
Liu Jicong 已提交
1189 1190 1191 1192 1193
//     return key;
//   } else {  // do nothing
//     pQueryAttr->window.skey      = tw.skey;
//     pWindowResInfo->prevSKey = tw.skey;
//     pTableQueryInfo->lastKey = tw.skey;
1194
//
L
Liu Jicong 已提交
1195 1196
//     return tw.skey;
//   }
1197
//
L
Liu Jicong 已提交
1198 1199 1200 1201 1202 1203 1204 1205 1206 1207
//   return true;
// }

// static bool skipTimeInterval(STaskRuntimeEnv *pRuntimeEnv, TSKEY* start) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//     assert(*start <= pRuntimeEnv->current->lastKey);
//   } else {
//     assert(*start >= pRuntimeEnv->current->lastKey);
//   }
1208
//
L
Liu Jicong 已提交
1209 1210 1211 1212 1213
//   // if queried with value filter, do NOT forward query start position
//   if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL ||
//   pRuntimeEnv->pFillInfo != NULL) {
//     return true;
//   }
1214
//
L
Liu Jicong 已提交
1215 1216 1217 1218 1219 1220 1221
//   /*
//    * 1. for interval without interpolation query we forward pQueryAttr->interval.interval at a time for
//    *    pQueryAttr->limit.offset times. Since hole exists, pQueryAttr->interval.interval*pQueryAttr->limit.offset
//    value is
//    *    not valid. otherwise, we only forward pQueryAttr->limit.offset number of points
//    */
//   assert(pRuntimeEnv->resultRowInfo.prevSKey == TSKEY_INITIAL_VAL);
1222
//
L
Liu Jicong 已提交
1223 1224
//   STimeWindow w = TSWINDOW_INITIALIZER;
//   bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
1225
//
L
Liu Jicong 已提交
1226 1227
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
//   STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
1228
//
L
Liu Jicong 已提交
1229 1230 1231
//   SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
//   while (tsdbNextDataBlock(pRuntimeEnv->pTsdbReadHandle)) {
//     tsdbRetrieveDataBlockInfo(pRuntimeEnv->pTsdbReadHandle, &blockInfo);
1232
//
L
Liu Jicong 已提交
1233 1234 1235 1236 1237 1238 1239 1240 1241
//     if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//       if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
//         getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.skey, blockInfo.window.skey, pQueryAttr->window.ekey,
//         &w); pWindowResInfo->prevSKey = w.skey;
//       }
//     } else {
//       getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.ekey, pQueryAttr->window.ekey, blockInfo.window.ekey, &w);
//       pWindowResInfo->prevSKey = w.skey;
//     }
1242
//
L
Liu Jicong 已提交
1243 1244
//     // the first time window
//     STimeWindow win = getActiveTimeWindow(pWindowResInfo, pWindowResInfo->prevSKey, pQueryAttr);
1245
//
L
Liu Jicong 已提交
1246 1247
//     while (pQueryAttr->limit.offset > 0) {
//       STimeWindow tw = win;
1248
//
L
Liu Jicong 已提交
1249 1250 1251
//       if ((win.ekey <= blockInfo.window.ekey && ascQuery) || (win.ekey >= blockInfo.window.skey && !ascQuery)) {
//         pQueryAttr->limit.offset -= 1;
//         pWindowResInfo->prevSKey = win.skey;
1252
//
L
Liu Jicong 已提交
1253 1254 1255 1256 1257 1258
//         // current time window is aligned with blockInfo.window.ekey
//         // restart it from next data block by set prevSKey to be TSKEY_INITIAL_VAL;
//         if ((win.ekey == blockInfo.window.ekey && ascQuery) || (win.ekey == blockInfo.window.skey && !ascQuery)) {
//           pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL;
//         }
//       }
1259
//
L
Liu Jicong 已提交
1260 1261 1262 1263
//       if (pQueryAttr->limit.offset == 0) {
//         *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//         return true;
//       }
1264
//
L
Liu Jicong 已提交
1265 1266
//       // current window does not ended in current data block, try next data block
//       getNextTimeWindow(pQueryAttr, &tw);
1267
//
L
Liu Jicong 已提交
1268 1269 1270 1271 1272 1273 1274 1275 1276
//       /*
//        * If the next time window still starts from current data block,
//        * load the primary timestamp column first, and then find the start position for the next queried time window.
//        * Note that only the primary timestamp column is required.
//        * TODO: Optimize for this cases. All data blocks are not needed to be loaded, only if the first actually
//        required
//        * time window resides in current data block.
//        */
//       if ((tw.skey <= blockInfo.window.ekey && ascQuery) || (tw.ekey >= blockInfo.window.skey && !ascQuery)) {
1277
//
L
Liu Jicong 已提交
1278 1279
//         SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//         SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1280
//
L
Liu Jicong 已提交
1281 1282 1283
//         if ((win.ekey > blockInfo.window.ekey && ascQuery) || (win.ekey < blockInfo.window.skey && !ascQuery)) {
//           pQueryAttr->limit.offset -= 1;
//         }
1284
//
L
Liu Jicong 已提交
1285 1286 1287 1288 1289 1290 1291 1292
//         if (pQueryAttr->limit.offset == 0) {
//           *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//           return true;
//         } else {
//           tw = win;
//           int32_t startPos =
//               getNextQualifiedWindow(pQueryAttr, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1);
//           assert(startPos >= 0);
1293
//
L
Liu Jicong 已提交
1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304
//           // set the abort info
//           pQueryAttr->pos = startPos;
//           pTableQueryInfo->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
//           pWindowResInfo->prevSKey = tw.skey;
//           win = tw;
//         }
//       } else {
//         break;  // offset is not 0, and next time window begins or ends in the next block.
//       }
//     }
//   }
1305
//
L
Liu Jicong 已提交
1306 1307
//   // check for error
//   if (terrno != TSDB_CODE_SUCCESS) {
1308
//     T_LONG_JMP(pRuntimeEnv->env, terrno);
L
Liu Jicong 已提交
1309
//   }
1310
//
L
Liu Jicong 已提交
1311 1312
//   return true;
// }
1313

1314
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) {
H
Haojun Liao 已提交
1315
  if (p->pDownstream == NULL) {
H
Haojun Liao 已提交
1316
    assert(p->numOfDownstream == 0);
1317 1318
  }

wafwerar's avatar
wafwerar 已提交
1319
  p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES);
1320 1321 1322 1323 1324 1325 1326
  if (p->pDownstream == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  memcpy(p->pDownstream, pDownstream, num * POINTER_BYTES);
  p->numOfDownstream = num;
  return TSDB_CODE_SUCCESS;
1327 1328
}

X
Xiaoyu Wang 已提交
1329
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) {
1330
  // todo add more information about exchange operation
1331
  int32_t type = pOperator->operatorType;
X
Xiaoyu Wang 已提交
1332
  if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN ||
1333
      type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN ||
1334 1335
      type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN ||
      type == QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN) {
1336 1337 1338
    *order = TSDB_ORDER_ASC;
    *scanFlag = MAIN_SCAN;
    return TSDB_CODE_SUCCESS;
1339
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
1340
    STableScanInfo* pTableScanInfo = pOperator->info;
H
Haojun Liao 已提交
1341 1342
    *order = pTableScanInfo->base.cond.order;
    *scanFlag = pTableScanInfo->base.scanFlag;
1343
    return TSDB_CODE_SUCCESS;
1344 1345
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) {
    STableMergeScanInfo* pTableScanInfo = pOperator->info;
H
Haojun Liao 已提交
1346 1347
    *order = pTableScanInfo->base.cond.order;
    *scanFlag = pTableScanInfo->base.scanFlag;
1348
    return TSDB_CODE_SUCCESS;
1349
  } else {
H
Haojun Liao 已提交
1350
    if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
1351
      return TSDB_CODE_INVALID_PARA;
H
Haojun Liao 已提交
1352
    } else {
1353
      return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag);
1354 1355 1356
    }
  }
}
1357

1358 1359 1360 1361 1362
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock **ppBlock) {
  if (!tsCountAlwaysReturnValue) {
    return TSDB_CODE_SUCCESS;
  }

1363
  SOperatorInfo* downstream = pOperator->pDownstream[0];
G
Ganlin Zhao 已提交
1364
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION ||
1365 1366 1367 1368 1369
      (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN &&
       ((STableScanInfo *)downstream->info)->hasGroupByTag == true)) {
    return TSDB_CODE_SUCCESS;
  }

1370 1371
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
  bool hasCountFunc = false;
1372

1373
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
1374 1375 1376
    const char* pName = pCtx[i].pExpr->pExpr->_function.functionName;
    if ((strcmp(pName, "count") == 0) || (strcmp(pName, "hyperloglog") == 0) ||
        (strcmp(pName, "_hyperloglog_partial") == 0) || (strcmp(pName, "_hyperloglog_merge") == 0)) {
1377 1378 1379 1380 1381 1382 1383 1384 1385 1386
      hasCountFunc = true;
      break;
    }
  }

  if (!hasCountFunc) {
    return TSDB_CODE_SUCCESS;
  }

  SSDataBlock* pBlock = createDataBlock();
G
Ganlin Zhao 已提交
1387
  pBlock->info.rows = 1;
1388 1389 1390
  pBlock->info.capacity = 0;

  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
G
Ganlin Zhao 已提交
1391 1392 1393 1394
    SColumnInfoData colInfo = {0};
    colInfo.hasNull = true;
    colInfo.info.type = TSDB_DATA_TYPE_NULL;
    colInfo.info.bytes = 1;
1395 1396 1397 1398 1399 1400

    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
G
Ganlin Zhao 已提交
1401 1402 1403 1404 1405 1406 1407
        int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
        if (slotId >= numOfCols) {
          taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1);
          for (int32_t k = numOfCols; k < slotId + 1; ++k) {
            taosArrayPush(pBlock->pDataBlock, &colInfo);
          }
        }
1408
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
G
Ganlin Zhao 已提交
1409
        // do nothing
1410 1411 1412 1413
      }
    }
  }

G
Ganlin Zhao 已提交
1414
  blockDataEnsureCapacity(pBlock, pBlock->info.rows);
1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428
  *ppBlock = pBlock;

  return TSDB_CODE_SUCCESS;
}

static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock **ppBlock) {
  if (!blockAllocated) {
    return;
  }

  blockDataDestroy(*ppBlock);
  *ppBlock = NULL;
}

1429
// this is a blocking operator
L
Liu Jicong 已提交
1430
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
1431 1432
  if (OPTR_IS_OPENED(pOperator)) {
    return TSDB_CODE_SUCCESS;
1433 1434
  }

H
Haojun Liao 已提交
1435
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
1436
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
1437

1438 1439
  SExprSupp*     pSup = &pOperator->exprSupp;
  SOperatorInfo* downstream = pOperator->pDownstream[0];
1440

1441 1442
  int64_t st = taosGetTimestampUs();

1443 1444 1445
  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;

1446 1447 1448
  bool    hasValidBlock = false;
  bool    blockAllocated = false;

H
Haojun Liao 已提交
1449
  while (1) {
1450
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
1451
    if (pBlock == NULL) {
G
Ganlin Zhao 已提交
1452
      if (!hasValidBlock) {
1453 1454 1455 1456 1457 1458 1459 1460
        createDataBlockForEmptyInput(pOperator, &pBlock);
        if (pBlock == NULL) {
          break;
        }
        blockAllocated = true;
      } else {
        break;
      }
1461
    }
1462
    hasValidBlock = true;
1463

1464 1465
    int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
    if (code != TSDB_CODE_SUCCESS) {
1466
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
1467
      T_LONG_JMP(pTaskInfo->env, code);
1468
    }
1469

1470
    // there is an scalar expression that needs to be calculated before apply the group aggregation.
G
Ganlin Zhao 已提交
1471
    if (pAggInfo->scalarExprSup.pExprInfo != NULL && !blockAllocated) {
1472 1473
      SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
      code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
1474
      if (code != TSDB_CODE_SUCCESS) {
1475
        destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
1476
        T_LONG_JMP(pTaskInfo->env, code);
1477
      }
1478 1479
    }

1480
    // the pDataBlock are always the same one, no need to call this again
H
Haojun Liao 已提交
1481
    setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
1482
    setInputDataBlock(pSup, pBlock, order, scanFlag, true);
1483
    code = doAggregateImpl(pOperator, pSup->pCtx);
1484
    if (code != 0) {
1485
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
1486
      T_LONG_JMP(pTaskInfo->env, code);
1487
    }
1488 1489 1490

    destroyDataBlockForEmptyInput(blockAllocated, &pBlock);

1491 1492
  }

1493 1494 1495 1496 1497
  // the downstream operator may return with error code, so let's check the code before generating results.
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
  }

1498
  initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
H
Haojun Liao 已提交
1499
  OPTR_SET_OPENED(pOperator);
1500

1501
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1502
  return pTaskInfo->code;
H
Haojun Liao 已提交
1503 1504
}

1505
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
1506
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
1507 1508 1509 1510 1511 1512
  SOptrBasicInfo*   pInfo = &pAggInfo->binfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

L
Liu Jicong 已提交
1513
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1514
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
H
Haojun Liao 已提交
1515
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1516
    setOperatorCompleted(pOperator);
H
Haojun Liao 已提交
1517 1518 1519
    return NULL;
  }

H
Haojun Liao 已提交
1520
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
S
slzhou 已提交
1521 1522
  while (1) {
    doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
H
Haojun Liao 已提交
1523
    doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
S
slzhou 已提交
1524

1525
    if (!hasRemainResults(&pAggInfo->groupResInfo)) {
H
Haojun Liao 已提交
1526
      setOperatorCompleted(pOperator);
S
slzhou 已提交
1527 1528
      break;
    }
1529

S
slzhou 已提交
1530 1531 1532 1533
    if (pInfo->pRes->info.rows > 0) {
      break;
    }
  }
1534

1535
  size_t rows = blockDataGetNumOfRows(pInfo->pRes);
1536 1537
  pOperator->resultInfo.totalRows += rows;

1538
  return (rows == 0) ? NULL : pInfo->pRes;
1539 1540
}

1541
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
C
Cary Xu 已提交
1542 1543 1544 1545 1546
  for (int32_t i = 0; i < numOfExprs; ++i) {
    SExprInfo* pExprInfo = &pExpr[i];
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
        taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
5
54liuyao 已提交
1547 1548
      } else if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_VALUE) {
        taosVariantDestroy(&pExprInfo->base.pParam[j].param);
H
Haojun Liao 已提交
1549
      }
1550
    }
C
Cary Xu 已提交
1551 1552 1553

    taosMemoryFree(pExprInfo->base.pParam);
    taosMemoryFree(pExprInfo->pExpr);
H
Haojun Liao 已提交
1554 1555 1556
  }
}

5
54liuyao 已提交
1557
void destroyOperatorInfo(SOperatorInfo* pOperator) {
1558 1559 1560 1561
  if (pOperator == NULL) {
    return;
  }

1562
  if (pOperator->fpSet.closeFn != NULL) {
1563
    pOperator->fpSet.closeFn(pOperator->info);
1564 1565
  }

H
Haojun Liao 已提交
1566
  if (pOperator->pDownstream != NULL) {
L
Liu Jicong 已提交
1567
    for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
H
Haojun Liao 已提交
1568
      destroyOperatorInfo(pOperator->pDownstream[i]);
1569 1570
    }

wafwerar's avatar
wafwerar 已提交
1571
    taosMemoryFreeClear(pOperator->pDownstream);
H
Haojun Liao 已提交
1572
    pOperator->numOfDownstream = 0;
1573 1574
  }

1575
  cleanupExprSupp(&pOperator->exprSupp);
wafwerar's avatar
wafwerar 已提交
1576
  taosMemoryFreeClear(pOperator);
1577 1578
}

1579 1580 1581 1582
// each operator should be set their own function to return total cost buffer
int32_t optrDefaultBufFn(SOperatorInfo* pOperator) {
  if (pOperator->blocking) {
    ASSERT(0);
H
Haojun Liao 已提交
1583
    return 0;
1584 1585 1586 1587 1588
  } else {
    return 0;
  }
}

1589 1590 1591 1592 1593 1594
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) {
  *defaultPgsz = 4096;
  while (*defaultPgsz < rowSize * 4) {
    *defaultPgsz <<= 1u;
  }

1595
  // The default buffer for each operator in query is 10MB.
1596
  // at least four pages need to be in buffer
1597 1598
  // TODO: make this variable to be configurable.
  *defaultBufsz = 4096 * 2560;
1599 1600 1601 1602 1603 1604 1605
  if ((*defaultBufsz) <= (*defaultPgsz)) {
    (*defaultBufsz) = (*defaultPgsz) * 4;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
1606 1607
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                         const char* pKey) {
L
Liu Jicong 已提交
1608
  int32_t    code = 0;
1609 1610
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);

1611
  pAggSup->currentPageId = -1;
dengyihao's avatar
dengyihao 已提交
1612 1613
  pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
  pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
H
Haojun Liao 已提交
1614
  pAggSup->pResultRowHashTable = tSimpleHashInit(100, hashFn);
1615

H
Haojun Liao 已提交
1616
  if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
1617 1618 1619
    return TSDB_CODE_OUT_OF_MEMORY;
  }

dengyihao's avatar
dengyihao 已提交
1620
  uint32_t defaultPgsz = 0;
1621 1622
  uint32_t defaultBufsz = 0;
  getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
H
Haojun Liao 已提交
1623

wafwerar's avatar
wafwerar 已提交
1624
  if (!osTempSpaceAvailable()) {
H
Haojun Liao 已提交
1625 1626 1627
    code = TSDB_CODE_NO_AVAIL_DISK;
    qError("Init stream agg supporter failed since %s, %s", terrstr(code), pKey);
    return code;
wafwerar's avatar
wafwerar 已提交
1628
  }
1629

H
Haojun Liao 已提交
1630
  code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
H
Haojun Liao 已提交
1631
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1632
    qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey);
H
Haojun Liao 已提交
1633 1634 1635
    return code;
  }

H
Haojun Liao 已提交
1636
  return code;
1637 1638
}

1639
void cleanupAggSup(SAggSupporter* pAggSup) {
wafwerar's avatar
wafwerar 已提交
1640
  taosMemoryFreeClear(pAggSup->keyBuf);
1641
  tSimpleHashCleanup(pAggSup->pResultRowHashTable);
H
Haojun Liao 已提交
1642
  destroyDiskbasedBuf(pAggSup->pResultBuf);
1643 1644
}

H
Haojun Liao 已提交
1645
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
L
Liu Jicong 已提交
1646
                    const char* pkey) {
1647 1648 1649 1650 1651
  int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1652 1653 1654 1655 1656
  code = doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

L
Liu Jicong 已提交
1657
  for (int32_t i = 0; i < numOfCols; ++i) {
1658
    pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf;
1659 1660
  }

1661
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1662 1663
}

L
Liu Jicong 已提交
1664
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) {
wmmhello's avatar
wmmhello 已提交
1665
  ASSERT(numOfRows != 0);
1666 1667
  pResultInfo->capacity = numOfRows;
  pResultInfo->threshold = numOfRows * 0.75;
1668

1669 1670
  if (pResultInfo->threshold == 0) {
    pResultInfo->threshold = numOfRows;
1671 1672 1673
  }
}

1674 1675 1676 1677 1678
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
  pInfo->pRes = pBlock;
  initResultRowInfo(&pInfo->resultRowInfo);
}

5
54liuyao 已提交
1679
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
1680 1681 1682 1683 1684 1685 1686 1687 1688 1689
  if (pCtx == NULL) {
    return NULL;
  }

  for (int32_t i = 0; i < numOfOutput; ++i) {
    for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) {
      taosVariantDestroy(&pCtx[i].param[j].param);
    }

    taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
1690
    taosMemoryFreeClear(pCtx[i].subsidiaries.buf);
1691 1692 1693 1694 1695 1696 1697 1698
    taosMemoryFree(pCtx[i].input.pData);
    taosMemoryFree(pCtx[i].input.pColumnDataAgg);
  }

  taosMemoryFreeClear(pCtx);
  return NULL;
}

1699
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) {
1700 1701 1702 1703
  pSup->pExprInfo = pExprInfo;
  pSup->numOfExprs = numOfExpr;
  if (pSup->pExprInfo != NULL) {
    pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset);
1704 1705 1706
    if (pSup->pCtx == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1707
  }
1708 1709

  return TSDB_CODE_SUCCESS;
1710 1711
}

1712 1713 1714 1715
void cleanupExprSupp(SExprSupp* pSupp) {
  destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs);
  if (pSupp->pExprInfo != NULL) {
    destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
C
Cary Xu 已提交
1716
    taosMemoryFreeClear(pSupp->pExprInfo);
1717
  }
H
Haojun Liao 已提交
1718 1719 1720 1721 1722 1723

  if (pSupp->pFilterInfo != NULL) {
    filterFreeInfo(pSupp->pFilterInfo);
    pSupp->pFilterInfo = NULL;
  }

1724 1725 1726
  taosMemoryFree(pSupp->rowEntryInfoOffset);
}

1727 1728
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode,
                                           SExecTaskInfo* pTaskInfo) {
wafwerar's avatar
wafwerar 已提交
1729
  SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
L
Liu Jicong 已提交
1730
  SOperatorInfo*    pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
1731 1732 1733
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
H
Haojun Liao 已提交
1734

H
Haojun Liao 已提交
1735
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc);
H
Haojun Liao 已提交
1736 1737 1738
  initBasicInfo(&pInfo->binfo, pResBlock);

  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
1739
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
1740

1741 1742
  int32_t    num = 0;
  SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
H
Haojun Liao 已提交
1743
  int32_t    code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
L
Liu Jicong 已提交
1744
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
1745 1746
    goto _error;
  }
H
Haojun Liao 已提交
1747

H
Haojun Liao 已提交
1748 1749 1750 1751 1752 1753
  int32_t    numOfScalarExpr = 0;
  SExprInfo* pScalarExprInfo = NULL;
  if (pAggNode->pExprs != NULL) {
    pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
  }

1754 1755 1756 1757
  code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
1758

H
Haojun Liao 已提交
1759 1760 1761 1762 1763
  code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
1764
  pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
1765
  pInfo->groupId = UINT64_MAX;
H
Haojun Liao 已提交
1766

1767 1768
  setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
1769
  pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
1770

1771 1772
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    STableScanInfo* pTableScanInfo = downstream->info;
H
Haojun Liao 已提交
1773 1774
    pTableScanInfo->base.pdInfo.pExprSup = &pOperator->exprSupp;
    pTableScanInfo->base.pdInfo.pAggSup = &pInfo->aggSup;
1775 1776
  }

H
Haojun Liao 已提交
1777 1778 1779 1780
  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
1781 1782

  return pOperator;
H
Haojun Liao 已提交
1783

1784
_error:
H
Haojun Liao 已提交
1785 1786 1787 1788
  if (pInfo != NULL) {
    destroyAggOperatorInfo(pInfo);
  }

1789 1790 1791
  if (pOperator != NULL) {
    cleanupExprSupp(&pOperator->exprSupp);
  }
H
Haojun Liao 已提交
1792

1793
  taosMemoryFreeClear(pOperator);
H
Haojun Liao 已提交
1794
  pTaskInfo->code = code;
H
Haojun Liao 已提交
1795
  return NULL;
1796 1797
}

1798
void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
1799
  assert(pInfo != NULL);
H
Haojun Liao 已提交
1800
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
1801 1802
}

H
Haojun Liao 已提交
1803 1804 1805 1806 1807 1808 1809
static void freeItem(void* pItem) {
  void** p = pItem;
  if (*p != NULL) {
    taosMemoryFreeClear(*p);
  }
}

1810
void destroyAggOperatorInfo(void* param) {
L
Liu Jicong 已提交
1811
  SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
L
Liu Jicong 已提交
1812 1813
  cleanupBasicInfo(&pInfo->binfo);

H
Haojun Liao 已提交
1814
  cleanupAggSup(&pInfo->aggSup);
S
shenglian zhou 已提交
1815
  cleanupExprSupp(&pInfo->scalarExprSup);
H
Haojun Liao 已提交
1816
  cleanupGroupResInfo(&pInfo->groupResInfo);
D
dapan1121 已提交
1817
  taosMemoryFreeClear(param);
1818
}
1819

D
dapan1121 已提交
1820
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model, char* dbFName) {
wafwerar's avatar
wafwerar 已提交
1821
  SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
H
Haojun Liao 已提交
1822 1823 1824 1825 1826
  if (pTaskInfo == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

1827
  setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
1828

1829
  pTaskInfo->schemaInfo.dbname = strdup(dbFName);
H
Haojun Liao 已提交
1830
  pTaskInfo->id.queryId = queryId;
dengyihao's avatar
dengyihao 已提交
1831
  pTaskInfo->execModel = model;
H
Haojun Liao 已提交
1832
  pTaskInfo->pTableInfoList = tableListCreate();
D
dapan1121 已提交
1833
  pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo));
H
Haojun Liao 已提交
1834
  pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES);
H
Haojun Liao 已提交
1835

wafwerar's avatar
wafwerar 已提交
1836
  char* p = taosMemoryCalloc(1, 128);
L
Liu Jicong 已提交
1837
  snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId);
H
Haojun Liao 已提交
1838
  pTaskInfo->id.str = p;
H
Haojun Liao 已提交
1839

1840 1841
  return pTaskInfo;
}
H
Haojun Liao 已提交
1842

H
Haojun Liao 已提交
1843 1844
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);

1845
int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) {
1846 1847
  SMetaReader mr = {0};
  metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
1848
  int32_t code = metaGetTableEntryByUidCache(&mr, pScanNode->uid);
1849
  if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
1850 1851
    qError("failed to get the table meta, uid:0x%" PRIx64 ", suid:0x%" PRIx64 ", %s", pScanNode->uid, pScanNode->suid,
           GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
1852

D
dapan1121 已提交
1853
    metaReaderClear(&mr);
1854
    return terrno;
D
dapan1121 已提交
1855
  }
1856

1857 1858
  SSchemaInfo* pSchemaInfo = &pTaskInfo->schemaInfo;
  pSchemaInfo->tablename = strdup(mr.me.name);
1859 1860

  if (mr.me.type == TSDB_SUPER_TABLE) {
1861 1862
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
    pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
1863
  } else if (mr.me.type == TSDB_CHILD_TABLE) {
1864 1865
    tDecoderClear(&mr.coder);

1866
    tb_uid_t suid = mr.me.ctbEntry.suid;
H
Haojun Liao 已提交
1867
    metaGetTableEntryByUidCache(&mr, suid);
1868 1869
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
    pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
1870
  } else {
1871
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
1872
  }
1873 1874

  metaReaderClear(&mr);
1875

H
Haojun Liao 已提交
1876 1877 1878 1879 1880
  pSchemaInfo->qsw = extractQueriedColumnSchema(pScanNode);
  return TSDB_CODE_SUCCESS;
}

SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
1881 1882 1883
  int32_t numOfCols = LIST_LENGTH(pScanNode->pScanCols);
  int32_t numOfTags = LIST_LENGTH(pScanNode->pScanPseudoCols);

1884
  SSchemaWrapper* pqSw = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
1885
  pqSw->pSchema = taosMemoryCalloc(numOfCols + numOfTags, sizeof(SSchema));
1886

L
Liu Jicong 已提交
1887
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
1888
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanCols, i);
1889 1890
    SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

H
Haojun Liao 已提交
1891 1892 1893
    SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
    pSchema->colId = pColNode->colId;
    pSchema->type = pColNode->node.resType.type;
H
Haojun Liao 已提交
1894 1895
    pSchema->bytes = pColNode->node.resType.bytes;
    tstrncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
1896 1897
  }

1898
  // this the tags and pseudo function columns, we only keep the tag columns
1899
  for (int32_t i = 0; i < numOfTags; ++i) {
1900 1901 1902 1903 1904 1905 1906 1907 1908
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanPseudoCols, i);

    int32_t type = nodeType(pNode->pExpr);
    if (type == QUERY_NODE_COLUMN) {
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

      SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
      pSchema->colId = pColNode->colId;
      pSchema->type = pColNode->node.resType.type;
H
Haojun Liao 已提交
1909
      pSchema->bytes = pColNode->node.resType.bytes;
H
Haojun Liao 已提交
1910
      tstrncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
1911 1912 1913
    }
  }

H
Haojun Liao 已提交
1914
  return pqSw;
1915 1916
}

1917 1918
static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
  taosMemoryFreeClear(pSchemaInfo->dbname);
1919
  taosMemoryFreeClear(pSchemaInfo->tablename);
1920 1921
  tDeleteSSchemaWrapper(pSchemaInfo->sw);
  tDeleteSSchemaWrapper(pSchemaInfo->qsw);
1922 1923
}

1924
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); }
1925

1926
bool groupbyTbname(SNodeList* pGroupList) {
1927
  bool bytbname = false;
1928
  if (LIST_LENGTH(pGroupList) == 1) {
1929 1930 1931 1932 1933 1934 1935 1936 1937 1938
    SNode* p = nodesListGetNode(pGroupList, 0);
    if (p->type == QUERY_NODE_FUNCTION) {
      // partition by tbname/group by tbname
      bytbname = (strcmp(((struct SFunctionNode*)p)->functionName, "tbname") == 0);
    }
  }

  return bytbname;
}

1939 1940
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond,
                                  SNode* pTagIndexCond, const char* pUser) {
1941
  int32_t         type = nodeType(pPhyNode);
1942
  STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
1943
  const char*     idstr = GET_TASKID(pTaskInfo);
1944

X
Xiaoyu Wang 已提交
1945
  if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
1946
    SOperatorInfo* pOperator = NULL;
H
Haojun Liao 已提交
1947
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
dengyihao's avatar
dengyihao 已提交
1948
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
1949

1950 1951 1952 1953 1954 1955
      // NOTE: this is an patch to fix the physical plan
      // TODO remove it later
      if (pTableScanNode->scan.node.pLimit != NULL) {
        pTableScanNode->groupSort = true;
      }

L
Liu Jicong 已提交
1956 1957
      int32_t code =
          createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
H
Haojun Liao 已提交
1958
                                  pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
1959
      if (code) {
wmmhello's avatar
wmmhello 已提交
1960
        pTaskInfo->code = code;
1961
        qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr);
D
dapan1121 已提交
1962 1963
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
1964

1965
      code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo);
S
slzhou 已提交
1966
      if (code) {
1967
        pTaskInfo->code = terrno;
wmmhello's avatar
wmmhello 已提交
1968 1969 1970
        return NULL;
      }

1971
      pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
D
dapan1121 已提交
1972 1973 1974 1975 1976
      if (NULL == pOperator) {
        pTaskInfo->code = terrno;
        return NULL;
      }

1977
      STableScanInfo* pScanInfo = pOperator->info;
H
Haojun Liao 已提交
1978
      pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
S
slzhou 已提交
1979 1980
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
      STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
1981 1982 1983

      int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, true, pHandle,
                                             pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
L
Liu Jicong 已提交
1984
      if (code) {
wmmhello's avatar
wmmhello 已提交
1985
        pTaskInfo->code = code;
H
Haojun Liao 已提交
1986
        qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
wmmhello's avatar
wmmhello 已提交
1987 1988
        return NULL;
      }
1989

1990
      code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo);
wmmhello's avatar
wmmhello 已提交
1991 1992 1993 1994
      if (code) {
        pTaskInfo->code = terrno;
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
1995

H
Haojun Liao 已提交
1996
      pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
D
dapan1121 已提交
1997 1998 1999 2000
      if (NULL == pOperator) {
        pTaskInfo->code = terrno;
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
2001

2002
      STableScanInfo* pScanInfo = pOperator->info;
H
Haojun Liao 已提交
2003
      pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder;
H
Haojun Liao 已提交
2004
    } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
2005 2006
      pOperator = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode,
                                             pTaskInfo);
H
Haojun Liao 已提交
2007
    } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
5
54liuyao 已提交
2008
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
5
54liuyao 已提交
2009
      if (pHandle->vnode) {
L
Liu Jicong 已提交
2010 2011
        int32_t code =
            createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
H
Haojun Liao 已提交
2012
                                    pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
L
Liu Jicong 已提交
2013
        if (code) {
wmmhello's avatar
wmmhello 已提交
2014
          pTaskInfo->code = code;
H
Haojun Liao 已提交
2015
          qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
wmmhello's avatar
wmmhello 已提交
2016 2017
          return NULL;
        }
L
Liu Jicong 已提交
2018 2019

#ifndef NDEBUG
H
Haojun Liao 已提交
2020
        int32_t sz = tableListGetSize(pTableListInfo);
H
Haojun Liao 已提交
2021 2022
        qDebug("create stream task, total:%d", sz);

L
Liu Jicong 已提交
2023
        for (int32_t i = 0; i < sz; i++) {
H
Haojun Liao 已提交
2024
          STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
2025
          qDebug("add table uid:%" PRIu64 ", gid:%" PRIu64, pKeyInfo->uid, pKeyInfo->groupId);
L
Liu Jicong 已提交
2026 2027
        }
#endif
2028
      }
2029

H
Haojun Liao 已提交
2030
      pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan);
2031
      pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo);
H
Haojun Liao 已提交
2032
    } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
L
Liu Jicong 已提交
2033
      SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
2034
      pOperator = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo);
S
shenglian zhou 已提交
2035 2036 2037
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN == type) {
      STableCountScanPhysiNode* pTblCountScanNode = (STableCountScanPhysiNode*)pPhyNode;
      pOperator = createTableCountScanOperatorInfo(pHandle, pTblCountScanNode, pTaskInfo);
2038
    } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
X
Xiaoyu Wang 已提交
2039
      STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
2040 2041

      int32_t code = createScanTableListInfo(pScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
H
Haojun Liao 已提交
2042
                                             pTagIndexCond, pTaskInfo);
2043
      if (code != TSDB_CODE_SUCCESS) {
2044
        pTaskInfo->code = code;
H
Haojun Liao 已提交
2045
        qError("failed to getTableList, code: %s", tstrerror(code));
2046 2047 2048
        return NULL;
      }

H
Haojun Liao 已提交
2049
      pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTaskInfo);
2050
    } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
2051
      SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
2052 2053

      if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
H
Haojun Liao 已提交
2054 2055
        SArray* pList = taosArrayInit(4, sizeof(STableKeyInfo));
        int32_t code = vnodeGetAllTableList(pHandle->vnode, pBlockNode->uid, pList);
2056 2057 2058 2059
        if (code != TSDB_CODE_SUCCESS) {
          pTaskInfo->code = terrno;
          return NULL;
        }
H
Haojun Liao 已提交
2060

H
Haojun Liao 已提交
2061 2062
        size_t num = taosArrayGetSize(pList);
        for (int32_t i = 0; i < num; ++i) {
H
Haojun Liao 已提交
2063
          STableKeyInfo* p = taosArrayGet(pList, i);
H
Haojun Liao 已提交
2064
          tableListAddTableInfo(pTableListInfo, p->uid, 0);
H
Haojun Liao 已提交
2065
        }
H
Haojun Liao 已提交
2066

H
Haojun Liao 已提交
2067
        taosArrayDestroy(pList);
2068
      } else {  // Create group with only one table
H
Haojun Liao 已提交
2069
        tableListAddTableInfo(pTableListInfo, pBlockNode->uid, 0);
2070 2071
      }

2072
      pOperator = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTaskInfo);
H
Haojun Liao 已提交
2073 2074 2075
    } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
      SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;

L
Liu Jicong 已提交
2076
      int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo,
H
Haojun Liao 已提交
2077
                                             pTagCond, pTagIndexCond, pTaskInfo);
2078 2079 2080 2081
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
      }
2082

2083
      code = extractTableSchemaInfo(pHandle, &pScanNode->scan, pTaskInfo);
2084 2085 2086
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
H
Haojun Liao 已提交
2087 2088
      }

2089
      pOperator = createCacherowsScanOperator(pScanNode, pHandle, pTaskInfo);
2090
    } else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
2091
      pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
2092 2093
    } else {
      ASSERT(0);
H
Haojun Liao 已提交
2094
    }
2095 2096 2097 2098 2099

    if (pOperator != NULL) {
      pOperator->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
    }

2100
    return pOperator;
H
Haojun Liao 已提交
2101 2102
  }

2103
  size_t          size = LIST_LENGTH(pPhyNode->pChildren);
2104
  SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
2105 2106 2107 2108
  if (ops == NULL) {
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
2109
  for (int32_t i = 0; i < size; ++i) {
2110
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
2111
    ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTagCond, pTagIndexCond, pUser);
2112
    if (ops[i] == NULL) {
H
Haojun Liao 已提交
2113
      taosMemoryFree(ops);
2114 2115
      return NULL;
    }
2116
  }
H
Haojun Liao 已提交
2117

2118
  SOperatorInfo* pOptr = NULL;
H
Haojun Liao 已提交
2119
  if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
2120
    pOptr = createProjectOperatorInfo(ops[0], (SProjectPhysiNode*)pPhyNode, pTaskInfo);
2121
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) {
H
Haojun Liao 已提交
2122 2123
    SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
    if (pAggNode->pGroupKeys != NULL) {
H
Haojun Liao 已提交
2124
      pOptr = createGroupOperatorInfo(ops[0], pAggNode, pTaskInfo);
H
Haojun Liao 已提交
2125
    } else {
H
Haojun Liao 已提交
2126
      pOptr = createAggregateOperatorInfo(ops[0], pAggNode, pTaskInfo);
H
Haojun Liao 已提交
2127
    }
2128
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type) {
H
Haojun Liao 已提交
2129
    SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
2130

H
Haojun Liao 已提交
2131 2132
    bool isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
    pOptr = createIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo, isStream);
2133
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
2134
    pOptr = createStreamIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo);
2135 2136
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
    SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode;
2137
    pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo);
S
shenglian zhou 已提交
2138
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
X
Xiaoyu Wang 已提交
2139
    SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
2140
    pOptr = createMergeIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo);
5
54liuyao 已提交
2141
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
2142
    int32_t children = 0;
5
54liuyao 已提交
2143 2144
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) {
5
54liuyao 已提交
2145
    int32_t children = pHandle->numOfVgroups;
5
54liuyao 已提交
2146
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
2147
  } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
2148
    pOptr = createSortOperatorInfo(ops[0], (SSortPhysiNode*)pPhyNode, pTaskInfo);
S
shenglian zhou 已提交
2149 2150
  } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT == type) {
    pOptr = createGroupSortOperatorInfo(ops[0], (SGroupSortPhysiNode*)pPhyNode, pTaskInfo);
X
Xiaoyu Wang 已提交
2151
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) {
2152
    SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode;
2153
    pOptr = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo);
2154
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
H
Haojun Liao 已提交
2155
    SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
2156
    pOptr = createSessionAggOperatorInfo(ops[0], pSessionNode, pTaskInfo);
2157
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
2158 2159 2160 2161 2162
    pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
    int32_t children = 0;
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION == type) {
2163
    int32_t children = pHandle->numOfVgroups;
2164
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
2165
  } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
2166
    pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
2167
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION == type) {
2168
    pOptr = createStreamPartitionOperatorInfo(ops[0], (SStreamPartitionPhysiNode*)pPhyNode, pTaskInfo);
2169
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
dengyihao's avatar
dengyihao 已提交
2170
    SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
2171
    pOptr = createStatewindowOperatorInfo(ops[0], pStateNode, pTaskInfo);
2172
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
5
54liuyao 已提交
2173
    pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
2174
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
2175
    pOptr = createMergeJoinOperatorInfo(ops, size, (SSortMergeJoinPhysiNode*)pPhyNode, pTaskInfo);
2176
  } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
H
Haojun Liao 已提交
2177
    pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo);
5
54liuyao 已提交
2178 2179
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL == type) {
    pOptr = createStreamFillOperatorInfo(ops[0], (SStreamFillPhysiNode*)pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
2180 2181
  } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
    pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo);
2182 2183
  } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
    pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
2184 2185
  } else {
    ASSERT(0);
H
Haojun Liao 已提交
2186
  }
2187

2188
  taosMemoryFree(ops);
2189 2190 2191 2192
  if (pOptr) {
    pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
  }

2193
  return pOptr;
2194
}
H
Haojun Liao 已提交
2195

L
Liu Jicong 已提交
2196 2197 2198 2199
static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator");
S
Shengliang Guan 已提交
2200
      return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
2201 2202 2203 2204
    }

    if (pOperator->numOfDownstream > 1) {
      qError("join not supported for stream block scan");
S
Shengliang Guan 已提交
2205
      return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
2206 2207 2208
    }
    return extractTbscanInStreamOpTree(pOperator->pDownstream[0], ppInfo);
  } else {
2209 2210 2211
    SStreamScanInfo* pInfo = pOperator->info;
    ASSERT(pInfo->pTableScanOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
    *ppInfo = pInfo->pTableScanOp->info;
L
Liu Jicong 已提交
2212 2213 2214 2215
    return 0;
  }
}

2216 2217 2218 2219 2220 2221 2222
int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) {
  if (pNode->pChildren == NULL || LIST_LENGTH(pNode->pChildren) == 0) {
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == pNode->type) {
      *ppNode = (STableScanPhysiNode*)pNode;
      return 0;
    } else {
      ASSERT(0);
S
Shengliang Guan 已提交
2223
      terrno = TSDB_CODE_APP_ERROR;
2224 2225 2226 2227 2228
      return -1;
    }
  } else {
    if (LIST_LENGTH(pNode->pChildren) != 1) {
      ASSERT(0);
S
Shengliang Guan 已提交
2229
      terrno = TSDB_CODE_APP_ERROR;
2230 2231 2232 2233 2234 2235 2236 2237
      return -1;
    }
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pNode->pChildren, 0);
    return extractTableScanNode(pChildNode, ppNode);
  }
  return -1;
}

2238
#if 0
L
Liu Jicong 已提交
2239 2240 2241 2242 2243
int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHandle, int64_t uid, int64_t ts) {
  STableScanInfo* pTableScanInfo = NULL;
  if (extractTbscanInStreamOpTree(pOperator, &pTableScanInfo) < 0) {
    return -1;
  }
2244

L
Liu Jicong 已提交
2245 2246 2247 2248
  STableScanPhysiNode* pNode = NULL;
  if (extractTableScanNode(plan->pNode, &pNode) < 0) {
    ASSERT(0);
  }
2249

H
Haojun Liao 已提交
2250
  tsdbReaderClose(pTableScanInfo->dataReader);
2251

L
Liu Jicong 已提交
2252
  STableListInfo info = {0};
H
Haojun Liao 已提交
2253
  pTableScanInfo->dataReader = doCreateDataReader(pNode, pHandle, &info, NULL);
L
Liu Jicong 已提交
2254 2255 2256
  if (pTableScanInfo->dataReader == NULL) {
    ASSERT(0);
    qError("failed to create data reader");
S
Shengliang Guan 已提交
2257
    return TSDB_CODE_APP_ERROR;
2258
  }
L
Liu Jicong 已提交
2259
  // TODO: set uid and ts to data reader
2260 2261
  return 0;
}
2262
#endif
2263

D
dapan1121 已提交
2264
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle) {
D
dapan1121 已提交
2265
  SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo;
2266

D
dapan1121 已提交
2267
  switch (pNode->type) {
D
dapan1121 已提交
2268 2269 2270 2271 2272 2273
    case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
      SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
      if (NULL == pInserterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      pInserterParam->readHandle = readHandle;
L
Liu Jicong 已提交
2274

D
dapan1121 已提交
2275 2276 2277
      *pParam = pInserterParam;
      break;
    }
D
dapan1121 已提交
2278
    case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
2279
      SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
D
dapan1121 已提交
2280 2281 2282
      if (NULL == pDeleterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
H
Haojun Liao 已提交
2283 2284 2285 2286
      int32_t tbNum = tableListGetSize(pTask->pTableInfoList);
      pDeleterParam->suid = tableListGetSuid(pTask->pTableInfoList);

      // TODO extract uid list
D
dapan1121 已提交
2287 2288 2289 2290 2291
      pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t));
      if (NULL == pDeleterParam->pUidList) {
        taosMemoryFree(pDeleterParam);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
H
Haojun Liao 已提交
2292

D
dapan1121 已提交
2293
      for (int32_t i = 0; i < tbNum; ++i) {
H
Haojun Liao 已提交
2294
        STableKeyInfo* pTable = tableListGetInfo(pTask->pTableInfoList, i);
D
dapan1121 已提交
2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307
        taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
      }

      *pParam = pDeleterParam;
      break;
    }
    default:
      break;
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2308
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
D
dapan1121 已提交
2309
                               char* sql, EOPTR_EXEC_MODEL model) {
H
Haojun Liao 已提交
2310 2311
  uint64_t queryId = pPlan->id.queryId;

D
dapan1121 已提交
2312
  *pTaskInfo = createExecTaskInfo(queryId, taskId, model, pPlan->dbFName);
H
Haojun Liao 已提交
2313 2314 2315
  if (*pTaskInfo == NULL) {
    goto _complete;
  }
H
Haojun Liao 已提交
2316

2317
  if (pHandle) {
L
Liu Jicong 已提交
2318
    /*(*pTaskInfo)->streamInfo.fillHistoryVer1 = pHandle->fillHistoryVer1;*/
2319 2320 2321
    if (pHandle->pStateBackend) {
      (*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend;
    }
H
Haojun Liao 已提交
2322 2323
  }

2324
  (*pTaskInfo)->sql = sql;
D
dapan1121 已提交
2325
  sql = NULL;
H
Haojun Liao 已提交
2326

2327
  (*pTaskInfo)->pSubplan = pPlan;
2328 2329
  (*pTaskInfo)->pRoot =
      createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user);
L
Liu Jicong 已提交
2330

D
dapan1121 已提交
2331
  if (NULL == (*pTaskInfo)->pRoot) {
H
Haojun Liao 已提交
2332
    terrno = (*pTaskInfo)->code;
D
dapan1121 已提交
2333
    goto _complete;
2334 2335
  }

2336
  (*pTaskInfo)->cost.created = taosGetTimestampUs();
H
Haojun Liao 已提交
2337
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2338

H
Haojun Liao 已提交
2339
_complete:
D
dapan1121 已提交
2340
  taosMemoryFree(sql);
H
Haojun Liao 已提交
2341
  doDestroyTask(*pTaskInfo);
H
Haojun Liao 已提交
2342
  return terrno;
H
Haojun Liao 已提交
2343 2344
}

H
Haojun Liao 已提交
2345 2346 2347 2348 2349
static void freeBlock(void* pParam) {
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
  blockDataDestroy(pBlock);
}

L
Liu Jicong 已提交
2350
void doDestroyTask(SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
2351 2352
  qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));

H
Haojun Liao 已提交
2353
  pTaskInfo->pTableInfoList = tableListDestroy(pTaskInfo->pTableInfoList);
H
Haojun Liao 已提交
2354
  destroyOperatorInfo(pTaskInfo->pRoot);
2355
  cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);
2356
  cleanupStreamInfo(&pTaskInfo->streamInfo);
2357

D
dapan1121 已提交
2358
  if (!pTaskInfo->localFetch.localExec) {
D
dapan1121 已提交
2359 2360
    nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
  }
2361

H
Haojun Liao 已提交
2362
  taosArrayDestroyEx(pTaskInfo->pResultBlockList, freeBlock);
D
dapan1121 已提交
2363
  taosArrayDestroy(pTaskInfo->stopInfo.pStopInfo);
wafwerar's avatar
wafwerar 已提交
2364 2365 2366
  taosMemoryFreeClear(pTaskInfo->sql);
  taosMemoryFreeClear(pTaskInfo->id.str);
  taosMemoryFreeClear(pTaskInfo);
2367 2368 2369 2370
}

static int64_t getQuerySupportBufSize(size_t numOfTables) {
  size_t s1 = sizeof(STableQueryInfo);
L
Liu Jicong 已提交
2371 2372
  //  size_t s3 = sizeof(STableCheckInfo);  buffer consumption in tsdb
  return (int64_t)(s1 * 1.5 * numOfTables);
2373 2374 2375 2376 2377 2378 2379
}

int32_t checkForQueryBuf(size_t numOfTables) {
  int64_t t = getQuerySupportBufSize(numOfTables);
  if (tsQueryBufferSizeBytes < 0) {
    return TSDB_CODE_SUCCESS;
  } else if (tsQueryBufferSizeBytes > 0) {
L
Liu Jicong 已提交
2380
    while (1) {
2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406
      int64_t s = tsQueryBufferSizeBytes;
      int64_t remain = s - t;
      if (remain >= 0) {
        if (atomic_val_compare_exchange_64(&tsQueryBufferSizeBytes, s, remain) == s) {
          return TSDB_CODE_SUCCESS;
        }
      } else {
        return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
      }
    }
  }

  // disable query processing if the value of tsQueryBufferSize is zero.
  return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
}

void releaseQueryBuf(size_t numOfTables) {
  if (tsQueryBufferSizeBytes < 0) {
    return;
  }

  int64_t t = getQuerySupportBufSize(numOfTables);

  // restore value is not enough buffer available
  atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
}
D
dapan1121 已提交
2407

H
Haojun Liao 已提交
2408
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) {
2409
  SExplainExecInfo  execInfo = {0};
H
Haojun Liao 已提交
2410
  SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo);
2411

H
Haojun Liao 已提交
2412 2413 2414 2415 2416
  pExplainInfo->numOfRows = operatorInfo->resultInfo.totalRows;
  pExplainInfo->startupCost = operatorInfo->cost.openCost;
  pExplainInfo->totalCost = operatorInfo->cost.totalCost;
  pExplainInfo->verboseLen = 0;
  pExplainInfo->verboseInfo = NULL;
D
dapan1121 已提交
2417

2418
  if (operatorInfo->fpSet.getExplainFn) {
2419 2420
    int32_t code =
        operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen);
D
dapan1121 已提交
2421
    if (code) {
2422
      qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
D
dapan1121 已提交
2423 2424 2425
      return code;
    }
  }
dengyihao's avatar
dengyihao 已提交
2426

D
dapan1121 已提交
2427
  int32_t code = 0;
D
dapan1121 已提交
2428
  for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
H
Haojun Liao 已提交
2429 2430
    code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pExecInfoList);
    if (code != TSDB_CODE_SUCCESS) {
2431
      //      taosMemoryFreeClear(*pRes);
S
Shengliang Guan 已提交
2432
      return TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
2433 2434 2435 2436
    }
  }

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
2437
}
5
54liuyao 已提交
2438

2439 2440
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
                     SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup) {
2441 2442 2443 2444 2445 2446
  SWinKey key = {
      .ts = win->skey,
      .groupId = tableGroupId,
  };
  char*   value = NULL;
  int32_t size = pAggSup->resultRowSize;
5
54liuyao 已提交
2447

2448
  if (streamStateAddIfNotExist(pState, &key, (void**)&value, &size) < 0) {
S
Shengliang Guan 已提交
2449
    return TSDB_CODE_OUT_OF_MEMORY;
2450 2451 2452 2453 2454 2455 2456 2457 2458
  }
  *pResult = (SResultRow*)value;
  ASSERT(*pResult);
  // set time window for current result
  (*pResult)->win = (*win);
  setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset);
  return TSDB_CODE_SUCCESS;
}

2459 2460
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult) {
  streamStateReleaseBuf(pState, pKey, pResult);
2461 2462 2463
  return TSDB_CODE_SUCCESS;
}

2464 2465
int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize) {
  streamStatePut(pState, pKey, pResult, resSize);
2466 2467 2468
  return TSDB_CODE_SUCCESS;
}

2469
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
2470
                                   SGroupResInfo* pGroupResInfo) {
2471
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);

  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
    int32_t     size = 0;
    void*       pVal = NULL;
    SWinKey     key = {
2484 2485
            .ts = *(TSKEY*)pPos->key,
            .groupId = pPos->groupId,
2486
    };
2487
    int32_t code = streamStateGet(pState, &key, &pVal, &size);
2488 2489 2490 2491 2492 2493
    ASSERT(code == 0);
    SResultRow* pRow = (SResultRow*)pVal;
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
    // no results, continue to check the next one
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
2494
      releaseOutputBuf(pState, &key, pRow);
2495 2496 2497
      continue;
    }

H
Haojun Liao 已提交
2498 2499
    if (pBlock->info.id.groupId == 0) {
      pBlock->info.id.groupId = pPos->groupId;
2500
      void* tbname = NULL;
H
Haojun Liao 已提交
2501
      if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
L
Liu Jicong 已提交
2502
        pBlock->info.parTbName[0] = 0;
2503 2504
      } else {
        memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
2505
      }
2506
      tdbFree(tbname);
2507 2508
    } else {
      // current value belongs to different group, it can't be packed into one datablock
H
Haojun Liao 已提交
2509
      if (pBlock->info.id.groupId != pPos->groupId) {
2510
        releaseOutputBuf(pState, &key, pRow);
2511 2512 2513 2514 2515 2516
        break;
      }
    }

    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
      ASSERT(pBlock->info.rows > 0);
2517
      releaseOutputBuf(pState, &key, pRow);
2518 2519 2520 2521 2522 2523 2524 2525 2526 2527
      break;
    }

    pGroupResInfo->index += 1;

    for (int32_t j = 0; j < numOfExprs; ++j) {
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
      if (pCtx[j].fpSet.finalize) {
2528 2529 2530 2531
        int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
        if (TAOS_FAILED(code1)) {
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
          T_LONG_JMP(pTaskInfo->env, code1);
2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544
        }
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
      } else {
        // expand the result into multiple rows. E.g., _wstart, top(k, 20)
        // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
        for (int32_t k = 0; k < pRow->numOfRows; ++k) {
          colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
        }
      }
    }
5
54liuyao 已提交
2545

2546
    pBlock->info.rows += pRow->numOfRows;
2547
    releaseOutputBuf(pState, &key, pRow);
2548 2549 2550 2551
  }
  blockDataUpdateTsWindow(pBlock, 0);
  return TSDB_CODE_SUCCESS;
}
5
54liuyao 已提交
2552 2553 2554 2555 2556 2557 2558

int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size) {
  streamStateSessionPut(pState, key, (const void*)buf, size);
  releaseOutputBuf(pState, NULL, (SResultRow*)buf);
  return TSDB_CODE_SUCCESS;
}

2559
int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
5
54liuyao 已提交
2560
                                    SExprSupp* pSup, SGroupResInfo* pGroupResInfo) {
2561
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
5
54liuyao 已提交
2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);

  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
    SSessionKey* pKey = taosArrayGet(pGroupResInfo->pRows, i);
    int32_t      size = 0;
    void*        pVal = NULL;
    int32_t      code = streamStateSessionGet(pState, pKey, &pVal, &size);
    ASSERT(code == 0);
2575 2576
    if (code == -1) {
      // coverity scan
5
54liuyao 已提交
2577
      pGroupResInfo->index += 1;
2578 2579
      continue;
    }
5
54liuyao 已提交
2580 2581 2582 2583 2584 2585 2586 2587 2588
    SResultRow* pRow = (SResultRow*)pVal;
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
    // no results, continue to check the next one
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
      releaseOutputBuf(pState, NULL, pRow);
      continue;
    }

H
Haojun Liao 已提交
2589 2590
    if (pBlock->info.id.groupId == 0) {
      pBlock->info.id.groupId = pKey->groupId;
2591

2592
      void* tbname = NULL;
H
Haojun Liao 已提交
2593
      if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
2594
        pBlock->info.parTbName[0] = 0;
2595
      } else {
2596
        memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
2597
      }
2598
      tdbFree(tbname);
5
54liuyao 已提交
2599 2600
    } else {
      // current value belongs to different group, it can't be packed into one datablock
H
Haojun Liao 已提交
2601
      if (pBlock->info.id.groupId != pKey->groupId) {
5
54liuyao 已提交
2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643
        releaseOutputBuf(pState, NULL, pRow);
        break;
      }
    }

    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
      ASSERT(pBlock->info.rows > 0);
      releaseOutputBuf(pState, NULL, pRow);
      break;
    }

    pGroupResInfo->index += 1;

    for (int32_t j = 0; j < numOfExprs; ++j) {
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
      if (pCtx[j].fpSet.finalize) {
        int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
        if (TAOS_FAILED(code1)) {
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
          T_LONG_JMP(pTaskInfo->env, code1);
        }
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
      } else {
        // expand the result into multiple rows. E.g., _wstart, top(k, 20)
        // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
        for (int32_t k = 0; k < pRow->numOfRows; ++k) {
          colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
        }
      }
    }

    pBlock->info.rows += pRow->numOfRows;
    // saveSessionDiscBuf(pState, pKey, pVal, size);
    releaseOutputBuf(pState, NULL, pRow);
  }
  blockDataUpdateTsWindow(pBlock, 0);
  return TSDB_CODE_SUCCESS;
2644
}