executorimpl.c 180.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

16
#include <executorimpl.h>
H
Haojun Liao 已提交
17 18
#include "filter.h"
#include "function.h"
19 20
#include "functionMgt.h"
#include "os.h"
H
Haojun Liao 已提交
21
#include "querynodes.h"
22
#include "tfill.h"
dengyihao's avatar
dengyihao 已提交
23
#include "tname.h"
X
Xiaoyu Wang 已提交
24
#include "tref.h"
25

H
Haojun Liao 已提交
26
#include "tdatablock.h"
27
#include "tglobal.h"
H
Haojun Liao 已提交
28
#include "tmsg.h"
H
Haojun Liao 已提交
29
#include "tsort.h"
30
#include "ttime.h"
H
Haojun Liao 已提交
31

32
#include "executorimpl.h"
dengyihao's avatar
dengyihao 已提交
33
#include "index.h"
34
#include "query.h"
35 36
#include "tcompare.h"
#include "tcompression.h"
H
Haojun Liao 已提交
37
#include "thash.h"
38
#include "ttypes.h"
dengyihao's avatar
dengyihao 已提交
39
#include "vnode.h"
40

H
Haojun Liao 已提交
41
#define IS_MAIN_SCAN(runtime)          ((runtime)->scanFlag == MAIN_SCAN)
42 43 44 45 46 47
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)

#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)

#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
wafwerar's avatar
wafwerar 已提交
48
  uint32_t v = taosRand();
49 50 51 52

  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
53
    return taosMemoryMalloc(__size);
54 55 56 57
  }
}

static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
wafwerar's avatar
wafwerar 已提交
58
  uint32_t v = taosRand();
59 60 61
  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
62
    return taosMemoryCalloc(num, __size);
63 64 65 66
  }
}

static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
wafwerar's avatar
wafwerar 已提交
67
  uint32_t v = taosRand();
68 69 70
  if (v % 5 <= 1) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
71
    return taosMemoryRealloc(p, __size);
72 73 74 75 76 77 78 79
  }
}

#define calloc  u_calloc
#define malloc  u_malloc
#define realloc u_realloc
#endif

X
Xiaoyu Wang 已提交
80
#define CLEAR_QUERY_STATUS(q, st)   ((q)->status &= (~(st)))
81 82
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)

L
Liu Jicong 已提交
83 84 85
int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; }

static int32_t getExprFunctionId(SExprInfo* pExprInfo) {
86
  assert(pExprInfo != NULL && pExprInfo->pExpr != NULL && pExprInfo->pExpr->nodeType == TEXPR_UNARYEXPR_NODE);
87
  return 0;
88 89 90 91
}

static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes);

92
static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pSDataBlock);
93

X
Xiaoyu Wang 已提交
94
static void releaseQueryBuf(size_t numOfTables);
95 96 97 98 99

static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput);
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
X
Xiaoyu Wang 已提交
100

H
Haojun Liao 已提交
101
static void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput);
H
Haojun Liao 已提交
102 103
static void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput);

104 105
static void destroyOperatorInfo(SOperatorInfo* pOperator);

106
void doSetOperatorCompleted(SOperatorInfo* pOperator) {
107
  pOperator->status = OP_EXEC_DONE;
108

109
  pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000) / 1000.0;
H
Haojun Liao 已提交
110
  if (pOperator->pTaskInfo != NULL) {
111
    setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
112 113
  }
}
114

H
Haojun Liao 已提交
115
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) {
116
  OPTR_SET_OPENED(pOperator);
117
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
118
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
119 120
}

121
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
L
Liu Jicong 已提交
122
                                   __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode,
123
                                   __optr_decode_fn_t decode, __optr_explain_fn_t explain) {
124 125 126 127 128 129 130 131 132 133 134 135 136 137
  SOperatorFpSet fpSet = {
      ._openFn = openFn,
      .getNextFn = nextFn,
      .getStreamResFn = streamFn,
      .cleanupFn = cleanup,
      .closeFn = closeFn,
      .encodeResultRow = encode,
      .decodeResultRow = decode,
      .getExplainFn = explain,
  };

  return fpSet;
}

H
Haojun Liao 已提交
138
void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
H
Haojun Liao 已提交
139

X
Xiaoyu Wang 已提交
140 141 142
static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo,
                                  SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, const int32_t* rowCellOffset,
                                  SqlFunctionCtx* pCtx, int32_t numOfExprs);
H
Haojun Liao 已提交
143

144
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
L
Liu Jicong 已提交
145 146
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput,
                                     uint64_t groupId);
147

L
Liu Jicong 已提交
148 149
// setup the output buffer for each operator
static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) {
dengyihao's avatar
dengyihao 已提交
150 151
  if (TSDB_COL_IS_TAG(pColumn->flag) || TSDB_COL_IS_UD_COL(pColumn->flag) ||
      pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
152 153 154 155 156 157 158 159 160 161
    return false;
  }

  if (pStatis != NULL && pStatis->numOfNull == 0) {
    return false;
  }

  return true;
}

162
#if 0
L
Liu Jicong 已提交
163 164
static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, char* pData,
                                int16_t bytes, bool masterscan, uint64_t uid) {
165 166 167
  bool existed = false;
  SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid);

L
Liu Jicong 已提交
168 169
  SResultRow** p1 =
      (SResultRow**)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
170 171 172 173 174 175 176 177 178 179 180

  // in case of repeat scan/reverse scan, no new time window added.
  if (QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQueryAttr)) {
    if (!masterscan) {  // the *p1 may be NULL in case of sliding+offset exists.
      return p1 != NULL;
    }

    if (p1 != NULL) {
      if (pResultRowInfo->size == 0) {
        existed = false;
      } else if (pResultRowInfo->size == 1) {
dengyihao's avatar
dengyihao 已提交
181
        //        existed = (pResultRowInfo->pResult[0] == (*p1));
182 183
      } else {  // check if current pResultRowInfo contains the existed pResultRow
        SET_RES_EXT_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid, pResultRowInfo);
L
Liu Jicong 已提交
184 185
        int64_t* index =
            taosHashGet(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes));
186 187 188 189 190 191 192 193 194 195 196 197 198
        if (index != NULL) {
          existed = true;
        } else {
          existed = false;
        }
      }
    }

    return existed;
  }

  return p1 != NULL;
}
199
#endif
200

201
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize) {
L
Liu Jicong 已提交
202
  SFilePage* pData = NULL;
203 204 205 206 207 208 209 210 211 212 213 214 215

  // in the first scan, new space needed for results
  int32_t pageId = -1;
  SIDList list = getDataBufPagesIdList(pResultBuf, tableGroupId);

  if (taosArrayGetSize(list) == 0) {
    pData = getNewBufPage(pResultBuf, tableGroupId, &pageId);
    pData->num = sizeof(SFilePage);
  } else {
    SPageInfo* pi = getLastPageInfo(list);
    pData = getBufPage(pResultBuf, getPageId(pi));
    pageId = getPageId(pi);

wmmhello's avatar
wmmhello 已提交
216
    if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
217 218 219 220 221 222 223 224 225 226 227 228 229 230
      // release current page first, and prepare the next one
      releaseBufPageInfo(pResultBuf, pi);

      pData = getNewBufPage(pResultBuf, tableGroupId, &pageId);
      if (pData != NULL) {
        pData->num = sizeof(SFilePage);
      }
    }
  }

  if (pData == NULL) {
    return NULL;
  }

231 232
  setBufPageDirty(pData, true);

233 234 235 236 237
  // set the number of rows in current disk page
  SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
  pResultRow->pageId = pageId;
  pResultRow->offset = (int32_t)pData->num;

wmmhello's avatar
wmmhello 已提交
238
  pData->num += interBufSize;
239 240 241 242

  return pResultRow;
}

243 244 245 246 247 248 249
/**
 * the struct of key in hash table
 * +----------+---------------+
 * | group id |   key data    |
 * | 8 bytes  | actual length |
 * +----------+---------------+
 */
250 251 252
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
                                   int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
                                   bool isIntervalQuery, SAggSupporter* pSup) {
253
  SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
H
Haojun Liao 已提交
254

dengyihao's avatar
dengyihao 已提交
255 256
  SResultRowPosition* p1 =
      (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
H
Haojun Liao 已提交
257

258 259
  SResultRow* pResult = NULL;

H
Haojun Liao 已提交
260 261
  // in case of repeat scan/reverse scan, no new time window added.
  if (isIntervalQuery) {
262 263
    if (masterscan && p1 != NULL) {  // the *p1 may be NULL in case of sliding+offset exists.
      pResult = getResultRowByPos(pResultBuf, p1);
264
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
265 266
    }
  } else {
dengyihao's avatar
dengyihao 已提交
267 268
    // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
    // pResultRowInfo object.
H
Haojun Liao 已提交
269
    if (p1 != NULL) {
270
      // todo
271
      pResult = getResultRowByPos(pResultBuf, p1);
272
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
273 274 275
    }
  }

L
Liu Jicong 已提交
276
  // 1. close current opened time window
277
  if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
278
#ifdef BUF_PAGE_DEBUG
wmmhello's avatar
wmmhello 已提交
279
    qDebug("page_1");
280
#endif
281
    SResultRowPosition pos = pResultRowInfo->cur;
X
Xiaoyu Wang 已提交
282
    SFilePage*         pPage = getBufPage(pResultBuf, pos.pageId);
283 284 285 286 287
    releaseBufPage(pResultBuf, pPage);
  }

  // allocate a new buffer page
  if (pResult == NULL) {
288
#ifdef BUF_PAGE_DEBUG
wmmhello's avatar
wmmhello 已提交
289
    qDebug("page_2");
290
#endif
H
Haojun Liao 已提交
291
    ASSERT(pSup->resultRowSize > 0);
292 293
    pResult = getNewResultRow(pResultBuf, groupId, pSup->resultRowSize);

294
    initResultRow(pResult);
H
Haojun Liao 已提交
295

296 297
    // add a new result set for a new group
    SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
X
Xiaoyu Wang 已提交
298 299
    taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
                sizeof(SResultRowPosition));
H
Haojun Liao 已提交
300 301
  }

302 303 304
  // 2. set the new time window to be the new active time window
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};

H
Haojun Liao 已提交
305
  // too many time window in query
306
  if (taosHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
H
Haojun Liao 已提交
307 308 309
    longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
  }

H
Haojun Liao 已提交
310
  return pResult;
H
Haojun Liao 已提交
311 312
}

313
// a new buffer page for each table. Needs to opt this design
L
Liu Jicong 已提交
314
static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, int32_t tid, uint32_t size) {
315 316 317 318
  if (pWindowRes->pageId != -1) {
    return 0;
  }

L
Liu Jicong 已提交
319
  SFilePage* pData = NULL;
320 321 322 323 324 325

  // in the first scan, new space needed for results
  int32_t pageId = -1;
  SIDList list = getDataBufPagesIdList(pResultBuf, tid);

  if (taosArrayGetSize(list) == 0) {
H
Haojun Liao 已提交
326
    pData = getNewBufPage(pResultBuf, tid, &pageId);
327
    pData->num = sizeof(SFilePage);
328 329
  } else {
    SPageInfo* pi = getLastPageInfo(list);
330
    pData = getBufPage(pResultBuf, getPageId(pi));
331
    pageId = getPageId(pi);
332

333
    if (pData->num + size > getBufPageSize(pResultBuf)) {
334
      // release current page first, and prepare the next one
335
      releaseBufPageInfo(pResultBuf, pi);
336

H
Haojun Liao 已提交
337
      pData = getNewBufPage(pResultBuf, tid, &pageId);
338
      if (pData != NULL) {
339
        pData->num = sizeof(SFilePage);
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
      }
    }
  }

  if (pData == NULL) {
    return -1;
  }

  // set the number of rows in current disk page
  if (pWindowRes->pageId == -1) {  // not allocated yet, allocate new buffer
    pWindowRes->pageId = pageId;
    pWindowRes->offset = (int32_t)pData->num;

    pData->num += size;
    assert(pWindowRes->pageId >= 0);
  }

  return 0;
}

360
//  query_range_start, query_range_end, window_duration, window_start, window_end
361
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
362 363 364
  pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
  pColData->info.bytes = sizeof(int64_t);

365
  colInfoDataEnsureCapacity(pColData, 5);
366 367 368 369 370 371 372 373 374
  colDataAppendInt64(pColData, 0, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 1, &pQueryWindow->ekey);

  int64_t interval = 0;
  colDataAppendInt64(pColData, 2, &interval);  // this value may be variable in case of 'n' and 'y'.
  colDataAppendInt64(pColData, 3, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
}

X
Xiaoyu Wang 已提交
375 376 377
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin,
                      SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol,
                      int32_t numOfTotal, int32_t numOfOutput, int32_t order) {
378
  for (int32_t k = 0; k < numOfOutput; ++k) {
H
Haojun Liao 已提交
379
    // keep it temporarily
380
    // todo no need this??
dengyihao's avatar
dengyihao 已提交
381 382
    bool    hasAgg = pCtx[k].input.colDataAggIsSet;
    int32_t numOfRows = pCtx[k].input.numOfRows;
H
Haojun Liao 已提交
383
    int32_t startOffset = pCtx[k].input.startRowIndex;
384

385
    pCtx[k].input.startRowIndex = offset;
386
    pCtx[k].input.numOfRows = forwardStep;
387 388 389

    // not a whole block involved in query processing, statistics data can not be used
    // NOTE: the original value of isSet have been changed here
390 391
    if (pCtx[k].input.colDataAggIsSet && forwardStep < numOfTotal) {
      pCtx[k].input.colDataAggIsSet = false;
392 393
    }

394 395
    if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) {
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
396 397

      char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
398

399
      SColumnInfoData idata = {0};
dengyihao's avatar
dengyihao 已提交
400
      idata.info.type = TSDB_DATA_TYPE_BIGINT;
401
      idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
dengyihao's avatar
dengyihao 已提交
402
      idata.pData = p;
403 404 405 406

      SScalarParam out = {.columnData = &idata};
      SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
      pCtx[k].sfp.process(&tw, 1, &out);
407
      pEntryInfo->numOfRes = 1;
408 409 410 411 412 413 414 415 416 417
    } else {
      int32_t code = TSDB_CODE_SUCCESS;
      if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
        code = pCtx[k].fpSet.process(&pCtx[k]);

        if (code != TSDB_CODE_SUCCESS) {
          qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
          taskInfo->code = code;
          longjmp(taskInfo->env, code);
        }
418
      }
419

420 421 422 423 424
      // restore it
      pCtx[k].input.colDataAggIsSet = hasAgg;
      pCtx[k].input.startRowIndex = startOffset;
      pCtx[k].input.numOfRows = numOfRows;
    }
425 426 427
  }
}

dengyihao's avatar
dengyihao 已提交
428
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
429
                                   int32_t scanFlag, bool createDummyCol);
430

dengyihao's avatar
dengyihao 已提交
431 432
static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock,
                                    int32_t order) {
433
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
434
    pCtx[i].order = order;
435
    pCtx[i].input.numOfRows = pBlock->info.rows;
436
    setBlockStatisInfo(&pCtx[i], &pOperator->exprSupp.pExprInfo[i], pBlock);
437 438 439
  }
}

X
Xiaoyu Wang 已提交
440 441
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
                       int32_t scanFlag, bool createDummyCol) {
442
  if (pBlock->pBlockAgg != NULL) {
H
Haojun Liao 已提交
443
    doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
444
  } else {
445
    doSetInputDataBlock(pOperator, pCtx, pBlock, order, scanFlag, createDummyCol);
H
Haojun Liao 已提交
446
  }
447 448
}

L
Liu Jicong 已提交
449 450
static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t paramIndex,
                                             int32_t numOfRows) {
451 452 453 454 455 456 457 458
  SColumnInfoData* pColInfo = NULL;
  if (pInput->pData[paramIndex] == NULL) {
    pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pColInfo == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    // Set the correct column info (data type and bytes)
459 460
    pColInfo->info.type = pFuncParam->param.nType;
    pColInfo->info.bytes = pFuncParam->param.nLen;
461 462

    pInput->pData[paramIndex] = pColInfo;
463 464
  } else {
    pColInfo = pInput->pData[paramIndex];
465 466
  }

467
  colInfoDataEnsureCapacity(pColInfo, numOfRows);
468

469
  int8_t type = pFuncParam->param.nType;
470 471
  if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
    int64_t v = pFuncParam->param.i;
dengyihao's avatar
dengyihao 已提交
472
    for (int32_t i = 0; i < numOfRows; ++i) {
473 474 475 476
      colDataAppendInt64(pColInfo, i, &v);
    }
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
dengyihao's avatar
dengyihao 已提交
477
    for (int32_t i = 0; i < numOfRows; ++i) {
478 479
      colDataAppendDouble(pColInfo, i, &v);
    }
480
  } else if (type == TSDB_DATA_TYPE_VARCHAR) {
L
Liu Jicong 已提交
481
    char* tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE);
482
    STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen);
L
Liu Jicong 已提交
483
    for (int32_t i = 0; i < numOfRows; ++i) {
484 485
      colDataAppend(pColInfo, i, tmp, false);
    }
486 487 488 489 490
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
491
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
X
Xiaoyu Wang 已提交
492
                                   int32_t scanFlag, bool createDummyCol) {
493 494
  int32_t code = TSDB_CODE_SUCCESS;

495
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
L
Liu Jicong 已提交
496
    pCtx[i].order = order;
497 498
    pCtx[i].input.numOfRows = pBlock->info.rows;

L
Liu Jicong 已提交
499
    pCtx[i].pSrcBlock = pBlock;
X
Xiaoyu Wang 已提交
500
    pCtx[i].scanFlag = scanFlag;
H
Haojun Liao 已提交
501

502
    SInputColumnInfoData* pInput = &pCtx[i].input;
503
    pInput->uid = pBlock->info.uid;
C
Cary Xu 已提交
504
    pInput->colDataAggIsSet = false;
505

506
    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
507
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
dengyihao's avatar
dengyihao 已提交
508
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
G
Ganlin Zhao 已提交
509 510
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
dengyihao's avatar
dengyihao 已提交
511
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
512 513 514
        pInput->totalRows = pBlock->info.rows;
        pInput->numOfRows = pBlock->info.rows;
        pInput->startRowIndex = 0;
515

516
        // NOTE: the last parameter is the primary timestamp column
517 518 519
        if (fmIsTimelineFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
          pInput->pPTS = pInput->pData[j];
        }
520 521
        ASSERT(pInput->pData[j] != NULL);
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
522 523 524
        // todo avoid case: top(k, 12), 12 is the value parameter.
        // sum(11), 11 is also the value parameter.
        if (createDummyCol && pOneExpr->base.numOfParams == 1) {
525 526 527 528
          pInput->totalRows = pBlock->info.rows;
          pInput->numOfRows = pBlock->info.rows;
          pInput->startRowIndex = 0;

529
          code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows);
530 531 532
          if (code != TSDB_CODE_SUCCESS) {
            return code;
          }
533
        }
G
Ganlin Zhao 已提交
534 535
      }
    }
H
Haojun Liao 已提交
536
  }
537 538

  return code;
H
Haojun Liao 已提交
539 540
}

541
static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) {
542
  for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
H
Haojun Liao 已提交
543
    if (functionNeedToExecute(&pCtx[k])) {
544
      // todo add a dummy funtion to avoid process check
545 546 547
      if (pCtx[k].fpSet.process == NULL) {
        continue;
      }
H
Haojun Liao 已提交
548

549 550 551 552
      int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
      if (code != TSDB_CODE_SUCCESS) {
        qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
        return code;
553
      }
554 555
    }
  }
556 557

  return TSDB_CODE_SUCCESS;
558 559
}

H
Haojun Liao 已提交
560
static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) {
dengyihao's avatar
dengyihao 已提交
561
  size_t num = (pPseudoList != NULL) ? taosArrayGetSize(pPseudoList) : 0;
H
Haojun Liao 已提交
562 563 564 565 566
  for (int32_t i = 0; i < num; ++i) {
    pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i);
  }
}

567
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
X
Xiaoyu Wang 已提交
568
                              int32_t numOfOutput, SArray* pPseudoList) {
H
Haojun Liao 已提交
569
  setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
H
Haojun Liao 已提交
570
  pResult->info.groupId = pSrcBlock->info.groupId;
H
Haojun Liao 已提交
571

572 573
  // if the source equals to the destination, it is to create a new column as the result of scalar
  // function or some operators.
574 575
  bool createNewColModel = (pResult == pSrcBlock);

576 577
  int32_t numOfRows = 0;

578
  for (int32_t k = 0; k < numOfOutput; ++k) {
dengyihao's avatar
dengyihao 已提交
579
    int32_t         outputSlotId = pExpr[k].base.resSchema.slotId;
580
    SqlFunctionCtx* pfCtx = &pCtx[k];
581
    SInputColumnInfoData* pInputData = &pfCtx->input;
582

L
Liu Jicong 已提交
583
    if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) {  // it is a project query
584
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
585
      if (pResult->info.rows > 0 && !createNewColModel) {
586
        colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pInputData->pData[0], pInputData->numOfRows);
587
      } else {
588
        colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info);
589
      }
590

591
      numOfRows = pInputData->numOfRows;
592
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
593
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
594

dengyihao's avatar
dengyihao 已提交
595
      int32_t offset = createNewColModel ? 0 : pResult->info.rows;
596
      for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
dengyihao's avatar
dengyihao 已提交
597 598 599
        colDataAppend(pColInfoData, i + offset,
                      taosVariantGet(&pExpr[k].base.pParam[0].param, pExpr[k].base.pParam[0].param.nType),
                      TSDB_DATA_TYPE_NULL == pExpr[k].base.pParam[0].param.nType);
600
      }
601 602

      numOfRows = pSrcBlock->info.rows;
H
Haojun Liao 已提交
603
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) {
604 605 606
      SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
      taosArrayPush(pBlockList, &pSrcBlock);

607
      SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
608
      SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
609

610
      SScalarParam dest = {.columnData = &idata};
X
Xiaoyu Wang 已提交
611
      int32_t      code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
612 613 614 615
      if (code != TSDB_CODE_SUCCESS) {
        taosArrayDestroy(pBlockList);
        return code;
      }
616

dengyihao's avatar
dengyihao 已提交
617
      int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
618
      ASSERT(pResult->info.capacity > 0);
619
      colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
620 621

      numOfRows = dest.numOfRows;
622 623
      taosArrayDestroy(pBlockList);
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
624 625
      // _rowts/_c0, not tbname column
      if (fmIsPseudoColumnFunc(pfCtx->functionId) && (!fmIsScanPseudoColumnFunc(pfCtx->functionId))) {
H
Haojun Liao 已提交
626
        // do nothing
627
      } else if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) {
628 629
        SResultRowEntryInfo* pResInfo = GET_RES_INFO(pfCtx);
        pfCtx->fpSet.init(pfCtx, pResInfo);
630 631 632 633 634 635 636 637 638 639 640

        pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
        pfCtx->offset = createNewColModel ? 0 : pResult->info.rows;  // set the start offset

        // set the timestamp(_rowts) output buffer
        if (taosArrayGetSize(pPseudoList) > 0) {
          int32_t* outputColIndex = taosArrayGet(pPseudoList, 0);
          pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
        }

        numOfRows = pfCtx->fpSet.process(pfCtx);
H
Haojun Liao 已提交
641
      } else if (fmIsAggFunc(pfCtx->functionId)) {
642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657
        // _group_key function for "partition by tbname" + csum(col_name) query
        SColumnInfoData* pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
        int32_t slotId = pfCtx->param[0].pCol->slotId;

        // todo handle the json tag
        SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
        for(int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
          bool isNull = colDataIsNull_s(pInput, f);
          if (isNull) {
            colDataAppendNULL(pOutput, pResult->info.rows + f);
          } else {
            char* data = colDataGetData(pInput, f);
            colDataAppend(pOutput, pResult->info.rows + f, data, isNull);
          }
        }

H
Haojun Liao 已提交
658 659 660
      } else {
        SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
        taosArrayPush(pBlockList, &pSrcBlock);
G
Ganlin Zhao 已提交
661

662
        SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
663
        SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
H
Haojun Liao 已提交
664

665
        SScalarParam dest = {.columnData = &idata};
X
Xiaoyu Wang 已提交
666
        int32_t      code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
667 668 669 670
        if (code != TSDB_CODE_SUCCESS) {
          taosArrayDestroy(pBlockList);
          return code;
        }
671

dengyihao's avatar
dengyihao 已提交
672
        int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
673
        ASSERT(pResult->info.capacity > 0);
674
        colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
675 676

        numOfRows = dest.numOfRows;
H
Haojun Liao 已提交
677 678
        taosArrayDestroy(pBlockList);
      }
679
    } else {
680
      ASSERT(0);
681 682
    }
  }
683

684 685 686
  if (!createNewColModel) {
    pResult->info.rows += numOfRows;
  }
687 688

  return TSDB_CODE_SUCCESS;
689 690
}

5
54liuyao 已提交
691
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
692
  struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
693

694 695 696 697 698
  // in case of timestamp column, always generated results.
  int32_t functionId = pCtx->functionId;
  if (functionId == -1) {
    return false;
  }
699

700 701
  if (pCtx->scanFlag == REPEAT_SCAN) {
    return fmIsRepeatScanFunc(pCtx->functionId);
702 703
  }

704 705
  if (isRowEntryCompleted(pResInfo)) {
    return false;
706 707
  }

708 709 710
  return true;
}

711 712 713 714 715 716 717
static int32_t doCreateConstantValColumnAggInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type,
                                                int32_t paramIndex, int32_t numOfRows) {
  if (pInput->pData[paramIndex] == NULL) {
    pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pInput->pData[paramIndex] == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
718

719 720 721
    // Set the correct column info (data type and bytes)
    pInput->pData[paramIndex]->info.type = type;
    pInput->pData[paramIndex]->info.bytes = tDataTypes[type].bytes;
722
  }
H
Haojun Liao 已提交
723

724 725 726 727 728 729
  SColumnDataAgg* da = NULL;
  if (pInput->pColumnDataAgg[paramIndex] == NULL) {
    da = taosMemoryCalloc(1, sizeof(SColumnDataAgg));
    pInput->pColumnDataAgg[paramIndex] = da;
    if (da == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
730 731
    }
  } else {
732
    da = pInput->pColumnDataAgg[paramIndex];
733 734
  }

735
  ASSERT(!IS_VAR_DATA_TYPE(type));
736

737 738
  if (type == TSDB_DATA_TYPE_BIGINT) {
    int64_t v = pFuncParam->param.i;
739
    *da = (SColumnDataAgg){.numOfNull = 0, .min = v, .max = v, .sum = v * numOfRows};
740 741
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
742
    *da = (SColumnDataAgg){.numOfNull = 0};
743

744 745 746 747 748 749
    *(double*)&da->min = v;
    *(double*)&da->max = v;
    *(double*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_BOOL) {  // todo validate this data type
    bool v = pFuncParam->param.i;

750
    *da = (SColumnDataAgg){.numOfNull = 0};
751 752 753 754 755
    *(bool*)&da->min = 0;
    *(bool*)&da->max = v;
    *(bool*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_TIMESTAMP) {
    // do nothing
756
  } else {
757
    ASSERT(0);
758 759
  }

760 761
  return TSDB_CODE_SUCCESS;
}
762 763 764 765 766 767 768 769 770 771 772

void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock) {
  int32_t numOfRows = pBlock->info.rows;

  SInputColumnInfoData* pInput = &pCtx->input;
  pInput->numOfRows = numOfRows;
  pInput->totalRows = numOfRows;

  if (pBlock->pBlockAgg != NULL) {
    pInput->colDataAggIsSet = true;

773 774
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
775

776 777
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
778 779 780 781
        pInput->pColumnDataAgg[j] = pBlock->pBlockAgg[slotId];
        if (pInput->pColumnDataAgg[j] == NULL) {
          pInput->colDataAggIsSet = false;
        }
782 783 784 785

        // Here we set the column info data since the data type for each column data is required, but
        // the data in the corresponding SColumnInfoData will not be used.
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
786 787
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
        doCreateConstantValColumnAggInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
788 789
      }
    }
790
  } else {
791
    pInput->colDataAggIsSet = false;
792 793 794
  }

  // set the statistics data for primary time stamp column
795 796 797 798 799
  //  if (pCtx->functionId == FUNCTION_SPREAD && pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
  //    pCtx->isAggSet = true;
  //    pCtx->agg.min = pBlock->info.window.skey;
  //    pCtx->agg.max = pBlock->info.window.ekey;
  //  }
800 801
}

L
Liu Jicong 已提交
802
bool isTaskKilled(SExecTaskInfo* pTaskInfo) {
803 804
  // query has been executed more than tsShellActivityTimer, and the retrieve has not arrived
  // abort current query execution.
L
Liu Jicong 已提交
805 806
  if (pTaskInfo->owner != 0 &&
      ((taosGetTimestampSec() - pTaskInfo->cost.start / 1000) > 10 * getMaximumIdleDurationSec())
807 808
      /*(!needBuildResAfterQueryComplete(pTaskInfo))*/) {
    assert(pTaskInfo->cost.start != 0);
L
Liu Jicong 已提交
809 810 811
    //    qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64
    //           ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec());
    //    return true;
812 813 814 815 816
  }

  return false;
}

L
Liu Jicong 已提交
817
void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED; }
818 819

/////////////////////////////////////////////////////////////////////////////////////////////
L
Liu Jicong 已提交
820
// todo refactor : return window
821
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win) {
H
Haojun Liao 已提交
822
  win->skey = taosTimeTruncate(key, pInterval, precision);
823 824

  /*
H
Haojun Liao 已提交
825
   * if the realSkey > INT64_MAX - pInterval->interval, the query duration between
826 827
   * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
   */
828 829
  win->ekey = taosTimeAdd(win->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
  if (win->ekey < win->skey) {
830 831 832 833
    win->ekey = INT64_MAX;
  }
}

834
#if 0
L
Liu Jicong 已提交
835
static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
836

837 838 839
  bool hasFirstLastFunc = false;
  bool hasOtherFunc = false;

840
  if (status == BLK_DATA_DATA_LOAD || status == BLK_DATA_FILTEROUT) {
841 842 843 844 845
    return status;
  }

  for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
    int32_t functionId = getExprFunctionId(&pQuery->pExpr1[i]);
846

847 848 849 850 851 852 853 854 855 856
    if (functionId == FUNCTION_TS || functionId == FUNCTION_TS_DUMMY || functionId == FUNCTION_TAG ||
        functionId == FUNCTION_TAG_DUMMY) {
      continue;
    }

    if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_LAST_DST) {
      hasFirstLastFunc = true;
    } else {
      hasOtherFunc = true;
    }
857

858 859
  }

860
  if (hasFirstLastFunc && status == BLK_DATA_NOT_LOAD) {
L
Liu Jicong 已提交
861
    if (!hasOtherFunc) {
862
      return BLK_DATA_FILTEROUT;
863
    } else {
864
      return BLK_DATA_DATA_LOAD;
865 866 867 868 869 870
    }
  }

  return status;
}

871 872
#endif

L
Liu Jicong 已提交
873 874
// static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableReq* pQueryMsg, bool stableQuery) {
//   STaskAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
H
Haojun Liao 已提交
875
//
L
Liu Jicong 已提交
876 877 878 879
//   // in case of point-interpolation query, use asc order scan
//   char msg[] = "QInfo:0x%"PRIx64" scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%"
//   PRId64
//                "-%" PRId64 ", new qrange:%" PRId64 "-%" PRId64;
H
Haojun Liao 已提交
880
//
L
Liu Jicong 已提交
881 882 883 884 885
//   // todo handle the case the the order irrelevant query type mixed up with order critical query type
//   // descending order query for last_row query
//   if (isFirstLastRowQuery(pQueryAttr)) {
//     //qDebug("QInfo:0x%"PRIx64" scan order changed for last_row query, old:%d, new:%d", pQInfo->qId,
//     pQueryAttr->order.order, TSDB_ORDER_ASC);
H
Haojun Liao 已提交
886
//
L
Liu Jicong 已提交
887 888
//     pQueryAttr->order.order = TSDB_ORDER_ASC;
//     if (pQueryAttr->window.skey > pQueryAttr->window.ekey) {
wafwerar's avatar
wafwerar 已提交
889
//       TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
L
Liu Jicong 已提交
890
//     }
H
Haojun Liao 已提交
891
//
L
Liu Jicong 已提交
892 893 894
//     pQueryAttr->needReverseScan = false;
//     return;
//   }
H
Haojun Liao 已提交
895
//
L
Liu Jicong 已提交
896 897 898
//   if (pQueryAttr->groupbyColumn && pQueryAttr->order.order == TSDB_ORDER_DESC) {
//     pQueryAttr->order.order = TSDB_ORDER_ASC;
//     if (pQueryAttr->window.skey > pQueryAttr->window.ekey) {
wafwerar's avatar
wafwerar 已提交
899
//       TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
L
Liu Jicong 已提交
900
//     }
H
Haojun Liao 已提交
901
//
L
Liu Jicong 已提交
902 903 904 905
//     pQueryAttr->needReverseScan = false;
//     doUpdateLastKey(pQueryAttr);
//     return;
//   }
H
Haojun Liao 已提交
906
//
L
Liu Jicong 已提交
907 908 909 910 911 912
//   if (pQueryAttr->pointInterpQuery && pQueryAttr->interval.interval == 0) {
//     if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
//       //qDebug(msg, pQInfo->qId, "interp", pQueryAttr->order.order, TSDB_ORDER_ASC, pQueryAttr->window.skey,
//       pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey); TSWAP(pQueryAttr->window.skey,
//       pQueryAttr->window.ekey, TSKEY);
//     }
H
Haojun Liao 已提交
913
//
L
Liu Jicong 已提交
914 915 916
//     pQueryAttr->order.order = TSDB_ORDER_ASC;
//     return;
//   }
H
Haojun Liao 已提交
917
//
L
Liu Jicong 已提交
918 919 920 921
//   if (pQueryAttr->interval.interval == 0) {
//     if (onlyFirstQuery(pQueryAttr)) {
//       if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
//         //qDebug(msg, pQInfo->qId, "only-first", pQueryAttr->order.order, TSDB_ORDER_ASC, pQueryAttr->window.skey,
H
Haojun Liao 已提交
922 923
////               pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
//
wafwerar's avatar
wafwerar 已提交
924
//        TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
H
Haojun Liao 已提交
925 926 927 928 929 930 931 932 933 934
//        doUpdateLastKey(pQueryAttr);
//      }
//
//      pQueryAttr->order.order = TSDB_ORDER_ASC;
//      pQueryAttr->needReverseScan = false;
//    } else if (onlyLastQuery(pQueryAttr) && notContainSessionOrStateWindow(pQueryAttr)) {
//      if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//        //qDebug(msg, pQInfo->qId, "only-last", pQueryAttr->order.order, TSDB_ORDER_DESC, pQueryAttr->window.skey,
////               pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
//
wafwerar's avatar
wafwerar 已提交
935
//        TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
H
Haojun Liao 已提交
936 937 938 939 940 941 942 943 944 945 946 947
//        doUpdateLastKey(pQueryAttr);
//      }
//
//      pQueryAttr->order.order = TSDB_ORDER_DESC;
//      pQueryAttr->needReverseScan = false;
//    }
//
//  } else {  // interval query
//    if (stableQuery) {
//      if (onlyFirstQuery(pQueryAttr)) {
//        if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
//          //qDebug(msg, pQInfo->qId, "only-first stable", pQueryAttr->order.order, TSDB_ORDER_ASC,
L
Liu Jicong 已提交
948 949
////                 pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey,
/// pQueryAttr->window.skey);
H
Haojun Liao 已提交
950
//
wafwerar's avatar
wafwerar 已提交
951
//          TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
H
Haojun Liao 已提交
952 953 954 955 956 957 958 959
//          doUpdateLastKey(pQueryAttr);
//        }
//
//        pQueryAttr->order.order = TSDB_ORDER_ASC;
//        pQueryAttr->needReverseScan = false;
//      } else if (onlyLastQuery(pQueryAttr)) {
//        if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//          //qDebug(msg, pQInfo->qId, "only-last stable", pQueryAttr->order.order, TSDB_ORDER_DESC,
L
Liu Jicong 已提交
960 961
////                 pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey,
/// pQueryAttr->window.skey);
H
Haojun Liao 已提交
962
//
wafwerar's avatar
wafwerar 已提交
963
//          TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
H
Haojun Liao 已提交
964 965 966 967 968 969 970 971 972
//          doUpdateLastKey(pQueryAttr);
//        }
//
//        pQueryAttr->order.order = TSDB_ORDER_DESC;
//        pQueryAttr->needReverseScan = false;
//      }
//    }
//  }
//}
973

L
Liu Jicong 已提交
974 975 976
// static FORCE_INLINE bool doFilterByBlockStatistics(STaskRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis,
// SqlFunctionCtx *pCtx, int32_t numOfRows) {
//   STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
977
//
L
Liu Jicong 已提交
978 979 980
//   if (pDataStatis == NULL || pQueryAttr->pFilters == NULL) {
//     return true;
//   }
981
//
L
Liu Jicong 已提交
982 983
//   return filterRangeExecute(pQueryAttr->pFilters, pDataStatis, pQueryAttr->numOfCols, numOfRows);
// }
984
#if 0
H
Haojun Liao 已提交
985
static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockInfo) {
986 987
  STimeWindow w = {0};

dengyihao's avatar
dengyihao 已提交
988 989
  TSKEY sk = TMIN(pQueryAttr->window.skey, pQueryAttr->window.ekey);
  TSKEY ek = TMAX(pQueryAttr->window.skey, pQueryAttr->window.ekey);
990

991
  if (true) {
L
Liu Jicong 已提交
992
    //    getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.skey, sk, ek, &w);
993 994 995 996 997 998
    assert(w.ekey >= pBlockInfo->window.skey);

    if (w.ekey < pBlockInfo->window.ekey) {
      return true;
    }

L
Liu Jicong 已提交
999 1000
    while (1) {
      //      getNextTimeWindow(pQueryAttr, &w);
1001 1002 1003 1004 1005 1006 1007 1008 1009 1010
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

      assert(w.ekey > pBlockInfo->window.ekey);
      if (w.skey <= pBlockInfo->window.ekey && w.skey > pBlockInfo->window.skey) {
        return true;
      }
    }
  } else {
L
Liu Jicong 已提交
1011
    //    getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.ekey, sk, ek, &w);
1012 1013 1014 1015 1016 1017
    assert(w.skey <= pBlockInfo->window.ekey);

    if (w.skey > pBlockInfo->window.skey) {
      return true;
    }

L
Liu Jicong 已提交
1018 1019
    while (1) {
      //      getNextTimeWindow(pQueryAttr, &w);
1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

      assert(w.skey < pBlockInfo->window.skey);
      if (w.ekey < pBlockInfo->window.ekey && w.ekey >= pBlockInfo->window.skey) {
        return true;
      }
    }
  }

  return false;
}
1033
#endif
1034 1035

static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) {
H
Haojun Liao 已提交
1036
#if 0
H
Haojun Liao 已提交
1037
  SqlFunctionCtx* pCtx = pTableScanInfo->pCtx;
1038
  uint32_t        status = BLK_DATA_NOT_LOAD;
1039

L
Liu Jicong 已提交
1040
  int32_t numOfOutput = 0;  // pTableScanInfo->numOfOutput;
1041 1042
  for (int32_t i = 0; i < numOfOutput; ++i) {
    int32_t functionId = pCtx[i].functionId;
H
Haojun Liao 已提交
1043
    int32_t colId = pTableScanInfo->pExpr[i].base.pParam[0].pCol->colId;
1044 1045 1046

    // group by + first/last should not apply the first/last block filter
    if (functionId < 0) {
1047
      status |= BLK_DATA_DATA_LOAD;
1048 1049
      return status;
    } else {
L
Liu Jicong 已提交
1050
      //      status |= aAggs[functionId].dataReqFunc(&pTableScanInfo->pCtx[i], &pBlock->info.window, colId);
1051
      //      if ((status & BLK_DATA_DATA_LOAD) == BLK_DATA_DATA_LOAD) {
L
Liu Jicong 已提交
1052 1053
      //        return status;
      //      }
1054 1055 1056 1057
    }
  }

  return status;
H
Haojun Liao 已提交
1058 1059
#endif
  return 0;
1060 1061
}

L
Liu Jicong 已提交
1062 1063
int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
                              uint32_t* status) {
1064
  *status = BLK_DATA_NOT_LOAD;
1065

H
Haojun Liao 已提交
1066
  pBlock->pDataBlock = NULL;
L
Liu Jicong 已提交
1067
  pBlock->pBlockAgg = NULL;
H
Haojun Liao 已提交
1068

L
Liu Jicong 已提交
1069 1070
  //  int64_t groupId = pRuntimeEnv->current->groupIndex;
  //  bool    ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
1071

H
Haojun Liao 已提交
1072
  STaskCostInfo* pCost = &pTaskInfo->cost;
1073

1074 1075
//  pCost->totalBlocks += 1;
//  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
1076
#if 0
1077 1078 1079
  // Calculate all time windows that are overlapping or contain current data block.
  // If current data block is contained by all possible time window, do not load current data block.
  if (/*pQueryAttr->pFilters || */pQueryAttr->groupbyColumn || pQueryAttr->sw.gap > 0 ||
H
Haojun Liao 已提交
1080
      (QUERY_IS_INTERVAL_QUERY(pQueryAttr) && overlapWithTimeWindow(pTaskInfo, &pBlock->info))) {
1081
    (*status) = BLK_DATA_DATA_LOAD;
1082 1083 1084
  }

  // check if this data block is required to load
1085
  if ((*status) != BLK_DATA_DATA_LOAD) {
1086 1087 1088 1089 1090 1091 1092
    bool needFilter = true;

    // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
    // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
    if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
      SResultRow* pResult = NULL;

H
Haojun Liao 已提交
1093
      bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
1094 1095 1096 1097 1098 1099
      TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

      STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
      if (pQueryAttr->pointInterpQuery) {
        needFilter = chkWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
1100
                                    pTableScanInfo->rowEntryInfoOffset);
1101 1102 1103
      } else {
        if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId,
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
1104
                                    pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
1105 1106 1107 1108 1109
          longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }
    } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate
      doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx,
1110
                               pTableScanInfo->rowEntryInfoOffset, pTableScanInfo->numOfOutput,
1111 1112 1113 1114 1115 1116
                               pRuntimeEnv->current->groupIndex);
    }

    if (needFilter) {
      (*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock);
    } else {
1117
      (*status) = BLK_DATA_DATA_LOAD;
1118 1119 1120 1121
    }
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
H
Haojun Liao 已提交
1122
//  *status = updateBlockLoadStatus(pRuntimeEnv->pQueryAttr, *status);
1123

1124
  if ((*status) == BLK_DATA_NOT_LOAD || (*status) == BLK_DATA_FILTEROUT) {
1125 1126
    //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//           pBlockInfo->window.ekey, pBlockInfo->rows);
1127
    pCost->skipBlocks += 1;
1128
  } else if ((*status) == BLK_DATA_SMA_LOAD) {
1129 1130
    // this function never returns error?
    pCost->loadBlockStatis += 1;
1131
//    tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
1132 1133

    if (pBlock->pBlockAgg == NULL) {  // data block statistics does not exist, load data block
1134
//      pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
1135 1136 1137
      pCost->totalCheckedRows += pBlock->info.rows;
    }
  } else {
1138
    assert((*status) == BLK_DATA_DATA_LOAD);
1139 1140 1141

    // load the data block statistics to perform further filter
    pCost->loadBlockStatis += 1;
1142
//    tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
1143 1144 1145 1146 1147 1148

    if (pQueryAttr->topBotQuery && pBlock->pBlockAgg != NULL) {
      { // set previous window
        if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
          SResultRow* pResult = NULL;

H
Haojun Liao 已提交
1149
          bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
1150 1151 1152 1153 1154
          TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

          STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
          if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId,
                                      pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
1155
                                      pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166
            longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
          }
        }
      }
      bool load = false;
      for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
        int32_t functionId = pTableScanInfo->pCtx[i].functionId;
        if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM) {
//          load = topbot_datablock_filter(&pTableScanInfo->pCtx[i], (char*)&(pBlock->pBlockAgg[i].min),
//                                         (char*)&(pBlock->pBlockAgg[i].max));
          if (!load) { // current block has been discard due to filter applied
1167
            pCost->skipBlocks += 1;
1168 1169
            //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId,
//                   pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
1170
            (*status) = BLK_DATA_FILTEROUT;
1171 1172 1173 1174 1175 1176 1177 1178
            return TSDB_CODE_SUCCESS;
          }
        }
      }
    }

    // current block has been discard due to filter applied
//    if (!doFilterByBlockStatistics(pRuntimeEnv, pBlock->pBlockAgg, pTableScanInfo->pCtx, pBlockInfo->rows)) {
1179
//      pCost->skipBlocks += 1;
1180 1181
//      qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//             pBlockInfo->window.ekey, pBlockInfo->rows);
1182
//      (*status) = BLK_DATA_FILTEROUT;
1183 1184 1185 1186 1187
//      return TSDB_CODE_SUCCESS;
//    }

    pCost->totalCheckedRows += pBlockInfo->rows;
    pCost->loadBlocks += 1;
1188
//    pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
1189 1190 1191 1192 1193
//    if (pBlock->pDataBlock == NULL) {
//      return terrno;
//    }

//    if (pQueryAttr->pFilters != NULL) {
1194
//      filterSetColFieldData(pQueryAttr->pFilters, taosArrayGetSize(pBlock->pDataBlock), pBlock->pDataBlock);
1195
//    }
1196

1197 1198 1199 1200
//    if (pQueryAttr->pFilters != NULL || pRuntimeEnv->pTsBuf != NULL) {
//      filterColRowsInDataBlock(pRuntimeEnv, pBlock, ascQuery);
//    }
  }
H
Haojun Liao 已提交
1201
#endif
1202 1203 1204
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1205
static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo) {
1206 1207 1208 1209
  if (pTableQueryInfo == NULL) {
    return;
  }

wafwerar's avatar
wafwerar 已提交
1210
  //  TSWAP(pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey);
L
Liu Jicong 已提交
1211
  //  pTableQueryInfo->lastKey = pTableQueryInfo->win.skey;
1212

L
Liu Jicong 已提交
1213 1214
  //  SWITCH_ORDER(pTableQueryInfo->cur.order);
  //  pTableQueryInfo->cur.vgroupIndex = -1;
1215 1216

  // set the index to be the end slot of result rows array
dengyihao's avatar
dengyihao 已提交
1217 1218 1219 1220 1221 1222
  //  SResultRowInfo* pResultRowInfo = &pTableQueryInfo->resInfo;
  //  if (pResultRowInfo->size > 0) {
  //    pResultRowInfo->curPos = pResultRowInfo->size - 1;
  //  } else {
  //    pResultRowInfo->curPos = -1;
  //  }
1223 1224
}

H
Haojun Liao 已提交
1225
void initResultRow(SResultRow* pResultRow) {
X
Xiaoyu Wang 已提交
1226
  //  pResultRow->pEntryInfo = (struct SResultRowEntryInfo*)((char*)pResultRow + sizeof(SResultRow));
1227 1228 1229 1230 1231
}

/*
 * The start of each column SResultRowEntryInfo is denote by RowCellInfoOffset.
 * Note that in case of top/bottom query, the whole multiple rows of result is treated as only one row of results.
H
Haojun Liao 已提交
1232 1233 1234
 * +------------+-----------------result column 1------------+------------------result column 2-----------+
 * | SResultRow | SResultRowEntryInfo | intermediate buffer1 | SResultRowEntryInfo | intermediate buffer 2|
 * +------------+--------------------------------------------+--------------------------------------------+
1235 1236
 *           offset[0]                                  offset[1]                                   offset[2]
 */
1237
// TODO refactor: some function move away
L
Liu Jicong 已提交
1238 1239 1240
void setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage,
                             int32_t numOfExprs) {
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
1241 1242
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
H
Haojun Liao 已提交
1243

H
Haojun Liao 已提交
1244
  SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
1245
  initResultRowInfo(pResultRowInfo);
H
Haojun Liao 已提交
1246

L
Liu Jicong 已提交
1247 1248
  int64_t     tid = 0;
  int64_t     groupId = 0;
1249 1250
  SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, (char*)&tid, sizeof(tid), true, groupId,
                                            pTaskInfo, false, pSup);
H
Haojun Liao 已提交
1251

1252
  for (int32_t i = 0; i < numOfExprs; ++i) {
1253
    struct SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, rowEntryInfoOffset);
H
Haojun Liao 已提交
1254 1255
    cleanupResultRowEntry(pEntry);

L
Liu Jicong 已提交
1256
    pCtx[i].resultInfo = pEntry;
1257
    pCtx[i].scanFlag = stage;
H
Haojun Liao 已提交
1258 1259
  }

1260
  initCtxOutputBuffer(pCtx, numOfExprs);
H
Haojun Liao 已提交
1261 1262
}

H
Haojun Liao 已提交
1263
void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) {
1264 1265
  for (int32_t j = 0; j < size; ++j) {
    struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
dengyihao's avatar
dengyihao 已提交
1266 1267
    if (isRowEntryInitialized(pResInfo) || fmIsPseudoColumnFunc(pCtx[j].functionId) || pCtx[j].functionId == -1 ||
        fmIsScalarFunc(pCtx[j].functionId)) {
1268 1269 1270
      continue;
    }

H
Haojun Liao 已提交
1271
    pCtx[j].fpSet.init(&pCtx[j], pCtx[j].resultInfo);
1272 1273 1274
  }
}

L
Liu Jicong 已提交
1275
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
1276
  if (status == TASK_NOT_COMPLETED) {
H
Haojun Liao 已提交
1277
    pTaskInfo->status = status;
1278 1279
  } else {
    // QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first
1280
    CLEAR_QUERY_STATUS(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
1281
    pTaskInfo->status |= status;
1282 1283 1284
  }
}

L
Liu Jicong 已提交
1285
void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) {
1286 1287 1288 1289
  if (pTableQueryInfo == NULL) {
    return;
  }

L
Liu Jicong 已提交
1290
  //  taosVariantDestroy(&pTableQueryInfo->tag);
dengyihao's avatar
dengyihao 已提交
1291
  //  cleanupResultRowInfo(&pTableQueryInfo->resInfo);
1292 1293
}

1294
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
1295
  for (int32_t i = 0; i < numOfOutput; ++i) {
1296
    pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
1297 1298 1299 1300 1301

    struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
    if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
      continue;
    }
1302 1303 1304 1305 1306

    if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
      continue;
    }

1307 1308 1309 1310 1311 1312
    if (!pResInfo->initialized) {
      if (pCtx[i].functionId != -1) {
        pCtx[i].fpSet.init(&pCtx[i], pResInfo);
      } else {
        pResInfo->initialized = true;
      }
1313 1314 1315 1316
    }
  }
}

H
Haojun Liao 已提交
1317
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep);
1318

1319
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
1320 1321 1322 1323 1324
  if (pFilterNode == NULL) {
    return;
  }

  SFilterInfo* filter = NULL;
H
Haojun Liao 已提交
1325

H
Haojun Liao 已提交
1326
  // todo move to the initialization function
H
Haojun Liao 已提交
1327
  int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
1328

1329
  size_t             numOfCols = taosArrayGetSize(pBlock->pDataBlock);
1330
  SFilterColumnParam param1 = {.numOfCols = numOfCols, .pDataBlock = pBlock->pDataBlock};
1331 1332 1333
  code = filterSetDataFromSlotId(filter, &param1);

  int8_t* rowRes = NULL;
1334

1335
  // todo the keep seems never to be True??
1336
  bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols);
D
dapan1121 已提交
1337
  filterFreeInfo(filter);
1338

H
Haojun Liao 已提交
1339
  extractQualifiedTupleByFilterResult(pBlock, rowRes, keep);
1340
  blockDataUpdateTsWindow(pBlock, 0);
H
Haojun Liao 已提交
1341 1342

  taosMemoryFree(rowRes);
1343 1344
}

H
Haojun Liao 已提交
1345
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep) {
1346 1347 1348 1349 1350
  if (keep) {
    return;
  }

  if (rowRes != NULL) {
L
Liu Jicong 已提交
1351
    int32_t      totalRows = pBlock->info.rows;
1352
    SSDataBlock* px = createOneDataBlock(pBlock, true);
1353

1354 1355
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
    for (int32_t i = 0; i < numOfCols; ++i) {
1356 1357
      SColumnInfoData* pSrc = taosArrayGet(px->pDataBlock, i);
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
1358
      // it is a reserved column for scalar function, and no data in this column yet.
1359
      if (pDst->pData == NULL || pSrc->pData == NULL) {
1360 1361 1362
        continue;
      }

1363 1364
      colInfoDataCleanup(pDst, pBlock->info.rows);

1365
      int32_t numOfRows = 0;
1366
      for (int32_t j = 0; j < totalRows; ++j) {
D
dapan1121 已提交
1367 1368 1369
        if (rowRes[j] == 0) {
          continue;
        }
1370

D
dapan1121 已提交
1371
        if (colDataIsNull_s(pSrc, j)) {
1372
          colDataAppendNULL(pDst, numOfRows);
D
dapan1121 已提交
1373
        } else {
1374
          colDataAppend(pDst, numOfRows, colDataGetData(pSrc, j), false);
D
dapan1121 已提交
1375
        }
1376
        numOfRows += 1;
H
Haojun Liao 已提交
1377
      }
1378

1379 1380 1381 1382 1383
      if (pBlock->info.rows == totalRows) {
        pBlock->info.rows = numOfRows;
      } else {
        ASSERT(pBlock->info.rows == numOfRows);
      }
1384
    }
1385

dengyihao's avatar
dengyihao 已提交
1386
    blockDataDestroy(px);  // fix memory leak
1387 1388 1389
  } else {
    // do nothing
    pBlock->info.rows = 0;
1390 1391 1392
  }
}

L
Liu Jicong 已提交
1393 1394
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput,
                              uint64_t groupId) {
1395
  // for simple group by query without interval, all the tables belong to one group result.
L
Liu Jicong 已提交
1396
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
1397
  SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
1398 1399
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
1400

1401
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId,
L
Liu Jicong 已提交
1402
                                                  sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
L
Liu Jicong 已提交
1403
  assert(pResultRow != NULL);
1404 1405 1406 1407 1408 1409

  /*
   * not assign result buffer yet, add new result buffer
   * all group belong to one result set, and each group result has different group id so set the id to be one
   */
  if (pResultRow->pageId == -1) {
dengyihao's avatar
dengyihao 已提交
1410 1411
    int32_t ret =
        addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, groupId, pAggInfo->binfo.pRes->info.rowSize);
1412 1413 1414 1415 1416
    if (ret != TSDB_CODE_SUCCESS) {
      return;
    }
  }

1417
  setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
1418 1419
}

1420
void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId, SAggOperatorInfo* pAggInfo) {
H
Haojun Liao 已提交
1421
  if (pAggInfo->groupId != INT32_MIN && pAggInfo->groupId == groupId) {
1422 1423
    return;
  }
1424
#ifdef BUF_PAGE_DEBUG
L
Liu Jicong 已提交
1425
  qDebug("page_setbuf, groupId:%" PRIu64, groupId);
1426
#endif
1427
  doSetTableGroupOutputBuf(pOperator, pAggInfo, numOfOutput, groupId);
1428 1429

  // record the current active group id
H
Haojun Liao 已提交
1430
  pAggInfo->groupId = groupId;
1431 1432
}

1433 1434
static void doUpdateNumOfRows(SResultRow* pRow, int32_t numOfExprs, const int32_t* rowCellOffset) {
  for (int32_t j = 0; j < numOfExprs; ++j) {
1435
    struct SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowCellOffset);
1436 1437 1438 1439 1440 1441 1442 1443 1444 1445
    if (!isRowEntryInitialized(pResInfo)) {
      continue;
    }

    if (pRow->numOfRows < pResInfo->numOfRes) {
      pRow->numOfRows = pResInfo->numOfRes;
    }
  }
}

1446
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
S
shenglian zhou 已提交
1447 1448 1449
                                             SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs,
                                             const int32_t* rowCellOffset, SSDataBlock* pBlock,
                                             SExecTaskInfo* pTaskInfo) {
1450 1451 1452 1453 1454 1455 1456 1457 1458
  SFilePage*  page = getBufPage(pBuf, resultRowPosition->pageId);
  SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);

  doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset);
  if (pRow->numOfRows == 0) {
    releaseBufPage(pBuf, page);
    return 0;
  }

1459 1460 1461 1462 1463 1464 1465
  while (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
    int32_t code = blockDataEnsureCapacity(pBlock, pBlock->info.capacity * 1.25);
    if (TAOS_FAILED(code)) {
      releaseBufPage(pBuf, page);
      qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
      longjmp(pTaskInfo->env, code);
    }
1466 1467 1468 1469 1470
  }

  for (int32_t j = 0; j < numOfExprs; ++j) {
    int32_t slotId = pExprInfo[j].base.resSchema.slotId;

1471
    pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowCellOffset);
1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491
    if (pCtx[j].fpSet.finalize) {
      int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
      if (TAOS_FAILED(code)) {
        qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
        longjmp(pTaskInfo->env, code);
      }
    } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
      // do nothing, todo refactor
    } else {
      // expand the result into multiple rows. E.g., _wstartts, top(k, 20)
      // the _wstartts needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
      char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
      for (int32_t k = 0; k < pRow->numOfRows; ++k) {
        colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
      }
    }
  }

  releaseBufPage(pBuf, page);
1492
  pBlock->info.rows += pRow->numOfRows;
1493 1494 1495 1496

  return 0;
}

X
Xiaoyu Wang 已提交
1497 1498 1499
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf,
                           SGroupResInfo* pGroupResInfo, const int32_t* rowCellOffset, SqlFunctionCtx* pCtx,
                           int32_t numOfExprs) {
1500
  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
1501
  int32_t start = pGroupResInfo->index;
1502
#ifdef BUF_PAGE_DEBUG
wmmhello's avatar
wmmhello 已提交
1503
  qDebug("\npage_copytoblock rows:%d", numOfRows);
1504
#endif
1505
  for (int32_t i = start; i < numOfRows; i += 1) {
L
Liu Jicong 已提交
1506 1507
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
    SFilePage*  page = getBufPage(pBuf, pPos->pos.pageId);
1508
#ifdef BUF_PAGE_DEBUG
wmmhello's avatar
wmmhello 已提交
1509
    qDebug("page_copytoblock pos pageId:%d, offset:%d", pPos->pos.pageId, pPos->pos.offset);
1510
#endif
1511
    SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
1512 1513

    doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset);
1514 1515
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
1516
      releaseBufPage(pBuf, page);
1517 1518 1519
      continue;
    }

1520 1521 1522 1523 1524
    if (pBlock->info.groupId == 0) {
      pBlock->info.groupId = pPos->groupId;
    } else {
      // current value belongs to different group, it can't be packed into one datablock
      if (pBlock->info.groupId != pPos->groupId) {
1525
        releaseBufPage(pBuf, page);
1526 1527 1528 1529
        break;
      }
    }

1530
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
1531
      releaseBufPage(pBuf, page);
1532 1533 1534 1535 1536
      break;
    }

    pGroupResInfo->index += 1;

1537
    for (int32_t j = 0; j < numOfExprs; ++j) {
1538 1539
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

1540
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowCellOffset);
1541
      if (pCtx[j].fpSet.finalize) {
1542
#ifdef BUF_PAGE_DEBUG
wmmhello's avatar
wmmhello 已提交
1543
        qDebug("\npage_finalize %d", numOfExprs);
1544
#endif
1545
        int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
1546
        if (TAOS_FAILED(code)) {
1547 1548
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
          longjmp(pTaskInfo->env, code);
1549
        }
1550 1551
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
1552
      } else {
1553 1554
        // expand the result into multiple rows. E.g., _wstartts, top(k, 20)
        // the _wstartts needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
X
Xiaoyu Wang 已提交
1555 1556
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
1557
        if (pCtx[j].increase) {
L
Liu Jicong 已提交
1558
          int64_t ts = *(int64_t*)in;
1559
          for (int32_t k = 0; k < pRow->numOfRows; ++k) {
L
Liu Jicong 已提交
1560
            colDataAppend(pColInfoData, pBlock->info.rows + k, (const char*)&ts, pCtx[j].resultInfo->isNullRes);
1561 1562 1563 1564 1565 1566
            ts++;
          }
        } else {
          for (int32_t k = 0; k < pRow->numOfRows; ++k) {
            colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
          }
X
Xiaoyu Wang 已提交
1567
        }
1568
      }
1569 1570
    }

1571
    releaseBufPage(pBuf, page);
1572
    pBlock->info.rows += pRow->numOfRows;
L
Liu Jicong 已提交
1573 1574 1575
    //    if (pBlock->info.rows >= pBlock->info.capacity) {  // output buffer is full
    //      break;
    //    }
1576 1577
  }

X
Xiaoyu Wang 已提交
1578 1579
  qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
         pBlock->info.groupId);
1580
  blockDataUpdateTsWindow(pBlock, 0);
1581 1582 1583
  return 0;
}

X
Xiaoyu Wang 已提交
1584 1585
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                            SDiskbasedBuf* pBuf) {
1586 1587
  SExprInfo*     pExprInfo = pOperator->exprSupp.pExprInfo;
  int32_t        numOfExprs = pOperator->exprSupp.numOfExprs;
1588 1589
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

1590
  int32_t*        rowCellOffset = pOperator->exprSupp.rowEntryInfoOffset;
X
Xiaoyu Wang 已提交
1591
  SSDataBlock*    pBlock = pbInfo->pRes;
1592
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
1593

1594
  blockDataCleanup(pBlock);
1595
  if (!hasDataInGroupInfo(pGroupResInfo)) {
1596 1597 1598
    return;
  }

1599 1600
  // clear the existed group id
  pBlock->info.groupId = 0;
1601
  doCopyToSDataBlock(pTaskInfo, pBlock, pExprInfo, pBuf, pGroupResInfo, rowCellOffset, pCtx, numOfExprs);
1602 1603
}

L
Liu Jicong 已提交
1604
static void updateNumOfRowsInResultRows(SqlFunctionCtx* pCtx, int32_t numOfOutput, SResultRowInfo* pResultRowInfo,
1605
                                        int32_t* rowEntryInfoOffset) {
1606
  // update the number of result for each, only update the number of rows for the corresponding window result.
L
Liu Jicong 已提交
1607 1608 1609
  //  if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
  //    return;
  //  }
H
Haojun Liao 已提交
1610
#if 0
1611
  for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
L
Liu Jicong 已提交
1612
    SResultRow* pResult = pResultRowInfo->pResult[i];
1613 1614 1615 1616 1617 1618 1619

    for (int32_t j = 0; j < numOfOutput; ++j) {
      int32_t functionId = pCtx[j].functionId;
      if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG || functionId == FUNCTION_TAGPRJ) {
        continue;
      }

1620
      SResultRowEntryInfo* pCell = getResultEntryInfo(pResult, j, rowEntryInfoOffset);
1621
      pResult->numOfRows = (uint16_t)(TMAX(pResult->numOfRows, pCell->numOfRes));
1622 1623
    }
  }
H
Haojun Liao 已提交
1624
#endif
1625 1626
}

L
Liu Jicong 已提交
1627
static int32_t compressQueryColData(SColumnInfoData* pColRes, int32_t numOfRows, char* data, int8_t compressed) {
1628 1629
  int32_t colSize = pColRes->info.bytes * numOfRows;
  return (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colSize, numOfRows, data,
L
Liu Jicong 已提交
1630
                                                      colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
1631 1632
}

1633 1634 1635
int32_t doFillTimeIntervalGapsInResults(struct SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t capacity) {
  int32_t numOfRows = (int32_t)taosFillResultDataBlock(pFillInfo, pBlock, capacity - pBlock->info.rows);
  pBlock->info.rows += numOfRows;
1636

1637
  return pBlock->info.rows;
1638 1639
}

L
Liu Jicong 已提交
1640 1641
void queryCostStatis(SExecTaskInfo* pTaskInfo) {
  STaskCostInfo* pSummary = &pTaskInfo->cost;
1642

L
Liu Jicong 已提交
1643 1644 1645
  //  uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pResultRowHashTable);
  //  hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map);
  //  pSummary->hashSize = hashSize;
1646 1647 1648 1649

  // add the merge time
  pSummary->elapsedTime += pSummary->firstStageMergeTime;

L
Liu Jicong 已提交
1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660
  //  SResultRowPool* p = pTaskInfo->pool;
  //  if (p != NULL) {
  //    pSummary->winInfoSize = getResultRowPoolMemSize(p);
  //    pSummary->numOfTimeWindows = getNumOfAllocatedResultRows(p);
  //  } else {
  //    pSummary->winInfoSize = 0;
  //    pSummary->numOfTimeWindows = 0;
  //  }
  //
  //  calculateOperatorProfResults(pQInfo);

1661 1662
  SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
  if (pSummary->pRecoder != NULL) {
X
Xiaoyu Wang 已提交
1663 1664
    qDebug("%s :cost summary: elapsed time:%" PRId64 " us, first merge:%" PRId64
           " us, total blocks:%d, "
1665 1666 1667 1668
           "load block statis:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
           GET_TASKID(pTaskInfo), pSummary->elapsedTime, pSummary->firstStageMergeTime, pRecorder->totalBlocks,
           pRecorder->loadBlockStatis, pRecorder->loadBlocks, pRecorder->totalRows, pRecorder->totalCheckedRows);
  }
L
Liu Jicong 已提交
1669 1670 1671
  // qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb,
  // hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
  //      pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0);
1672 1673
}

L
Liu Jicong 已提交
1674 1675 1676
// static void updateOffsetVal(STaskRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
1677
//
L
Liu Jicong 已提交
1678
//   int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
1679
//
L
Liu Jicong 已提交
1680 1681 1682 1683
//   if (pQueryAttr->limit.offset == pBlockInfo->rows) {  // current block will ignore completed
//     pTableQueryInfo->lastKey = QUERY_IS_ASC_QUERY(pQueryAttr) ? pBlockInfo->window.ekey + step :
//     pBlockInfo->window.skey + step; pQueryAttr->limit.offset = 0; return;
//   }
1684
//
L
Liu Jicong 已提交
1685 1686 1687 1688 1689
//   if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//     pQueryAttr->pos = (int32_t)pQueryAttr->limit.offset;
//   } else {
//     pQueryAttr->pos = pBlockInfo->rows - (int32_t)pQueryAttr->limit.offset - 1;
//   }
1690
//
L
Liu Jicong 已提交
1691
//   assert(pQueryAttr->pos >= 0 && pQueryAttr->pos <= pBlockInfo->rows - 1);
1692
//
L
Liu Jicong 已提交
1693 1694
//   SArray *         pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//   SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1695
//
L
Liu Jicong 已提交
1696 1697
//   // update the pQueryAttr->limit.offset value, and pQueryAttr->pos value
//   TSKEY *keys = (TSKEY *) pColInfoData->pData;
1698
//
L
Liu Jicong 已提交
1699 1700 1701
//   // update the offset value
//   pTableQueryInfo->lastKey = keys[pQueryAttr->pos];
//   pQueryAttr->limit.offset = 0;
1702
//
L
Liu Jicong 已提交
1703
//   int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
1704
//
L
Liu Jicong 已提交
1705 1706 1707 1708
//   //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numBlocksOfStep:%d, numOfRes:%d,
//   lastKey:%"PRId64, GET_TASKID(pRuntimeEnv),
//          pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey);
// }
1709

L
Liu Jicong 已提交
1710 1711
// void skipBlocks(STaskRuntimeEnv *pRuntimeEnv) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
1712
//
L
Liu Jicong 已提交
1713 1714 1715
//   if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0) {
//     return;
//   }
1716
//
L
Liu Jicong 已提交
1717 1718
//   pQueryAttr->pos = 0;
//   int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
1719
//
L
Liu Jicong 已提交
1720 1721
//   STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
//   TsdbQueryHandleT pTsdbReadHandle = pRuntimeEnv->pTsdbReadHandle;
1722
//
L
Liu Jicong 已提交
1723 1724 1725 1726 1727
//   SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
//   while (tsdbNextDataBlock(pTsdbReadHandle)) {
//     if (isTaskKilled(pRuntimeEnv->qinfo)) {
//       longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
//     }
1728
//
L
Liu Jicong 已提交
1729
//     tsdbRetrieveDataBlockInfo(pTsdbReadHandle, &blockInfo);
1730
//
L
Liu Jicong 已提交
1731 1732 1733 1734
//     if (pQueryAttr->limit.offset > blockInfo.rows) {
//       pQueryAttr->limit.offset -= blockInfo.rows;
//       pTableQueryInfo->lastKey = (QUERY_IS_ASC_QUERY(pQueryAttr)) ? blockInfo.window.ekey : blockInfo.window.skey;
//       pTableQueryInfo->lastKey += step;
1735
//
L
Liu Jicong 已提交
1736 1737 1738 1739 1740 1741 1742
//       //qDebug("QInfo:0x%"PRIx64" skip rows:%d, offset:%" PRId64, GET_TASKID(pRuntimeEnv), blockInfo.rows,
//              pQuery->limit.offset);
//     } else {  // find the appropriated start position in current block
//       updateOffsetVal(pRuntimeEnv, &blockInfo);
//       break;
//     }
//   }
1743
//
L
Liu Jicong 已提交
1744 1745 1746 1747 1748 1749 1750 1751 1752
//   if (terrno != TSDB_CODE_SUCCESS) {
//     longjmp(pRuntimeEnv->env, terrno);
//   }
// }

// static TSKEY doSkipIntervalProcess(STaskRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo,
// STableQueryInfo* pTableQueryInfo) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
1753
//
L
Liu Jicong 已提交
1754 1755 1756
//   assert(pQueryAttr->limit.offset == 0);
//   STimeWindow tw = *win;
//   getNextTimeWindow(pQueryAttr, &tw);
1757
//
L
Liu Jicong 已提交
1758 1759
//   if ((tw.skey <= pBlockInfo->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) ||
//       (tw.ekey >= pBlockInfo->window.skey && !QUERY_IS_ASC_QUERY(pQueryAttr))) {
1760
//
L
Liu Jicong 已提交
1761 1762 1763 1764
//     // load the data block and check data remaining in current data block
//     // TODO optimize performance
//     SArray *         pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//     SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1765
//
L
Liu Jicong 已提交
1766 1767 1768 1769
//     tw = *win;
//     int32_t startPos =
//         getNextQualifiedWindow(pQueryAttr, &tw, pBlockInfo, pColInfoData->pData, binarySearchForKey, -1);
//     assert(startPos >= 0);
1770
//
L
Liu Jicong 已提交
1771 1772
//     // set the abort info
//     pQueryAttr->pos = startPos;
1773
//
L
Liu Jicong 已提交
1774 1775 1776 1777
//     // reset the query start timestamp
//     pTableQueryInfo->win.skey = ((TSKEY *)pColInfoData->pData)[startPos];
//     pQueryAttr->window.skey = pTableQueryInfo->win.skey;
//     TSKEY key = pTableQueryInfo->win.skey;
1778
//
L
Liu Jicong 已提交
1779 1780
//     pWindowResInfo->prevSKey = tw.skey;
//     int32_t index = pRuntimeEnv->resultRowInfo.curIndex;
1781
//
L
Liu Jicong 已提交
1782 1783
//     int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
//     pRuntimeEnv->resultRowInfo.curIndex = index;  // restore the window index
1784
//
L
Liu Jicong 已提交
1785 1786 1787 1788
//     //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d,
//     lastKey:%" PRId64,
//            GET_TASKID(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes,
//            pQueryAttr->current->lastKey);
1789
//
L
Liu Jicong 已提交
1790 1791 1792 1793 1794
//     return key;
//   } else {  // do nothing
//     pQueryAttr->window.skey      = tw.skey;
//     pWindowResInfo->prevSKey = tw.skey;
//     pTableQueryInfo->lastKey = tw.skey;
1795
//
L
Liu Jicong 已提交
1796 1797
//     return tw.skey;
//   }
1798
//
L
Liu Jicong 已提交
1799 1800 1801 1802 1803 1804 1805 1806 1807 1808
//   return true;
// }

// static bool skipTimeInterval(STaskRuntimeEnv *pRuntimeEnv, TSKEY* start) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//     assert(*start <= pRuntimeEnv->current->lastKey);
//   } else {
//     assert(*start >= pRuntimeEnv->current->lastKey);
//   }
1809
//
L
Liu Jicong 已提交
1810 1811 1812 1813 1814
//   // if queried with value filter, do NOT forward query start position
//   if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL ||
//   pRuntimeEnv->pFillInfo != NULL) {
//     return true;
//   }
1815
//
L
Liu Jicong 已提交
1816 1817 1818 1819 1820 1821 1822
//   /*
//    * 1. for interval without interpolation query we forward pQueryAttr->interval.interval at a time for
//    *    pQueryAttr->limit.offset times. Since hole exists, pQueryAttr->interval.interval*pQueryAttr->limit.offset
//    value is
//    *    not valid. otherwise, we only forward pQueryAttr->limit.offset number of points
//    */
//   assert(pRuntimeEnv->resultRowInfo.prevSKey == TSKEY_INITIAL_VAL);
1823
//
L
Liu Jicong 已提交
1824 1825
//   STimeWindow w = TSWINDOW_INITIALIZER;
//   bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
1826
//
L
Liu Jicong 已提交
1827 1828
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
//   STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
1829
//
L
Liu Jicong 已提交
1830 1831 1832
//   SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
//   while (tsdbNextDataBlock(pRuntimeEnv->pTsdbReadHandle)) {
//     tsdbRetrieveDataBlockInfo(pRuntimeEnv->pTsdbReadHandle, &blockInfo);
1833
//
L
Liu Jicong 已提交
1834 1835 1836 1837 1838 1839 1840 1841 1842
//     if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//       if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
//         getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.skey, blockInfo.window.skey, pQueryAttr->window.ekey,
//         &w); pWindowResInfo->prevSKey = w.skey;
//       }
//     } else {
//       getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.ekey, pQueryAttr->window.ekey, blockInfo.window.ekey, &w);
//       pWindowResInfo->prevSKey = w.skey;
//     }
1843
//
L
Liu Jicong 已提交
1844 1845
//     // the first time window
//     STimeWindow win = getActiveTimeWindow(pWindowResInfo, pWindowResInfo->prevSKey, pQueryAttr);
1846
//
L
Liu Jicong 已提交
1847 1848
//     while (pQueryAttr->limit.offset > 0) {
//       STimeWindow tw = win;
1849
//
L
Liu Jicong 已提交
1850 1851 1852
//       if ((win.ekey <= blockInfo.window.ekey && ascQuery) || (win.ekey >= blockInfo.window.skey && !ascQuery)) {
//         pQueryAttr->limit.offset -= 1;
//         pWindowResInfo->prevSKey = win.skey;
1853
//
L
Liu Jicong 已提交
1854 1855 1856 1857 1858 1859
//         // current time window is aligned with blockInfo.window.ekey
//         // restart it from next data block by set prevSKey to be TSKEY_INITIAL_VAL;
//         if ((win.ekey == blockInfo.window.ekey && ascQuery) || (win.ekey == blockInfo.window.skey && !ascQuery)) {
//           pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL;
//         }
//       }
1860
//
L
Liu Jicong 已提交
1861 1862 1863 1864
//       if (pQueryAttr->limit.offset == 0) {
//         *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//         return true;
//       }
1865
//
L
Liu Jicong 已提交
1866 1867
//       // current window does not ended in current data block, try next data block
//       getNextTimeWindow(pQueryAttr, &tw);
1868
//
L
Liu Jicong 已提交
1869 1870 1871 1872 1873 1874 1875 1876 1877
//       /*
//        * If the next time window still starts from current data block,
//        * load the primary timestamp column first, and then find the start position for the next queried time window.
//        * Note that only the primary timestamp column is required.
//        * TODO: Optimize for this cases. All data blocks are not needed to be loaded, only if the first actually
//        required
//        * time window resides in current data block.
//        */
//       if ((tw.skey <= blockInfo.window.ekey && ascQuery) || (tw.ekey >= blockInfo.window.skey && !ascQuery)) {
1878
//
L
Liu Jicong 已提交
1879 1880
//         SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//         SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1881
//
L
Liu Jicong 已提交
1882 1883 1884
//         if ((win.ekey > blockInfo.window.ekey && ascQuery) || (win.ekey < blockInfo.window.skey && !ascQuery)) {
//           pQueryAttr->limit.offset -= 1;
//         }
1885
//
L
Liu Jicong 已提交
1886 1887 1888 1889 1890 1891 1892 1893
//         if (pQueryAttr->limit.offset == 0) {
//           *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//           return true;
//         } else {
//           tw = win;
//           int32_t startPos =
//               getNextQualifiedWindow(pQueryAttr, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1);
//           assert(startPos >= 0);
1894
//
L
Liu Jicong 已提交
1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905
//           // set the abort info
//           pQueryAttr->pos = startPos;
//           pTableQueryInfo->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
//           pWindowResInfo->prevSKey = tw.skey;
//           win = tw;
//         }
//       } else {
//         break;  // offset is not 0, and next time window begins or ends in the next block.
//       }
//     }
//   }
1906
//
L
Liu Jicong 已提交
1907 1908 1909 1910
//   // check for error
//   if (terrno != TSDB_CODE_SUCCESS) {
//     longjmp(pRuntimeEnv->env, terrno);
//   }
1911
//
L
Liu Jicong 已提交
1912 1913
//   return true;
// }
1914

1915
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) {
H
Haojun Liao 已提交
1916
  if (p->pDownstream == NULL) {
H
Haojun Liao 已提交
1917
    assert(p->numOfDownstream == 0);
1918 1919
  }

wafwerar's avatar
wafwerar 已提交
1920
  p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES);
1921 1922 1923 1924 1925 1926 1927
  if (p->pDownstream == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  memcpy(p->pDownstream, pDownstream, num * POINTER_BYTES);
  p->numOfDownstream = num;
  return TSDB_CODE_SUCCESS;
1928 1929
}

wmmhello's avatar
wmmhello 已提交
1930
static void doDestroyTableList(STableListInfo* pTableqinfoList);
1931

1932
static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQueryInfo* pTableQueryInfo, int32_t order) {
H
Haojun Liao 已提交
1933 1934
#if 0
    if (order == TSDB_ORDER_ASC) {
1935 1936
    assert(
        (pTableQueryInfo->win.skey <= pTableQueryInfo->win.ekey) &&
H
Haojun Liao 已提交
1937 1938
        (pTableQueryInfo->lastKey >= pTaskInfo->window.skey) &&
        (pTableQueryInfo->win.skey >= pTaskInfo->window.skey && pTableQueryInfo->win.ekey <= pTaskInfo->window.ekey));
1939 1940 1941
  } else {
    assert(
        (pTableQueryInfo->win.skey >= pTableQueryInfo->win.ekey) &&
H
Haojun Liao 已提交
1942 1943
        (pTableQueryInfo->lastKey <= pTaskInfo->window.skey) &&
        (pTableQueryInfo->win.skey <= pTaskInfo->window.skey && pTableQueryInfo->win.ekey >= pTaskInfo->window.ekey));
1944
  }
H
Haojun Liao 已提交
1945
#endif
1946 1947
}

1948 1949 1950 1951
typedef struct SFetchRspHandleWrapper {
  uint32_t exchangeId;
  int32_t  sourceIndex;
} SFetchRspHandleWrapper;
1952

D
dapan1121 已提交
1953
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1954
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
1955 1956 1957 1958 1959 1960 1961

  SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
  if (pExchangeInfo == NULL) {
    qWarn("failed to acquire exchange operator, since it may have been released");
    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
1962
  int32_t          index = pWrapper->sourceIndex;
1963
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
1964

H
Haojun Liao 已提交
1965 1966
  if (code == TSDB_CODE_SUCCESS) {
    pSourceDataInfo->pRsp = pMsg->pData;
1967

H
Haojun Liao 已提交
1968 1969
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
    pRsp->numOfRows = htonl(pRsp->numOfRows);
dengyihao's avatar
dengyihao 已提交
1970
    pRsp->compLen = htonl(pRsp->compLen);
1971
    pRsp->numOfCols = htonl(pRsp->numOfCols);
dengyihao's avatar
dengyihao 已提交
1972
    pRsp->useconds = htobe64(pRsp->useconds);
1973

1974
    ASSERT(pRsp != NULL);
1975
    qDebug("%s fetch rsp received, index:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfRows);
H
Haojun Liao 已提交
1976 1977 1978
  } else {
    pSourceDataInfo->code = code;
  }
H
Haojun Liao 已提交
1979

H
Haojun Liao 已提交
1980
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
1981 1982 1983 1984 1985

  tsem_post(&pExchangeInfo->ready);
  taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);

  taosMemoryFree(pWrapper);
wmmhello's avatar
wmmhello 已提交
1986
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1987 1988 1989 1990
}

static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
  assert(pMsgBody != NULL);
wafwerar's avatar
wafwerar 已提交
1991 1992
  taosMemoryFreeClear(pMsgBody->msgInfo.pData);
  taosMemoryFreeClear(pMsgBody);
H
Haojun Liao 已提交
1993 1994
}

D
dapan1121 已提交
1995
void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
S
Shengliang Guan 已提交
1996 1997
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
  assert(pMsg->info.ahandle != NULL);
H
Haojun Liao 已提交
1998 1999 2000 2001

  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};

  if (pMsg->contLen > 0) {
wafwerar's avatar
wafwerar 已提交
2002
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
H
Haojun Liao 已提交
2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
  }

  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
  rpcFreeCont(pMsg->pCont);
  destroySendMsgInfo(pSendInfo);
2014 2015
}

L
Liu Jicong 已提交
2016
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
2017
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2018

wafwerar's avatar
wafwerar 已提交
2019
  SResFetchReq* pMsg = taosMemoryCalloc(1, sizeof(SResFetchReq));
2020 2021 2022 2023
  if (NULL == pMsg) {
    pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return pTaskInfo->code;
  }
2024

L
Liu Jicong 已提交
2025 2026
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex);
  SSourceDataInfo*       pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
2027

2028 2029
  ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY);

2030 2031 2032
  qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu,
         GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, pSource->execId,
         sourceIndex, totalSources);
2033 2034 2035 2036 2037

  pMsg->header.vgId = htonl(pSource->addr.nodeId);
  pMsg->sId = htobe64(pSource->schedId);
  pMsg->taskId = htobe64(pSource->taskId);
  pMsg->queryId = htobe64(pTaskInfo->id.queryId);
D
dapan1121 已提交
2038
  pMsg->execId = htonl(pSource->execId);
2039 2040

  // send the fetch remote task result reques
wafwerar's avatar
wafwerar 已提交
2041
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
2042
  if (NULL == pMsgSendInfo) {
wafwerar's avatar
wafwerar 已提交
2043
    taosMemoryFreeClear(pMsg);
2044 2045 2046
    qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
    pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return pTaskInfo->code;
H
Haojun Liao 已提交
2047 2048
  }

2049
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
X
Xiaoyu Wang 已提交
2050
  pWrapper->exchangeId = pExchangeInfo->self;
2051 2052 2053
  pWrapper->sourceIndex = sourceIndex;

  pMsgSendInfo->param = pWrapper;
2054 2055
  pMsgSendInfo->msgInfo.pData = pMsg;
  pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
L
Liu Jicong 已提交
2056
  pMsgSendInfo->msgType = pSource->fetchMsgType;
2057
  pMsgSendInfo->fp = loadRemoteDataCallback;
2058

2059
  int64_t transporterId = 0;
L
Liu Jicong 已提交
2060
  int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
2061 2062 2063
  return TSDB_CODE_SUCCESS;
}

2064
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
L
Liu Jicong 已提交
2065 2066
                                     int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
                                     SArray* pColList) {
H
Haojun Liao 已提交
2067
  if (pColList == NULL) {  // data from other sources
2068
    blockDataCleanup(pRes);
2069
    //    blockDataEnsureCapacity(pRes, numOfRows);
2070
    blockDecode(pRes, numOfOutput, numOfRows, pData);
H
Haojun Liao 已提交
2071
  } else {  // extract data according to pColList
2072
    ASSERT(numOfOutput == taosArrayGetSize(pColList));
2073 2074 2075 2076 2077
    char* pStart = pData;

    int32_t numOfCols = htonl(*(int32_t*)pStart);
    pStart += sizeof(int32_t);

2078
    // todo refactor:extract method
2079
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
dengyihao's avatar
dengyihao 已提交
2080
    for (int32_t i = 0; i < numOfCols; ++i) {
2081 2082 2083 2084 2085 2086 2087
      SSysTableSchema* p = (SSysTableSchema*)pStart;

      p->colId = htons(p->colId);
      p->bytes = htonl(p->bytes);
      pStart += sizeof(SSysTableSchema);
    }

2088
    SSDataBlock* pBlock = createDataBlock();
dengyihao's avatar
dengyihao 已提交
2089
    for (int32_t i = 0; i < numOfCols; ++i) {
2090 2091
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
      blockDataAppendColInfo(pBlock, &idata);
2092 2093
    }

2094
    blockDecode(pBlock, numOfCols, numOfRows, pStart);
2095 2096
    blockDataEnsureCapacity(pRes, numOfRows);

H
Haojun Liao 已提交
2097
    // data from mnode
2098
    pRes->info.rows = numOfRows;
2099 2100
    relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
    blockDataDestroy(pBlock);
2101
  }
2102

2103 2104
  // todo move this to time window aggregator, since the primary timestamp may not be known by exchange operator.
  blockDataUpdateTsWindow(pRes, 0);
2105

2106
  int64_t el = taosGetTimestampUs() - startTs;
2107

H
Haojun Liao 已提交
2108 2109
  pLoadInfo->totalRows += numOfRows;
  pLoadInfo->totalSize += compLen;
2110

H
Haojun Liao 已提交
2111 2112 2113
  if (total != NULL) {
    *total += numOfRows;
  }
2114

H
Haojun Liao 已提交
2115
  pLoadInfo->totalElapsed += el;
2116 2117
  return TSDB_CODE_SUCCESS;
}
2118

L
Liu Jicong 已提交
2119 2120
static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
2121
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
H
Haojun Liao 已提交
2122

2123
  int64_t              el = taosGetTimestampUs() - startTs;
H
Haojun Liao 已提交
2124
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2125

H
Haojun Liao 已提交
2126
  pLoadInfo->totalElapsed += el;
H
Haojun Liao 已提交
2127

2128
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
L
Liu Jicong 已提交
2129 2130 2131
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
         pLoadInfo->totalElapsed / 1000.0);
2132 2133 2134 2135 2136

  doSetOperatorCompleted(pOperator);
  return NULL;
}

L
Liu Jicong 已提交
2137 2138
static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
                                                   SExecTaskInfo* pTaskInfo) {
2139 2140 2141 2142 2143 2144 2145 2146
  int32_t code = 0;
  int64_t startTs = taosGetTimestampUs();
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);

  while (1) {
    int32_t completed = 0;
    for (int32_t i = 0; i < totalSources; ++i) {
      SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
2147
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
2148
        completed += 1;
H
Haojun Liao 已提交
2149 2150
        continue;
      }
2151

2152
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
2153 2154 2155
        continue;
      }

2156 2157 2158 2159 2160
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
        code = pDataInfo->code;
        goto _error;
      }

L
Liu Jicong 已提交
2161
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
X
Xiaoyu Wang 已提交
2162
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, i);
2163

L
Liu Jicong 已提交
2164
      SSDataBlock*         pRes = pExchangeInfo->pResult;
H
Haojun Liao 已提交
2165
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2166
      if (pRsp->numOfRows == 0) {
2167 2168
        qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64
               ", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu,
D
dapan1121 已提交
2169
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows,
2170
               pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources);
2171
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2172
        completed += 1;
D
dapan1121 已提交
2173
        taosMemoryFreeClear(pDataInfo->pRsp);
2174 2175
        continue;
      }
H
Haojun Liao 已提交
2176

H
Haojun Liao 已提交
2177
      SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
L
Liu Jicong 已提交
2178 2179 2180
      code =
          extractDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
                                       pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
2181
      if (code != 0) {
2182
        taosMemoryFreeClear(pDataInfo->pRsp);
2183 2184 2185
        goto _error;
      }

2186
      if (pRsp->completed == 1) {
2187 2188
        qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
               " execId:%d"
X
Xiaoyu Wang 已提交
2189 2190
               " index:%d completed, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64
               ", completed:%d try next %d/%" PRIzu,
2191 2192
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRes->info.rows,
               pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, completed + 1, i + 1, totalSources);
2193
        completed += 1;
2194
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2195
      } else {
D
dapan1121 已提交
2196
        qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64
dengyihao's avatar
dengyihao 已提交
2197
               ", totalBytes:%" PRIu64,
2198 2199
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows,
               pLoadInfo->totalRows, pLoadInfo->totalSize);
2200 2201
      }

2202 2203
      taosMemoryFreeClear(pDataInfo->pRsp);

2204 2205
      if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED) {
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
2206 2207
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
        if (code != TSDB_CODE_SUCCESS) {
2208
          taosMemoryFreeClear(pDataInfo->pRsp);
2209 2210 2211 2212 2213 2214 2215
          goto _error;
        }
      }

      return pExchangeInfo->pResult;
    }

2216
    if (completed == totalSources) {
2217 2218 2219 2220 2221 2222 2223 2224 2225
      return setAllSourcesCompleted(pOperator, startTs);
    }
  }

_error:
  pTaskInfo->code = code;
  return NULL;
}

L
Liu Jicong 已提交
2226 2227 2228
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2229

L
Liu Jicong 已提交
2230
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2231 2232 2233
  int64_t startTs = taosGetTimestampUs();

  // Asynchronously send all fetch requests to all sources.
L
Liu Jicong 已提交
2234
  for (int32_t i = 0; i < totalSources; ++i) {
2235 2236
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2237 2238
      pTaskInfo->code = code;
      return code;
2239 2240 2241 2242
    }
  }

  int64_t endTs = taosGetTimestampUs();
2243
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
X
Xiaoyu Wang 已提交
2244
         totalSources, (endTs - startTs) / 1000.0);
2245

2246
  pOperator->status = OP_RES_TO_RETURN;
H
Haojun Liao 已提交
2247
  pOperator->cost.openCost = taosGetTimestampUs() - startTs;
2248

2249
  tsem_wait(&pExchangeInfo->ready);
H
Haojun Liao 已提交
2250
  return TSDB_CODE_SUCCESS;
2251 2252
}

L
Liu Jicong 已提交
2253 2254 2255
static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2256

L
Liu Jicong 已提交
2257
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2258
  int64_t startTs = taosGetTimestampUs();
2259

L
Liu Jicong 已提交
2260
  while (1) {
2261 2262
    if (pExchangeInfo->current >= totalSources) {
      return setAllSourcesCompleted(pOperator, startTs);
2263
    }
2264

2265 2266 2267
    doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
    tsem_wait(&pExchangeInfo->ready);

dengyihao's avatar
dengyihao 已提交
2268
    SSourceDataInfo*       pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
X
Xiaoyu Wang 已提交
2269
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
2270

H
Haojun Liao 已提交
2271
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
2272 2273
      qError("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d error happens, code:%s", GET_TASKID(pTaskInfo),
             pSource->addr.nodeId, pSource->taskId, pSource->execId, tstrerror(pDataInfo->code));
H
Haojun Liao 已提交
2274 2275 2276 2277
      pOperator->pTaskInfo->code = pDataInfo->code;
      return NULL;
    }

L
Liu Jicong 已提交
2278
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
H
Haojun Liao 已提交
2279
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2280
    if (pRsp->numOfRows == 0) {
2281 2282
      qDebug("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d %d of total completed, rowsOfSource:%" PRIu64
             ", totalRows:%" PRIu64 " try next",
D
dapan1121 已提交
2283
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pExchangeInfo->current + 1,
H
Haojun Liao 已提交
2284
             pDataInfo->totalRows, pLoadInfo->totalRows);
H
Haojun Liao 已提交
2285

2286
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2287
      pExchangeInfo->current += 1;
D
dapan1121 已提交
2288
      taosMemoryFreeClear(pDataInfo->pRsp);
2289 2290
      continue;
    }
H
Haojun Liao 已提交
2291

L
Liu Jicong 已提交
2292
    SSDataBlock*       pRes = pExchangeInfo->pResult;
H
Haojun Liao 已提交
2293
    SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
L
Liu Jicong 已提交
2294
    int32_t            code =
2295
        extractDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
L
Liu Jicong 已提交
2296
                                     pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
2297 2298

    if (pRsp->completed == 1) {
D
dapan1121 已提交
2299
      qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64
L
Liu Jicong 已提交
2300
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
2301 2302 2303
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows,
             pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1,
             totalSources);
2304

2305
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2306 2307
      pExchangeInfo->current += 1;
    } else {
D
dapan1121 已提交
2308
      qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64
L
Liu Jicong 已提交
2309
             ", totalBytes:%" PRIu64,
2310 2311
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows,
             pLoadInfo->totalRows, pLoadInfo->totalSize);
2312 2313
    }

2314
    pOperator->resultInfo.totalRows += pRes->info.rows;
2315
    taosMemoryFreeClear(pDataInfo->pRsp);
2316 2317
    return pExchangeInfo->pResult;
  }
2318 2319
}

L
Liu Jicong 已提交
2320
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
2321
  if (OPTR_IS_OPENED(pOperator)) {
H
Haojun Liao 已提交
2322 2323 2324
    return TSDB_CODE_SUCCESS;
  }

2325 2326
  int64_t st = taosGetTimestampUs();

L
Liu Jicong 已提交
2327
  SExchangeInfo* pExchangeInfo = pOperator->info;
2328
  if (!pExchangeInfo->seqLoadData) {
H
Haojun Liao 已提交
2329 2330 2331 2332 2333 2334
    int32_t code = prepareConcurrentlyLoad(pOperator);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
  }

2335
  OPTR_SET_OPENED(pOperator);
2336
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
2337 2338 2339
  return TSDB_CODE_SUCCESS;
}

2340
static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2341 2342
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2343

2344
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
2345
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2346 2347
    return NULL;
  }
2348

L
Liu Jicong 已提交
2349
  size_t               totalSources = taosArrayGetSize(pExchangeInfo->pSources);
H
Haojun Liao 已提交
2350
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
H
Haojun Liao 已提交
2351

2352
  if (pOperator->status == OP_EXEC_DONE) {
L
Liu Jicong 已提交
2353 2354 2355
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
           pLoadInfo->totalElapsed / 1000.0);
2356 2357 2358 2359 2360 2361
    return NULL;
  }

  if (pExchangeInfo->seqLoadData) {
    return seqLoadRemoteData(pOperator);
  } else {
2362
    return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
2363
  }
H
Haojun Liao 已提交
2364
}
2365

2366
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
2367
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
H
Haojun Liao 已提交
2368 2369
  if (pInfo->pSourceDataInfo == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
2370 2371
  }

L
Liu Jicong 已提交
2372
  for (int32_t i = 0; i < numOfSources; ++i) {
2373
    SSourceDataInfo dataInfo = {0};
H
Haojun Liao 已提交
2374
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
2375
    dataInfo.taskId = id;
L
Liu Jicong 已提交
2376
    dataInfo.index = i;
X
Xiaoyu Wang 已提交
2377
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
2378
    if (pDs == NULL) {
H
Haojun Liao 已提交
2379 2380 2381 2382 2383 2384 2385 2386
      taosArrayDestroy(pInfo->pSourceDataInfo);
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
}

2387
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
2388
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
H
Haojun Liao 已提交
2389

2390
  if (numOfSources == 0) {
X
Xiaoyu Wang 已提交
2391
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
2392 2393 2394
    return TSDB_CODE_INVALID_PARA;
  }

H
Haojun Liao 已提交
2395
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
wmmhello's avatar
wmmhello 已提交
2396
  if (pInfo->pSources == NULL) {
2397
    return TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
2398 2399
  }

L
Liu Jicong 已提交
2400
  for (int32_t i = 0; i < numOfSources; ++i) {
D
dapan1121 已提交
2401
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
H
Haojun Liao 已提交
2402 2403
    taosArrayPush(pInfo->pSources, pNode);
  }
2404

2405 2406
  pInfo->self = taosAddRef(exchangeObjRefPool, pInfo);

2407
  return initDataSource(numOfSources, pInfo, id);
2408 2409 2410 2411 2412 2413
}

SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo) {
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
2414
    goto _error;
2415
  }
H
Haojun Liao 已提交
2416

2417
  int32_t code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
2418 2419 2420
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2421 2422

  tsem_init(&pInfo->ready, 0, 0);
2423

2424
  pInfo->seqLoadData = false;
2425
  pInfo->pTransporter = pTransporter;
2426 2427
  pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc);
  pOperator->name = "ExchangeOperator";
X
Xiaoyu Wang 已提交
2428
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
X
Xiaoyu Wang 已提交
2429
  pOperator->blocking = false;
2430 2431
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
2432
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pResult->pDataBlock);
X
Xiaoyu Wang 已提交
2433
  pOperator->pTaskInfo = pTaskInfo;
2434

L
Liu Jicong 已提交
2435 2436
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
                                         destroyExchangeOperatorInfo, NULL, NULL, NULL);
2437
  return pOperator;
H
Haojun Liao 已提交
2438

L
Liu Jicong 已提交
2439
_error:
H
Haojun Liao 已提交
2440
  if (pInfo != NULL) {
2441
    doDestroyExchangeOperatorInfo(pInfo);
H
Haojun Liao 已提交
2442 2443
  }

wafwerar's avatar
wafwerar 已提交
2444
  taosMemoryFreeClear(pOperator);
2445
  pTaskInfo->code = code;
H
Haojun Liao 已提交
2446
  return NULL;
2447 2448
}

dengyihao's avatar
dengyihao 已提交
2449 2450
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                                const char* pKey);
2451

2452
static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
2453
  SSortedMergeOperatorInfo* pInfo = (SSortedMergeOperatorInfo*)param;
H
Haojun Liao 已提交
2454
  taosArrayDestroy(pInfo->pSortInfo);
2455 2456 2457
  taosArrayDestroy(pInfo->groupInfo);

  if (pInfo->pSortHandle != NULL) {
H
Haojun Liao 已提交
2458
    tsortDestroySortHandle(pInfo->pSortHandle);
2459 2460
  }

H
Haojun Liao 已提交
2461
  blockDataDestroy(pInfo->binfo.pRes);
H
Haojun Liao 已提交
2462
  cleanupAggSup(&pInfo->aggSup);
L
Liu Jicong 已提交
2463

D
dapan1121 已提交
2464
  taosMemoryFreeClear(param);
2465
}
H
Haojun Liao 已提交
2466

L
Liu Jicong 已提交
2467
static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) {
2468 2469 2470 2471
  size_t size = taosArrayGetSize(groupInfo);
  if (size == 0) {
    return true;
  }
2472

2473 2474
  for (int32_t i = 0; i < size; ++i) {
    int32_t* index = taosArrayGet(groupInfo, i);
2475

2476
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index);
L
Liu Jicong 已提交
2477
    bool             isNull = colDataIsNull(pColInfo, rowIndex, pBlock->info.rows, NULL);
2478

2479 2480 2481
    if ((isNull && buf[i] != NULL) || (!isNull && buf[i] == NULL)) {
      return false;
    }
2482

2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495
    char* pCell = colDataGetData(pColInfo, rowIndex);
    if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
      if (varDataLen(pCell) != varDataLen(buf[i])) {
        return false;
      } else {
        if (memcmp(varDataVal(pCell), varDataVal(buf[i]), varDataLen(pCell)) != 0) {
          return false;
        }
      }
    } else {
      if (memcmp(pCell, buf[i], pColInfo->info.bytes) != 0) {
        return false;
      }
2496 2497 2498
    }
  }

2499
  return 0;
2500 2501
}

L
Liu Jicong 已提交
2502 2503 2504
static void doMergeResultImpl(SSortedMergeOperatorInfo* pInfo, SqlFunctionCtx* pCtx, int32_t numOfExpr,
                              int32_t rowIndex) {
  for (int32_t j = 0; j < numOfExpr; ++j) {  // TODO set row index
X
Xiaoyu Wang 已提交
2505
                                             //    pCtx[j].startRow = rowIndex;
2506 2507
  }

2508 2509
  for (int32_t j = 0; j < numOfExpr; ++j) {
    int32_t functionId = pCtx[j].functionId;
L
Liu Jicong 已提交
2510 2511 2512 2513 2514 2515 2516 2517 2518
    //    pCtx[j].fpSet->addInput(&pCtx[j]);

    //    if (functionId < 0) {
    //      SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
    //      doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
    //    } else {
    //      assert(!TSDB_FUNC_IS_SCALAR(functionId));
    //      aAggs[functionId].mergeFunc(&pCtx[j]);
    //    }
2519
  }
2520
}
2521

L
Liu Jicong 已提交
2522 2523
static void doFinalizeResultImpl(SqlFunctionCtx* pCtx, int32_t numOfExpr) {
  for (int32_t j = 0; j < numOfExpr; ++j) {
2524 2525 2526 2527
    int32_t functionId = pCtx[j].functionId;
    //    if (functionId == FUNC_TAG_DUMMY || functionId == FUNC_TS_DUMMY) {
    //      continue;
    //    }
2528

2529 2530 2531 2532
    //    if (functionId < 0) {
    //      SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
    //      doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
    //    } else {
dengyihao's avatar
dengyihao 已提交
2533
    //    pCtx[j].fpSet.finalize(&pCtx[j]);
2534 2535
  }
}
2536

2537
static bool saveCurrentTuple(char** rowColData, SArray* pColumnList, SSDataBlock* pBlock, int32_t rowIndex) {
L
Liu Jicong 已提交
2538
  int32_t size = (int32_t)taosArrayGetSize(pColumnList);
2539

L
Liu Jicong 已提交
2540 2541
  for (int32_t i = 0; i < size; ++i) {
    int32_t*         index = taosArrayGet(pColumnList, i);
2542
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index);
H
Haojun Liao 已提交
2543

2544 2545 2546
    char* data = colDataGetData(pColInfo, rowIndex);
    memcpy(rowColData[i], data, colDataGetLength(pColInfo, rowIndex));
  }
2547

2548 2549
  return true;
}
2550

2551 2552
static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock* pBlock) {
  SSortedMergeOperatorInfo* pInfo = pOperator->info;
2553

2554
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
2555

L
Liu Jicong 已提交
2556
  for (int32_t i = 0; i < pBlock->info.rows; ++i) {
2557 2558 2559 2560 2561 2562 2563 2564 2565
    if (!pInfo->hasGroupVal) {
      ASSERT(i == 0);
      doMergeResultImpl(pInfo, pCtx, numOfExpr, i);
      pInfo->hasGroupVal = saveCurrentTuple(pInfo->groupVal, pInfo->groupInfo, pBlock, i);
    } else {
      if (needToMerge(pBlock, pInfo->groupInfo, pInfo->groupVal, i)) {
        doMergeResultImpl(pInfo, pCtx, numOfExpr, i);
      } else {
        doFinalizeResultImpl(pCtx, numOfExpr);
2566 2567
        int32_t numOfRows = getNumOfResult(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL);
        //        setTagValueForMultipleRows(pCtx, pOperator->exprSupp.numOfExprs, numOfRows);
2568

2569
        // TODO check for available buffer;
H
Haojun Liao 已提交
2570

2571 2572 2573 2574 2575
        // next group info data
        pInfo->binfo.pRes->info.rows += numOfRows;
        for (int32_t j = 0; j < numOfExpr; ++j) {
          if (pCtx[j].functionId < 0) {
            continue;
2576
          }
2577

H
Haojun Liao 已提交
2578
          pCtx[j].fpSet.process(&pCtx[j]);
2579
        }
2580 2581 2582

        doMergeResultImpl(pInfo, pCtx, numOfExpr, i);
        pInfo->hasGroupVal = saveCurrentTuple(pInfo->groupVal, pInfo->groupInfo, pBlock, i);
H
Haojun Liao 已提交
2583
      }
2584 2585 2586 2587
    }
  }
}

2588 2589
static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
  SSortedMergeOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
2590
  SSortHandle*              pHandle = pInfo->pSortHandle;
2591

2592
  SSDataBlock* pDataBlock = createOneDataBlock(pInfo->binfo.pRes, false);
2593
  blockDataEnsureCapacity(pDataBlock, pOperator->resultInfo.capacity);
2594

L
Liu Jicong 已提交
2595
  while (1) {
2596
    blockDataCleanup(pDataBlock);
2597
    while (1) {
H
Haojun Liao 已提交
2598
      STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2599 2600
      if (pTupleHandle == NULL) {
        break;
2601
      }
2602

2603 2604
      // build datablock for merge for one group
      appendOneRowToDataBlock(pDataBlock, pTupleHandle);
2605
      if (pDataBlock->info.rows >= pOperator->resultInfo.capacity) {
2606 2607
        break;
      }
2608
    }
2609

2610 2611 2612
    if (pDataBlock->info.rows == 0) {
      break;
    }
2613

2614
    setInputDataBlock(pOperator, pOperator->exprSupp.pCtx, pDataBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
L
Liu Jicong 已提交
2615 2616
    //  updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor,
    //  pOperator->pRuntimeEnv, true);
2617
    doMergeImpl(pOperator, pOperator->exprSupp.numOfExprs, pDataBlock);
2618 2619
    // flush to tuple store, and after all data have been handled, return to upstream node or sink node
  }
2620

2621 2622 2623
  doFinalizeResultImpl(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs);
  int32_t numOfRows = getNumOfResult(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL);
  //        setTagValueForMultipleRows(pCtx, pOperator->exprSupp.numOfExprs, numOfRows);
2624

2625
  // TODO check for available buffer;
2626

2627 2628
  // next group info data
  pInfo->binfo.pRes->info.rows += numOfRows;
L
Liu Jicong 已提交
2629
  return (pInfo->binfo.pRes->info.rows > 0) ? pInfo->binfo.pRes : NULL;
2630
}
2631

L
Liu Jicong 已提交
2632 2633
SSDataBlock* getSortedMergeBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
                                     SArray* pColMatchInfo, SSortedMergeOperatorInfo* pInfo) {
2634 2635 2636 2637 2638 2639 2640 2641 2642 2643
  blockDataCleanup(pDataBlock);

  SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
  if (p == NULL) {
    return NULL;
  }

  blockDataEnsureCapacity(p, capacity);

  while (1) {
2644
    STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2645 2646 2647 2648
    if (pTupleHandle == NULL) {
      break;
    }

2649
    appendOneRowToDataBlock(p, pTupleHandle);
2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662
    if (p->info.rows >= capacity) {
      break;
    }
  }

  if (p->info.rows > 0) {
    int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
    for (int32_t i = 0; i < numOfCols; ++i) {
      SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i);
      ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID);

      SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
      SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId);
2663
      colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
2664 2665 2666 2667 2668 2669 2670 2671 2672 2673
    }

    pDataBlock->info.rows = p->info.rows;
    pDataBlock->info.capacity = p->info.rows;
  }

  blockDataDestroy(p);
  return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
}

2674
static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) {
2675 2676
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
2677 2678
  }

L
Liu Jicong 已提交
2679
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
2680
  SSortedMergeOperatorInfo* pInfo = pOperator->info;
H
Haojun Liao 已提交
2681
  if (pOperator->status == OP_RES_TO_RETURN) {
2682
    return getSortedMergeBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, NULL, pInfo);
2683 2684
  }

2685
  int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
L
Liu Jicong 已提交
2686 2687
  pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
                                             pInfo->binfo.pRes, "GET_TASKID(pTaskInfo)");
H
Haojun Liao 已提交
2688

2689
  tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
2690

L
Liu Jicong 已提交
2691
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
wmmhello's avatar
wmmhello 已提交
2692
    SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
H
Haojun Liao 已提交
2693
    ps->param = pOperator->pDownstream[i];
H
Haojun Liao 已提交
2694
    tsortAddSource(pInfo->pSortHandle, ps);
2695 2696
  }

H
Haojun Liao 已提交
2697
  int32_t code = tsortOpen(pInfo->pSortHandle);
2698
  if (code != TSDB_CODE_SUCCESS) {
2699
    longjmp(pTaskInfo->env, terrno);
2700 2701
  }

H
Haojun Liao 已提交
2702
  pOperator->status = OP_RES_TO_RETURN;
2703
  return doMerge(pOperator);
2704
}
2705

L
Liu Jicong 已提交
2706 2707
static int32_t initGroupCol(SExprInfo* pExprInfo, int32_t numOfCols, SArray* pGroupInfo,
                            SSortedMergeOperatorInfo* pInfo) {
2708 2709
  if (pGroupInfo == NULL || taosArrayGetSize(pGroupInfo) == 0) {
    return 0;
H
Haojun Liao 已提交
2710 2711
  }

2712 2713 2714 2715 2716 2717 2718 2719
  int32_t len = 0;
  SArray* plist = taosArrayInit(3, sizeof(SColumn));
  pInfo->groupInfo = taosArrayInit(3, sizeof(int32_t));

  if (plist == NULL || pInfo->groupInfo == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

L
Liu Jicong 已提交
2720 2721
  size_t numOfGroupCol = taosArrayGetSize(pInfo->groupInfo);
  for (int32_t i = 0; i < numOfGroupCol; ++i) {
2722
    SColumn* pCol = taosArrayGet(pGroupInfo, i);
L
Liu Jicong 已提交
2723
    for (int32_t j = 0; j < numOfCols; ++j) {
H
Haojun Liao 已提交
2724
      SExprInfo* pe = &pExprInfo[j];
2725
      if (pe->base.resSchema.slotId == pCol->colId) {
2726 2727
        taosArrayPush(plist, pCol);
        taosArrayPush(pInfo->groupInfo, &j);
H
Haojun Liao 已提交
2728
        len += pCol->bytes;
2729 2730
        break;
      }
H
Haojun Liao 已提交
2731 2732 2733
    }
  }

2734
  ASSERT(taosArrayGetSize(pGroupInfo) == taosArrayGetSize(plist));
H
Haojun Liao 已提交
2735

wafwerar's avatar
wafwerar 已提交
2736
  pInfo->groupVal = taosMemoryCalloc(1, (POINTER_BYTES * numOfGroupCol + len));
2737 2738 2739 2740
  if (pInfo->groupVal == NULL) {
    taosArrayDestroy(plist);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Haojun Liao 已提交
2741

2742
  int32_t offset = 0;
L
Liu Jicong 已提交
2743 2744
  char*   start = (char*)(pInfo->groupVal + (POINTER_BYTES * numOfGroupCol));
  for (int32_t i = 0; i < numOfGroupCol; ++i) {
2745 2746
    pInfo->groupVal[i] = start + offset;
    SColumn* pCol = taosArrayGet(plist, i);
H
Haojun Liao 已提交
2747
    offset += pCol->bytes;
2748
  }
H
Haojun Liao 已提交
2749

2750
  taosArrayDestroy(plist);
H
Haojun Liao 已提交
2751

2752 2753
  return TSDB_CODE_SUCCESS;
}
H
Haojun Liao 已提交
2754

L
Liu Jicong 已提交
2755 2756 2757
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo,
                                             int32_t num, SArray* pSortInfo, SArray* pGroupInfo,
                                             SExecTaskInfo* pTaskInfo) {
wafwerar's avatar
wafwerar 已提交
2758
  SSortedMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortedMergeOperatorInfo));
L
Liu Jicong 已提交
2759
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2760
  if (pInfo == NULL || pOperator == NULL) {
2761
    goto _error;
2762
  }
H
Haojun Liao 已提交
2763

2764 2765 2766 2767 2768
  int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, num);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

2769
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
H
Haojun Liao 已提交
2770

2771
  if (pOperator->exprSupp.pCtx == NULL || pInfo->binfo.pRes == NULL) {
2772 2773
    goto _error;
  }
H
Haojun Liao 已提交
2774

2775
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
2776
  code = doInitAggInfoSup(&pInfo->aggSup, pOperator->exprSupp.pCtx, num, keyBufSize, pTaskInfo->id.str);
2777 2778 2779
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
2780

2781
  setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, num);
H
Haojun Liao 已提交
2782
  code = initGroupCol(pExprInfo, num, pGroupInfo, pInfo);
2783 2784 2785
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
2786

L
Liu Jicong 已提交
2787 2788 2789 2790 2791
  //  pInfo->resultRowFactor = (int32_t)(getRowNumForMultioutput(pRuntimeEnv->pQueryAttr,
  //      pRuntimeEnv->pQueryAttr->topBotQuery, false));
  pInfo->sortBufSize = 1024 * 16;  // 1MB
  pInfo->bufPageSize = 1024;
  pInfo->pSortInfo = pSortInfo;
H
Haojun Liao 已提交
2792

2793
  pOperator->resultInfo.capacity = blockDataGetCapacityInRow(pInfo->binfo.pRes, pInfo->bufPageSize);
H
Haojun Liao 已提交
2794

L
Liu Jicong 已提交
2795
  pOperator->name = "SortedMerge";
X
Xiaoyu Wang 已提交
2796
  // pOperator->operatorType = OP_SortedMerge;
2797 2798 2799
  pOperator->blocking = true;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
L
Liu Jicong 已提交
2800
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
2801

2802 2803
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSortedMerge, NULL, NULL, destroySortedMergeOperatorInfo,
                                         NULL, NULL, NULL);
2804 2805 2806
  code = appendDownstream(pOperator, downstream, numOfDownstream);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
2807
  }
H
Haojun Liao 已提交
2808

2809
  return pOperator;
H
Haojun Liao 已提交
2810

L
Liu Jicong 已提交
2811
_error:
2812
  if (pInfo != NULL) {
H
Haojun Liao 已提交
2813
    destroySortedMergeOperatorInfo(pInfo, num);
H
Haojun Liao 已提交
2814 2815
  }

wafwerar's avatar
wafwerar 已提交
2816 2817
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
2818 2819
  terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
  return NULL;
H
Haojun Liao 已提交
2820 2821
}

X
Xiaoyu Wang 已提交
2822
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) {
2823
  // todo add more information about exchange operation
2824
  int32_t type = pOperator->operatorType;
X
Xiaoyu Wang 已提交
2825
  if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN ||
2826
      type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN ||
2827
      type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN) {
2828 2829 2830
    *order = TSDB_ORDER_ASC;
    *scanFlag = MAIN_SCAN;
    return TSDB_CODE_SUCCESS;
2831
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
2832 2833 2834 2835 2836
    STableScanInfo* pTableScanInfo = pOperator->info;
    *order = pTableScanInfo->cond.order;
    *scanFlag = pTableScanInfo->scanFlag;
    return TSDB_CODE_SUCCESS;
  } else {
H
Haojun Liao 已提交
2837
    if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
2838
      return TSDB_CODE_INVALID_PARA;
H
Haojun Liao 已提交
2839
    } else {
2840
      return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag);
2841 2842 2843
    }
  }
}
L
Liu Jicong 已提交
2844
#if 0
L
Liu Jicong 已提交
2845
int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
L
Liu Jicong 已提交
2846
  uint8_t type = pOperator->operatorType;
2847 2848 2849

  pOperator->status = OP_OPENED;

L
Liu Jicong 已提交
2850
  if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
2851
    SStreamScanInfo* pScanInfo = pOperator->info;
L
Liu Jicong 已提交
2852
    pScanInfo->blockType = STREAM_INPUT__TABLE_SCAN;
2853

2854
    pScanInfo->pTableScanOp->status = OP_OPENED;
2855

2856
    STableScanInfo* pInfo = pScanInfo->pTableScanOp->info;
2857 2858
    ASSERT(pInfo->scanMode == TABLE_SCAN__TABLE_ORDER);

L
Liu Jicong 已提交
2859 2860 2861 2862
    if (uid == 0) {
      pInfo->noTable = 1;
      return TSDB_CODE_SUCCESS;
    }
2863 2864 2865 2866 2867 2868

    /*if (pSnapShotScanInfo->dataReader == NULL) {*/
    /*pSnapShotScanInfo->dataReader = tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, 0, 0);*/
    /*pSnapShotScanInfo->scanMode = TABLE_SCAN__TABLE_ORDER;*/
    /*}*/

L
Liu Jicong 已提交
2869 2870
    pInfo->noTable = 0;

2871
    if (pInfo->lastStatus.uid != uid || pInfo->lastStatus.ts != ts) {
L
Liu Jicong 已提交
2872 2873 2874 2875 2876 2877 2878 2879 2880 2881 2882
      SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

      int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
      bool    found = false;
      for (int32_t i = 0; i < tableSz; i++) {
        STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
        if (pTableInfo->uid == uid) {
          found = true;
          pInfo->currentTable = i;
        }
      }
2883
      // TODO after processing drop, found can be false
L
Liu Jicong 已提交
2884
      ASSERT(found);
2885 2886

      tsdbSetTableId(pInfo->dataReader, uid);
H
Haojun Liao 已提交
2887 2888 2889 2890
      int64_t oldSkey = pInfo->cond.twindows.skey;
      pInfo->cond.twindows.skey = ts + 1;
      tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
      pInfo->cond.twindows.skey = oldSkey;
2891 2892
      pInfo->scanTimes = 0;

S
Shengliang Guan 已提交
2893
      qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, ts,
L
Liu Jicong 已提交
2894
             pInfo->currentTable, tableSz);
L
Liu Jicong 已提交
2895
    }
L
Liu Jicong 已提交
2896

L
Liu Jicong 已提交
2897
    return TSDB_CODE_SUCCESS;
2898

L
Liu Jicong 已提交
2899
  } else {
2900 2901 2902 2903 2904
    if (pOperator->numOfDownstream == 1) {
      return doPrepareScan(pOperator->pDownstream[0], uid, ts);
    } else if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator to set the input data block");
      return TSDB_CODE_QRY_APP_ERROR;
L
Liu Jicong 已提交
2905
    } else {
2906 2907
      qError("join not supported for stream block scan");
      return TSDB_CODE_QRY_APP_ERROR;
L
Liu Jicong 已提交
2908 2909 2910 2911
    }
  }
}

2912 2913 2914
int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) {
  int32_t type = pOperator->operatorType;
  if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
2915 2916
    SStreamScanInfo* pScanInfo = pOperator->info;
    STableScanInfo*  pSnapShotScanInfo = pScanInfo->pTableScanOp->info;
L
Liu Jicong 已提交
2917 2918
    *uid = pSnapShotScanInfo->lastStatus.uid;
    *ts = pSnapShotScanInfo->lastStatus.ts;
2919 2920 2921 2922 2923 2924 2925 2926 2927 2928
  } else {
    if (pOperator->pDownstream[0] == NULL) {
      return TSDB_CODE_INVALID_PARA;
    } else {
      doGetScanStatus(pOperator->pDownstream[0], uid, ts);
    }
  }

  return TSDB_CODE_SUCCESS;
}
L
Liu Jicong 已提交
2929
#endif
2930

2931
// this is a blocking operator
L
Liu Jicong 已提交
2932
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
2933 2934
  if (OPTR_IS_OPENED(pOperator)) {
    return TSDB_CODE_SUCCESS;
2935 2936
  }

H
Haojun Liao 已提交
2937
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
2938
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
2939

2940 2941
  SExprSupp*     pSup = &pOperator->exprSupp;
  SOperatorInfo* downstream = pOperator->pDownstream[0];
2942

2943 2944
  int64_t st = taosGetTimestampUs();

2945 2946 2947
  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;

H
Haojun Liao 已提交
2948
  while (1) {
2949
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
2950 2951 2952 2953
    if (pBlock == NULL) {
      break;
    }

2954 2955 2956 2957
    int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, code);
    }
2958

2959
    // there is an scalar expression that needs to be calculated before apply the group aggregation.
2960 2961 2962
    if (pAggInfo->scalarExprSup.pExprInfo != NULL) {
      SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
      code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
2963
      if (code != TSDB_CODE_SUCCESS) {
2964
        longjmp(pTaskInfo->env, code);
2965
      }
2966 2967
    }

2968
    // the pDataBlock are always the same one, no need to call this again
2969 2970 2971
    setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId, pAggInfo);
    setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, true);
    code = doAggregateImpl(pOperator, 0, pSup->pCtx);
2972 2973 2974
    if (code != 0) {
      longjmp(pTaskInfo->env, code);
    }
2975

dengyihao's avatar
dengyihao 已提交
2976
#if 0  // test for encode/decode result info
2977
    if(pOperator->fpSet.encodeResultRow){
2978 2979
      char *result = NULL;
      int32_t length = 0;
2980 2981
      pOperator->fpSet.encodeResultRow(pOperator, &result, &length);
      SAggSupporter* pSup = &pAggInfo->aggSup;
2982 2983
      taosHashClear(pSup->pResultRowHashTable);
      pInfo->resultRowInfo.size = 0;
2984
      pOperator->fpSet.decodeResultRow(pOperator, result);
2985 2986 2987
      if(result){
        taosMemoryFree(result);
      }
2988
    }
2989
#endif
2990 2991
  }

H
Haojun Liao 已提交
2992
  closeAllResultRows(&pAggInfo->binfo.resultRowInfo);
2993
  initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
H
Haojun Liao 已提交
2994
  OPTR_SET_OPENED(pOperator);
2995

2996
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
2997 2998 2999
  return TSDB_CODE_SUCCESS;
}

3000
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
3001
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
3002 3003 3004 3005 3006 3007
  SOptrBasicInfo*   pInfo = &pAggInfo->binfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

L
Liu Jicong 已提交
3008
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3009
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
H
Haojun Liao 已提交
3010
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
3011
    doSetOperatorCompleted(pOperator);
H
Haojun Liao 已提交
3012 3013 3014
    return NULL;
  }

H
Haojun Liao 已提交
3015
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
S
slzhou 已提交
3016 3017 3018 3019 3020 3021 3022 3023
  while (1) {
    doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
    doFilter(pAggInfo->pCondition, pInfo->pRes);

    if (!hasDataInGroupInfo(&pAggInfo->groupResInfo)) {
      doSetOperatorCompleted(pOperator);
      break;
    }
3024

S
slzhou 已提交
3025 3026 3027 3028
    if (pInfo->pRes->info.rows > 0) {
      break;
    }
  }
3029
  size_t rows = blockDataGetNumOfRows(pInfo->pRes);
3030 3031
  pOperator->resultInfo.totalRows += rows;

3032
  return (rows == 0) ? NULL : pInfo->pRes;
3033 3034
}

wmmhello's avatar
wmmhello 已提交
3035
int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length) {
3036
  if (result == NULL || length == NULL) {
wmmhello's avatar
wmmhello 已提交
3037 3038 3039
    return TSDB_CODE_TSC_INVALID_INPUT;
  }
  SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
3040 3041 3042 3043 3044
  SAggSupporter*  pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
  int32_t         size = taosHashGetSize(pSup->pResultRowHashTable);
  size_t          keyLen = sizeof(uint64_t) * 2;  // estimate the key length
  int32_t         totalSize =
      sizeof(int32_t) + sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize);
wmmhello's avatar
wmmhello 已提交
3045

C
Cary Xu 已提交
3046 3047 3048 3049 3050 3051
  // no result
  if (getTotalBufSize(pSup->pResultBuf) == 0) {
    *result = NULL;
    *length = 0;
    return TSDB_CODE_SUCCESS;
  }
3052

wmmhello's avatar
wmmhello 已提交
3053
  *result = (char*)taosMemoryCalloc(1, totalSize);
L
Liu Jicong 已提交
3054
  if (*result == NULL) {
wmmhello's avatar
wmmhello 已提交
3055
    return TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
3056
  }
wmmhello's avatar
wmmhello 已提交
3057

wmmhello's avatar
wmmhello 已提交
3058
  int32_t offset = sizeof(int32_t);
wmmhello's avatar
wmmhello 已提交
3059 3060
  *(int32_t*)(*result + offset) = size;
  offset += sizeof(int32_t);
3061 3062

  // prepare memory
3063
  SResultRowPosition* pos = &pInfo->resultRowInfo.cur;
dengyihao's avatar
dengyihao 已提交
3064 3065
  void*               pPage = getBufPage(pSup->pResultBuf, pos->pageId);
  SResultRow*         pRow = (SResultRow*)((char*)pPage + pos->offset);
3066 3067 3068
  setBufPageDirty(pPage, true);
  releaseBufPage(pSup->pResultBuf, pPage);

dengyihao's avatar
dengyihao 已提交
3069
  void* pIter = taosHashIterate(pSup->pResultRowHashTable, NULL);
wmmhello's avatar
wmmhello 已提交
3070
  while (pIter) {
dengyihao's avatar
dengyihao 已提交
3071
    void*               key = taosHashGetKey(pIter, &keyLen);
3072
    SResultRowPosition* p1 = (SResultRowPosition*)pIter;
3073

dengyihao's avatar
dengyihao 已提交
3074
    pPage = (SFilePage*)getBufPage(pSup->pResultBuf, p1->pageId);
3075
    pRow = (SResultRow*)((char*)pPage + p1->offset);
3076 3077
    setBufPageDirty(pPage, true);
    releaseBufPage(pSup->pResultBuf, pPage);
wmmhello's avatar
wmmhello 已提交
3078 3079 3080

    // recalculate the result size
    int32_t realTotalSize = offset + sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize;
L
Liu Jicong 已提交
3081
    if (realTotalSize > totalSize) {
wmmhello's avatar
wmmhello 已提交
3082
      char* tmp = (char*)taosMemoryRealloc(*result, realTotalSize);
L
Liu Jicong 已提交
3083
      if (tmp == NULL) {
wafwerar's avatar
wafwerar 已提交
3084
        taosMemoryFree(*result);
wmmhello's avatar
wmmhello 已提交
3085
        *result = NULL;
wmmhello's avatar
wmmhello 已提交
3086
        return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
3087
      } else {
wmmhello's avatar
wmmhello 已提交
3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099
        *result = tmp;
      }
    }
    // save key
    *(int32_t*)(*result + offset) = keyLen;
    offset += sizeof(int32_t);
    memcpy(*result + offset, key, keyLen);
    offset += keyLen;

    // save value
    *(int32_t*)(*result + offset) = pSup->resultRowSize;
    offset += sizeof(int32_t);
3100
    memcpy(*result + offset, pRow, pSup->resultRowSize);
wmmhello's avatar
wmmhello 已提交
3101 3102 3103 3104 3105
    offset += pSup->resultRowSize;

    pIter = taosHashIterate(pSup->pResultRowHashTable, pIter);
  }

wmmhello's avatar
wmmhello 已提交
3106 3107 3108 3109
  *(int32_t*)(*result) = offset;
  *length = offset;

  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
3110 3111
}

3112
int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) {
3113
  if (result == NULL) {
wmmhello's avatar
wmmhello 已提交
3114
    return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
3115
  }
wmmhello's avatar
wmmhello 已提交
3116
  SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
3117
  SAggSupporter*  pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
wmmhello's avatar
wmmhello 已提交
3118 3119

  //  int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
3120
  int32_t length = *(int32_t*)(result);
wmmhello's avatar
wmmhello 已提交
3121
  int32_t offset = sizeof(int32_t);
3122 3123 3124 3125

  int32_t count = *(int32_t*)(result + offset);
  offset += sizeof(int32_t);

L
Liu Jicong 已提交
3126
  while (count-- > 0 && length > offset) {
wmmhello's avatar
wmmhello 已提交
3127 3128 3129
    int32_t keyLen = *(int32_t*)(result + offset);
    offset += sizeof(int32_t);

L
Liu Jicong 已提交
3130
    uint64_t    tableGroupId = *(uint64_t*)(result + offset);
3131
    SResultRow* resultRow = getNewResultRow(pSup->pResultBuf, tableGroupId, pSup->resultRowSize);
L
Liu Jicong 已提交
3132
    if (!resultRow) {
wmmhello's avatar
wmmhello 已提交
3133
      return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
3134
    }
3135

wmmhello's avatar
wmmhello 已提交
3136
    // add a new result set for a new group
3137 3138
    SResultRowPosition pos = {.pageId = resultRow->pageId, .offset = resultRow->offset};
    taosHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &pos, sizeof(SResultRowPosition));
wmmhello's avatar
wmmhello 已提交
3139 3140 3141

    offset += keyLen;
    int32_t valueLen = *(int32_t*)(result + offset);
L
Liu Jicong 已提交
3142
    if (valueLen != pSup->resultRowSize) {
wmmhello's avatar
wmmhello 已提交
3143
      return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
3144 3145 3146 3147 3148 3149 3150 3151 3152 3153
    }
    offset += sizeof(int32_t);
    int32_t pageId = resultRow->pageId;
    int32_t pOffset = resultRow->offset;
    memcpy(resultRow, result + offset, valueLen);
    resultRow->pageId = pageId;
    resultRow->offset = pOffset;
    offset += valueLen;

    initResultRow(resultRow);
dengyihao's avatar
dengyihao 已提交
3154
    pInfo->resultRowInfo.cur = (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset};
wmmhello's avatar
wmmhello 已提交
3155 3156
  }

L
Liu Jicong 已提交
3157
  if (offset != length) {
wmmhello's avatar
wmmhello 已提交
3158
    return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
3159
  }
wmmhello's avatar
wmmhello 已提交
3160
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
3161 3162
}

3163 3164
enum {
  PROJECT_RETRIEVE_CONTINUE = 0x1,
L
Liu Jicong 已提交
3165
  PROJECT_RETRIEVE_DONE = 0x2,
3166 3167 3168 3169 3170
};

static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
  SProjectOperatorInfo* pProjectInfo = pOperator->info;
  SOptrBasicInfo*       pInfo = &pProjectInfo->binfo;
L
Liu Jicong 已提交
3171
  SSDataBlock*          pRes = pInfo->pRes;
3172 3173 3174 3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197 3198 3199 3200 3201 3202 3203 3204 3205 3206 3207 3208 3209 3210 3211 3212 3213 3214 3215 3216 3217 3218 3219

  if (pProjectInfo->curSOffset > 0) {
    if (pProjectInfo->groupId == 0) {  // it is the first group
      pProjectInfo->groupId = pBlock->info.groupId;
      blockDataCleanup(pInfo->pRes);
      return PROJECT_RETRIEVE_CONTINUE;
    } else if (pProjectInfo->groupId != pBlock->info.groupId) {
      pProjectInfo->curSOffset -= 1;

      // ignore data block in current group
      if (pProjectInfo->curSOffset > 0) {
        blockDataCleanup(pInfo->pRes);
        return PROJECT_RETRIEVE_CONTINUE;
      }
    }

    // set current group id of the project operator
    pProjectInfo->groupId = pBlock->info.groupId;
  }

  if (pProjectInfo->groupId != 0 && pProjectInfo->groupId != pBlock->info.groupId) {
    pProjectInfo->curGroupOutput += 1;
    if ((pProjectInfo->slimit.limit > 0) && (pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput)) {
      pOperator->status = OP_EXEC_DONE;
      blockDataCleanup(pRes);

      return PROJECT_RETRIEVE_DONE;
    }

    // reset the value for a new group data
    pProjectInfo->curOffset = 0;
    pProjectInfo->curOutput = 0;
  }

  // here we reach the start position, according to the limit/offset requirements.

  // set current group id
  pProjectInfo->groupId = pBlock->info.groupId;

  if (pProjectInfo->curOffset >= pRes->info.rows) {
    pProjectInfo->curOffset -= pRes->info.rows;
    blockDataCleanup(pRes);
    return PROJECT_RETRIEVE_CONTINUE;
  } else if (pProjectInfo->curOffset < pRes->info.rows && pProjectInfo->curOffset > 0) {
    blockDataTrimFirstNRows(pRes, pProjectInfo->curOffset);
    pProjectInfo->curOffset = 0;
  }

3220
  // check for the limitation in each group
wmmhello's avatar
wmmhello 已提交
3221 3222 3223
  if (pProjectInfo->limit.limit >= 0 && pProjectInfo->curOutput + pRes->info.rows >= pProjectInfo->limit.limit) {
    int32_t keepRows = (int32_t)(pProjectInfo->limit.limit - pProjectInfo->curOutput);
    blockDataKeepFirstNRows(pRes, keepRows);
3224
    if (pProjectInfo->slimit.limit > 0 && pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput) {
3225 3226 3227
      pOperator->status = OP_EXEC_DONE;
    }

3228
    return PROJECT_RETRIEVE_DONE;
3229
  }
3230

3231
  // todo optimize performance
3232 3233
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
  // they may not belong to the same group the limit/offset value is not valid in this case.
L
Liu Jicong 已提交
3234 3235
  if (pRes->info.rows >= pOperator->resultInfo.threshold || pProjectInfo->slimit.offset != -1 ||
      pProjectInfo->slimit.limit != -1) {
3236
    return PROJECT_RETRIEVE_DONE;
L
Liu Jicong 已提交
3237
  } else {  // not full enough, continue to accumulate the output data in the buffer.
3238 3239 3240 3241
    return PROJECT_RETRIEVE_CONTINUE;
  }
}

3242
static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
3243
  SProjectOperatorInfo* pProjectInfo = pOperator->info;
L
Liu Jicong 已提交
3244
  SOptrBasicInfo*       pInfo = &pProjectInfo->binfo;
3245

L
Liu Jicong 已提交
3246
  SExprSupp*   pSup = &pOperator->exprSupp;
3247
  SSDataBlock* pRes = pInfo->pRes;
3248
  blockDataCleanup(pRes);
3249

3250
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3251 3252 3253
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
3254

H
Haojun Liao 已提交
3255
#if 0
3256 3257 3258 3259 3260
  if (pProjectInfo->existDataBlock) {  // TODO refactor
    SSDataBlock* pBlock = pProjectInfo->existDataBlock;
    pProjectInfo->existDataBlock = NULL;

    // the pDataBlock are always the same one, no need to call this again
H
Haojun Liao 已提交
3261
    setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC);
3262

H
Haojun Liao 已提交
3263
    blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
3264
    projectApplyFunctions(pOperator->exprSupp.pExprInfo, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->exprSupp.numOfExprs);
L
Liu Jicong 已提交
3265
    if (pRes->info.rows >= pProjectInfo->binfo.capacity * 0.8) {
3266 3267
      copyTsColoum(pRes, pInfo->pCtx, pOperator->exprSupp.numOfExprs);
      resetResultRowEntryResult(pInfo->pCtx, pOperator->exprSupp.numOfExprs);
3268 3269 3270
      return pRes;
    }
  }
H
Haojun Liao 已提交
3271
#endif
3272

3273
  int64_t st = 0;
3274 3275 3276
  int32_t order = 0;
  int32_t scanFlag = 0;

3277 3278 3279 3280
  if (pOperator->cost.openCost == 0) {
    st = taosGetTimestampUs();
  }

H
Haojun Liao 已提交
3281 3282
  SOperatorInfo* downstream = pOperator->pDownstream[0];

L
Liu Jicong 已提交
3283
  while (1) {
H
Haojun Liao 已提交
3284
    // The downstream exec may change the value of the newgroup, so use a local variable instead.
3285
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
3286
    if (pBlock == NULL) {
L
Liu Jicong 已提交
3287 3288
      // TODO optimize
      /*if (pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM) {*/
3289
      doSetOperatorCompleted(pOperator);
L
Liu Jicong 已提交
3290
      /*}*/
3291 3292
      break;
    }
3293 3294 3295 3296
    if (pBlock->info.type == STREAM_RETRIEVE) {
      // for stream interval
      return pBlock;
    }
3297 3298

    // the pDataBlock are always the same one, no need to call this again
3299
    int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag);
3300 3301 3302
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, code);
    }
3303

3304
    setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
3305 3306
    blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);

3307
    code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
X
Xiaoyu Wang 已提交
3308
                                 pProjectInfo->pPseudoColInfo);
3309 3310
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, code);
3311 3312
    }

3313
    int32_t status = handleLimitOffset(pOperator, pBlock);
3314 3315 3316 3317

    // filter shall be applied after apply functions and limit/offset on the result
    doFilter(pProjectInfo->pFilterNode, pInfo->pRes);

3318
    if (status == PROJECT_RETRIEVE_CONTINUE) {
H
Haojun Liao 已提交
3319
      continue;
L
Liu Jicong 已提交
3320
    } else if (status == PROJECT_RETRIEVE_DONE) {
3321 3322 3323
      break;
    }
  }
dengyihao's avatar
dengyihao 已提交
3324

H
Haojun Liao 已提交
3325
  pProjectInfo->curOutput += pInfo->pRes->info.rows;
H
Haojun Liao 已提交
3326

3327 3328 3329 3330
  size_t rows = pInfo->pRes->info.rows;
  pOperator->resultInfo.totalRows += rows;

  if (pOperator->cost.openCost == 0) {
3331
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
3332 3333
  }

3334
  return (rows > 0) ? pInfo->pRes : NULL;
3335 3336
}

L
Liu Jicong 已提交
3337 3338
static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, bool* newgroup,
                                               SExecTaskInfo* pTaskInfo) {
3339
  pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
H
Haojun Liao 已提交
3340

L
Liu Jicong 已提交
3341 3342
  int64_t ekey =
      Q_STATUS_EQUAL(pTaskInfo->status, TASK_COMPLETED) ? pInfo->win.ekey : pInfo->existNewGroupBlock->info.window.ekey;
3343 3344
  taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));

3345
  taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
3346 3347
  taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);

3348
  doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pResultInfo->capacity);
3349 3350 3351 3352
  pInfo->existNewGroupBlock = NULL;
  *newgroup = true;
}

L
Liu Jicong 已提交
3353 3354
static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, bool* newgroup,
                                            SExecTaskInfo* pTaskInfo) {
3355 3356
  if (taosFillHasMoreResults(pInfo->pFillInfo)) {
    *newgroup = false;
3357
    doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pResultInfo->capacity);
H
Haojun Liao 已提交
3358
    if (pInfo->pRes->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult)) {
3359 3360 3361 3362 3363 3364
      return;
    }
  }

  // handle the cached new group data block
  if (pInfo->existNewGroupBlock) {
3365
    doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, newgroup, pTaskInfo);
3366 3367 3368
  }
}

S
slzhou 已提交
3369
static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
3370 3371
  SFillOperatorInfo* pInfo = pOperator->info;
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
3372

H
Haojun Liao 已提交
3373
  SResultInfo* pResultInfo = &pOperator->resultInfo;
3374 3375 3376
  SSDataBlock* pResBlock = pInfo->pRes;

  blockDataCleanup(pResBlock);
3377

3378
  // todo handle different group data interpolation
X
Xiaoyu Wang 已提交
3379 3380
  bool  n = false;
  bool* newgroup = &n;
3381
  doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo);
3382 3383
  if (pResBlock->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult && pResBlock->info.rows > 0)) {
    return pResBlock;
H
Haojun Liao 已提交
3384
  }
3385

H
Haojun Liao 已提交
3386
  SOperatorInfo* pDownstream = pOperator->pDownstream[0];
L
Liu Jicong 已提交
3387
  while (1) {
3388
    SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream);
3389 3390 3391 3392
    if (*newgroup) {
      assert(pBlock != NULL);
    }

3393 3394
    blockDataUpdateTsWindow(pBlock, pInfo->primaryTsCol);

3395 3396 3397 3398 3399 3400
    if (*newgroup && pInfo->totalInputRows > 0) {  // there are already processed current group data block
      pInfo->existNewGroupBlock = pBlock;
      *newgroup = false;

      // Fill the previous group data block, before handle the data block of new group.
      // Close the fill operation for previous group data block
3401
      taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
3402 3403 3404 3405 3406 3407 3408
    } else {
      if (pBlock == NULL) {
        if (pInfo->totalInputRows == 0) {
          pOperator->status = OP_EXEC_DONE;
          return NULL;
        }

3409
        taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
3410 3411 3412 3413 3414 3415 3416
      } else {
        pInfo->totalInputRows += pBlock->info.rows;
        taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey);
        taosFillSetInputDataBlock(pInfo->pFillInfo, pBlock);
      }
    }

3417 3418
    blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
    doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pResBlock, pOperator->resultInfo.capacity);
3419 3420

    // current group has no more result to return
3421
    if (pResBlock->info.rows > 0) {
3422 3423
      // 1. The result in current group not reach the threshold of output result, continue
      // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
3424 3425
      if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || (!pInfo->multigroupResult)) {
        return pResBlock;
3426 3427
      }

3428
      doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo);
3429 3430
      if (pResBlock->info.rows > pOperator->resultInfo.threshold || pBlock == NULL) {
        return pResBlock;
3431 3432 3433
      }
    } else if (pInfo->existNewGroupBlock) {  // try next group
      assert(pBlock != NULL);
3434
      doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, newgroup, pTaskInfo);
3435 3436
      if (pResBlock->info.rows > pResultInfo->threshold) {
        return pResBlock;
3437 3438 3439 3440 3441 3442 3443
      }
    } else {
      return NULL;
    }
  }
}

S
slzhou 已提交
3444 3445 3446 3447 3448 3449 3450 3451
static SSDataBlock* doFill(SOperatorInfo* pOperator) {
  SFillOperatorInfo* pInfo = pOperator->info;
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

S
slzhou 已提交
3452
  SSDataBlock* fillResult = NULL;
S
slzhou 已提交
3453
  while (true) {
S
slzhou 已提交
3454
    fillResult = doFillImpl(pOperator);
S
slzhou 已提交
3455 3456 3457 3458 3459 3460 3461 3462 3463 3464 3465 3466 3467 3468
    if (fillResult != NULL) {
      doFilter(pInfo->pCondition, fillResult);
    }

    if (fillResult == NULL) {
      doSetOperatorCompleted(pOperator);
      break;
    }

    if (fillResult->info.rows > 0) {
      break;
    }
  }

S
slzhou 已提交
3469 3470 3471 3472
  if (fillResult != NULL) {
    size_t rows = fillResult->info.rows;
    pOperator->resultInfo.totalRows += rows;
  }
S
slzhou 已提交
3473

S
slzhou 已提交
3474
  return fillResult;
S
slzhou 已提交
3475 3476
}

H
Haojun Liao 已提交
3477 3478 3479 3480 3481 3482 3483 3484 3485 3486 3487
static void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
  for (int32_t i = 0; i < numOfExprs; ++i) {
    SExprInfo* pExprInfo = &pExpr[i];
    if (pExprInfo->pExpr->nodeType == QUERY_NODE_COLUMN) {
      taosMemoryFree(pExprInfo->base.pParam[0].pCol);
    }
    taosMemoryFree(pExprInfo->base.pParam);
    taosMemoryFree(pExprInfo->pExpr);
  }
}

3488 3489 3490 3491 3492
static void destroyOperatorInfo(SOperatorInfo* pOperator) {
  if (pOperator == NULL) {
    return;
  }

3493
  if (pOperator->fpSet.closeFn != NULL) {
3494
    pOperator->fpSet.closeFn(pOperator->info, pOperator->exprSupp.numOfExprs);
3495 3496
  }

H
Haojun Liao 已提交
3497
  if (pOperator->pDownstream != NULL) {
L
Liu Jicong 已提交
3498
    for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
H
Haojun Liao 已提交
3499
      destroyOperatorInfo(pOperator->pDownstream[i]);
3500 3501
    }

wafwerar's avatar
wafwerar 已提交
3502
    taosMemoryFreeClear(pOperator->pDownstream);
H
Haojun Liao 已提交
3503
    pOperator->numOfDownstream = 0;
3504 3505
  }

3506 3507
  if (pOperator->exprSupp.pExprInfo != NULL) {
    destroyExprInfo(pOperator->exprSupp.pExprInfo, pOperator->exprSupp.numOfExprs);
H
Haojun Liao 已提交
3508 3509
  }

3510
  taosMemoryFreeClear(pOperator->exprSupp.pExprInfo);
wafwerar's avatar
wafwerar 已提交
3511
  taosMemoryFreeClear(pOperator);
3512 3513
}

3514 3515 3516 3517 3518 3519 3520 3521 3522 3523 3524 3525 3526 3527 3528
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) {
  *defaultPgsz = 4096;
  while (*defaultPgsz < rowSize * 4) {
    *defaultPgsz <<= 1u;
  }

  // at least four pages need to be in buffer
  *defaultBufsz = 4096 * 256;
  if ((*defaultBufsz) <= (*defaultPgsz)) {
    (*defaultBufsz) = (*defaultPgsz) * 4;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
3529 3530
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                         const char* pKey) {
3531 3532
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);

dengyihao's avatar
dengyihao 已提交
3533 3534
  pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
  pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
3535 3536
  pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);

H
Haojun Liao 已提交
3537
  if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
3538 3539 3540
    return TSDB_CODE_OUT_OF_MEMORY;
  }

dengyihao's avatar
dengyihao 已提交
3541
  uint32_t defaultPgsz = 0;
3542 3543
  uint32_t defaultBufsz = 0;
  getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
H
Haojun Liao 已提交
3544

3545
  int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, TD_TMP_DIR_PATH);
H
Haojun Liao 已提交
3546 3547 3548 3549
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

3550 3551 3552
  return TSDB_CODE_SUCCESS;
}

3553
void cleanupAggSup(SAggSupporter* pAggSup) {
wafwerar's avatar
wafwerar 已提交
3554
  taosMemoryFreeClear(pAggSup->keyBuf);
3555
  taosHashCleanup(pAggSup->pResultRowHashTable);
H
Haojun Liao 已提交
3556
  destroyDiskbasedBuf(pAggSup->pResultBuf);
3557 3558
}

L
Liu Jicong 已提交
3559 3560
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
                    const char* pkey) {
3561 3562 3563 3564 3565
  int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

3566
  doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
L
Liu Jicong 已提交
3567
  for (int32_t i = 0; i < numOfCols; ++i) {
3568
    pSup->pCtx[i].pBuf = pAggSup->pResultBuf;
3569 3570
  }

3571
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
3572 3573
}

3574
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) {
wmmhello's avatar
wmmhello 已提交
3575
  ASSERT(numOfRows != 0);
3576 3577 3578 3579
  pOperator->resultInfo.capacity = numOfRows;
  pOperator->resultInfo.threshold = numOfRows * 0.75;

  if (pOperator->resultInfo.threshold == 0) {
wmmhello's avatar
wmmhello 已提交
3580
    pOperator->resultInfo.threshold = numOfRows;
3581 3582 3583
  }
}

3584 3585 3586 3587 3588
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
  pInfo->pRes = pBlock;
  initResultRowInfo(&pInfo->resultRowInfo);
}

3589
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) {
3590 3591 3592 3593
  pSup->pExprInfo = pExprInfo;
  pSup->numOfExprs = numOfExpr;
  if (pSup->pExprInfo != NULL) {
    pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset);
3594 3595 3596
    if (pSup->pCtx == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
3597
  }
3598 3599

  return TSDB_CODE_SUCCESS;
3600 3601
}

L
Liu Jicong 已提交
3602
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
S
slzhou 已提交
3603
                                           SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo,
wmmhello's avatar
wmmhello 已提交
3604
                                           int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) {
wafwerar's avatar
wafwerar 已提交
3605
  SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
L
Liu Jicong 已提交
3606
  SOperatorInfo*    pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
3607 3608 3609
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
H
Haojun Liao 已提交
3610

3611
  int32_t numOfRows = 1024;
dengyihao's avatar
dengyihao 已提交
3612
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
3613 3614

  initResultSizeInfo(pOperator, numOfRows);
3615
  int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
L
Liu Jicong 已提交
3616
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3617 3618
    goto _error;
  }
H
Haojun Liao 已提交
3619

3620
  initBasicInfo(&pInfo->binfo, pResultBlock);
3621 3622 3623 3624
  code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3625

L
Liu Jicong 已提交
3626
  pInfo->groupId = INT32_MIN;
S
slzhou 已提交
3627
  pInfo->pCondition = pCondition;
dengyihao's avatar
dengyihao 已提交
3628
  pOperator->name = "TableAggregate";
3629
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
3630
  pOperator->blocking = true;
dengyihao's avatar
dengyihao 已提交
3631 3632 3633
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
3634

3635 3636
  pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo,
                                         aggEncodeResultRow, aggDecodeResultRow, NULL);
H
Haojun Liao 已提交
3637 3638 3639 3640 3641

  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3642 3643

  return pOperator;
L
Liu Jicong 已提交
3644
_error:
H
Haojun Liao 已提交
3645
  destroyAggOperatorInfo(pInfo, numOfCols);
wafwerar's avatar
wafwerar 已提交
3646 3647
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
H
Haojun Liao 已提交
3648 3649
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
3650 3651
}

3652 3653 3654 3655 3656 3657 3658 3659 3660 3661 3662 3663 3664 3665 3666 3667 3668 3669 3670
static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
  if (pCtx == NULL) {
    return NULL;
  }

  for (int32_t i = 0; i < numOfOutput; ++i) {
    for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) {
      taosVariantDestroy(&pCtx[i].param[j].param);
    }

    taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
    taosMemoryFree(pCtx[i].input.pData);
    taosMemoryFree(pCtx[i].input.pColumnDataAgg);
  }

  taosMemoryFreeClear(pCtx);
  return NULL;
}

3671
void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
3672 3673
  assert(pInfo != NULL);
  cleanupResultRowInfo(&pInfo->resultRowInfo);
H
Haojun Liao 已提交
3674
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
3675 3676
}

H
Haojun Liao 已提交
3677
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
3678
  SOptrBasicInfo* pInfo = (SOptrBasicInfo*)param;
3679
  cleanupBasicInfo(pInfo);
L
Liu Jicong 已提交
3680

D
dapan1121 已提交
3681
  taosMemoryFreeClear(param);
3682
}
H
Haojun Liao 已提交
3683 3684

void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
3685
  SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
L
Liu Jicong 已提交
3686 3687
  cleanupBasicInfo(&pInfo->binfo);

D
dapan1121 已提交
3688
  taosMemoryFreeClear(param);
3689
}
3690

H
Haojun Liao 已提交
3691
void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
3692
  SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param;
3693
  pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
H
Haojun Liao 已提交
3694
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
wafwerar's avatar
wafwerar 已提交
3695
  taosMemoryFreeClear(pInfo->p);
L
Liu Jicong 已提交
3696

D
dapan1121 已提交
3697
  taosMemoryFreeClear(param);
3698 3699
}

H
Haojun Liao 已提交
3700
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
D
fix bug  
dapan 已提交
3701 3702 3703
  if (NULL == param) {
    return;
  }
L
Liu Jicong 已提交
3704
  SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*)param;
3705
  cleanupBasicInfo(&pInfo->binfo);
H
Haojun Liao 已提交
3706
  cleanupAggSup(&pInfo->aggSup);
H
Haojun Liao 已提交
3707
  taosArrayDestroy(pInfo->pPseudoColInfo);
L
Liu Jicong 已提交
3708

D
dapan1121 已提交
3709
  taosMemoryFreeClear(param);
3710 3711
}

3712
void cleanupExprSupp(SExprSupp* pSupp) {
3713 3714 3715 3716 3717 3718
  destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs);
  destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);

  taosMemoryFree(pSupp->rowEntryInfoOffset);
}

H
Haojun Liao 已提交
3719
static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) {
3720
  SIndefOperatorInfo* pInfo = (SIndefOperatorInfo*)param;
3721
  cleanupBasicInfo(&pInfo->binfo);
H
Haojun Liao 已提交
3722 3723 3724

  taosArrayDestroy(pInfo->pPseudoColInfo);
  cleanupAggSup(&pInfo->aggSup);
3725
  cleanupExprSupp(&pInfo->scalarSup);
L
Liu Jicong 已提交
3726

D
dapan1121 已提交
3727
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
3728 3729
}

H
Haojun Liao 已提交
3730
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
3731
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
3732 3733 3734 3735
  taosRemoveRef(exchangeObjRefPool, pExInfo->self);
}

void doDestroyExchangeOperatorInfo(void* param) {
X
Xiaoyu Wang 已提交
3736
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
3737

H
Haojun Liao 已提交
3738 3739 3740 3741 3742 3743 3744
  taosArrayDestroy(pExInfo->pSources);
  taosArrayDestroy(pExInfo->pSourceDataInfo);
  if (pExInfo->pResult != NULL) {
    blockDataDestroy(pExInfo->pResult);
  }

  tsem_destroy(&pExInfo->ready);
L
Liu Jicong 已提交
3745

D
dapan1121 已提交
3746
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
3747 3748
}

H
Haojun Liao 已提交
3749 3750
static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols) {
  SArray* pList = taosArrayInit(4, sizeof(int32_t));
dengyihao's avatar
dengyihao 已提交
3751
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
3752 3753 3754 3755 3756 3757 3758 3759
    if (fmIsPseudoColumnFunc(pCtx[i].functionId)) {
      taosArrayPush(pList, &i);
    }
  }

  return pList;
}

3760 3761 3762 3763
static int64_t getLimit(SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; }

static int64_t getOffset(SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; }

L
Liu Jicong 已提交
3764
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode,
dengyihao's avatar
dengyihao 已提交
3765
                                         SExecTaskInfo* pTaskInfo) {
wafwerar's avatar
wafwerar 已提交
3766
  SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
L
Liu Jicong 已提交
3767
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
3768 3769 3770
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
3771

L
Liu Jicong 已提交
3772
  int32_t    numOfCols = 0;
3773 3774 3775
  SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &numOfCols);

  SSDataBlock* pResBlock = createResDataBlock(pProjPhyNode->node.pOutputDataBlockDesc);
3776 3777
  SLimit       limit = {.limit = getLimit(pProjPhyNode->node.pLimit), .offset = getOffset(pProjPhyNode->node.pLimit)};
  SLimit slimit = {.limit = getLimit(pProjPhyNode->node.pSlimit), .offset = getOffset(pProjPhyNode->node.pSlimit)};
3778

L
Liu Jicong 已提交
3779 3780 3781 3782
  pInfo->limit = limit;
  pInfo->slimit = slimit;
  pInfo->curOffset = limit.offset;
  pInfo->curSOffset = slimit.offset;
H
Haojun Liao 已提交
3783
  pInfo->binfo.pRes = pResBlock;
3784
  pInfo->pFilterNode = pProjPhyNode->node.pConditions;
H
Haojun Liao 已提交
3785 3786

  int32_t numOfRows = 4096;
dengyihao's avatar
dengyihao 已提交
3787
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
3788

3789 3790 3791 3792 3793
  // Make sure the size of SSDataBlock will never exceed the size of 2MB.
  int32_t TWOMB = 2 * 1024 * 1024;
  if (numOfRows * pResBlock->info.rowSize > TWOMB) {
    numOfRows = TWOMB / pResBlock->info.rowSize;
  }
3794
  initResultSizeInfo(pOperator, numOfRows);
3795

3796 3797
  initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
  initBasicInfo(&pInfo->binfo, pResBlock);
3798
  setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols);
3799

3800
  pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols);
X
Xiaoyu Wang 已提交
3801
  pOperator->name = "ProjectOperator";
H
Haojun Liao 已提交
3802
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
X
Xiaoyu Wang 已提交
3803 3804 3805 3806
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
3807

L
Liu Jicong 已提交
3808 3809
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL,
                                         destroyProjectOperatorInfo, NULL, NULL, NULL);
L
Liu Jicong 已提交
3810

3811
  int32_t code = appendDownstream(pOperator, &downstream, 1);
H
Haojun Liao 已提交
3812
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3813 3814
    goto _error;
  }
3815 3816

  return pOperator;
H
Haojun Liao 已提交
3817

L
Liu Jicong 已提交
3818
_error:
H
Haojun Liao 已提交
3819 3820
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
3821 3822
}

3823 3824 3825 3826 3827 3828 3829 3830 3831 3832 3833 3834 3835 3836 3837 3838 3839 3840 3841 3842 3843 3844 3845 3846 3847 3848 3849 3850 3851 3852 3853 3854 3855 3856
static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOperatorInfo* downstream, SExecTaskInfo* pTaskInfo) {
  int32_t order = 0;
  int32_t scanFlag = 0;

  SIndefOperatorInfo* pIndefInfo = pOperator->info;
  SOptrBasicInfo*     pInfo = &pIndefInfo->binfo;
  SExprSupp*          pSup = &pOperator->exprSupp;

  // the pDataBlock are always the same one, no need to call this again
  int32_t code = getTableScanInfo(downstream, &order, &scanFlag);
  if (code != TSDB_CODE_SUCCESS) {
    longjmp(pTaskInfo->env, code);
  }

  // there is an scalar expression that needs to be calculated before apply the group aggregation.
  SExprSupp* pScalarSup = &pIndefInfo->scalarSup;
  if (pScalarSup->pExprInfo != NULL) {
    code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
                                 pIndefInfo->pPseudoColInfo);
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, code);
    }
  }

  setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
  blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);

  code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
                               pIndefInfo->pPseudoColInfo);
  if (code != TSDB_CODE_SUCCESS) {
    longjmp(pTaskInfo->env, code);
  }
}

H
Haojun Liao 已提交
3857 3858
static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
  SIndefOperatorInfo* pIndefInfo = pOperator->info;
3859
  SOptrBasicInfo*     pInfo = &pIndefInfo->binfo;
L
Liu Jicong 已提交
3860
  SExprSupp*          pSup = &pOperator->exprSupp;
H
Haojun Liao 已提交
3861 3862 3863 3864 3865 3866 3867 3868 3869 3870 3871 3872 3873 3874 3875 3876 3877

  SSDataBlock* pRes = pInfo->pRes;
  blockDataCleanup(pRes);

  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  int64_t st = 0;

  if (pOperator->cost.openCost == 0) {
    st = taosGetTimestampUs();
  }

  SOperatorInfo* downstream = pOperator->pDownstream[0];

3878 3879 3880 3881 3882
  while(1) {
    // here we need to handle the existsed group results
    if (pIndefInfo->pNextGroupRes != NULL) { // todo extract method
      for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
        SqlFunctionCtx* pCtx = &pSup->pCtx[k];
H
Haojun Liao 已提交
3883

3884 3885 3886 3887 3888 3889 3890
        SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
        pResInfo->initialized = false;
        pCtx->pOutput = NULL;
      }

      doHandleDataBlock(pOperator, pIndefInfo->pNextGroupRes, downstream, pTaskInfo);
      pIndefInfo->pNextGroupRes = NULL;
H
Haojun Liao 已提交
3891 3892
    }

3893 3894 3895 3896 3897 3898 3899 3900 3901 3902 3903 3904 3905 3906 3907 3908 3909 3910 3911 3912 3913 3914 3915
    if (pInfo->pRes->info.rows < pOperator->resultInfo.threshold) {
      while (1) {
        // The downstream exec may change the value of the newgroup, so use a local variable instead.
        SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
        if (pBlock == NULL) {
          doSetOperatorCompleted(pOperator);
          break;
        }

        if (pIndefInfo->groupId == 0 && pBlock->info.groupId != 0) {
          pIndefInfo->groupId = pBlock->info.groupId;  // this is the initial group result
        } else {
          if (pIndefInfo->groupId != pBlock->info.groupId) {  // reset output buffer and computing status
            pIndefInfo->groupId = pBlock->info.groupId;
            pIndefInfo->pNextGroupRes = pBlock;
            break;
          }
        }

        doHandleDataBlock(pOperator, pBlock, downstream, pTaskInfo);
        if (pInfo->pRes->info.rows >= pOperator->resultInfo.threshold) {
          break;
        }
H
Haojun Liao 已提交
3916 3917 3918
      }
    }

3919 3920 3921 3922
    doFilter(pIndefInfo->pCondition, pInfo->pRes);
    size_t rows = pInfo->pRes->info.rows;
    if (rows >= 0) {
      break;
H
Haojun Liao 已提交
3923 3924 3925 3926 3927 3928 3929 3930 3931 3932 3933 3934 3935
    }
  }

  size_t rows = pInfo->pRes->info.rows;
  pOperator->resultInfo.totalRows += rows;

  if (pOperator->cost.openCost == 0) {
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
  }

  return (rows > 0) ? pInfo->pRes : NULL;
}

3936 3937
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode,
                                                 SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
3938
  SIndefOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIndefOperatorInfo));
3939
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
3940 3941 3942 3943
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

3944 3945
  SExprSupp* pSup = &pOperator->exprSupp;

H
Haojun Liao 已提交
3946 3947 3948
  SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode;

  int32_t    numOfExpr = 0;
X
Xiaoyu Wang 已提交
3949
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->pFuncs, NULL, &numOfExpr);
H
Haojun Liao 已提交
3950 3951

  if (pPhyNode->pExprs != NULL) {
3952
    int32_t    num = 0;
3953
    SExprInfo* pSExpr = createExprInfo(pPhyNode->pExprs, NULL, &num);
3954
    int32_t    code = initExprSupp(&pInfo->scalarSup, pSExpr, num);
3955 3956 3957
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
H
Haojun Liao 已提交
3958 3959
  }

3960
  SSDataBlock* pResBlock = createResDataBlock(pPhyNode->node.pOutputDataBlockDesc);
H
Haojun Liao 已提交
3961 3962 3963 3964 3965 3966 3967 3968 3969

  int32_t numOfRows = 4096;
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;

  // Make sure the size of SSDataBlock will never exceed the size of 2MB.
  int32_t TWOMB = 2 * 1024 * 1024;
  if (numOfRows * pResBlock->info.rowSize > TWOMB) {
    numOfRows = TWOMB / pResBlock->info.rowSize;
  }
3970

H
Haojun Liao 已提交
3971 3972
  initResultSizeInfo(pOperator, numOfRows);

3973
  initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str);
3974 3975
  initBasicInfo(&pInfo->binfo, pResBlock);

3976
  setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
H
Haojun Liao 已提交
3977

3978 3979 3980
  pInfo->binfo.pRes    = pResBlock;
  pInfo->pCondition    = pPhyNode->node.pConditions;
  pInfo->pPseudoColInfo= setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
H
Haojun Liao 已提交
3981

3982 3983 3984 3985 3986
  pOperator->name      = "IndefinitOperator";
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC;
  pOperator->blocking  = false;
  pOperator->status    = OP_NOT_OPENED;
  pOperator->info      = pInfo;
3987
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
3988 3989 3990 3991 3992 3993 3994 3995 3996 3997 3998

  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL,
                                         destroyIndefinitOperatorInfo, NULL, NULL, NULL);

  int32_t code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

3999
_error:
H
Haojun Liao 已提交
4000 4001 4002 4003 4004 4005
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}

4006
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SNodeListNode* pValNode,
L
Liu Jicong 已提交
4007
                            STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) {
4008
  SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pValNode);
H
Haojun Liao 已提交
4009 4010

  STimeWindow w = TSWINDOW_INITIALIZER;
4011
  getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey, &w);
4012
  w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC);
H
Haojun Liao 已提交
4013 4014

  int32_t order = TSDB_ORDER_ASC;
4015
  pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, id);
H
Haojun Liao 已提交
4016

4017
  pInfo->win = win;
L
Liu Jicong 已提交
4018
  pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
H
Haojun Liao 已提交
4019
  if (pInfo->pFillInfo == NULL || pInfo->p == NULL) {
H
Haojun Liao 已提交
4020 4021
    taosMemoryFree(pInfo->pFillInfo);
    taosMemoryFree(pInfo->p);
H
Haojun Liao 已提交
4022 4023 4024 4025 4026 4027
    return TSDB_CODE_OUT_OF_MEMORY;
  } else {
    return TSDB_CODE_SUCCESS;
  }
}

4028 4029 4030 4031 4032 4033 4034 4035
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, bool multigroupResult,
                                      SExecTaskInfo* pTaskInfo) {
  SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

L
Liu Jicong 已提交
4036 4037 4038
  int32_t      num = 0;
  SSDataBlock* pResBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc);
  SExprInfo*   pExprInfo = createExprInfo(pPhyFillNode->pTargets, NULL, &num);
4039 4040 4041 4042
  SInterval*   pInterval =
      QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
            ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
            : &((SIntervalAggOperatorInfo*)downstream->info)->interval;
4043

4044
  int32_t type = convertFillType(pPhyFillNode->mode);
4045

H
Haojun Liao 已提交
4046
  SResultInfo* pResultInfo = &pOperator->resultInfo;
4047
  initResultSizeInfo(pOperator, 4096);
H
Haojun Liao 已提交
4048
  pInfo->primaryTsCol = ((SColumnNode*)pPhyFillNode->pWStartTs)->slotId;
4049

4050 4051 4052 4053
  int32_t numOfOutputCols = 0;
  SArray* pColMatchColInfo =
      extractColMatchInfo(pPhyFillNode->pTargets, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);

4054 4055
  int32_t code = initFillInfo(pInfo, pExprInfo, num, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange,
                              pResultInfo->capacity, pTaskInfo->id.str, pInterval, type);
4056 4057 4058
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
4059

4060 4061 4062 4063 4064 4065 4066 4067 4068
  pInfo->pRes                    = pResBlock;
  pInfo->multigroupResult        = multigroupResult;
  pInfo->pCondition              = pPhyFillNode->node.pConditions;
  pInfo->pColMatchColInfo        = pColMatchColInfo;
  pOperator->name                = "FillOperator";
  pOperator->blocking            = false;
  pOperator->status              = OP_NOT_OPENED;
  pOperator->operatorType        = QUERY_NODE_PHYSICAL_PLAN_FILL;
  pOperator->exprSupp.pExprInfo  = pExprInfo;
4069
  pOperator->exprSupp.numOfExprs = num;
4070 4071
  pOperator->info                = pInfo;
  pOperator->pTaskInfo           = pTaskInfo;
H
Haojun Liao 已提交
4072

L
Liu Jicong 已提交
4073 4074
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL);
4075

4076
  code = appendDownstream(pOperator, &downstream, 1);
4077
  return pOperator;
H
Haojun Liao 已提交
4078

L
Liu Jicong 已提交
4079
_error:
wafwerar's avatar
wafwerar 已提交
4080 4081
  taosMemoryFreeClear(pOperator);
  taosMemoryFreeClear(pInfo);
H
Haojun Liao 已提交
4082
  return NULL;
4083 4084
}

D
dapan1121 已提交
4085
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model, char* dbFName) {
wafwerar's avatar
wafwerar 已提交
4086
  SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
4087
  setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
4088

D
dapan1121 已提交
4089
  pTaskInfo->schemaVer.dbname = strdup(dbFName);
4090
  pTaskInfo->cost.created = taosGetTimestampMs();
H
Haojun Liao 已提交
4091
  pTaskInfo->id.queryId = queryId;
dengyihao's avatar
dengyihao 已提交
4092
  pTaskInfo->execModel = model;
H
Haojun Liao 已提交
4093

wafwerar's avatar
wafwerar 已提交
4094
  char* p = taosMemoryCalloc(1, 128);
L
Liu Jicong 已提交
4095
  snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId);
H
Haojun Liao 已提交
4096
  pTaskInfo->id.str = p;
H
Haojun Liao 已提交
4097

4098 4099
  return pTaskInfo;
}
H
Haojun Liao 已提交
4100

H
Hongze Cheng 已提交
4101
static STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
H
Haojun Liao 已提交
4102
                                       STableListInfo* pTableListInfo, const char* idstr);
H
Haojun Liao 已提交
4103

H
Haojun Liao 已提交
4104
static SArray* extractColumnInfo(SNodeList* pNodeList);
4105

4106
int32_t extractTableSchemaInfo(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) {
4107 4108
  SMetaReader mr = {0};
  metaReaderInit(&mr, pHandle->meta, 0);
D
dapan1121 已提交
4109
  int32_t code = metaGetTableEntryByUid(&mr, uid);
4110
  if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
4111
    metaReaderClear(&mr);
4112
    return terrno;
D
dapan1121 已提交
4113
  }
4114 4115 4116 4117

  pTaskInfo->schemaVer.tablename = strdup(mr.me.name);

  if (mr.me.type == TSDB_SUPER_TABLE) {
4118
    pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
4119
    pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version;
4120 4121 4122
  } else if (mr.me.type == TSDB_CHILD_TABLE) {
    tb_uid_t suid = mr.me.ctbEntry.suid;
    metaGetTableEntryByUid(&mr, suid);
4123
    pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
4124
    pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version;
4125
  } else {
4126
    pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
4127
  }
4128 4129

  metaReaderClear(&mr);
D
dapan1121 已提交
4130
  return TSDB_CODE_SUCCESS;
4131 4132
}

4133 4134 4135 4136 4137 4138 4139 4140 4141 4142 4143
static void cleanupTableSchemaInfo(SExecTaskInfo* pTaskInfo) {
  taosMemoryFreeClear(pTaskInfo->schemaVer.dbname);
  if (pTaskInfo->schemaVer.sw == NULL) {
    return;
  }

  taosMemoryFree(pTaskInfo->schemaVer.sw->pSchema);
  taosMemoryFree(pTaskInfo->schemaVer.sw);
  taosMemoryFree(pTaskInfo->schemaVer.tablename);
}

4144
static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum) {
wmmhello's avatar
wmmhello 已提交
4145
  taosArrayClear(pTableListInfo->pGroupList);
4146 4147
  SArray* sortSupport = taosArrayInit(groupNum, sizeof(uint64_t));
  if (sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
4148 4149
  for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
4150
    uint64_t*      groupId = taosHashGet(pTableListInfo->map, &info->uid, sizeof(uint64_t));
wmmhello's avatar
wmmhello 已提交
4151 4152

    int32_t index = taosArraySearchIdx(sortSupport, groupId, compareUint64Val, TD_EQ);
4153 4154 4155 4156
    if (index == -1) {
      void*   p = taosArraySearch(sortSupport, groupId, compareUint64Val, TD_GT);
      SArray* tGroup = taosArrayInit(8, sizeof(STableKeyInfo));
      if (tGroup == NULL) {
wmmhello's avatar
wmmhello 已提交
4157 4158 4159
        taosArrayDestroy(sortSupport);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
4160
      if (taosArrayPush(tGroup, info) == NULL) {
wmmhello's avatar
wmmhello 已提交
4161 4162 4163 4164
        qError("taos push info array error");
        taosArrayDestroy(sortSupport);
        return TSDB_CODE_QRY_APP_ERROR;
      }
4165
      if (p == NULL) {
wmmhello's avatar
wmmhello 已提交
4166
        if (taosArrayPush(sortSupport, groupId) == NULL) {
wmmhello's avatar
wmmhello 已提交
4167 4168 4169 4170
          qError("taos push support array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
wmmhello's avatar
wmmhello 已提交
4171
        if (taosArrayPush(pTableListInfo->pGroupList, &tGroup) == NULL) {
wmmhello's avatar
wmmhello 已提交
4172 4173 4174 4175
          qError("taos push group array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
4176
      } else {
wmmhello's avatar
wmmhello 已提交
4177
        int32_t pos = TARRAY_ELEM_IDX(sortSupport, p);
4178
        if (taosArrayInsert(sortSupport, pos, groupId) == NULL) {
wmmhello's avatar
wmmhello 已提交
4179 4180 4181 4182
          qError("taos insert support array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
4183
        if (taosArrayInsert(pTableListInfo->pGroupList, pos, &tGroup) == NULL) {
wmmhello's avatar
wmmhello 已提交
4184 4185 4186 4187 4188
          qError("taos insert group array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
      }
4189
    } else {
wmmhello's avatar
wmmhello 已提交
4190
      SArray* tGroup = (SArray*)taosArrayGetP(pTableListInfo->pGroupList, index);
4191
      if (taosArrayPush(tGroup, info) == NULL) {
wmmhello's avatar
wmmhello 已提交
4192 4193 4194 4195 4196 4197 4198 4199 4200 4201
        qError("taos push uid array error");
        taosArrayDestroy(sortSupport);
        return TSDB_CODE_QRY_APP_ERROR;
      }
    }
  }
  taosArrayDestroy(sortSupport);
  return TDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
4202 4203
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group) {
  if (group == NULL) {
wmmhello's avatar
wmmhello 已提交
4204 4205 4206 4207 4208 4209 4210 4211
    return TDB_CODE_SUCCESS;
  }

  pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (pTableListInfo->map == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  int32_t keyLen = 0;
X
Xiaoyu Wang 已提交
4212
  void*   keyBuf = NULL;
wmmhello's avatar
wmmhello 已提交
4213

4214
  SNode* node;
wmmhello's avatar
wmmhello 已提交
4215
  FOREACH(node, group) {
4216
    SExprNode* pExpr = (SExprNode*)node;
wmmhello's avatar
wmmhello 已提交
4217
    keyLen += pExpr->resType.bytes;
wmmhello's avatar
wmmhello 已提交
4218 4219
  }

wmmhello's avatar
wmmhello 已提交
4220
  int32_t nullFlagSize = sizeof(int8_t) * LIST_LENGTH(group);
wmmhello's avatar
wmmhello 已提交
4221 4222 4223 4224 4225 4226 4227
  keyLen += nullFlagSize;

  keyBuf = taosMemoryCalloc(1, keyLen);
  if (keyBuf == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

4228
  int32_t groupNum = 0;
X
Xiaoyu Wang 已提交
4229 4230 4231
  for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
    SMetaReader    mr = {0};
wmmhello's avatar
wmmhello 已提交
4232 4233 4234
    metaReaderInit(&mr, pHandle->meta, 0);
    metaGetTableEntryByUid(&mr, info->uid);

4235
    SNodeList* groupNew = nodesCloneList(group);
wmmhello's avatar
wmmhello 已提交
4236

wmmhello's avatar
wmmhello 已提交
4237
    nodesRewriteExprsPostOrder(groupNew, doTranslateTagExpr, &mr);
wmmhello's avatar
wmmhello 已提交
4238
    char* isNull = (char*)keyBuf;
wmmhello's avatar
wmmhello 已提交
4239 4240
    char* pStart = (char*)keyBuf + nullFlagSize;

4241
    SNode*  pNode;
wmmhello's avatar
wmmhello 已提交
4242
    int32_t index = 0;
4243
    FOREACH(pNode, groupNew) {
wmmhello's avatar
wmmhello 已提交
4244 4245 4246 4247
      SNode*  pNew = NULL;
      int32_t code = scalarCalculateConstants(pNode, &pNew);
      if (TSDB_CODE_SUCCESS == code) {
        REPLACE_NODE(pNew);
X
Xiaoyu Wang 已提交
4248
      } else {
4249
        taosMemoryFree(keyBuf);
wmmhello's avatar
wmmhello 已提交
4250
        nodesClearList(groupNew);
4251
        metaReaderClear(&mr);
wmmhello's avatar
wmmhello 已提交
4252
        return code;
wmmhello's avatar
wmmhello 已提交
4253
      }
4254

wmmhello's avatar
wmmhello 已提交
4255
      ASSERT(nodeType(pNew) == QUERY_NODE_VALUE);
4256
      SValueNode* pValue = (SValueNode*)pNew;
4257

wmmhello's avatar
wmmhello 已提交
4258
      if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL || pValue->isNull) {
wmmhello's avatar
wmmhello 已提交
4259 4260 4261 4262
        isNull[index++] = 1;
        continue;
      } else {
        isNull[index++] = 0;
4263
        char* data = nodesGetValueFromNode(pValue);
L
Liu Jicong 已提交
4264 4265
        if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) {
          if (tTagIsJson(data)) {
wmmhello's avatar
wmmhello 已提交
4266 4267 4268
            terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
            taosMemoryFree(keyBuf);
            nodesClearList(groupNew);
4269
            metaReaderClear(&mr);
wmmhello's avatar
wmmhello 已提交
4270 4271
            return terrno;
          }
wmmhello's avatar
wmmhello 已提交
4272
          int32_t len = getJsonValueLen(data);
wmmhello's avatar
wmmhello 已提交
4273 4274 4275
          memcpy(pStart, data, len);
          pStart += len;
        } else if (IS_VAR_DATA_TYPE(pValue->node.resType.type)) {
wmmhello's avatar
wmmhello 已提交
4276 4277
          memcpy(pStart, data, varDataTLen(data));
          pStart += varDataTLen(data);
wmmhello's avatar
wmmhello 已提交
4278
        } else {
wmmhello's avatar
wmmhello 已提交
4279 4280
          memcpy(pStart, data, pValue->node.resType.bytes);
          pStart += pValue->node.resType.bytes;
wmmhello's avatar
wmmhello 已提交
4281 4282 4283
        }
      }
    }
4284
    int32_t  len = (int32_t)(pStart - (char*)keyBuf);
4285 4286
    uint64_t groupId = calcGroupId(keyBuf, len);
    taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &groupId, sizeof(uint64_t));
S
slzhou 已提交
4287
    info->groupId = groupId;
4288
    groupNum++;
wmmhello's avatar
wmmhello 已提交
4289

wmmhello's avatar
wmmhello 已提交
4290
    nodesClearList(groupNew);
wmmhello's avatar
wmmhello 已提交
4291 4292 4293
    metaReaderClear(&mr);
  }
  taosMemoryFree(keyBuf);
4294

4295
  if (pTableListInfo->needSortTableByGroupId) {
wmmhello's avatar
wmmhello 已提交
4296
    return sortTableGroup(pTableListInfo, groupNum);
4297 4298
  }

wmmhello's avatar
wmmhello 已提交
4299 4300 4301
  return TDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
4302
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
4303 4304
                                  uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo,
                                  const char* pUser) {
4305 4306
  int32_t type = nodeType(pPhyNode);

X
Xiaoyu Wang 已提交
4307
  if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
H
Haojun Liao 已提交
4308
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
dengyihao's avatar
dengyihao 已提交
4309
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
4310

wmmhello's avatar
wmmhello 已提交
4311
      int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
4312
      if (code) {
wmmhello's avatar
wmmhello 已提交
4313
        pTaskInfo->code = code;
D
dapan1121 已提交
4314 4315
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
4316

4317
      code = extractTableSchemaInfo(pHandle, pTableScanNode->scan.uid, pTaskInfo);
S
slzhou 已提交
4318
      if (code) {
4319
        pTaskInfo->code = terrno;
wmmhello's avatar
wmmhello 已提交
4320 4321 4322
        return NULL;
      }

H
Haojun Liao 已提交
4323
      SOperatorInfo*  pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
4324 4325
      STableScanInfo* pScanInfo = pOperator->info;
      pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
S
slzhou 已提交
4326
      return pOperator;
L
Liu Jicong 已提交
4327

S
slzhou 已提交
4328 4329
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
      STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
wmmhello's avatar
wmmhello 已提交
4330
      int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
L
Liu Jicong 已提交
4331
      if (code) {
wmmhello's avatar
wmmhello 已提交
4332
        pTaskInfo->code = code;
wmmhello's avatar
wmmhello 已提交
4333 4334
        return NULL;
      }
4335
      code = extractTableSchemaInfo(pHandle, pTableScanNode->scan.uid, pTaskInfo);
wmmhello's avatar
wmmhello 已提交
4336 4337 4338 4339
      if (code) {
        pTaskInfo->code = terrno;
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
4340

4341 4342
      SOperatorInfo* pOperator =
          createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo, queryId, taskId);
wmmhello's avatar
wmmhello 已提交
4343

4344 4345 4346
      STableScanInfo* pScanInfo = pOperator->info;
      pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
      return pOperator;
L
Liu Jicong 已提交
4347

H
Haojun Liao 已提交
4348
    } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
4349
      return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo);
L
Liu Jicong 已提交
4350

H
Haojun Liao 已提交
4351
    } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
5
54liuyao 已提交
4352
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
4353
      STimeWindowAggSupp   twSup = {
L
Liu Jicong 已提交
4354 4355 4356 4357
            .waterMark = pTableScanNode->watermark,
            .calTrigger = pTableScanNode->triggerType,
            .maxTs = INT64_MIN,
      };
L
Liu Jicong 已提交
4358
      if (pHandle) {
wmmhello's avatar
wmmhello 已提交
4359
        int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
L
Liu Jicong 已提交
4360
        if (code) {
wmmhello's avatar
wmmhello 已提交
4361 4362 4363
          pTaskInfo->code = code;
          return NULL;
        }
5
54liuyao 已提交
4364
      }
4365

4366 4367
      SOperatorInfo* pOperator =
          createStreamScanOperatorInfo(pHandle, pTableScanNode, pTaskInfo, &twSup, queryId, taskId);
H
Haojun Liao 已提交
4368
      return pOperator;
L
Liu Jicong 已提交
4369

H
Haojun Liao 已提交
4370
    } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
L
Liu Jicong 已提交
4371
      SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
4372
      return createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo);
4373
    } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
X
Xiaoyu Wang 已提交
4374
      STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
4375

4376
      int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanPhyNode, pTableListInfo);
4377
      if (code != TSDB_CODE_SUCCESS) {
4378
        pTaskInfo->code = terrno;
4379 4380 4381
        return NULL;
      }

4382
      return createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo);
4383
    } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
4384
      SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
4385 4386 4387
      pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));

      if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
4388
        int32_t code = vnodeGetAllTableList(pHandle->vnode, pBlockNode->uid, pTableListInfo->pTableList);
4389 4390 4391 4392 4393
        if (code != TSDB_CODE_SUCCESS) {
          pTaskInfo->code = terrno;
          return NULL;
        }
      } else {  // Create one table group.
S
slzhou 已提交
4394
        STableKeyInfo info = {.lastKey = 0, .uid = pBlockNode->uid, .groupId = 0};
4395 4396 4397 4398 4399 4400 4401 4402 4403 4404 4405 4406 4407 4408 4409 4410 4411 4412
        taosArrayPush(pTableListInfo->pTableList, &info);
      }

      SQueryTableDataCond cond = {0};

      {
        cond.order = TSDB_ORDER_ASC;
        cond.numOfCols = 1;
        cond.colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
        if (cond.colList == NULL) {
          terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
          return NULL;
        }

        cond.colList->colId = 1;
        cond.colList->type = TSDB_DATA_TYPE_TIMESTAMP;
        cond.colList->bytes = sizeof(TSKEY);

H
Haojun Liao 已提交
4413
        cond.twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
4414
        cond.suid = pBlockNode->suid;
H
Haojun Liao 已提交
4415
        cond.type = BLOCK_LOAD_OFFSET_ORDER;
4416
      }
H
Haojun Liao 已提交
4417 4418 4419

      STsdbReader* pReader = NULL;
      tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, "");
4420 4421
      cleanupQueryTableDataCond(&cond);

4422
      return createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo);
H
Haojun Liao 已提交
4423 4424 4425
    } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
      SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;

L
Liu Jicong 已提交
4426 4427 4428 4429 4430
      //      int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
      //      if (code) {
      //        pTaskInfo->code = code;
      //        return NULL;
      //      }
H
Haojun Liao 已提交
4431

4432
      int32_t code = extractTableSchemaInfo(pHandle, pScanNode->uid, pTaskInfo);
4433 4434 4435 4436
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
      }
4437

H
Haojun Liao 已提交
4438
      pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
H
Haojun Liao 已提交
4439
      if (pScanNode->tableType == TSDB_SUPER_TABLE) {
4440
        code = vnodeGetAllTableList(pHandle->vnode, pScanNode->uid, pTableListInfo->pTableList);
H
Haojun Liao 已提交
4441 4442 4443 4444 4445 4446 4447 4448 4449
        if (code != TSDB_CODE_SUCCESS) {
          pTaskInfo->code = terrno;
          return NULL;
        }
      } else {  // Create one table group.
        STableKeyInfo info = {.lastKey = 0, .uid = pScanNode->uid, .groupId = 0};
        taosArrayPush(pTableListInfo->pTableList, &info);
      }

H
Haojun Liao 已提交
4450
      return createLastrowScanOperator(pScanNode, pHandle, pTableListInfo->pTableList, pTaskInfo);
H
Haojun Liao 已提交
4451 4452
    } else {
      ASSERT(0);
H
Haojun Liao 已提交
4453 4454 4455
    }
  }

4456 4457
  int32_t num = 0;
  size_t  size = LIST_LENGTH(pPhyNode->pChildren);
H
Haojun Liao 已提交
4458

4459
  SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
dengyihao's avatar
dengyihao 已提交
4460
  for (int32_t i = 0; i < size; ++i) {
4461
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
4462
    ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableListInfo, pUser);
4463 4464 4465
    if (ops[i] == NULL) {
      return NULL;
    }
4466
  }
H
Haojun Liao 已提交
4467

4468
  SOperatorInfo* pOptr = NULL;
H
Haojun Liao 已提交
4469
  if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
4470
    pOptr = createProjectOperatorInfo(ops[0], (SProjectPhysiNode*)pPhyNode, pTaskInfo);
4471
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) {
H
Haojun Liao 已提交
4472 4473
    SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
    SExprInfo*     pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
4474
    SSDataBlock*   pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
H
Haojun Liao 已提交
4475

dengyihao's avatar
dengyihao 已提交
4476
    int32_t    numOfScalarExpr = 0;
4477 4478 4479 4480 4481
    SExprInfo* pScalarExprInfo = NULL;
    if (pAggNode->pExprs != NULL) {
      pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
    }

H
Haojun Liao 已提交
4482 4483
    if (pAggNode->pGroupKeys != NULL) {
      SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys);
dengyihao's avatar
dengyihao 已提交
4484
      pOptr = createGroupOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions,
wmmhello's avatar
wmmhello 已提交
4485
                                      pScalarExprInfo, numOfScalarExpr, pTaskInfo);
H
Haojun Liao 已提交
4486
    } else {
L
Liu Jicong 已提交
4487 4488
      pOptr = createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pAggNode->node.pConditions,
                                          pScalarExprInfo, numOfScalarExpr, pTaskInfo);
H
Haojun Liao 已提交
4489
    }
X
Xiaoyu Wang 已提交
4490
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type || QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
H
Haojun Liao 已提交
4491
    SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
4492

H
Haojun Liao 已提交
4493
    SExprInfo*   pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
4494
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
H
Haojun Liao 已提交
4495

dengyihao's avatar
dengyihao 已提交
4496 4497 4498 4499 4500 4501
    SInterval interval = {.interval = pIntervalPhyNode->interval,
                          .sliding = pIntervalPhyNode->sliding,
                          .intervalUnit = pIntervalPhyNode->intervalUnit,
                          .slidingUnit = pIntervalPhyNode->slidingUnit,
                          .offset = pIntervalPhyNode->offset,
                          .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
H
Haojun Liao 已提交
4502

X
Xiaoyu Wang 已提交
4503 4504 4505 4506 4507
    STimeWindowAggSupp as = {
        .waterMark = pIntervalPhyNode->window.watermark,
        .calTrigger = pIntervalPhyNode->window.triggerType,
        .maxTs = INT64_MIN,
    };
4508
    ASSERT(as.calTrigger != STREAM_TRIGGER_MAX_DELAY);
4509

4510
    int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
L
Liu Jicong 已提交
4511
    bool    isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
4512 4513
    pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pIntervalPhyNode,
                                       pTaskInfo, isStream);
4514

4515 4516
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
    SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode;
S
shenglian zhou 已提交
4517 4518 4519 4520 4521 4522 4523 4524 4525 4526

    SExprInfo*   pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);

    SInterval interval = {.interval = pIntervalPhyNode->interval,
                          .sliding = pIntervalPhyNode->sliding,
                          .intervalUnit = pIntervalPhyNode->intervalUnit,
                          .slidingUnit = pIntervalPhyNode->slidingUnit,
                          .offset = pIntervalPhyNode->offset,
                          .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
4527

S
shenglian zhou 已提交
4528
    int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
L
Liu Jicong 已提交
4529 4530
    pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId,
                                                   pPhyNode->pConditions, pTaskInfo);
S
shenglian zhou 已提交
4531
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
X
Xiaoyu Wang 已提交
4532
    SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
S
shenglian zhou 已提交
4533 4534 4535 4536 4537 4538 4539 4540 4541 4542

    SExprInfo*   pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);

    SInterval interval = {.interval = pIntervalPhyNode->interval,
                          .sliding = pIntervalPhyNode->sliding,
                          .intervalUnit = pIntervalPhyNode->intervalUnit,
                          .slidingUnit = pIntervalPhyNode->slidingUnit,
                          .offset = pIntervalPhyNode->offset,
                          .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
4543

S
shenglian zhou 已提交
4544 4545
    int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
    pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pTaskInfo);
5
54liuyao 已提交
4546
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
4547
    int32_t children = 0;
5
54liuyao 已提交
4548 4549
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) {
4550
    int32_t children = 1;
5
54liuyao 已提交
4551
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
4552
  } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
4553
    pOptr = createSortOperatorInfo(ops[0], (SSortPhysiNode*)pPhyNode, pTaskInfo);
S
shenglian zhou 已提交
4554 4555
  } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT == type) {
    pOptr = createGroupSortOperatorInfo(ops[0], (SGroupSortPhysiNode*)pPhyNode, pTaskInfo);
X
Xiaoyu Wang 已提交
4556
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) {
4557
    SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode;
4558
    pOptr = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo);
4559
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
H
Haojun Liao 已提交
4560 4561
    SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;

X
Xiaoyu Wang 已提交
4562 4563
    STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark,
                             .calTrigger = pSessionNode->window.triggerType};
4564

H
Haojun Liao 已提交
4565
    SExprInfo*   pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num);
4566
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
4567 4568
    int32_t      tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;

L
Liu Jicong 已提交
4569 4570
    pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as,
                                         pPhyNode->pConditions, pTaskInfo);
4571
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
4572 4573 4574 4575 4576 4577 4578
    pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
    int32_t children = 0;
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION == type) {
    int32_t children = 1;
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
4579
  } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
4580
    pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
4581
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
dengyihao's avatar
dengyihao 已提交
4582
    SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
4583

4584 4585
    STimeWindowAggSupp as = {.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};

dengyihao's avatar
dengyihao 已提交
4586
    SExprInfo*   pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
4587
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
4588 4589
    int32_t      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;

4590
    SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
X
Xiaoyu Wang 已提交
4591
    SColumn      col = extractColumnFromColumnNode(pColNode);
L
Liu Jicong 已提交
4592 4593
    pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pPhyNode->pConditions,
                                          pTaskInfo);
4594
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
5
54liuyao 已提交
4595
    pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
4596
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
4597
    pOptr = createMergeJoinOperatorInfo(ops, size, (SJoinPhysiNode*)pPhyNode, pTaskInfo);
4598
  } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
4599
    pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, false, pTaskInfo);
H
Haojun Liao 已提交
4600 4601
  } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
    pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo);
4602 4603
  } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
    pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
4604 4605
  } else {
    ASSERT(0);
H
Haojun Liao 已提交
4606
  }
4607 4608 4609

  taosMemoryFree(ops);
  return pOptr;
4610
}
H
Haojun Liao 已提交
4611

H
Haojun Liao 已提交
4612
SArray* extractColumnInfo(SNodeList* pNodeList) {
L
Liu Jicong 已提交
4613
  size_t  numOfCols = LIST_LENGTH(pNodeList);
H
Haojun Liao 已提交
4614 4615 4616 4617 4618 4619
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
  if (pList == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

L
Liu Jicong 已提交
4620 4621
  for (int32_t i = 0; i < numOfCols; ++i) {
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
H
Haojun Liao 已提交
4622

4623 4624 4625
    if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

4626
      SColumn c = extractColumnFromColumnNode(pColNode);
4627 4628
      taosArrayPush(pList, &c);
    } else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) {
L
Liu Jicong 已提交
4629 4630
      SValueNode* pValNode = (SValueNode*)pNode->pExpr;
      SColumn     c = {0};
4631
      c.slotId = pNode->slotId;
L
Liu Jicong 已提交
4632 4633 4634 4635
      c.colId = pNode->slotId;
      c.type = pValNode->node.type;
      c.bytes = pValNode->node.resType.bytes;
      c.scale = pValNode->node.resType.scale;
4636 4637 4638 4639
      c.precision = pValNode->node.resType.precision;

      taosArrayPush(pList, &c);
    }
H
Haojun Liao 已提交
4640 4641 4642 4643 4644
  }

  return pList;
}

L
Liu Jicong 已提交
4645 4646
STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
                                STableListInfo* pTableListInfo, const char* idstr) {
4647
  int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo);
wmmhello's avatar
wmmhello 已提交
4648 4649 4650 4651 4652 4653
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
    code = 0;
H
Haojun Liao 已提交
4654
    qDebug("no table qualified for query, %s", idstr);
wmmhello's avatar
wmmhello 已提交
4655 4656 4657
    goto _error;
  }

4658
  SQueryTableDataCond cond = {0};
wmmhello's avatar
wmmhello 已提交
4659
  code = initQueryTableDataCond(&cond, pTableScanNode);
4660
  if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
4661
    goto _error;
X
Xiaoyu Wang 已提交
4662
  }
4663

H
Hongze Cheng 已提交
4664
  STsdbReader* pReader;
H
Haojun Liao 已提交
4665
  code = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, idstr);
H
Haojun Liao 已提交
4666 4667 4668 4669
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

4670
  cleanupQueryTableDataCond(&cond);
H
Haojun Liao 已提交
4671 4672

  return pReader;
wmmhello's avatar
wmmhello 已提交
4673 4674 4675 4676

_error:
  terrno = code;
  return NULL;
H
Haojun Liao 已提交
4677 4678
}

L
Liu Jicong 已提交
4679 4680 4681 4682 4683 4684 4685 4686 4687 4688 4689 4690 4691
static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator");
      return TSDB_CODE_QRY_APP_ERROR;
    }

    if (pOperator->numOfDownstream > 1) {
      qError("join not supported for stream block scan");
      return TSDB_CODE_QRY_APP_ERROR;
    }
    return extractTbscanInStreamOpTree(pOperator->pDownstream[0], ppInfo);
  } else {
4692 4693 4694
    SStreamScanInfo* pInfo = pOperator->info;
    ASSERT(pInfo->pTableScanOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
    *ppInfo = pInfo->pTableScanOp->info;
L
Liu Jicong 已提交
4695 4696 4697 4698
    return 0;
  }
}

4699 4700 4701 4702 4703 4704 4705 4706 4707 4708 4709 4710 4711 4712 4713 4714 4715 4716 4717 4718 4719 4720
int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) {
  if (pNode->pChildren == NULL || LIST_LENGTH(pNode->pChildren) == 0) {
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == pNode->type) {
      *ppNode = (STableScanPhysiNode*)pNode;
      return 0;
    } else {
      ASSERT(0);
      terrno = TSDB_CODE_QRY_APP_ERROR;
      return -1;
    }
  } else {
    if (LIST_LENGTH(pNode->pChildren) != 1) {
      ASSERT(0);
      terrno = TSDB_CODE_QRY_APP_ERROR;
      return -1;
    }
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pNode->pChildren, 0);
    return extractTableScanNode(pChildNode, ppNode);
  }
  return -1;
}

L
Liu Jicong 已提交
4721 4722 4723 4724 4725
int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHandle, int64_t uid, int64_t ts) {
  STableScanInfo* pTableScanInfo = NULL;
  if (extractTbscanInStreamOpTree(pOperator, &pTableScanInfo) < 0) {
    return -1;
  }
4726

L
Liu Jicong 已提交
4727 4728 4729 4730
  STableScanPhysiNode* pNode = NULL;
  if (extractTableScanNode(plan->pNode, &pNode) < 0) {
    ASSERT(0);
  }
4731

H
Haojun Liao 已提交
4732
  tsdbReaderClose(pTableScanInfo->dataReader);
4733

L
Liu Jicong 已提交
4734
  STableListInfo info = {0};
H
Haojun Liao 已提交
4735
  pTableScanInfo->dataReader = doCreateDataReader(pNode, pHandle, &info, NULL);
L
Liu Jicong 已提交
4736 4737 4738 4739
  if (pTableScanInfo->dataReader == NULL) {
    ASSERT(0);
    qError("failed to create data reader");
    return TSDB_CODE_QRY_APP_ERROR;
4740
  }
L
Liu Jicong 已提交
4741
  // TODO: set uid and ts to data reader
4742 4743 4744
  return 0;
}

C
Cary Xu 已提交
4745
int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length, int32_t* nOptrWithVal) {
wmmhello's avatar
wmmhello 已提交
4746
  int32_t code = TDB_CODE_SUCCESS;
4747
  char*   pCurrent = NULL;
wmmhello's avatar
wmmhello 已提交
4748
  int32_t currLength = 0;
4749
  if (ops->fpSet.encodeResultRow) {
C
Cary Xu 已提交
4750
    if (result == NULL || length == NULL || nOptrWithVal == NULL) {
wmmhello's avatar
wmmhello 已提交
4751 4752 4753
      return TSDB_CODE_TSC_INVALID_INPUT;
    }
    code = ops->fpSet.encodeResultRow(ops, &pCurrent, &currLength);
wmmhello's avatar
wmmhello 已提交
4754

4755 4756
    if (code != TDB_CODE_SUCCESS) {
      if (*result != NULL) {
wmmhello's avatar
wmmhello 已提交
4757 4758 4759 4760
        taosMemoryFree(*result);
        *result = NULL;
      }
      return code;
C
Cary Xu 已提交
4761 4762 4763
    } else if (currLength == 0) {
      ASSERT(!pCurrent);
      goto _downstream;
wmmhello's avatar
wmmhello 已提交
4764
    }
wmmhello's avatar
wmmhello 已提交
4765

C
Cary Xu 已提交
4766 4767
    ++(*nOptrWithVal);

C
Cary Xu 已提交
4768
    ASSERT(currLength >= 0);
wmmhello's avatar
wmmhello 已提交
4769

4770
    if (*result == NULL) {
wmmhello's avatar
wmmhello 已提交
4771
      *result = (char*)taosMemoryCalloc(1, currLength + sizeof(int32_t));
wmmhello's avatar
wmmhello 已提交
4772 4773 4774 4775 4776 4777
      if (*result == NULL) {
        taosMemoryFree(pCurrent);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      memcpy(*result + sizeof(int32_t), pCurrent, currLength);
      *(int32_t*)(*result) = currLength + sizeof(int32_t);
4778
    } else {
wmmhello's avatar
wmmhello 已提交
4779
      int32_t sizePre = *(int32_t*)(*result);
4780
      char*   tmp = (char*)taosMemoryRealloc(*result, sizePre + currLength);
wmmhello's avatar
wmmhello 已提交
4781 4782 4783 4784 4785 4786 4787 4788 4789 4790 4791 4792
      if (tmp == NULL) {
        taosMemoryFree(pCurrent);
        taosMemoryFree(*result);
        *result = NULL;
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      *result = tmp;
      memcpy(*result + sizePre, pCurrent, currLength);
      *(int32_t*)(*result) += currLength;
    }
    taosMemoryFree(pCurrent);
    *length = *(int32_t*)(*result);
wmmhello's avatar
wmmhello 已提交
4793 4794
  }

C
Cary Xu 已提交
4795
_downstream:
wmmhello's avatar
wmmhello 已提交
4796
  for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
C
Cary Xu 已提交
4797
    code = encodeOperator(ops->pDownstream[i], result, length, nOptrWithVal);
4798
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
4799
      return code;
wmmhello's avatar
wmmhello 已提交
4800 4801
    }
  }
wmmhello's avatar
wmmhello 已提交
4802
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
4803 4804
}

H
Haojun Liao 已提交
4805
int32_t decodeOperator(SOperatorInfo* ops, const char* result, int32_t length) {
wmmhello's avatar
wmmhello 已提交
4806
  int32_t code = TDB_CODE_SUCCESS;
4807 4808
  if (ops->fpSet.decodeResultRow) {
    if (result == NULL) {
wmmhello's avatar
wmmhello 已提交
4809 4810
      return TSDB_CODE_TSC_INVALID_INPUT;
    }
H
Haojun Liao 已提交
4811

4812
    ASSERT(length == *(int32_t*)result);
H
Haojun Liao 已提交
4813 4814

    const char* data = result + sizeof(int32_t);
L
Liu Jicong 已提交
4815
    code = ops->fpSet.decodeResultRow(ops, (char*)data);
4816
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
4817 4818
      return code;
    }
wmmhello's avatar
wmmhello 已提交
4819

wmmhello's avatar
wmmhello 已提交
4820
    int32_t totalLength = *(int32_t*)result;
4821 4822
    int32_t dataLength = *(int32_t*)data;

4823
    if (totalLength == dataLength + sizeof(int32_t)) {  // the last data
wmmhello's avatar
wmmhello 已提交
4824 4825
      result = NULL;
      length = 0;
4826
    } else {
wmmhello's avatar
wmmhello 已提交
4827 4828 4829 4830
      result += dataLength;
      *(int32_t*)(result) = totalLength - dataLength;
      length = totalLength - dataLength;
    }
wmmhello's avatar
wmmhello 已提交
4831 4832
  }

wmmhello's avatar
wmmhello 已提交
4833 4834
  for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
    code = decodeOperator(ops->pDownstream[i], result, length);
4835
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
4836
      return code;
wmmhello's avatar
wmmhello 已提交
4837 4838
    }
  }
wmmhello's avatar
wmmhello 已提交
4839
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
4840 4841
}

D
dapan1121 已提交
4842
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle) {
D
dapan1121 已提交
4843
  SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo;
4844

D
dapan1121 已提交
4845
  switch (pNode->type) {
D
dapan1121 已提交
4846 4847 4848 4849 4850 4851
    case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
      SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
      if (NULL == pInserterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      pInserterParam->readHandle = readHandle;
L
Liu Jicong 已提交
4852

D
dapan1121 已提交
4853 4854 4855
      *pParam = pInserterParam;
      break;
    }
D
dapan1121 已提交
4856
    case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
4857
      SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
D
dapan1121 已提交
4858 4859 4860 4861
      if (NULL == pDeleterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      int32_t tbNum = taosArrayGetSize(pTask->tableqinfoList.pTableList);
D
dapan1121 已提交
4862
      pDeleterParam->suid = pTask->tableqinfoList.suid;
D
dapan1121 已提交
4863 4864 4865 4866 4867 4868
      pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t));
      if (NULL == pDeleterParam->pUidList) {
        taosMemoryFree(pDeleterParam);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      for (int32_t i = 0; i < tbNum; ++i) {
4869
        STableKeyInfo* pTable = taosArrayGet(pTask->tableqinfoList.pTableList, i);
D
dapan1121 已提交
4870 4871 4872 4873 4874 4875 4876 4877 4878 4879 4880 4881 4882
        taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
      }

      *pParam = pDeleterParam;
      break;
    }
    default:
      break;
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
4883
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
4884
                               const char* sql, EOPTR_EXEC_MODEL model) {
H
Haojun Liao 已提交
4885 4886
  uint64_t queryId = pPlan->id.queryId;

H
Haojun Liao 已提交
4887
  int32_t code = TSDB_CODE_SUCCESS;
D
dapan1121 已提交
4888
  *pTaskInfo = createExecTaskInfo(queryId, taskId, model, pPlan->dbFName);
H
Haojun Liao 已提交
4889 4890 4891 4892
  if (*pTaskInfo == NULL) {
    code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    goto _complete;
  }
H
Haojun Liao 已提交
4893

4894
  (*pTaskInfo)->sql = sql;
wmmhello's avatar
wmmhello 已提交
4895 4896
  (*pTaskInfo)->tableqinfoList.pTagCond = pPlan->pTagCond;
  (*pTaskInfo)->tableqinfoList.pTagIndexCond = pPlan->pTagIndexCond;
4897 4898
  (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId,
                                           &(*pTaskInfo)->tableqinfoList, pPlan->user);
L
Liu Jicong 已提交
4899

D
dapan1121 已提交
4900
  if (NULL == (*pTaskInfo)->pRoot) {
4901
    code = (*pTaskInfo)->code;
D
dapan1121 已提交
4902
    goto _complete;
4903 4904
  }

H
Haojun Liao 已提交
4905 4906
  return code;

H
Haojun Liao 已提交
4907
_complete:
wafwerar's avatar
wafwerar 已提交
4908
  taosMemoryFreeClear(*pTaskInfo);
H
Haojun Liao 已提交
4909 4910
  terrno = code;
  return code;
H
Haojun Liao 已提交
4911 4912
}

wmmhello's avatar
wmmhello 已提交
4913 4914 4915
static void doDestroyTableList(STableListInfo* pTableqinfoList) {
  taosArrayDestroy(pTableqinfoList->pTableList);
  taosHashCleanup(pTableqinfoList->map);
4916 4917
  if (pTableqinfoList->needSortTableByGroupId) {
    for (int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++) {
wmmhello's avatar
wmmhello 已提交
4918 4919 4920 4921 4922
      SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i);
      taosArrayDestroy(tmp);
    }
  }
  taosArrayDestroy(pTableqinfoList->pGroupList);
4923

wmmhello's avatar
wmmhello 已提交
4924 4925
  pTableqinfoList->pTableList = NULL;
  pTableqinfoList->map = NULL;
4926 4927
}

L
Liu Jicong 已提交
4928
void doDestroyTask(SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
4929 4930
  qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));

wmmhello's avatar
wmmhello 已提交
4931
  doDestroyTableList(&pTaskInfo->tableqinfoList);
H
Haojun Liao 已提交
4932
  destroyOperatorInfo(pTaskInfo->pRoot);
4933
  cleanupTableSchemaInfo(pTaskInfo);
4934

wafwerar's avatar
wafwerar 已提交
4935 4936 4937
  taosMemoryFreeClear(pTaskInfo->sql);
  taosMemoryFreeClear(pTaskInfo->id.str);
  taosMemoryFreeClear(pTaskInfo);
4938 4939 4940 4941 4942 4943 4944 4945 4946 4947 4948 4949
}

static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes) {
  if (val == NULL) {
    setNull(output, type, bytes);
    return;
  }

  if (IS_VAR_DATA_TYPE(type)) {
    // Binary data overflows for sort of unknown reasons. Let trim the overflow data
    if (varDataTLen(val) > bytes) {
      int32_t maxLen = bytes - VARSTR_HEADER_SIZE;
L
Liu Jicong 已提交
4950
      int32_t len = (varDataLen(val) > maxLen) ? maxLen : varDataLen(val);
4951 4952 4953 4954 4955 4956 4957 4958 4959 4960 4961 4962
      memcpy(varDataVal(output), varDataVal(val), len);
      varDataSetLen(output, len);
    } else {
      varDataCopy(output, val);
    }
  } else {
    memcpy(output, val, bytes);
  }
}

static int64_t getQuerySupportBufSize(size_t numOfTables) {
  size_t s1 = sizeof(STableQueryInfo);
L
Liu Jicong 已提交
4963 4964
  //  size_t s3 = sizeof(STableCheckInfo);  buffer consumption in tsdb
  return (int64_t)(s1 * 1.5 * numOfTables);
4965 4966 4967 4968 4969 4970 4971
}

int32_t checkForQueryBuf(size_t numOfTables) {
  int64_t t = getQuerySupportBufSize(numOfTables);
  if (tsQueryBufferSizeBytes < 0) {
    return TSDB_CODE_SUCCESS;
  } else if (tsQueryBufferSizeBytes > 0) {
L
Liu Jicong 已提交
4972
    while (1) {
4973 4974 4975 4976 4977 4978 4979 4980 4981 4982 4983 4984 4985 4986 4987 4988 4989 4990 4991 4992 4993 4994 4995 4996 4997 4998
      int64_t s = tsQueryBufferSizeBytes;
      int64_t remain = s - t;
      if (remain >= 0) {
        if (atomic_val_compare_exchange_64(&tsQueryBufferSizeBytes, s, remain) == s) {
          return TSDB_CODE_SUCCESS;
        }
      } else {
        return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
      }
    }
  }

  // disable query processing if the value of tsQueryBufferSize is zero.
  return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
}

void releaseQueryBuf(size_t numOfTables) {
  if (tsQueryBufferSizeBytes < 0) {
    return;
  }

  int64_t t = getQuerySupportBufSize(numOfTables);

  // restore value is not enough buffer available
  atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
}
D
dapan1121 已提交
4999

dengyihao's avatar
dengyihao 已提交
5000 5001
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity,
                                   int32_t* resNum) {
D
dapan1121 已提交
5002 5003
  if (*resNum >= *capacity) {
    *capacity += 10;
dengyihao's avatar
dengyihao 已提交
5004

D
dapan1121 已提交
5005 5006
    *pRes = taosMemoryRealloc(*pRes, (*capacity) * sizeof(SExplainExecInfo));
    if (NULL == *pRes) {
D
dapan1121 已提交
5007
      qError("malloc %d failed", (*capacity) * (int32_t)sizeof(SExplainExecInfo));
D
dapan1121 已提交
5008 5009 5010 5011
      return TSDB_CODE_QRY_OUT_OF_MEMORY;
    }
  }

5012 5013 5014 5015 5016
  SExplainExecInfo* pInfo = &(*pRes)[*resNum];

  pInfo->numOfRows = operatorInfo->resultInfo.totalRows;
  pInfo->startupCost = operatorInfo->cost.openCost;
  pInfo->totalCost = operatorInfo->cost.totalCost;
D
dapan1121 已提交
5017

5018
  if (operatorInfo->fpSet.getExplainFn) {
5019
    int32_t code = operatorInfo->fpSet.getExplainFn(operatorInfo, &pInfo->verboseInfo, &pInfo->verboseLen);
D
dapan1121 已提交
5020
    if (code) {
5021
      qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
D
dapan1121 已提交
5022 5023
      return code;
    }
5024 5025 5026
  } else {
    pInfo->verboseLen = 0;
    pInfo->verboseInfo = NULL;
D
dapan1121 已提交
5027
  }
dengyihao's avatar
dengyihao 已提交
5028

D
dapan1121 已提交
5029
  ++(*resNum);
dengyihao's avatar
dengyihao 已提交
5030

D
dapan1121 已提交
5031
  int32_t code = 0;
D
dapan1121 已提交
5032 5033
  for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
    code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pRes, capacity, resNum);
D
dapan1121 已提交
5034 5035 5036 5037 5038 5039 5040
    if (code) {
      taosMemoryFreeClear(*pRes);
      return TSDB_CODE_QRY_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
5041
}
5
54liuyao 已提交
5042

L
Liu Jicong 已提交
5043
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput,
5044
                               int32_t size) {
5045
  pSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
5
54liuyao 已提交
5046 5047
  pSup->keySize = sizeof(int64_t) + sizeof(TSKEY);
  pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize);
5048 5049
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  pSup->pResultRows = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
5
54liuyao 已提交
5050 5051 5052
  if (pSup->pKeyBuf == NULL || pSup->pResultRows == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
5053
  pSup->valueSize = size;
5
54liuyao 已提交
5054

5
54liuyao 已提交
5055 5056
  pSup->pScanWindow = taosArrayInit(4, sizeof(STimeWindow));

5
54liuyao 已提交
5057 5058 5059 5060 5061 5062 5063 5064 5065
  int32_t pageSize = 4096;
  while (pageSize < pSup->resultRowSize * 4) {
    pageSize <<= 1u;
  }
  // at least four pages need to be in buffer
  int32_t bufSize = 4096 * 256;
  if (bufSize <= pageSize) {
    bufSize = pageSize * 4;
  }
5066
  int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, TD_TMP_DIR_PATH);
L
Liu Jicong 已提交
5067
  for (int32_t i = 0; i < numOfOutput; ++i) {
5068 5069 5070
    pCtx[i].pBuf = pSup->pResultBuf;
  }
  return code;
5
54liuyao 已提交
5071
}