executorimpl.c 178.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

16
#include <executorimpl.h>
H
Haojun Liao 已提交
17 18
#include "filter.h"
#include "function.h"
19 20
#include "functionMgt.h"
#include "os.h"
H
Haojun Liao 已提交
21
#include "querynodes.h"
22
#include "tfill.h"
dengyihao's avatar
dengyihao 已提交
23
#include "tname.h"
X
Xiaoyu Wang 已提交
24
#include "tref.h"
25

H
Haojun Liao 已提交
26
#include "tdatablock.h"
27
#include "tglobal.h"
H
Haojun Liao 已提交
28
#include "tmsg.h"
H
Haojun Liao 已提交
29
#include "tsort.h"
30
#include "ttime.h"
H
Haojun Liao 已提交
31

32
#include "executorimpl.h"
dengyihao's avatar
dengyihao 已提交
33
#include "index.h"
34
#include "query.h"
35 36
#include "tcompare.h"
#include "tcompression.h"
H
Haojun Liao 已提交
37
#include "thash.h"
38
#include "ttypes.h"
dengyihao's avatar
dengyihao 已提交
39
#include "vnode.h"
40

H
Haojun Liao 已提交
41
#define IS_MAIN_SCAN(runtime)          ((runtime)->scanFlag == MAIN_SCAN)
42 43 44 45 46 47
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)

#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)

#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
wafwerar's avatar
wafwerar 已提交
48
  uint32_t v = taosRand();
49 50 51 52

  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
53
    return taosMemoryMalloc(__size);
54 55 56 57
  }
}

static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
wafwerar's avatar
wafwerar 已提交
58
  uint32_t v = taosRand();
59 60 61
  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
62
    return taosMemoryCalloc(num, __size);
63 64 65 66
  }
}

static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
wafwerar's avatar
wafwerar 已提交
67
  uint32_t v = taosRand();
68 69 70
  if (v % 5 <= 1) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
71
    return taosMemoryRealloc(p, __size);
72 73 74 75 76 77 78 79
  }
}

#define calloc  u_calloc
#define malloc  u_malloc
#define realloc u_realloc
#endif

X
Xiaoyu Wang 已提交
80
#define CLEAR_QUERY_STATUS(q, st)   ((q)->status &= (~(st)))
81 82
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)

L
Liu Jicong 已提交
83 84 85
int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; }

static int32_t getExprFunctionId(SExprInfo* pExprInfo) {
86
  assert(pExprInfo != NULL && pExprInfo->pExpr != NULL && pExprInfo->pExpr->nodeType == TEXPR_UNARYEXPR_NODE);
87
  return 0;
88 89 90 91
}

static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes);

92
static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pSDataBlock);
93

X
Xiaoyu Wang 已提交
94
static void releaseQueryBuf(size_t numOfTables);
95 96 97 98 99

static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput);
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
X
Xiaoyu Wang 已提交
100

H
Haojun Liao 已提交
101
static void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput);
H
Haojun Liao 已提交
102 103
static void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput);

104 105
static void destroyOperatorInfo(SOperatorInfo* pOperator);

106
void doSetOperatorCompleted(SOperatorInfo* pOperator) {
107
  pOperator->status = OP_EXEC_DONE;
108

109
  pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000) / 1000.0;
H
Haojun Liao 已提交
110
  if (pOperator->pTaskInfo != NULL) {
111
    setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
112 113
  }
}
114

H
Haojun Liao 已提交
115
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) {
116
  OPTR_SET_OPENED(pOperator);
117
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
118
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
119 120
}

121
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
L
Liu Jicong 已提交
122
                                   __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode,
123
                                   __optr_decode_fn_t decode, __optr_explain_fn_t explain) {
124 125 126 127 128 129 130 131 132 133 134 135 136 137
  SOperatorFpSet fpSet = {
      ._openFn = openFn,
      .getNextFn = nextFn,
      .getStreamResFn = streamFn,
      .cleanupFn = cleanup,
      .closeFn = closeFn,
      .encodeResultRow = encode,
      .decodeResultRow = decode,
      .getExplainFn = explain,
  };

  return fpSet;
}

H
Haojun Liao 已提交
138
void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
H
Haojun Liao 已提交
139

X
Xiaoyu Wang 已提交
140 141 142
static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo,
                                  SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, const int32_t* rowCellOffset,
                                  SqlFunctionCtx* pCtx, int32_t numOfExprs);
H
Haojun Liao 已提交
143

144
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
L
Liu Jicong 已提交
145 146
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput,
                                     uint64_t groupId);
147

L
Liu Jicong 已提交
148 149
// setup the output buffer for each operator
static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) {
dengyihao's avatar
dengyihao 已提交
150 151
  if (TSDB_COL_IS_TAG(pColumn->flag) || TSDB_COL_IS_UD_COL(pColumn->flag) ||
      pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
152 153 154 155 156 157 158 159 160 161
    return false;
  }

  if (pStatis != NULL && pStatis->numOfNull == 0) {
    return false;
  }

  return true;
}

162
#if 0
L
Liu Jicong 已提交
163 164
static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, char* pData,
                                int16_t bytes, bool masterscan, uint64_t uid) {
165 166 167
  bool existed = false;
  SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid);

L
Liu Jicong 已提交
168 169
  SResultRow** p1 =
      (SResultRow**)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
170 171 172 173 174 175 176 177 178 179 180

  // in case of repeat scan/reverse scan, no new time window added.
  if (QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQueryAttr)) {
    if (!masterscan) {  // the *p1 may be NULL in case of sliding+offset exists.
      return p1 != NULL;
    }

    if (p1 != NULL) {
      if (pResultRowInfo->size == 0) {
        existed = false;
      } else if (pResultRowInfo->size == 1) {
dengyihao's avatar
dengyihao 已提交
181
        //        existed = (pResultRowInfo->pResult[0] == (*p1));
182 183
      } else {  // check if current pResultRowInfo contains the existed pResultRow
        SET_RES_EXT_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid, pResultRowInfo);
L
Liu Jicong 已提交
184 185
        int64_t* index =
            taosHashGet(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes));
186 187 188 189 190 191 192 193 194 195 196 197 198
        if (index != NULL) {
          existed = true;
        } else {
          existed = false;
        }
      }
    }

    return existed;
  }

  return p1 != NULL;
}
199
#endif
200

201
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize) {
L
Liu Jicong 已提交
202
  SFilePage* pData = NULL;
203 204 205 206 207 208 209 210 211 212 213 214 215

  // in the first scan, new space needed for results
  int32_t pageId = -1;
  SIDList list = getDataBufPagesIdList(pResultBuf, tableGroupId);

  if (taosArrayGetSize(list) == 0) {
    pData = getNewBufPage(pResultBuf, tableGroupId, &pageId);
    pData->num = sizeof(SFilePage);
  } else {
    SPageInfo* pi = getLastPageInfo(list);
    pData = getBufPage(pResultBuf, getPageId(pi));
    pageId = getPageId(pi);

wmmhello's avatar
wmmhello 已提交
216
    if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
217 218 219 220 221 222 223 224 225 226 227 228 229 230
      // release current page first, and prepare the next one
      releaseBufPageInfo(pResultBuf, pi);

      pData = getNewBufPage(pResultBuf, tableGroupId, &pageId);
      if (pData != NULL) {
        pData->num = sizeof(SFilePage);
      }
    }
  }

  if (pData == NULL) {
    return NULL;
  }

231 232
  setBufPageDirty(pData, true);

233 234 235 236 237
  // set the number of rows in current disk page
  SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
  pResultRow->pageId = pageId;
  pResultRow->offset = (int32_t)pData->num;

wmmhello's avatar
wmmhello 已提交
238
  pData->num += interBufSize;
239 240 241 242

  return pResultRow;
}

243 244 245 246 247 248 249
/**
 * the struct of key in hash table
 * +----------+---------------+
 * | group id |   key data    |
 * | 8 bytes  | actual length |
 * +----------+---------------+
 */
250 251 252
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
                                   int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
                                   bool isIntervalQuery, SAggSupporter* pSup) {
253
  SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
H
Haojun Liao 已提交
254

dengyihao's avatar
dengyihao 已提交
255 256
  SResultRowPosition* p1 =
      (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
H
Haojun Liao 已提交
257

258 259
  SResultRow* pResult = NULL;

H
Haojun Liao 已提交
260 261
  // in case of repeat scan/reverse scan, no new time window added.
  if (isIntervalQuery) {
262 263
    if (masterscan && p1 != NULL) {  // the *p1 may be NULL in case of sliding+offset exists.
      pResult = getResultRowByPos(pResultBuf, p1);
264
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
265 266
    }
  } else {
dengyihao's avatar
dengyihao 已提交
267 268
    // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
    // pResultRowInfo object.
H
Haojun Liao 已提交
269
    if (p1 != NULL) {
270
      // todo
271
      pResult = getResultRowByPos(pResultBuf, p1);
272
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
273 274 275
    }
  }

L
Liu Jicong 已提交
276
  // 1. close current opened time window
277
  if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
278
#ifdef BUF_PAGE_DEBUG
wmmhello's avatar
wmmhello 已提交
279
    qDebug("page_1");
280
#endif
281
    SResultRowPosition pos = pResultRowInfo->cur;
X
Xiaoyu Wang 已提交
282
    SFilePage*         pPage = getBufPage(pResultBuf, pos.pageId);
283 284 285 286 287
    releaseBufPage(pResultBuf, pPage);
  }

  // allocate a new buffer page
  if (pResult == NULL) {
288
#ifdef BUF_PAGE_DEBUG
wmmhello's avatar
wmmhello 已提交
289
    qDebug("page_2");
290
#endif
H
Haojun Liao 已提交
291
    ASSERT(pSup->resultRowSize > 0);
292 293
    pResult = getNewResultRow(pResultBuf, groupId, pSup->resultRowSize);

294
    initResultRow(pResult);
H
Haojun Liao 已提交
295

296 297
    // add a new result set for a new group
    SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
X
Xiaoyu Wang 已提交
298 299
    taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
                sizeof(SResultRowPosition));
H
Haojun Liao 已提交
300 301
  }

302 303 304
  // 2. set the new time window to be the new active time window
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};

H
Haojun Liao 已提交
305
  // too many time window in query
306
  if (taosHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
H
Haojun Liao 已提交
307 308 309
    longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
  }

H
Haojun Liao 已提交
310
  return pResult;
H
Haojun Liao 已提交
311 312
}

313
// a new buffer page for each table. Needs to opt this design
L
Liu Jicong 已提交
314
static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, int32_t tid, uint32_t size) {
315 316 317 318
  if (pWindowRes->pageId != -1) {
    return 0;
  }

L
Liu Jicong 已提交
319
  SFilePage* pData = NULL;
320 321 322 323 324 325

  // in the first scan, new space needed for results
  int32_t pageId = -1;
  SIDList list = getDataBufPagesIdList(pResultBuf, tid);

  if (taosArrayGetSize(list) == 0) {
H
Haojun Liao 已提交
326
    pData = getNewBufPage(pResultBuf, tid, &pageId);
327
    pData->num = sizeof(SFilePage);
328 329
  } else {
    SPageInfo* pi = getLastPageInfo(list);
330
    pData = getBufPage(pResultBuf, getPageId(pi));
331
    pageId = getPageId(pi);
332

333
    if (pData->num + size > getBufPageSize(pResultBuf)) {
334
      // release current page first, and prepare the next one
335
      releaseBufPageInfo(pResultBuf, pi);
336

H
Haojun Liao 已提交
337
      pData = getNewBufPage(pResultBuf, tid, &pageId);
338
      if (pData != NULL) {
339
        pData->num = sizeof(SFilePage);
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
      }
    }
  }

  if (pData == NULL) {
    return -1;
  }

  // set the number of rows in current disk page
  if (pWindowRes->pageId == -1) {  // not allocated yet, allocate new buffer
    pWindowRes->pageId = pageId;
    pWindowRes->offset = (int32_t)pData->num;

    pData->num += size;
    assert(pWindowRes->pageId >= 0);
  }

  return 0;
}

360
//  query_range_start, query_range_end, window_duration, window_start, window_end
361
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
362 363 364
  pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
  pColData->info.bytes = sizeof(int64_t);

365
  colInfoDataEnsureCapacity(pColData, 5);
366 367 368 369 370 371 372 373 374
  colDataAppendInt64(pColData, 0, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 1, &pQueryWindow->ekey);

  int64_t interval = 0;
  colDataAppendInt64(pColData, 2, &interval);  // this value may be variable in case of 'n' and 'y'.
  colDataAppendInt64(pColData, 3, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
}

X
Xiaoyu Wang 已提交
375 376 377
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin,
                      SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol,
                      int32_t numOfTotal, int32_t numOfOutput, int32_t order) {
378
  for (int32_t k = 0; k < numOfOutput; ++k) {
H
Haojun Liao 已提交
379
    // keep it temporarily
380
    // todo no need this??
dengyihao's avatar
dengyihao 已提交
381 382
    bool    hasAgg = pCtx[k].input.colDataAggIsSet;
    int32_t numOfRows = pCtx[k].input.numOfRows;
H
Haojun Liao 已提交
383
    int32_t startOffset = pCtx[k].input.startRowIndex;
384

385
    pCtx[k].input.startRowIndex = offset;
386
    pCtx[k].input.numOfRows = forwardStep;
387 388 389

    // not a whole block involved in query processing, statistics data can not be used
    // NOTE: the original value of isSet have been changed here
390 391
    if (pCtx[k].input.colDataAggIsSet && forwardStep < numOfTotal) {
      pCtx[k].input.colDataAggIsSet = false;
392 393
    }

394 395
    if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) {
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
396 397

      char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
398

399
      SColumnInfoData idata = {0};
dengyihao's avatar
dengyihao 已提交
400
      idata.info.type = TSDB_DATA_TYPE_BIGINT;
401
      idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
dengyihao's avatar
dengyihao 已提交
402
      idata.pData = p;
403 404 405 406

      SScalarParam out = {.columnData = &idata};
      SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
      pCtx[k].sfp.process(&tw, 1, &out);
407
      pEntryInfo->numOfRes = 1;
408 409 410 411 412 413 414 415 416 417
    } else {
      int32_t code = TSDB_CODE_SUCCESS;
      if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
        code = pCtx[k].fpSet.process(&pCtx[k]);

        if (code != TSDB_CODE_SUCCESS) {
          qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
          taskInfo->code = code;
          longjmp(taskInfo->env, code);
        }
418
      }
419

420 421 422 423 424
      // restore it
      pCtx[k].input.colDataAggIsSet = hasAgg;
      pCtx[k].input.startRowIndex = startOffset;
      pCtx[k].input.numOfRows = numOfRows;
    }
425 426 427
  }
}

dengyihao's avatar
dengyihao 已提交
428
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
429
                                   int32_t scanFlag, bool createDummyCol);
430

dengyihao's avatar
dengyihao 已提交
431 432
static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock,
                                    int32_t order) {
433
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
434
    pCtx[i].order = order;
435
    pCtx[i].input.numOfRows = pBlock->info.rows;
436
    setBlockStatisInfo(&pCtx[i], &pOperator->exprSupp.pExprInfo[i], pBlock);
437 438 439
  }
}

X
Xiaoyu Wang 已提交
440 441
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
                       int32_t scanFlag, bool createDummyCol) {
442
  if (pBlock->pBlockAgg != NULL) {
H
Haojun Liao 已提交
443
    doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
444
  } else {
445
    doSetInputDataBlock(pOperator, pCtx, pBlock, order, scanFlag, createDummyCol);
H
Haojun Liao 已提交
446
  }
447 448
}

L
Liu Jicong 已提交
449 450
static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t paramIndex,
                                             int32_t numOfRows) {
451 452 453 454 455 456 457 458
  SColumnInfoData* pColInfo = NULL;
  if (pInput->pData[paramIndex] == NULL) {
    pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pColInfo == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    // Set the correct column info (data type and bytes)
459 460
    pColInfo->info.type = pFuncParam->param.nType;
    pColInfo->info.bytes = pFuncParam->param.nLen;
461 462

    pInput->pData[paramIndex] = pColInfo;
463 464
  } else {
    pColInfo = pInput->pData[paramIndex];
465 466
  }

467
  colInfoDataEnsureCapacity(pColInfo, numOfRows);
468

469
  int8_t type = pFuncParam->param.nType;
470 471
  if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
    int64_t v = pFuncParam->param.i;
dengyihao's avatar
dengyihao 已提交
472
    for (int32_t i = 0; i < numOfRows; ++i) {
473 474 475 476
      colDataAppendInt64(pColInfo, i, &v);
    }
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
dengyihao's avatar
dengyihao 已提交
477
    for (int32_t i = 0; i < numOfRows; ++i) {
478 479
      colDataAppendDouble(pColInfo, i, &v);
    }
480
  } else if (type == TSDB_DATA_TYPE_VARCHAR) {
L
Liu Jicong 已提交
481
    char* tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE);
482
    STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen);
L
Liu Jicong 已提交
483
    for (int32_t i = 0; i < numOfRows; ++i) {
484 485
      colDataAppend(pColInfo, i, tmp, false);
    }
486 487 488 489 490
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
491
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
X
Xiaoyu Wang 已提交
492
                                   int32_t scanFlag, bool createDummyCol) {
493 494
  int32_t code = TSDB_CODE_SUCCESS;

495
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
L
Liu Jicong 已提交
496
    pCtx[i].order = order;
497 498
    pCtx[i].input.numOfRows = pBlock->info.rows;

L
Liu Jicong 已提交
499
    pCtx[i].pSrcBlock = pBlock;
X
Xiaoyu Wang 已提交
500
    pCtx[i].scanFlag = scanFlag;
H
Haojun Liao 已提交
501

502
    SInputColumnInfoData* pInput = &pCtx[i].input;
503
    pInput->uid = pBlock->info.uid;
C
Cary Xu 已提交
504
    pInput->colDataAggIsSet = false;
505

506
    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
507
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
dengyihao's avatar
dengyihao 已提交
508
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
G
Ganlin Zhao 已提交
509 510
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
dengyihao's avatar
dengyihao 已提交
511
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
512 513 514
        pInput->totalRows = pBlock->info.rows;
        pInput->numOfRows = pBlock->info.rows;
        pInput->startRowIndex = 0;
515

516
        // NOTE: the last parameter is the primary timestamp column
517 518 519
        if (fmIsTimelineFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
          pInput->pPTS = pInput->pData[j];
        }
520 521
        ASSERT(pInput->pData[j] != NULL);
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
522 523 524
        // todo avoid case: top(k, 12), 12 is the value parameter.
        // sum(11), 11 is also the value parameter.
        if (createDummyCol && pOneExpr->base.numOfParams == 1) {
525 526 527 528
          pInput->totalRows = pBlock->info.rows;
          pInput->numOfRows = pBlock->info.rows;
          pInput->startRowIndex = 0;

529
          code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows);
530 531 532
          if (code != TSDB_CODE_SUCCESS) {
            return code;
          }
533
        }
G
Ganlin Zhao 已提交
534 535
      }
    }
H
Haojun Liao 已提交
536
  }
537 538

  return code;
H
Haojun Liao 已提交
539 540
}

541
static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) {
542
  for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
H
Haojun Liao 已提交
543
    if (functionNeedToExecute(&pCtx[k])) {
544
      // todo add a dummy funtion to avoid process check
545 546 547
      if (pCtx[k].fpSet.process == NULL) {
        continue;
      }
548
#ifdef BUF_PAGE_DEBUG
wmmhello's avatar
wmmhello 已提交
549
      qDebug("page_process");
550
#endif
551 552 553 554
      int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
      if (code != TSDB_CODE_SUCCESS) {
        qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
        return code;
555
      }
556 557
    }
  }
558 559

  return TSDB_CODE_SUCCESS;
560 561
}

H
Haojun Liao 已提交
562
static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) {
dengyihao's avatar
dengyihao 已提交
563
  size_t num = (pPseudoList != NULL) ? taosArrayGetSize(pPseudoList) : 0;
H
Haojun Liao 已提交
564 565 566 567 568
  for (int32_t i = 0; i < num; ++i) {
    pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i);
  }
}

569
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
X
Xiaoyu Wang 已提交
570
                              int32_t numOfOutput, SArray* pPseudoList) {
H
Haojun Liao 已提交
571
  setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
H
Haojun Liao 已提交
572
  pResult->info.groupId = pSrcBlock->info.groupId;
H
Haojun Liao 已提交
573

dengyihao's avatar
dengyihao 已提交
574 575
  // if the source equals to the destination, it is to create a new column as the result of scalar function or some
  // operators.
576 577
  bool createNewColModel = (pResult == pSrcBlock);

578 579
  int32_t numOfRows = 0;

580
  for (int32_t k = 0; k < numOfOutput; ++k) {
dengyihao's avatar
dengyihao 已提交
581
    int32_t         outputSlotId = pExpr[k].base.resSchema.slotId;
582 583
    SqlFunctionCtx* pfCtx = &pCtx[k];

L
Liu Jicong 已提交
584
    if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) {  // it is a project query
585
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
586
      if (pResult->info.rows > 0 && !createNewColModel) {
X
Xiaoyu Wang 已提交
587 588
        colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pfCtx->input.pData[0],
                        pfCtx->input.numOfRows);
589
      } else {
590
        colDataAssign(pColInfoData, pfCtx->input.pData[0], pfCtx->input.numOfRows, &pResult->info);
591
      }
592

593
      numOfRows = pfCtx->input.numOfRows;
594
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
595
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
596

dengyihao's avatar
dengyihao 已提交
597
      int32_t offset = createNewColModel ? 0 : pResult->info.rows;
598
      for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
dengyihao's avatar
dengyihao 已提交
599 600 601
        colDataAppend(pColInfoData, i + offset,
                      taosVariantGet(&pExpr[k].base.pParam[0].param, pExpr[k].base.pParam[0].param.nType),
                      TSDB_DATA_TYPE_NULL == pExpr[k].base.pParam[0].param.nType);
602
      }
603 604

      numOfRows = pSrcBlock->info.rows;
H
Haojun Liao 已提交
605
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) {
606 607 608
      SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
      taosArrayPush(pBlockList, &pSrcBlock);

609
      SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
610
      SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
611

612
      SScalarParam dest = {.columnData = &idata};
X
Xiaoyu Wang 已提交
613
      int32_t      code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
614 615 616 617
      if (code != TSDB_CODE_SUCCESS) {
        taosArrayDestroy(pBlockList);
        return code;
      }
618

dengyihao's avatar
dengyihao 已提交
619
      int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
620
      ASSERT(pResult->info.capacity > 0);
621
      colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
622 623

      numOfRows = dest.numOfRows;
624 625
      taosArrayDestroy(pBlockList);
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
626
      ASSERT(!fmIsAggFunc(pfCtx->functionId));
627

628 629
      // _rowts/_c0, not tbname column
      if (fmIsPseudoColumnFunc(pfCtx->functionId) && (!fmIsScanPseudoColumnFunc(pfCtx->functionId))) {
H
Haojun Liao 已提交
630
        // do nothing
631 632 633 634 635 636 637 638 639 640 641 642 643 644
      } else if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) {
        SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[k]);
        pfCtx->fpSet.init(&pCtx[k], pResInfo);

        pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
        pfCtx->offset = createNewColModel ? 0 : pResult->info.rows;  // set the start offset

        // set the timestamp(_rowts) output buffer
        if (taosArrayGetSize(pPseudoList) > 0) {
          int32_t* outputColIndex = taosArrayGet(pPseudoList, 0);
          pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
        }

        numOfRows = pfCtx->fpSet.process(pfCtx);
H
Haojun Liao 已提交
645 646 647
      } else {
        SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
        taosArrayPush(pBlockList, &pSrcBlock);
G
Ganlin Zhao 已提交
648

649
        SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
650
        SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
H
Haojun Liao 已提交
651

652
        SScalarParam dest = {.columnData = &idata};
X
Xiaoyu Wang 已提交
653
        int32_t      code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
654 655 656 657
        if (code != TSDB_CODE_SUCCESS) {
          taosArrayDestroy(pBlockList);
          return code;
        }
658

dengyihao's avatar
dengyihao 已提交
659
        int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
660
        ASSERT(pResult->info.capacity > 0);
661
        colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
662 663

        numOfRows = dest.numOfRows;
H
Haojun Liao 已提交
664 665
        taosArrayDestroy(pBlockList);
      }
666
    } else {
667
      ASSERT(0);
668 669
    }
  }
670

671 672 673
  if (!createNewColModel) {
    pResult->info.rows += numOfRows;
  }
674 675

  return TSDB_CODE_SUCCESS;
676 677
}

678 679 680
static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
  if (IS_VAR_DATA_TYPE(type)) {
    // todo disable this
681

682 683 684 685 686 687 688 689 690
    //    if (pResultRow->key == NULL) {
    //      pResultRow->key = taosMemoryMalloc(varDataTLen(pData));
    //      varDataCopy(pResultRow->key, pData);
    //    } else {
    //      ASSERT(memcmp(pResultRow->key, pData, varDataTLen(pData)) == 0);
    //    }
  } else {
    int64_t v = -1;
    GET_TYPED_DATA(v, int64_t, type, pData);
691

692 693
    pResultRow->win.skey = v;
    pResultRow->win.ekey = v;
694 695 696
  }
}

5
54liuyao 已提交
697
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
698
  struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
699

700 701 702 703 704
  // in case of timestamp column, always generated results.
  int32_t functionId = pCtx->functionId;
  if (functionId == -1) {
    return false;
  }
705

706 707
  if (pCtx->scanFlag == REPEAT_SCAN) {
    return fmIsRepeatScanFunc(pCtx->functionId);
708 709
  }

710 711
  if (isRowEntryCompleted(pResInfo)) {
    return false;
712 713
  }

714 715 716
  return true;
}

717 718 719 720 721 722 723
static int32_t doCreateConstantValColumnAggInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type,
                                                int32_t paramIndex, int32_t numOfRows) {
  if (pInput->pData[paramIndex] == NULL) {
    pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pInput->pData[paramIndex] == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
724

725 726 727
    // Set the correct column info (data type and bytes)
    pInput->pData[paramIndex]->info.type = type;
    pInput->pData[paramIndex]->info.bytes = tDataTypes[type].bytes;
728
  }
H
Haojun Liao 已提交
729

730 731 732 733 734 735
  SColumnDataAgg* da = NULL;
  if (pInput->pColumnDataAgg[paramIndex] == NULL) {
    da = taosMemoryCalloc(1, sizeof(SColumnDataAgg));
    pInput->pColumnDataAgg[paramIndex] = da;
    if (da == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
736 737
    }
  } else {
738
    da = pInput->pColumnDataAgg[paramIndex];
739 740
  }

741
  ASSERT(!IS_VAR_DATA_TYPE(type));
742

743 744
  if (type == TSDB_DATA_TYPE_BIGINT) {
    int64_t v = pFuncParam->param.i;
745
    *da = (SColumnDataAgg){.numOfNull = 0, .min = v, .max = v, .sum = v * numOfRows};
746 747
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
748
    *da = (SColumnDataAgg){.numOfNull = 0};
749

750 751 752 753 754 755
    *(double*)&da->min = v;
    *(double*)&da->max = v;
    *(double*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_BOOL) {  // todo validate this data type
    bool v = pFuncParam->param.i;

756
    *da = (SColumnDataAgg){.numOfNull = 0};
757 758 759 760 761
    *(bool*)&da->min = 0;
    *(bool*)&da->max = v;
    *(bool*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_TIMESTAMP) {
    // do nothing
762
  } else {
763
    ASSERT(0);
764 765
  }

766 767
  return TSDB_CODE_SUCCESS;
}
768 769 770 771 772 773 774 775 776 777 778

void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock) {
  int32_t numOfRows = pBlock->info.rows;

  SInputColumnInfoData* pInput = &pCtx->input;
  pInput->numOfRows = numOfRows;
  pInput->totalRows = numOfRows;

  if (pBlock->pBlockAgg != NULL) {
    pInput->colDataAggIsSet = true;

779 780
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
781

782 783
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
784 785 786 787
        pInput->pColumnDataAgg[j] = pBlock->pBlockAgg[slotId];
        if (pInput->pColumnDataAgg[j] == NULL) {
          pInput->colDataAggIsSet = false;
        }
788 789 790 791

        // Here we set the column info data since the data type for each column data is required, but
        // the data in the corresponding SColumnInfoData will not be used.
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
792 793
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
        doCreateConstantValColumnAggInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
794 795
      }
    }
796
  } else {
797
    pInput->colDataAggIsSet = false;
798 799 800
  }

  // set the statistics data for primary time stamp column
801 802 803 804 805
  //  if (pCtx->functionId == FUNCTION_SPREAD && pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
  //    pCtx->isAggSet = true;
  //    pCtx->agg.min = pBlock->info.window.skey;
  //    pCtx->agg.max = pBlock->info.window.ekey;
  //  }
806 807
}

L
Liu Jicong 已提交
808
bool isTaskKilled(SExecTaskInfo* pTaskInfo) {
809 810
  // query has been executed more than tsShellActivityTimer, and the retrieve has not arrived
  // abort current query execution.
L
Liu Jicong 已提交
811 812
  if (pTaskInfo->owner != 0 &&
      ((taosGetTimestampSec() - pTaskInfo->cost.start / 1000) > 10 * getMaximumIdleDurationSec())
813 814
      /*(!needBuildResAfterQueryComplete(pTaskInfo))*/) {
    assert(pTaskInfo->cost.start != 0);
L
Liu Jicong 已提交
815 816 817
    //    qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64
    //           ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec());
    //    return true;
818 819 820 821 822
  }

  return false;
}

L
Liu Jicong 已提交
823
void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED; }
824 825

/////////////////////////////////////////////////////////////////////////////////////////////
L
Liu Jicong 已提交
826
// todo refactor : return window
827
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win) {
H
Haojun Liao 已提交
828
  win->skey = taosTimeTruncate(key, pInterval, precision);
829 830

  /*
H
Haojun Liao 已提交
831
   * if the realSkey > INT64_MAX - pInterval->interval, the query duration between
832 833
   * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
   */
834 835
  win->ekey = taosTimeAdd(win->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
  if (win->ekey < win->skey) {
836 837 838 839
    win->ekey = INT64_MAX;
  }
}

840
#if 0
L
Liu Jicong 已提交
841
static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
842

843 844 845
  bool hasFirstLastFunc = false;
  bool hasOtherFunc = false;

846
  if (status == BLK_DATA_DATA_LOAD || status == BLK_DATA_FILTEROUT) {
847 848 849 850 851
    return status;
  }

  for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
    int32_t functionId = getExprFunctionId(&pQuery->pExpr1[i]);
852

853 854 855 856 857 858 859 860 861 862
    if (functionId == FUNCTION_TS || functionId == FUNCTION_TS_DUMMY || functionId == FUNCTION_TAG ||
        functionId == FUNCTION_TAG_DUMMY) {
      continue;
    }

    if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_LAST_DST) {
      hasFirstLastFunc = true;
    } else {
      hasOtherFunc = true;
    }
863

864 865
  }

866
  if (hasFirstLastFunc && status == BLK_DATA_NOT_LOAD) {
L
Liu Jicong 已提交
867
    if (!hasOtherFunc) {
868
      return BLK_DATA_FILTEROUT;
869
    } else {
870
      return BLK_DATA_DATA_LOAD;
871 872 873 874 875 876
    }
  }

  return status;
}

877 878
#endif

L
Liu Jicong 已提交
879 880
// static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableReq* pQueryMsg, bool stableQuery) {
//   STaskAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
H
Haojun Liao 已提交
881
//
L
Liu Jicong 已提交
882 883 884 885
//   // in case of point-interpolation query, use asc order scan
//   char msg[] = "QInfo:0x%"PRIx64" scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%"
//   PRId64
//                "-%" PRId64 ", new qrange:%" PRId64 "-%" PRId64;
H
Haojun Liao 已提交
886
//
L
Liu Jicong 已提交
887 888 889 890 891
//   // todo handle the case the the order irrelevant query type mixed up with order critical query type
//   // descending order query for last_row query
//   if (isFirstLastRowQuery(pQueryAttr)) {
//     //qDebug("QInfo:0x%"PRIx64" scan order changed for last_row query, old:%d, new:%d", pQInfo->qId,
//     pQueryAttr->order.order, TSDB_ORDER_ASC);
H
Haojun Liao 已提交
892
//
L
Liu Jicong 已提交
893 894
//     pQueryAttr->order.order = TSDB_ORDER_ASC;
//     if (pQueryAttr->window.skey > pQueryAttr->window.ekey) {
wafwerar's avatar
wafwerar 已提交
895
//       TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
L
Liu Jicong 已提交
896
//     }
H
Haojun Liao 已提交
897
//
L
Liu Jicong 已提交
898 899 900
//     pQueryAttr->needReverseScan = false;
//     return;
//   }
H
Haojun Liao 已提交
901
//
L
Liu Jicong 已提交
902 903 904
//   if (pQueryAttr->groupbyColumn && pQueryAttr->order.order == TSDB_ORDER_DESC) {
//     pQueryAttr->order.order = TSDB_ORDER_ASC;
//     if (pQueryAttr->window.skey > pQueryAttr->window.ekey) {
wafwerar's avatar
wafwerar 已提交
905
//       TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
L
Liu Jicong 已提交
906
//     }
H
Haojun Liao 已提交
907
//
L
Liu Jicong 已提交
908 909 910 911
//     pQueryAttr->needReverseScan = false;
//     doUpdateLastKey(pQueryAttr);
//     return;
//   }
H
Haojun Liao 已提交
912
//
L
Liu Jicong 已提交
913 914 915 916 917 918
//   if (pQueryAttr->pointInterpQuery && pQueryAttr->interval.interval == 0) {
//     if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
//       //qDebug(msg, pQInfo->qId, "interp", pQueryAttr->order.order, TSDB_ORDER_ASC, pQueryAttr->window.skey,
//       pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey); TSWAP(pQueryAttr->window.skey,
//       pQueryAttr->window.ekey, TSKEY);
//     }
H
Haojun Liao 已提交
919
//
L
Liu Jicong 已提交
920 921 922
//     pQueryAttr->order.order = TSDB_ORDER_ASC;
//     return;
//   }
H
Haojun Liao 已提交
923
//
L
Liu Jicong 已提交
924 925 926 927
//   if (pQueryAttr->interval.interval == 0) {
//     if (onlyFirstQuery(pQueryAttr)) {
//       if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
//         //qDebug(msg, pQInfo->qId, "only-first", pQueryAttr->order.order, TSDB_ORDER_ASC, pQueryAttr->window.skey,
H
Haojun Liao 已提交
928 929
////               pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
//
wafwerar's avatar
wafwerar 已提交
930
//        TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
H
Haojun Liao 已提交
931 932 933 934 935 936 937 938 939 940
//        doUpdateLastKey(pQueryAttr);
//      }
//
//      pQueryAttr->order.order = TSDB_ORDER_ASC;
//      pQueryAttr->needReverseScan = false;
//    } else if (onlyLastQuery(pQueryAttr) && notContainSessionOrStateWindow(pQueryAttr)) {
//      if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//        //qDebug(msg, pQInfo->qId, "only-last", pQueryAttr->order.order, TSDB_ORDER_DESC, pQueryAttr->window.skey,
////               pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
//
wafwerar's avatar
wafwerar 已提交
941
//        TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
H
Haojun Liao 已提交
942 943 944 945 946 947 948 949 950 951 952 953
//        doUpdateLastKey(pQueryAttr);
//      }
//
//      pQueryAttr->order.order = TSDB_ORDER_DESC;
//      pQueryAttr->needReverseScan = false;
//    }
//
//  } else {  // interval query
//    if (stableQuery) {
//      if (onlyFirstQuery(pQueryAttr)) {
//        if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
//          //qDebug(msg, pQInfo->qId, "only-first stable", pQueryAttr->order.order, TSDB_ORDER_ASC,
L
Liu Jicong 已提交
954 955
////                 pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey,
/// pQueryAttr->window.skey);
H
Haojun Liao 已提交
956
//
wafwerar's avatar
wafwerar 已提交
957
//          TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
H
Haojun Liao 已提交
958 959 960 961 962 963 964 965
//          doUpdateLastKey(pQueryAttr);
//        }
//
//        pQueryAttr->order.order = TSDB_ORDER_ASC;
//        pQueryAttr->needReverseScan = false;
//      } else if (onlyLastQuery(pQueryAttr)) {
//        if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//          //qDebug(msg, pQInfo->qId, "only-last stable", pQueryAttr->order.order, TSDB_ORDER_DESC,
L
Liu Jicong 已提交
966 967
////                 pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey,
/// pQueryAttr->window.skey);
H
Haojun Liao 已提交
968
//
wafwerar's avatar
wafwerar 已提交
969
//          TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
H
Haojun Liao 已提交
970 971 972 973 974 975 976 977 978
//          doUpdateLastKey(pQueryAttr);
//        }
//
//        pQueryAttr->order.order = TSDB_ORDER_DESC;
//        pQueryAttr->needReverseScan = false;
//      }
//    }
//  }
//}
979

L
Liu Jicong 已提交
980 981 982
// static FORCE_INLINE bool doFilterByBlockStatistics(STaskRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis,
// SqlFunctionCtx *pCtx, int32_t numOfRows) {
//   STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
983
//
L
Liu Jicong 已提交
984 985 986
//   if (pDataStatis == NULL || pQueryAttr->pFilters == NULL) {
//     return true;
//   }
987
//
L
Liu Jicong 已提交
988 989
//   return filterRangeExecute(pQueryAttr->pFilters, pDataStatis, pQueryAttr->numOfCols, numOfRows);
// }
990
#if 0
H
Haojun Liao 已提交
991
static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockInfo) {
992 993
  STimeWindow w = {0};

dengyihao's avatar
dengyihao 已提交
994 995
  TSKEY sk = TMIN(pQueryAttr->window.skey, pQueryAttr->window.ekey);
  TSKEY ek = TMAX(pQueryAttr->window.skey, pQueryAttr->window.ekey);
996

997
  if (true) {
L
Liu Jicong 已提交
998
    //    getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.skey, sk, ek, &w);
999 1000 1001 1002 1003 1004
    assert(w.ekey >= pBlockInfo->window.skey);

    if (w.ekey < pBlockInfo->window.ekey) {
      return true;
    }

L
Liu Jicong 已提交
1005 1006
    while (1) {
      //      getNextTimeWindow(pQueryAttr, &w);
1007 1008 1009 1010 1011 1012 1013 1014 1015 1016
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

      assert(w.ekey > pBlockInfo->window.ekey);
      if (w.skey <= pBlockInfo->window.ekey && w.skey > pBlockInfo->window.skey) {
        return true;
      }
    }
  } else {
L
Liu Jicong 已提交
1017
    //    getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.ekey, sk, ek, &w);
1018 1019 1020 1021 1022 1023
    assert(w.skey <= pBlockInfo->window.ekey);

    if (w.skey > pBlockInfo->window.skey) {
      return true;
    }

L
Liu Jicong 已提交
1024 1025
    while (1) {
      //      getNextTimeWindow(pQueryAttr, &w);
1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

      assert(w.skey < pBlockInfo->window.skey);
      if (w.ekey < pBlockInfo->window.ekey && w.ekey >= pBlockInfo->window.skey) {
        return true;
      }
    }
  }

  return false;
}
1039
#endif
1040 1041

static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) {
H
Haojun Liao 已提交
1042
#if 0
H
Haojun Liao 已提交
1043
  SqlFunctionCtx* pCtx = pTableScanInfo->pCtx;
1044
  uint32_t        status = BLK_DATA_NOT_LOAD;
1045

L
Liu Jicong 已提交
1046
  int32_t numOfOutput = 0;  // pTableScanInfo->numOfOutput;
1047 1048
  for (int32_t i = 0; i < numOfOutput; ++i) {
    int32_t functionId = pCtx[i].functionId;
H
Haojun Liao 已提交
1049
    int32_t colId = pTableScanInfo->pExpr[i].base.pParam[0].pCol->colId;
1050 1051 1052

    // group by + first/last should not apply the first/last block filter
    if (functionId < 0) {
1053
      status |= BLK_DATA_DATA_LOAD;
1054 1055
      return status;
    } else {
L
Liu Jicong 已提交
1056
      //      status |= aAggs[functionId].dataReqFunc(&pTableScanInfo->pCtx[i], &pBlock->info.window, colId);
1057
      //      if ((status & BLK_DATA_DATA_LOAD) == BLK_DATA_DATA_LOAD) {
L
Liu Jicong 已提交
1058 1059
      //        return status;
      //      }
1060 1061 1062 1063
    }
  }

  return status;
H
Haojun Liao 已提交
1064 1065
#endif
  return 0;
1066 1067
}

L
Liu Jicong 已提交
1068 1069
int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
                              uint32_t* status) {
1070
  *status = BLK_DATA_NOT_LOAD;
1071

H
Haojun Liao 已提交
1072
  pBlock->pDataBlock = NULL;
L
Liu Jicong 已提交
1073
  pBlock->pBlockAgg = NULL;
H
Haojun Liao 已提交
1074

L
Liu Jicong 已提交
1075 1076
  //  int64_t groupId = pRuntimeEnv->current->groupIndex;
  //  bool    ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
1077

H
Haojun Liao 已提交
1078
  STaskCostInfo* pCost = &pTaskInfo->cost;
1079

1080 1081
//  pCost->totalBlocks += 1;
//  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
1082
#if 0
1083 1084 1085
  // Calculate all time windows that are overlapping or contain current data block.
  // If current data block is contained by all possible time window, do not load current data block.
  if (/*pQueryAttr->pFilters || */pQueryAttr->groupbyColumn || pQueryAttr->sw.gap > 0 ||
H
Haojun Liao 已提交
1086
      (QUERY_IS_INTERVAL_QUERY(pQueryAttr) && overlapWithTimeWindow(pTaskInfo, &pBlock->info))) {
1087
    (*status) = BLK_DATA_DATA_LOAD;
1088 1089 1090
  }

  // check if this data block is required to load
1091
  if ((*status) != BLK_DATA_DATA_LOAD) {
1092 1093 1094 1095 1096 1097 1098
    bool needFilter = true;

    // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
    // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
    if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
      SResultRow* pResult = NULL;

H
Haojun Liao 已提交
1099
      bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
1100 1101 1102 1103 1104 1105
      TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

      STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
      if (pQueryAttr->pointInterpQuery) {
        needFilter = chkWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
1106
                                    pTableScanInfo->rowEntryInfoOffset);
1107 1108 1109
      } else {
        if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId,
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
1110
                                    pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
1111 1112 1113 1114 1115
          longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }
    } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate
      doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx,
1116
                               pTableScanInfo->rowEntryInfoOffset, pTableScanInfo->numOfOutput,
1117 1118 1119 1120 1121 1122
                               pRuntimeEnv->current->groupIndex);
    }

    if (needFilter) {
      (*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock);
    } else {
1123
      (*status) = BLK_DATA_DATA_LOAD;
1124 1125 1126 1127
    }
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
H
Haojun Liao 已提交
1128
//  *status = updateBlockLoadStatus(pRuntimeEnv->pQueryAttr, *status);
1129

1130
  if ((*status) == BLK_DATA_NOT_LOAD || (*status) == BLK_DATA_FILTEROUT) {
1131 1132
    //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//           pBlockInfo->window.ekey, pBlockInfo->rows);
1133
    pCost->skipBlocks += 1;
1134
  } else if ((*status) == BLK_DATA_SMA_LOAD) {
1135 1136
    // this function never returns error?
    pCost->loadBlockStatis += 1;
1137
//    tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
1138 1139

    if (pBlock->pBlockAgg == NULL) {  // data block statistics does not exist, load data block
1140
//      pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
1141 1142 1143
      pCost->totalCheckedRows += pBlock->info.rows;
    }
  } else {
1144
    assert((*status) == BLK_DATA_DATA_LOAD);
1145 1146 1147

    // load the data block statistics to perform further filter
    pCost->loadBlockStatis += 1;
1148
//    tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
1149 1150 1151 1152 1153 1154

    if (pQueryAttr->topBotQuery && pBlock->pBlockAgg != NULL) {
      { // set previous window
        if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
          SResultRow* pResult = NULL;

H
Haojun Liao 已提交
1155
          bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
1156 1157 1158 1159 1160
          TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

          STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
          if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId,
                                      pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
1161
                                      pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172
            longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
          }
        }
      }
      bool load = false;
      for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
        int32_t functionId = pTableScanInfo->pCtx[i].functionId;
        if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM) {
//          load = topbot_datablock_filter(&pTableScanInfo->pCtx[i], (char*)&(pBlock->pBlockAgg[i].min),
//                                         (char*)&(pBlock->pBlockAgg[i].max));
          if (!load) { // current block has been discard due to filter applied
1173
            pCost->skipBlocks += 1;
1174 1175
            //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId,
//                   pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
1176
            (*status) = BLK_DATA_FILTEROUT;
1177 1178 1179 1180 1181 1182 1183 1184
            return TSDB_CODE_SUCCESS;
          }
        }
      }
    }

    // current block has been discard due to filter applied
//    if (!doFilterByBlockStatistics(pRuntimeEnv, pBlock->pBlockAgg, pTableScanInfo->pCtx, pBlockInfo->rows)) {
1185
//      pCost->skipBlocks += 1;
1186 1187
//      qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//             pBlockInfo->window.ekey, pBlockInfo->rows);
1188
//      (*status) = BLK_DATA_FILTEROUT;
1189 1190 1191 1192 1193
//      return TSDB_CODE_SUCCESS;
//    }

    pCost->totalCheckedRows += pBlockInfo->rows;
    pCost->loadBlocks += 1;
1194
//    pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
1195 1196 1197 1198 1199
//    if (pBlock->pDataBlock == NULL) {
//      return terrno;
//    }

//    if (pQueryAttr->pFilters != NULL) {
1200
//      filterSetColFieldData(pQueryAttr->pFilters, taosArrayGetSize(pBlock->pDataBlock), pBlock->pDataBlock);
1201
//    }
1202

1203 1204 1205 1206
//    if (pQueryAttr->pFilters != NULL || pRuntimeEnv->pTsBuf != NULL) {
//      filterColRowsInDataBlock(pRuntimeEnv, pBlock, ascQuery);
//    }
  }
H
Haojun Liao 已提交
1207
#endif
1208 1209 1210
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1211
static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo) {
1212 1213 1214 1215
  if (pTableQueryInfo == NULL) {
    return;
  }

wafwerar's avatar
wafwerar 已提交
1216
  //  TSWAP(pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey);
L
Liu Jicong 已提交
1217
  //  pTableQueryInfo->lastKey = pTableQueryInfo->win.skey;
1218

L
Liu Jicong 已提交
1219 1220
  //  SWITCH_ORDER(pTableQueryInfo->cur.order);
  //  pTableQueryInfo->cur.vgroupIndex = -1;
1221 1222

  // set the index to be the end slot of result rows array
dengyihao's avatar
dengyihao 已提交
1223 1224 1225 1226 1227 1228
  //  SResultRowInfo* pResultRowInfo = &pTableQueryInfo->resInfo;
  //  if (pResultRowInfo->size > 0) {
  //    pResultRowInfo->curPos = pResultRowInfo->size - 1;
  //  } else {
  //    pResultRowInfo->curPos = -1;
  //  }
1229 1230
}

H
Haojun Liao 已提交
1231
void initResultRow(SResultRow* pResultRow) {
X
Xiaoyu Wang 已提交
1232
  //  pResultRow->pEntryInfo = (struct SResultRowEntryInfo*)((char*)pResultRow + sizeof(SResultRow));
1233 1234 1235 1236 1237
}

/*
 * The start of each column SResultRowEntryInfo is denote by RowCellInfoOffset.
 * Note that in case of top/bottom query, the whole multiple rows of result is treated as only one row of results.
H
Haojun Liao 已提交
1238 1239 1240
 * +------------+-----------------result column 1------------+------------------result column 2-----------+
 * | SResultRow | SResultRowEntryInfo | intermediate buffer1 | SResultRowEntryInfo | intermediate buffer 2|
 * +------------+--------------------------------------------+--------------------------------------------+
1241 1242
 *           offset[0]                                  offset[1]                                   offset[2]
 */
1243
// TODO refactor: some function move away
L
Liu Jicong 已提交
1244 1245 1246
void setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage,
                             int32_t numOfExprs) {
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
1247 1248
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
H
Haojun Liao 已提交
1249

H
Haojun Liao 已提交
1250
  SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
1251
  initResultRowInfo(pResultRowInfo);
H
Haojun Liao 已提交
1252

L
Liu Jicong 已提交
1253 1254
  int64_t     tid = 0;
  int64_t     groupId = 0;
1255 1256
  SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, (char*)&tid, sizeof(tid), true, groupId,
                                            pTaskInfo, false, pSup);
H
Haojun Liao 已提交
1257

1258
  for (int32_t i = 0; i < numOfExprs; ++i) {
1259
    struct SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, rowEntryInfoOffset);
H
Haojun Liao 已提交
1260 1261
    cleanupResultRowEntry(pEntry);

L
Liu Jicong 已提交
1262
    pCtx[i].resultInfo = pEntry;
1263
    pCtx[i].scanFlag = stage;
H
Haojun Liao 已提交
1264 1265
  }

1266
  initCtxOutputBuffer(pCtx, numOfExprs);
H
Haojun Liao 已提交
1267 1268
}

H
Haojun Liao 已提交
1269
void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) {
1270 1271
  for (int32_t j = 0; j < size; ++j) {
    struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
dengyihao's avatar
dengyihao 已提交
1272 1273
    if (isRowEntryInitialized(pResInfo) || fmIsPseudoColumnFunc(pCtx[j].functionId) || pCtx[j].functionId == -1 ||
        fmIsScalarFunc(pCtx[j].functionId)) {
1274 1275 1276
      continue;
    }

H
Haojun Liao 已提交
1277
    pCtx[j].fpSet.init(&pCtx[j], pCtx[j].resultInfo);
1278 1279 1280
  }
}

L
Liu Jicong 已提交
1281
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
1282
  if (status == TASK_NOT_COMPLETED) {
H
Haojun Liao 已提交
1283
    pTaskInfo->status = status;
1284 1285
  } else {
    // QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first
1286
    CLEAR_QUERY_STATUS(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
1287
    pTaskInfo->status |= status;
1288 1289 1290
  }
}

L
Liu Jicong 已提交
1291
void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) {
1292 1293 1294 1295
  if (pTableQueryInfo == NULL) {
    return;
  }

L
Liu Jicong 已提交
1296
  //  taosVariantDestroy(&pTableQueryInfo->tag);
dengyihao's avatar
dengyihao 已提交
1297
  //  cleanupResultRowInfo(&pTableQueryInfo->resInfo);
1298 1299
}

1300
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
1301
  for (int32_t i = 0; i < numOfOutput; ++i) {
1302
    pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
1303 1304 1305 1306 1307

    struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
    if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
      continue;
    }
1308 1309 1310 1311 1312

    if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
      continue;
    }

1313 1314 1315 1316 1317 1318
    if (!pResInfo->initialized) {
      if (pCtx[i].functionId != -1) {
        pCtx[i].fpSet.init(&pCtx[i], pResInfo);
      } else {
        pResInfo->initialized = true;
      }
1319 1320 1321 1322
    }
  }
}

H
Haojun Liao 已提交
1323
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep);
1324

1325
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
1326 1327 1328 1329 1330
  if (pFilterNode == NULL) {
    return;
  }

  SFilterInfo* filter = NULL;
H
Haojun Liao 已提交
1331

H
Haojun Liao 已提交
1332
  // todo move to the initialization function
H
Haojun Liao 已提交
1333
  int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
1334

1335
  size_t             numOfCols = taosArrayGetSize(pBlock->pDataBlock);
1336
  SFilterColumnParam param1 = {.numOfCols = numOfCols, .pDataBlock = pBlock->pDataBlock};
1337 1338 1339
  code = filterSetDataFromSlotId(filter, &param1);

  int8_t* rowRes = NULL;
1340

1341
  // todo the keep seems never to be True??
1342
  bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols);
D
dapan1121 已提交
1343
  filterFreeInfo(filter);
1344

H
Haojun Liao 已提交
1345
  extractQualifiedTupleByFilterResult(pBlock, rowRes, keep);
1346
  blockDataUpdateTsWindow(pBlock, 0);
H
Haojun Liao 已提交
1347 1348

  taosMemoryFree(rowRes);
1349 1350
}

H
Haojun Liao 已提交
1351
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep) {
1352 1353 1354 1355 1356
  if (keep) {
    return;
  }

  if (rowRes != NULL) {
L
Liu Jicong 已提交
1357
    int32_t      totalRows = pBlock->info.rows;
1358
    SSDataBlock* px = createOneDataBlock(pBlock, true);
1359

1360 1361
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
    for (int32_t i = 0; i < numOfCols; ++i) {
1362 1363
      SColumnInfoData* pSrc = taosArrayGet(px->pDataBlock, i);
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
1364
      // it is a reserved column for scalar function, and no data in this column yet.
1365
      if (pDst->pData == NULL || pSrc->pData == NULL) {
1366 1367 1368
        continue;
      }

1369 1370
      colInfoDataCleanup(pDst, pBlock->info.rows);

1371
      int32_t numOfRows = 0;
1372
      for (int32_t j = 0; j < totalRows; ++j) {
D
dapan1121 已提交
1373 1374 1375
        if (rowRes[j] == 0) {
          continue;
        }
1376

D
dapan1121 已提交
1377
        if (colDataIsNull_s(pSrc, j)) {
1378
          colDataAppendNULL(pDst, numOfRows);
D
dapan1121 已提交
1379
        } else {
1380
          colDataAppend(pDst, numOfRows, colDataGetData(pSrc, j), false);
D
dapan1121 已提交
1381
        }
1382
        numOfRows += 1;
H
Haojun Liao 已提交
1383
      }
1384

1385 1386 1387 1388 1389
      if (pBlock->info.rows == totalRows) {
        pBlock->info.rows = numOfRows;
      } else {
        ASSERT(pBlock->info.rows == numOfRows);
      }
1390
    }
1391

dengyihao's avatar
dengyihao 已提交
1392
    blockDataDestroy(px);  // fix memory leak
1393 1394 1395
  } else {
    // do nothing
    pBlock->info.rows = 0;
1396 1397 1398
  }
}

L
Liu Jicong 已提交
1399 1400
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput,
                              uint64_t groupId) {
1401
  // for simple group by query without interval, all the tables belong to one group result.
L
Liu Jicong 已提交
1402
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
1403
  SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
1404 1405
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
1406

1407
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId,
L
Liu Jicong 已提交
1408
                                                  sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
L
Liu Jicong 已提交
1409
  assert(pResultRow != NULL);
1410 1411 1412 1413 1414 1415

  /*
   * not assign result buffer yet, add new result buffer
   * all group belong to one result set, and each group result has different group id so set the id to be one
   */
  if (pResultRow->pageId == -1) {
dengyihao's avatar
dengyihao 已提交
1416 1417
    int32_t ret =
        addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, groupId, pAggInfo->binfo.pRes->info.rowSize);
1418 1419 1420 1421 1422
    if (ret != TSDB_CODE_SUCCESS) {
      return;
    }
  }

1423
  setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
1424 1425
}

1426
void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId, SAggOperatorInfo* pAggInfo) {
H
Haojun Liao 已提交
1427
  if (pAggInfo->groupId != INT32_MIN && pAggInfo->groupId == groupId) {
1428 1429
    return;
  }
1430
#ifdef BUF_PAGE_DEBUG
L
Liu Jicong 已提交
1431
  qDebug("page_setbuf, groupId:%" PRIu64, groupId);
1432
#endif
1433
  doSetTableGroupOutputBuf(pOperator, pAggInfo, numOfOutput, groupId);
1434 1435

  // record the current active group id
H
Haojun Liao 已提交
1436
  pAggInfo->groupId = groupId;
1437 1438
}

1439 1440
static void doUpdateNumOfRows(SResultRow* pRow, int32_t numOfExprs, const int32_t* rowCellOffset) {
  for (int32_t j = 0; j < numOfExprs; ++j) {
1441
    struct SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowCellOffset);
1442 1443 1444 1445 1446 1447 1448 1449 1450 1451
    if (!isRowEntryInitialized(pResInfo)) {
      continue;
    }

    if (pRow->numOfRows < pResInfo->numOfRes) {
      pRow->numOfRows = pResInfo->numOfRes;
    }
  }
}

1452
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
S
shenglian zhou 已提交
1453 1454 1455
                                             SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs,
                                             const int32_t* rowCellOffset, SSDataBlock* pBlock,
                                             SExecTaskInfo* pTaskInfo) {
1456 1457 1458 1459 1460 1461 1462 1463 1464
  SFilePage*  page = getBufPage(pBuf, resultRowPosition->pageId);
  SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);

  doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset);
  if (pRow->numOfRows == 0) {
    releaseBufPage(pBuf, page);
    return 0;
  }

1465 1466 1467 1468 1469 1470 1471
  while (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
    int32_t code = blockDataEnsureCapacity(pBlock, pBlock->info.capacity * 1.25);
    if (TAOS_FAILED(code)) {
      releaseBufPage(pBuf, page);
      qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
      longjmp(pTaskInfo->env, code);
    }
1472 1473 1474 1475 1476
  }

  for (int32_t j = 0; j < numOfExprs; ++j) {
    int32_t slotId = pExprInfo[j].base.resSchema.slotId;

1477
    pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowCellOffset);
1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497
    if (pCtx[j].fpSet.finalize) {
      int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
      if (TAOS_FAILED(code)) {
        qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
        longjmp(pTaskInfo->env, code);
      }
    } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
      // do nothing, todo refactor
    } else {
      // expand the result into multiple rows. E.g., _wstartts, top(k, 20)
      // the _wstartts needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
      char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
      for (int32_t k = 0; k < pRow->numOfRows; ++k) {
        colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
      }
    }
  }

  releaseBufPage(pBuf, page);
1498
  pBlock->info.rows += pRow->numOfRows;
1499 1500 1501 1502

  return 0;
}

X
Xiaoyu Wang 已提交
1503 1504 1505
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf,
                           SGroupResInfo* pGroupResInfo, const int32_t* rowCellOffset, SqlFunctionCtx* pCtx,
                           int32_t numOfExprs) {
1506
  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
1507
  int32_t start = pGroupResInfo->index;
1508
#ifdef BUF_PAGE_DEBUG
wmmhello's avatar
wmmhello 已提交
1509
  qDebug("\npage_copytoblock rows:%d", numOfRows);
1510
#endif
1511
  for (int32_t i = start; i < numOfRows; i += 1) {
L
Liu Jicong 已提交
1512 1513
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
    SFilePage*  page = getBufPage(pBuf, pPos->pos.pageId);
1514
#ifdef BUF_PAGE_DEBUG
wmmhello's avatar
wmmhello 已提交
1515
    qDebug("page_copytoblock pos pageId:%d, offset:%d", pPos->pos.pageId, pPos->pos.offset);
1516
#endif
1517
    SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
1518 1519

    doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset);
1520 1521
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
1522
      releaseBufPage(pBuf, page);
1523 1524 1525
      continue;
    }

1526 1527 1528 1529 1530
    if (pBlock->info.groupId == 0) {
      pBlock->info.groupId = pPos->groupId;
    } else {
      // current value belongs to different group, it can't be packed into one datablock
      if (pBlock->info.groupId != pPos->groupId) {
1531
        releaseBufPage(pBuf, page);
1532 1533 1534 1535
        break;
      }
    }

1536
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
1537
      releaseBufPage(pBuf, page);
1538 1539 1540 1541 1542
      break;
    }

    pGroupResInfo->index += 1;

1543
    for (int32_t j = 0; j < numOfExprs; ++j) {
1544 1545
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

1546
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowCellOffset);
1547
      if (pCtx[j].fpSet.finalize) {
1548
#ifdef BUF_PAGE_DEBUG
wmmhello's avatar
wmmhello 已提交
1549
        qDebug("\npage_finalize %d", numOfExprs);
1550
#endif
1551
        int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
1552
        if (TAOS_FAILED(code)) {
1553 1554
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
          longjmp(pTaskInfo->env, code);
1555
        }
1556 1557
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
1558
      } else {
1559 1560
        // expand the result into multiple rows. E.g., _wstartts, top(k, 20)
        // the _wstartts needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
X
Xiaoyu Wang 已提交
1561 1562
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
1563
        if (pCtx[j].increase) {
L
Liu Jicong 已提交
1564
          int64_t ts = *(int64_t*)in;
1565
          for (int32_t k = 0; k < pRow->numOfRows; ++k) {
L
Liu Jicong 已提交
1566
            colDataAppend(pColInfoData, pBlock->info.rows + k, (const char*)&ts, pCtx[j].resultInfo->isNullRes);
1567 1568 1569 1570 1571 1572
            ts++;
          }
        } else {
          for (int32_t k = 0; k < pRow->numOfRows; ++k) {
            colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
          }
X
Xiaoyu Wang 已提交
1573
        }
1574
      }
1575 1576
    }

1577
    releaseBufPage(pBuf, page);
1578
    pBlock->info.rows += pRow->numOfRows;
L
Liu Jicong 已提交
1579 1580 1581
    //    if (pBlock->info.rows >= pBlock->info.capacity) {  // output buffer is full
    //      break;
    //    }
1582 1583
  }

X
Xiaoyu Wang 已提交
1584 1585
  qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
         pBlock->info.groupId);
1586
  blockDataUpdateTsWindow(pBlock, 0);
1587 1588 1589
  return 0;
}

X
Xiaoyu Wang 已提交
1590 1591
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                            SDiskbasedBuf* pBuf) {
1592 1593
  SExprInfo*     pExprInfo = pOperator->exprSupp.pExprInfo;
  int32_t        numOfExprs = pOperator->exprSupp.numOfExprs;
1594 1595
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

1596
  int32_t*        rowCellOffset = pOperator->exprSupp.rowEntryInfoOffset;
X
Xiaoyu Wang 已提交
1597
  SSDataBlock*    pBlock = pbInfo->pRes;
1598
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
1599

1600
  blockDataCleanup(pBlock);
1601
  if (!hasDataInGroupInfo(pGroupResInfo)) {
1602 1603 1604
    return;
  }

1605 1606
  // clear the existed group id
  pBlock->info.groupId = 0;
1607
  doCopyToSDataBlock(pTaskInfo, pBlock, pExprInfo, pBuf, pGroupResInfo, rowCellOffset, pCtx, numOfExprs);
1608 1609
}

L
Liu Jicong 已提交
1610
static void updateNumOfRowsInResultRows(SqlFunctionCtx* pCtx, int32_t numOfOutput, SResultRowInfo* pResultRowInfo,
1611
                                        int32_t* rowEntryInfoOffset) {
1612
  // update the number of result for each, only update the number of rows for the corresponding window result.
L
Liu Jicong 已提交
1613 1614 1615
  //  if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
  //    return;
  //  }
H
Haojun Liao 已提交
1616
#if 0
1617
  for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
L
Liu Jicong 已提交
1618
    SResultRow* pResult = pResultRowInfo->pResult[i];
1619 1620 1621 1622 1623 1624 1625

    for (int32_t j = 0; j < numOfOutput; ++j) {
      int32_t functionId = pCtx[j].functionId;
      if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG || functionId == FUNCTION_TAGPRJ) {
        continue;
      }

1626
      SResultRowEntryInfo* pCell = getResultEntryInfo(pResult, j, rowEntryInfoOffset);
1627
      pResult->numOfRows = (uint16_t)(TMAX(pResult->numOfRows, pCell->numOfRes));
1628 1629
    }
  }
H
Haojun Liao 已提交
1630
#endif
1631 1632
}

L
Liu Jicong 已提交
1633
static int32_t compressQueryColData(SColumnInfoData* pColRes, int32_t numOfRows, char* data, int8_t compressed) {
1634 1635
  int32_t colSize = pColRes->info.bytes * numOfRows;
  return (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colSize, numOfRows, data,
L
Liu Jicong 已提交
1636
                                                      colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
1637 1638
}

1639 1640 1641
int32_t doFillTimeIntervalGapsInResults(struct SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t capacity) {
  int32_t numOfRows = (int32_t)taosFillResultDataBlock(pFillInfo, pBlock, capacity - pBlock->info.rows);
  pBlock->info.rows += numOfRows;
1642

1643
  return pBlock->info.rows;
1644 1645
}

L
Liu Jicong 已提交
1646 1647
void queryCostStatis(SExecTaskInfo* pTaskInfo) {
  STaskCostInfo* pSummary = &pTaskInfo->cost;
1648

L
Liu Jicong 已提交
1649 1650 1651
  //  uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pResultRowHashTable);
  //  hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map);
  //  pSummary->hashSize = hashSize;
1652 1653 1654 1655

  // add the merge time
  pSummary->elapsedTime += pSummary->firstStageMergeTime;

L
Liu Jicong 已提交
1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666
  //  SResultRowPool* p = pTaskInfo->pool;
  //  if (p != NULL) {
  //    pSummary->winInfoSize = getResultRowPoolMemSize(p);
  //    pSummary->numOfTimeWindows = getNumOfAllocatedResultRows(p);
  //  } else {
  //    pSummary->winInfoSize = 0;
  //    pSummary->numOfTimeWindows = 0;
  //  }
  //
  //  calculateOperatorProfResults(pQInfo);

1667 1668
  SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
  if (pSummary->pRecoder != NULL) {
X
Xiaoyu Wang 已提交
1669 1670
    qDebug("%s :cost summary: elapsed time:%" PRId64 " us, first merge:%" PRId64
           " us, total blocks:%d, "
1671 1672 1673 1674
           "load block statis:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
           GET_TASKID(pTaskInfo), pSummary->elapsedTime, pSummary->firstStageMergeTime, pRecorder->totalBlocks,
           pRecorder->loadBlockStatis, pRecorder->loadBlocks, pRecorder->totalRows, pRecorder->totalCheckedRows);
  }
L
Liu Jicong 已提交
1675 1676 1677
  // qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb,
  // hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
  //      pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0);
1678 1679
}

L
Liu Jicong 已提交
1680 1681 1682
// static void updateOffsetVal(STaskRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
1683
//
L
Liu Jicong 已提交
1684
//   int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
1685
//
L
Liu Jicong 已提交
1686 1687 1688 1689
//   if (pQueryAttr->limit.offset == pBlockInfo->rows) {  // current block will ignore completed
//     pTableQueryInfo->lastKey = QUERY_IS_ASC_QUERY(pQueryAttr) ? pBlockInfo->window.ekey + step :
//     pBlockInfo->window.skey + step; pQueryAttr->limit.offset = 0; return;
//   }
1690
//
L
Liu Jicong 已提交
1691 1692 1693 1694 1695
//   if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//     pQueryAttr->pos = (int32_t)pQueryAttr->limit.offset;
//   } else {
//     pQueryAttr->pos = pBlockInfo->rows - (int32_t)pQueryAttr->limit.offset - 1;
//   }
1696
//
L
Liu Jicong 已提交
1697
//   assert(pQueryAttr->pos >= 0 && pQueryAttr->pos <= pBlockInfo->rows - 1);
1698
//
L
Liu Jicong 已提交
1699 1700
//   SArray *         pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//   SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1701
//
L
Liu Jicong 已提交
1702 1703
//   // update the pQueryAttr->limit.offset value, and pQueryAttr->pos value
//   TSKEY *keys = (TSKEY *) pColInfoData->pData;
1704
//
L
Liu Jicong 已提交
1705 1706 1707
//   // update the offset value
//   pTableQueryInfo->lastKey = keys[pQueryAttr->pos];
//   pQueryAttr->limit.offset = 0;
1708
//
L
Liu Jicong 已提交
1709
//   int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
1710
//
L
Liu Jicong 已提交
1711 1712 1713 1714
//   //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numBlocksOfStep:%d, numOfRes:%d,
//   lastKey:%"PRId64, GET_TASKID(pRuntimeEnv),
//          pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey);
// }
1715

L
Liu Jicong 已提交
1716 1717
// void skipBlocks(STaskRuntimeEnv *pRuntimeEnv) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
1718
//
L
Liu Jicong 已提交
1719 1720 1721
//   if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0) {
//     return;
//   }
1722
//
L
Liu Jicong 已提交
1723 1724
//   pQueryAttr->pos = 0;
//   int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
1725
//
L
Liu Jicong 已提交
1726 1727
//   STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
//   TsdbQueryHandleT pTsdbReadHandle = pRuntimeEnv->pTsdbReadHandle;
1728
//
L
Liu Jicong 已提交
1729 1730 1731 1732 1733
//   SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
//   while (tsdbNextDataBlock(pTsdbReadHandle)) {
//     if (isTaskKilled(pRuntimeEnv->qinfo)) {
//       longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
//     }
1734
//
L
Liu Jicong 已提交
1735
//     tsdbRetrieveDataBlockInfo(pTsdbReadHandle, &blockInfo);
1736
//
L
Liu Jicong 已提交
1737 1738 1739 1740
//     if (pQueryAttr->limit.offset > blockInfo.rows) {
//       pQueryAttr->limit.offset -= blockInfo.rows;
//       pTableQueryInfo->lastKey = (QUERY_IS_ASC_QUERY(pQueryAttr)) ? blockInfo.window.ekey : blockInfo.window.skey;
//       pTableQueryInfo->lastKey += step;
1741
//
L
Liu Jicong 已提交
1742 1743 1744 1745 1746 1747 1748
//       //qDebug("QInfo:0x%"PRIx64" skip rows:%d, offset:%" PRId64, GET_TASKID(pRuntimeEnv), blockInfo.rows,
//              pQuery->limit.offset);
//     } else {  // find the appropriated start position in current block
//       updateOffsetVal(pRuntimeEnv, &blockInfo);
//       break;
//     }
//   }
1749
//
L
Liu Jicong 已提交
1750 1751 1752 1753 1754 1755 1756 1757 1758
//   if (terrno != TSDB_CODE_SUCCESS) {
//     longjmp(pRuntimeEnv->env, terrno);
//   }
// }

// static TSKEY doSkipIntervalProcess(STaskRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo,
// STableQueryInfo* pTableQueryInfo) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
1759
//
L
Liu Jicong 已提交
1760 1761 1762
//   assert(pQueryAttr->limit.offset == 0);
//   STimeWindow tw = *win;
//   getNextTimeWindow(pQueryAttr, &tw);
1763
//
L
Liu Jicong 已提交
1764 1765
//   if ((tw.skey <= pBlockInfo->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) ||
//       (tw.ekey >= pBlockInfo->window.skey && !QUERY_IS_ASC_QUERY(pQueryAttr))) {
1766
//
L
Liu Jicong 已提交
1767 1768 1769 1770
//     // load the data block and check data remaining in current data block
//     // TODO optimize performance
//     SArray *         pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//     SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1771
//
L
Liu Jicong 已提交
1772 1773 1774 1775
//     tw = *win;
//     int32_t startPos =
//         getNextQualifiedWindow(pQueryAttr, &tw, pBlockInfo, pColInfoData->pData, binarySearchForKey, -1);
//     assert(startPos >= 0);
1776
//
L
Liu Jicong 已提交
1777 1778
//     // set the abort info
//     pQueryAttr->pos = startPos;
1779
//
L
Liu Jicong 已提交
1780 1781 1782 1783
//     // reset the query start timestamp
//     pTableQueryInfo->win.skey = ((TSKEY *)pColInfoData->pData)[startPos];
//     pQueryAttr->window.skey = pTableQueryInfo->win.skey;
//     TSKEY key = pTableQueryInfo->win.skey;
1784
//
L
Liu Jicong 已提交
1785 1786
//     pWindowResInfo->prevSKey = tw.skey;
//     int32_t index = pRuntimeEnv->resultRowInfo.curIndex;
1787
//
L
Liu Jicong 已提交
1788 1789
//     int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
//     pRuntimeEnv->resultRowInfo.curIndex = index;  // restore the window index
1790
//
L
Liu Jicong 已提交
1791 1792 1793 1794
//     //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d,
//     lastKey:%" PRId64,
//            GET_TASKID(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes,
//            pQueryAttr->current->lastKey);
1795
//
L
Liu Jicong 已提交
1796 1797 1798 1799 1800
//     return key;
//   } else {  // do nothing
//     pQueryAttr->window.skey      = tw.skey;
//     pWindowResInfo->prevSKey = tw.skey;
//     pTableQueryInfo->lastKey = tw.skey;
1801
//
L
Liu Jicong 已提交
1802 1803
//     return tw.skey;
//   }
1804
//
L
Liu Jicong 已提交
1805 1806 1807 1808 1809 1810 1811 1812 1813 1814
//   return true;
// }

// static bool skipTimeInterval(STaskRuntimeEnv *pRuntimeEnv, TSKEY* start) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//     assert(*start <= pRuntimeEnv->current->lastKey);
//   } else {
//     assert(*start >= pRuntimeEnv->current->lastKey);
//   }
1815
//
L
Liu Jicong 已提交
1816 1817 1818 1819 1820
//   // if queried with value filter, do NOT forward query start position
//   if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL ||
//   pRuntimeEnv->pFillInfo != NULL) {
//     return true;
//   }
1821
//
L
Liu Jicong 已提交
1822 1823 1824 1825 1826 1827 1828
//   /*
//    * 1. for interval without interpolation query we forward pQueryAttr->interval.interval at a time for
//    *    pQueryAttr->limit.offset times. Since hole exists, pQueryAttr->interval.interval*pQueryAttr->limit.offset
//    value is
//    *    not valid. otherwise, we only forward pQueryAttr->limit.offset number of points
//    */
//   assert(pRuntimeEnv->resultRowInfo.prevSKey == TSKEY_INITIAL_VAL);
1829
//
L
Liu Jicong 已提交
1830 1831
//   STimeWindow w = TSWINDOW_INITIALIZER;
//   bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
1832
//
L
Liu Jicong 已提交
1833 1834
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
//   STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
1835
//
L
Liu Jicong 已提交
1836 1837 1838
//   SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
//   while (tsdbNextDataBlock(pRuntimeEnv->pTsdbReadHandle)) {
//     tsdbRetrieveDataBlockInfo(pRuntimeEnv->pTsdbReadHandle, &blockInfo);
1839
//
L
Liu Jicong 已提交
1840 1841 1842 1843 1844 1845 1846 1847 1848
//     if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//       if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
//         getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.skey, blockInfo.window.skey, pQueryAttr->window.ekey,
//         &w); pWindowResInfo->prevSKey = w.skey;
//       }
//     } else {
//       getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.ekey, pQueryAttr->window.ekey, blockInfo.window.ekey, &w);
//       pWindowResInfo->prevSKey = w.skey;
//     }
1849
//
L
Liu Jicong 已提交
1850 1851
//     // the first time window
//     STimeWindow win = getActiveTimeWindow(pWindowResInfo, pWindowResInfo->prevSKey, pQueryAttr);
1852
//
L
Liu Jicong 已提交
1853 1854
//     while (pQueryAttr->limit.offset > 0) {
//       STimeWindow tw = win;
1855
//
L
Liu Jicong 已提交
1856 1857 1858
//       if ((win.ekey <= blockInfo.window.ekey && ascQuery) || (win.ekey >= blockInfo.window.skey && !ascQuery)) {
//         pQueryAttr->limit.offset -= 1;
//         pWindowResInfo->prevSKey = win.skey;
1859
//
L
Liu Jicong 已提交
1860 1861 1862 1863 1864 1865
//         // current time window is aligned with blockInfo.window.ekey
//         // restart it from next data block by set prevSKey to be TSKEY_INITIAL_VAL;
//         if ((win.ekey == blockInfo.window.ekey && ascQuery) || (win.ekey == blockInfo.window.skey && !ascQuery)) {
//           pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL;
//         }
//       }
1866
//
L
Liu Jicong 已提交
1867 1868 1869 1870
//       if (pQueryAttr->limit.offset == 0) {
//         *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//         return true;
//       }
1871
//
L
Liu Jicong 已提交
1872 1873
//       // current window does not ended in current data block, try next data block
//       getNextTimeWindow(pQueryAttr, &tw);
1874
//
L
Liu Jicong 已提交
1875 1876 1877 1878 1879 1880 1881 1882 1883
//       /*
//        * If the next time window still starts from current data block,
//        * load the primary timestamp column first, and then find the start position for the next queried time window.
//        * Note that only the primary timestamp column is required.
//        * TODO: Optimize for this cases. All data blocks are not needed to be loaded, only if the first actually
//        required
//        * time window resides in current data block.
//        */
//       if ((tw.skey <= blockInfo.window.ekey && ascQuery) || (tw.ekey >= blockInfo.window.skey && !ascQuery)) {
1884
//
L
Liu Jicong 已提交
1885 1886
//         SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//         SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1887
//
L
Liu Jicong 已提交
1888 1889 1890
//         if ((win.ekey > blockInfo.window.ekey && ascQuery) || (win.ekey < blockInfo.window.skey && !ascQuery)) {
//           pQueryAttr->limit.offset -= 1;
//         }
1891
//
L
Liu Jicong 已提交
1892 1893 1894 1895 1896 1897 1898 1899
//         if (pQueryAttr->limit.offset == 0) {
//           *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//           return true;
//         } else {
//           tw = win;
//           int32_t startPos =
//               getNextQualifiedWindow(pQueryAttr, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1);
//           assert(startPos >= 0);
1900
//
L
Liu Jicong 已提交
1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911
//           // set the abort info
//           pQueryAttr->pos = startPos;
//           pTableQueryInfo->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
//           pWindowResInfo->prevSKey = tw.skey;
//           win = tw;
//         }
//       } else {
//         break;  // offset is not 0, and next time window begins or ends in the next block.
//       }
//     }
//   }
1912
//
L
Liu Jicong 已提交
1913 1914 1915 1916
//   // check for error
//   if (terrno != TSDB_CODE_SUCCESS) {
//     longjmp(pRuntimeEnv->env, terrno);
//   }
1917
//
L
Liu Jicong 已提交
1918 1919
//   return true;
// }
1920

1921
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) {
H
Haojun Liao 已提交
1922
  if (p->pDownstream == NULL) {
H
Haojun Liao 已提交
1923
    assert(p->numOfDownstream == 0);
1924 1925
  }

wafwerar's avatar
wafwerar 已提交
1926
  p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES);
1927 1928 1929 1930 1931 1932 1933
  if (p->pDownstream == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  memcpy(p->pDownstream, pDownstream, num * POINTER_BYTES);
  p->numOfDownstream = num;
  return TSDB_CODE_SUCCESS;
1934 1935
}

wmmhello's avatar
wmmhello 已提交
1936
static void doDestroyTableList(STableListInfo* pTableqinfoList);
1937

1938
static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQueryInfo* pTableQueryInfo, int32_t order) {
H
Haojun Liao 已提交
1939 1940
#if 0
    if (order == TSDB_ORDER_ASC) {
1941 1942
    assert(
        (pTableQueryInfo->win.skey <= pTableQueryInfo->win.ekey) &&
H
Haojun Liao 已提交
1943 1944
        (pTableQueryInfo->lastKey >= pTaskInfo->window.skey) &&
        (pTableQueryInfo->win.skey >= pTaskInfo->window.skey && pTableQueryInfo->win.ekey <= pTaskInfo->window.ekey));
1945 1946 1947
  } else {
    assert(
        (pTableQueryInfo->win.skey >= pTableQueryInfo->win.ekey) &&
H
Haojun Liao 已提交
1948 1949
        (pTableQueryInfo->lastKey <= pTaskInfo->window.skey) &&
        (pTableQueryInfo->win.skey <= pTaskInfo->window.skey && pTableQueryInfo->win.ekey >= pTaskInfo->window.ekey));
1950
  }
H
Haojun Liao 已提交
1951
#endif
1952 1953
}

1954 1955 1956 1957
typedef struct SFetchRspHandleWrapper {
  uint32_t exchangeId;
  int32_t  sourceIndex;
} SFetchRspHandleWrapper;
1958

D
dapan1121 已提交
1959
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1960
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
1961 1962 1963 1964 1965 1966 1967

  SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
  if (pExchangeInfo == NULL) {
    qWarn("failed to acquire exchange operator, since it may have been released");
    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
1968
  int32_t          index = pWrapper->sourceIndex;
1969
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
1970

H
Haojun Liao 已提交
1971 1972
  if (code == TSDB_CODE_SUCCESS) {
    pSourceDataInfo->pRsp = pMsg->pData;
1973

H
Haojun Liao 已提交
1974 1975
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
    pRsp->numOfRows = htonl(pRsp->numOfRows);
dengyihao's avatar
dengyihao 已提交
1976
    pRsp->compLen = htonl(pRsp->compLen);
1977
    pRsp->numOfCols = htonl(pRsp->numOfCols);
dengyihao's avatar
dengyihao 已提交
1978
    pRsp->useconds = htobe64(pRsp->useconds);
1979

1980
    ASSERT(pRsp != NULL);
1981
    qDebug("%s fetch rsp received, index:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfRows);
H
Haojun Liao 已提交
1982 1983 1984
  } else {
    pSourceDataInfo->code = code;
  }
H
Haojun Liao 已提交
1985

H
Haojun Liao 已提交
1986
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
1987 1988 1989 1990 1991

  tsem_post(&pExchangeInfo->ready);
  taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);

  taosMemoryFree(pWrapper);
wmmhello's avatar
wmmhello 已提交
1992
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1993 1994 1995 1996
}

static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
  assert(pMsgBody != NULL);
wafwerar's avatar
wafwerar 已提交
1997 1998
  taosMemoryFreeClear(pMsgBody->msgInfo.pData);
  taosMemoryFreeClear(pMsgBody);
H
Haojun Liao 已提交
1999 2000
}

D
dapan1121 已提交
2001
void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
S
Shengliang Guan 已提交
2002 2003
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
  assert(pMsg->info.ahandle != NULL);
H
Haojun Liao 已提交
2004 2005 2006 2007

  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};

  if (pMsg->contLen > 0) {
wafwerar's avatar
wafwerar 已提交
2008
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
H
Haojun Liao 已提交
2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
  }

  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
  rpcFreeCont(pMsg->pCont);
  destroySendMsgInfo(pSendInfo);
2020 2021
}

L
Liu Jicong 已提交
2022
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
2023
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2024

wafwerar's avatar
wafwerar 已提交
2025
  SResFetchReq* pMsg = taosMemoryCalloc(1, sizeof(SResFetchReq));
2026 2027 2028 2029
  if (NULL == pMsg) {
    pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return pTaskInfo->code;
  }
2030

L
Liu Jicong 已提交
2031 2032
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex);
  SSourceDataInfo*       pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
2033

2034 2035
  ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY);

2036 2037 2038
  qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu,
         GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, pSource->execId,
         sourceIndex, totalSources);
2039 2040 2041 2042 2043

  pMsg->header.vgId = htonl(pSource->addr.nodeId);
  pMsg->sId = htobe64(pSource->schedId);
  pMsg->taskId = htobe64(pSource->taskId);
  pMsg->queryId = htobe64(pTaskInfo->id.queryId);
D
dapan1121 已提交
2044
  pMsg->execId = htonl(pSource->execId);
2045 2046

  // send the fetch remote task result reques
wafwerar's avatar
wafwerar 已提交
2047
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
2048
  if (NULL == pMsgSendInfo) {
wafwerar's avatar
wafwerar 已提交
2049
    taosMemoryFreeClear(pMsg);
2050 2051 2052
    qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
    pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return pTaskInfo->code;
H
Haojun Liao 已提交
2053 2054
  }

2055
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
X
Xiaoyu Wang 已提交
2056
  pWrapper->exchangeId = pExchangeInfo->self;
2057 2058 2059
  pWrapper->sourceIndex = sourceIndex;

  pMsgSendInfo->param = pWrapper;
2060 2061
  pMsgSendInfo->msgInfo.pData = pMsg;
  pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
D
dapan1121 已提交
2062
  pMsgSendInfo->msgType = pSource->fetchMsgType;
2063
  pMsgSendInfo->fp = loadRemoteDataCallback;
2064

2065
  int64_t transporterId = 0;
L
Liu Jicong 已提交
2066
  int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
2067 2068 2069
  return TSDB_CODE_SUCCESS;
}

2070
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
L
Liu Jicong 已提交
2071 2072
                                     int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
                                     SArray* pColList) {
H
Haojun Liao 已提交
2073
  if (pColList == NULL) {  // data from other sources
2074
    blockDataCleanup(pRes);
2075
    //    blockDataEnsureCapacity(pRes, numOfRows);
2076
    blockDecode(pRes, numOfOutput, numOfRows, pData);
H
Haojun Liao 已提交
2077
  } else {  // extract data according to pColList
2078
    ASSERT(numOfOutput == taosArrayGetSize(pColList));
2079 2080 2081 2082 2083
    char* pStart = pData;

    int32_t numOfCols = htonl(*(int32_t*)pStart);
    pStart += sizeof(int32_t);

2084
    // todo refactor:extract method
2085
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
dengyihao's avatar
dengyihao 已提交
2086
    for (int32_t i = 0; i < numOfCols; ++i) {
2087 2088 2089 2090 2091 2092 2093
      SSysTableSchema* p = (SSysTableSchema*)pStart;

      p->colId = htons(p->colId);
      p->bytes = htonl(p->bytes);
      pStart += sizeof(SSysTableSchema);
    }

2094
    SSDataBlock* pBlock = createDataBlock();
dengyihao's avatar
dengyihao 已提交
2095
    for (int32_t i = 0; i < numOfCols; ++i) {
2096 2097
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
      blockDataAppendColInfo(pBlock, &idata);
2098 2099
    }

2100
    blockDecode(pBlock, numOfCols, numOfRows, pStart);
2101 2102
    blockDataEnsureCapacity(pRes, numOfRows);

H
Haojun Liao 已提交
2103
    // data from mnode
2104
    pRes->info.rows = numOfRows;
2105 2106
    relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
    blockDataDestroy(pBlock);
2107
  }
2108

2109 2110
  // todo move this to time window aggregator, since the primary timestamp may not be known by exchange operator.
  blockDataUpdateTsWindow(pRes, 0);
2111

2112
  int64_t el = taosGetTimestampUs() - startTs;
2113

H
Haojun Liao 已提交
2114 2115
  pLoadInfo->totalRows += numOfRows;
  pLoadInfo->totalSize += compLen;
2116

H
Haojun Liao 已提交
2117 2118 2119
  if (total != NULL) {
    *total += numOfRows;
  }
2120

H
Haojun Liao 已提交
2121
  pLoadInfo->totalElapsed += el;
2122 2123
  return TSDB_CODE_SUCCESS;
}
2124

L
Liu Jicong 已提交
2125 2126
static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
2127
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
H
Haojun Liao 已提交
2128

2129
  int64_t              el = taosGetTimestampUs() - startTs;
H
Haojun Liao 已提交
2130
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2131

H
Haojun Liao 已提交
2132
  pLoadInfo->totalElapsed += el;
H
Haojun Liao 已提交
2133

2134
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
L
Liu Jicong 已提交
2135 2136 2137
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
         pLoadInfo->totalElapsed / 1000.0);
2138 2139 2140 2141 2142

  doSetOperatorCompleted(pOperator);
  return NULL;
}

L
Liu Jicong 已提交
2143 2144
static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
                                                   SExecTaskInfo* pTaskInfo) {
2145 2146 2147 2148 2149 2150 2151 2152
  int32_t code = 0;
  int64_t startTs = taosGetTimestampUs();
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);

  while (1) {
    int32_t completed = 0;
    for (int32_t i = 0; i < totalSources; ++i) {
      SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
2153
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
2154
        completed += 1;
H
Haojun Liao 已提交
2155 2156
        continue;
      }
2157

2158
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
2159 2160 2161
        continue;
      }

2162 2163 2164 2165 2166
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
        code = pDataInfo->code;
        goto _error;
      }

L
Liu Jicong 已提交
2167
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
X
Xiaoyu Wang 已提交
2168
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, i);
2169

L
Liu Jicong 已提交
2170
      SSDataBlock*         pRes = pExchangeInfo->pResult;
H
Haojun Liao 已提交
2171
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2172
      if (pRsp->numOfRows == 0) {
2173 2174
        qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64
               ", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu,
D
dapan1121 已提交
2175
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows,
2176
               pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources);
2177
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2178
        completed += 1;
D
dapan1121 已提交
2179
        taosMemoryFreeClear(pDataInfo->pRsp);
2180 2181
        continue;
      }
H
Haojun Liao 已提交
2182

H
Haojun Liao 已提交
2183
      SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
L
Liu Jicong 已提交
2184 2185 2186
      code =
          extractDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
                                       pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
2187
      if (code != 0) {
2188
        taosMemoryFreeClear(pDataInfo->pRsp);
2189 2190 2191
        goto _error;
      }

2192
      if (pRsp->completed == 1) {
2193 2194
        qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
               " execId:%d"
X
Xiaoyu Wang 已提交
2195 2196
               " index:%d completed, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64
               ", completed:%d try next %d/%" PRIzu,
2197 2198
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRes->info.rows,
               pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, completed + 1, i + 1, totalSources);
2199
        completed += 1;
2200
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2201
      } else {
D
dapan1121 已提交
2202
        qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64
dengyihao's avatar
dengyihao 已提交
2203
               ", totalBytes:%" PRIu64,
2204 2205
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows,
               pLoadInfo->totalRows, pLoadInfo->totalSize);
2206 2207
      }

2208 2209
      taosMemoryFreeClear(pDataInfo->pRsp);

2210 2211
      if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED) {
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
2212 2213
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
        if (code != TSDB_CODE_SUCCESS) {
2214
          taosMemoryFreeClear(pDataInfo->pRsp);
2215 2216 2217 2218 2219 2220 2221
          goto _error;
        }
      }

      return pExchangeInfo->pResult;
    }

2222
    if (completed == totalSources) {
2223 2224 2225 2226 2227 2228 2229 2230 2231
      return setAllSourcesCompleted(pOperator, startTs);
    }
  }

_error:
  pTaskInfo->code = code;
  return NULL;
}

L
Liu Jicong 已提交
2232 2233 2234
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2235

L
Liu Jicong 已提交
2236
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2237 2238 2239
  int64_t startTs = taosGetTimestampUs();

  // Asynchronously send all fetch requests to all sources.
L
Liu Jicong 已提交
2240
  for (int32_t i = 0; i < totalSources; ++i) {
2241 2242
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2243 2244
      pTaskInfo->code = code;
      return code;
2245 2246 2247 2248
    }
  }

  int64_t endTs = taosGetTimestampUs();
2249
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
X
Xiaoyu Wang 已提交
2250
         totalSources, (endTs - startTs) / 1000.0);
2251

2252
  pOperator->status = OP_RES_TO_RETURN;
H
Haojun Liao 已提交
2253
  pOperator->cost.openCost = taosGetTimestampUs() - startTs;
2254

2255
  tsem_wait(&pExchangeInfo->ready);
H
Haojun Liao 已提交
2256
  return TSDB_CODE_SUCCESS;
2257 2258
}

L
Liu Jicong 已提交
2259 2260 2261
static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2262

L
Liu Jicong 已提交
2263
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2264
  int64_t startTs = taosGetTimestampUs();
2265

L
Liu Jicong 已提交
2266
  while (1) {
2267 2268
    if (pExchangeInfo->current >= totalSources) {
      return setAllSourcesCompleted(pOperator, startTs);
2269
    }
2270

2271 2272 2273
    doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
    tsem_wait(&pExchangeInfo->ready);

dengyihao's avatar
dengyihao 已提交
2274
    SSourceDataInfo*       pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
X
Xiaoyu Wang 已提交
2275
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
2276

H
Haojun Liao 已提交
2277
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
2278 2279
      qError("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d error happens, code:%s", GET_TASKID(pTaskInfo),
             pSource->addr.nodeId, pSource->taskId, pSource->execId, tstrerror(pDataInfo->code));
H
Haojun Liao 已提交
2280 2281 2282 2283
      pOperator->pTaskInfo->code = pDataInfo->code;
      return NULL;
    }

L
Liu Jicong 已提交
2284
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
H
Haojun Liao 已提交
2285
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2286
    if (pRsp->numOfRows == 0) {
2287 2288
      qDebug("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d %d of total completed, rowsOfSource:%" PRIu64
             ", totalRows:%" PRIu64 " try next",
D
dapan1121 已提交
2289
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pExchangeInfo->current + 1,
H
Haojun Liao 已提交
2290
             pDataInfo->totalRows, pLoadInfo->totalRows);
H
Haojun Liao 已提交
2291

2292
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2293
      pExchangeInfo->current += 1;
D
dapan1121 已提交
2294
      taosMemoryFreeClear(pDataInfo->pRsp);
2295 2296
      continue;
    }
H
Haojun Liao 已提交
2297

L
Liu Jicong 已提交
2298
    SSDataBlock*       pRes = pExchangeInfo->pResult;
H
Haojun Liao 已提交
2299
    SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
L
Liu Jicong 已提交
2300
    int32_t            code =
2301
        extractDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
L
Liu Jicong 已提交
2302
                                     pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
2303 2304

    if (pRsp->completed == 1) {
D
dapan1121 已提交
2305
      qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64
L
Liu Jicong 已提交
2306
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
2307 2308 2309
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows,
             pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1,
             totalSources);
2310

2311
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2312 2313
      pExchangeInfo->current += 1;
    } else {
D
dapan1121 已提交
2314
      qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64
L
Liu Jicong 已提交
2315
             ", totalBytes:%" PRIu64,
2316 2317
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows,
             pLoadInfo->totalRows, pLoadInfo->totalSize);
2318 2319
    }

2320
    pOperator->resultInfo.totalRows += pRes->info.rows;
2321
    taosMemoryFreeClear(pDataInfo->pRsp);
2322 2323
    return pExchangeInfo->pResult;
  }
2324 2325
}

L
Liu Jicong 已提交
2326
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
2327
  if (OPTR_IS_OPENED(pOperator)) {
H
Haojun Liao 已提交
2328 2329 2330
    return TSDB_CODE_SUCCESS;
  }

2331 2332
  int64_t st = taosGetTimestampUs();

L
Liu Jicong 已提交
2333
  SExchangeInfo* pExchangeInfo = pOperator->info;
2334
  if (!pExchangeInfo->seqLoadData) {
H
Haojun Liao 已提交
2335 2336 2337 2338 2339 2340
    int32_t code = prepareConcurrentlyLoad(pOperator);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
  }

2341
  OPTR_SET_OPENED(pOperator);
2342
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
2343 2344 2345
  return TSDB_CODE_SUCCESS;
}

2346
static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2347 2348
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2349

2350
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
2351
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2352 2353
    return NULL;
  }
2354

L
Liu Jicong 已提交
2355
  size_t               totalSources = taosArrayGetSize(pExchangeInfo->pSources);
H
Haojun Liao 已提交
2356
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
H
Haojun Liao 已提交
2357

2358
  if (pOperator->status == OP_EXEC_DONE) {
L
Liu Jicong 已提交
2359 2360 2361
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
           pLoadInfo->totalElapsed / 1000.0);
2362 2363 2364 2365 2366 2367
    return NULL;
  }

  if (pExchangeInfo->seqLoadData) {
    return seqLoadRemoteData(pOperator);
  } else {
2368
    return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
2369
  }
H
Haojun Liao 已提交
2370
}
2371

2372
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
2373
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
H
Haojun Liao 已提交
2374 2375
  if (pInfo->pSourceDataInfo == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
2376 2377
  }

L
Liu Jicong 已提交
2378
  for (int32_t i = 0; i < numOfSources; ++i) {
2379
    SSourceDataInfo dataInfo = {0};
H
Haojun Liao 已提交
2380
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
2381
    dataInfo.taskId = id;
L
Liu Jicong 已提交
2382
    dataInfo.index = i;
X
Xiaoyu Wang 已提交
2383
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
2384
    if (pDs == NULL) {
H
Haojun Liao 已提交
2385 2386 2387 2388 2389 2390 2391 2392
      taosArrayDestroy(pInfo->pSourceDataInfo);
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
}

2393
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
2394
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
H
Haojun Liao 已提交
2395

2396
  if (numOfSources == 0) {
X
Xiaoyu Wang 已提交
2397
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
2398 2399 2400
    return TSDB_CODE_INVALID_PARA;
  }

H
Haojun Liao 已提交
2401
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
wmmhello's avatar
wmmhello 已提交
2402
  if (pInfo->pSources == NULL) {
2403
    return TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
2404 2405
  }

L
Liu Jicong 已提交
2406
  for (int32_t i = 0; i < numOfSources; ++i) {
D
dapan1121 已提交
2407
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
H
Haojun Liao 已提交
2408 2409
    taosArrayPush(pInfo->pSources, pNode);
  }
2410

2411 2412
  pInfo->self = taosAddRef(exchangeObjRefPool, pInfo);

2413
  return initDataSource(numOfSources, pInfo, id);
2414 2415 2416 2417 2418 2419
}

SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo) {
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
2420
    goto _error;
2421
  }
H
Haojun Liao 已提交
2422

2423
  int32_t code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
2424 2425 2426
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2427 2428

  tsem_init(&pInfo->ready, 0, 0);
2429

2430
  pInfo->seqLoadData = false;
2431
  pInfo->pTransporter = pTransporter;
2432 2433
  pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc);
  pOperator->name = "ExchangeOperator";
X
Xiaoyu Wang 已提交
2434
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
X
Xiaoyu Wang 已提交
2435
  pOperator->blocking = false;
2436 2437
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
2438
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pResult->pDataBlock);
X
Xiaoyu Wang 已提交
2439
  pOperator->pTaskInfo = pTaskInfo;
2440

L
Liu Jicong 已提交
2441 2442
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
                                         destroyExchangeOperatorInfo, NULL, NULL, NULL);
2443
  return pOperator;
H
Haojun Liao 已提交
2444

L
Liu Jicong 已提交
2445
_error:
H
Haojun Liao 已提交
2446
  if (pInfo != NULL) {
2447
    doDestroyExchangeOperatorInfo(pInfo);
H
Haojun Liao 已提交
2448 2449
  }

wafwerar's avatar
wafwerar 已提交
2450 2451
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
2452
  pTaskInfo->code = code;
H
Haojun Liao 已提交
2453
  return NULL;
2454 2455
}

dengyihao's avatar
dengyihao 已提交
2456 2457
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                                const char* pKey);
2458

2459
static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
2460
  SSortedMergeOperatorInfo* pInfo = (SSortedMergeOperatorInfo*)param;
H
Haojun Liao 已提交
2461
  taosArrayDestroy(pInfo->pSortInfo);
2462 2463 2464
  taosArrayDestroy(pInfo->groupInfo);

  if (pInfo->pSortHandle != NULL) {
H
Haojun Liao 已提交
2465
    tsortDestroySortHandle(pInfo->pSortHandle);
2466 2467
  }

H
Haojun Liao 已提交
2468
  blockDataDestroy(pInfo->binfo.pRes);
H
Haojun Liao 已提交
2469
  cleanupAggSup(&pInfo->aggSup);
L
Liu Jicong 已提交
2470

D
dapan1121 已提交
2471
  taosMemoryFreeClear(param);
2472
}
H
Haojun Liao 已提交
2473

L
Liu Jicong 已提交
2474
static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) {
2475 2476 2477 2478
  size_t size = taosArrayGetSize(groupInfo);
  if (size == 0) {
    return true;
  }
2479

2480 2481
  for (int32_t i = 0; i < size; ++i) {
    int32_t* index = taosArrayGet(groupInfo, i);
2482

2483
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index);
L
Liu Jicong 已提交
2484
    bool             isNull = colDataIsNull(pColInfo, rowIndex, pBlock->info.rows, NULL);
2485

2486 2487 2488
    if ((isNull && buf[i] != NULL) || (!isNull && buf[i] == NULL)) {
      return false;
    }
2489

2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502
    char* pCell = colDataGetData(pColInfo, rowIndex);
    if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
      if (varDataLen(pCell) != varDataLen(buf[i])) {
        return false;
      } else {
        if (memcmp(varDataVal(pCell), varDataVal(buf[i]), varDataLen(pCell)) != 0) {
          return false;
        }
      }
    } else {
      if (memcmp(pCell, buf[i], pColInfo->info.bytes) != 0) {
        return false;
      }
2503 2504 2505
    }
  }

2506
  return 0;
2507 2508
}

L
Liu Jicong 已提交
2509 2510 2511
static void doMergeResultImpl(SSortedMergeOperatorInfo* pInfo, SqlFunctionCtx* pCtx, int32_t numOfExpr,
                              int32_t rowIndex) {
  for (int32_t j = 0; j < numOfExpr; ++j) {  // TODO set row index
X
Xiaoyu Wang 已提交
2512
                                             //    pCtx[j].startRow = rowIndex;
2513 2514
  }

2515 2516
  for (int32_t j = 0; j < numOfExpr; ++j) {
    int32_t functionId = pCtx[j].functionId;
L
Liu Jicong 已提交
2517 2518 2519 2520 2521 2522 2523 2524 2525
    //    pCtx[j].fpSet->addInput(&pCtx[j]);

    //    if (functionId < 0) {
    //      SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
    //      doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
    //    } else {
    //      assert(!TSDB_FUNC_IS_SCALAR(functionId));
    //      aAggs[functionId].mergeFunc(&pCtx[j]);
    //    }
2526
  }
2527
}
2528

L
Liu Jicong 已提交
2529 2530
static void doFinalizeResultImpl(SqlFunctionCtx* pCtx, int32_t numOfExpr) {
  for (int32_t j = 0; j < numOfExpr; ++j) {
2531 2532 2533 2534
    int32_t functionId = pCtx[j].functionId;
    //    if (functionId == FUNC_TAG_DUMMY || functionId == FUNC_TS_DUMMY) {
    //      continue;
    //    }
2535

2536 2537 2538 2539
    //    if (functionId < 0) {
    //      SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
    //      doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
    //    } else {
dengyihao's avatar
dengyihao 已提交
2540
    //    pCtx[j].fpSet.finalize(&pCtx[j]);
2541 2542
  }
}
2543

2544
static bool saveCurrentTuple(char** rowColData, SArray* pColumnList, SSDataBlock* pBlock, int32_t rowIndex) {
L
Liu Jicong 已提交
2545
  int32_t size = (int32_t)taosArrayGetSize(pColumnList);
2546

L
Liu Jicong 已提交
2547 2548
  for (int32_t i = 0; i < size; ++i) {
    int32_t*         index = taosArrayGet(pColumnList, i);
2549
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index);
H
Haojun Liao 已提交
2550

2551 2552 2553
    char* data = colDataGetData(pColInfo, rowIndex);
    memcpy(rowColData[i], data, colDataGetLength(pColInfo, rowIndex));
  }
2554

2555 2556
  return true;
}
2557

2558 2559
static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock* pBlock) {
  SSortedMergeOperatorInfo* pInfo = pOperator->info;
2560

2561
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
2562

L
Liu Jicong 已提交
2563
  for (int32_t i = 0; i < pBlock->info.rows; ++i) {
2564 2565 2566 2567 2568 2569 2570 2571 2572
    if (!pInfo->hasGroupVal) {
      ASSERT(i == 0);
      doMergeResultImpl(pInfo, pCtx, numOfExpr, i);
      pInfo->hasGroupVal = saveCurrentTuple(pInfo->groupVal, pInfo->groupInfo, pBlock, i);
    } else {
      if (needToMerge(pBlock, pInfo->groupInfo, pInfo->groupVal, i)) {
        doMergeResultImpl(pInfo, pCtx, numOfExpr, i);
      } else {
        doFinalizeResultImpl(pCtx, numOfExpr);
2573 2574
        int32_t numOfRows = getNumOfResult(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL);
        //        setTagValueForMultipleRows(pCtx, pOperator->exprSupp.numOfExprs, numOfRows);
2575

2576
        // TODO check for available buffer;
H
Haojun Liao 已提交
2577

2578 2579 2580 2581 2582
        // next group info data
        pInfo->binfo.pRes->info.rows += numOfRows;
        for (int32_t j = 0; j < numOfExpr; ++j) {
          if (pCtx[j].functionId < 0) {
            continue;
2583
          }
2584

H
Haojun Liao 已提交
2585
          pCtx[j].fpSet.process(&pCtx[j]);
2586
        }
2587 2588 2589

        doMergeResultImpl(pInfo, pCtx, numOfExpr, i);
        pInfo->hasGroupVal = saveCurrentTuple(pInfo->groupVal, pInfo->groupInfo, pBlock, i);
H
Haojun Liao 已提交
2590
      }
2591 2592 2593 2594
    }
  }
}

2595 2596
static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
  SSortedMergeOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
2597
  SSortHandle*              pHandle = pInfo->pSortHandle;
2598

2599
  SSDataBlock* pDataBlock = createOneDataBlock(pInfo->binfo.pRes, false);
2600
  blockDataEnsureCapacity(pDataBlock, pOperator->resultInfo.capacity);
2601

L
Liu Jicong 已提交
2602
  while (1) {
2603
    blockDataCleanup(pDataBlock);
2604
    while (1) {
H
Haojun Liao 已提交
2605
      STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2606 2607
      if (pTupleHandle == NULL) {
        break;
2608
      }
2609

2610 2611
      // build datablock for merge for one group
      appendOneRowToDataBlock(pDataBlock, pTupleHandle);
2612
      if (pDataBlock->info.rows >= pOperator->resultInfo.capacity) {
2613 2614
        break;
      }
2615
    }
2616

2617 2618 2619
    if (pDataBlock->info.rows == 0) {
      break;
    }
2620

2621
    setInputDataBlock(pOperator, pOperator->exprSupp.pCtx, pDataBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
L
Liu Jicong 已提交
2622 2623
    //  updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor,
    //  pOperator->pRuntimeEnv, true);
2624
    doMergeImpl(pOperator, pOperator->exprSupp.numOfExprs, pDataBlock);
2625 2626
    // flush to tuple store, and after all data have been handled, return to upstream node or sink node
  }
2627

2628 2629 2630
  doFinalizeResultImpl(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs);
  int32_t numOfRows = getNumOfResult(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL);
  //        setTagValueForMultipleRows(pCtx, pOperator->exprSupp.numOfExprs, numOfRows);
2631

2632
  // TODO check for available buffer;
2633

2634 2635
  // next group info data
  pInfo->binfo.pRes->info.rows += numOfRows;
L
Liu Jicong 已提交
2636
  return (pInfo->binfo.pRes->info.rows > 0) ? pInfo->binfo.pRes : NULL;
2637
}
2638

L
Liu Jicong 已提交
2639 2640
SSDataBlock* getSortedMergeBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
                                     SArray* pColMatchInfo, SSortedMergeOperatorInfo* pInfo) {
2641 2642 2643 2644 2645 2646 2647 2648 2649 2650
  blockDataCleanup(pDataBlock);

  SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
  if (p == NULL) {
    return NULL;
  }

  blockDataEnsureCapacity(p, capacity);

  while (1) {
2651
    STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2652 2653 2654 2655
    if (pTupleHandle == NULL) {
      break;
    }

2656
    appendOneRowToDataBlock(p, pTupleHandle);
2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669
    if (p->info.rows >= capacity) {
      break;
    }
  }

  if (p->info.rows > 0) {
    int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
    for (int32_t i = 0; i < numOfCols; ++i) {
      SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i);
      ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID);

      SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
      SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId);
2670
      colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
2671 2672 2673 2674 2675 2676 2677 2678 2679 2680
    }

    pDataBlock->info.rows = p->info.rows;
    pDataBlock->info.capacity = p->info.rows;
  }

  blockDataDestroy(p);
  return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
}

2681
static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) {
2682 2683
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
2684 2685
  }

L
Liu Jicong 已提交
2686
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
2687
  SSortedMergeOperatorInfo* pInfo = pOperator->info;
H
Haojun Liao 已提交
2688
  if (pOperator->status == OP_RES_TO_RETURN) {
2689
    return getSortedMergeBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, NULL, pInfo);
2690 2691
  }

2692
  int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
L
Liu Jicong 已提交
2693 2694
  pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
                                             pInfo->binfo.pRes, "GET_TASKID(pTaskInfo)");
H
Haojun Liao 已提交
2695

2696
  tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
2697

L
Liu Jicong 已提交
2698
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
wmmhello's avatar
wmmhello 已提交
2699
    SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
H
Haojun Liao 已提交
2700
    ps->param = pOperator->pDownstream[i];
H
Haojun Liao 已提交
2701
    tsortAddSource(pInfo->pSortHandle, ps);
2702 2703
  }

H
Haojun Liao 已提交
2704
  int32_t code = tsortOpen(pInfo->pSortHandle);
2705
  if (code != TSDB_CODE_SUCCESS) {
2706
    longjmp(pTaskInfo->env, terrno);
2707 2708
  }

H
Haojun Liao 已提交
2709
  pOperator->status = OP_RES_TO_RETURN;
2710
  return doMerge(pOperator);
2711
}
2712

L
Liu Jicong 已提交
2713 2714
static int32_t initGroupCol(SExprInfo* pExprInfo, int32_t numOfCols, SArray* pGroupInfo,
                            SSortedMergeOperatorInfo* pInfo) {
2715 2716
  if (pGroupInfo == NULL || taosArrayGetSize(pGroupInfo) == 0) {
    return 0;
H
Haojun Liao 已提交
2717 2718
  }

2719 2720 2721 2722 2723 2724 2725 2726
  int32_t len = 0;
  SArray* plist = taosArrayInit(3, sizeof(SColumn));
  pInfo->groupInfo = taosArrayInit(3, sizeof(int32_t));

  if (plist == NULL || pInfo->groupInfo == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

L
Liu Jicong 已提交
2727 2728
  size_t numOfGroupCol = taosArrayGetSize(pInfo->groupInfo);
  for (int32_t i = 0; i < numOfGroupCol; ++i) {
2729
    SColumn* pCol = taosArrayGet(pGroupInfo, i);
L
Liu Jicong 已提交
2730
    for (int32_t j = 0; j < numOfCols; ++j) {
H
Haojun Liao 已提交
2731
      SExprInfo* pe = &pExprInfo[j];
2732
      if (pe->base.resSchema.slotId == pCol->colId) {
2733 2734
        taosArrayPush(plist, pCol);
        taosArrayPush(pInfo->groupInfo, &j);
H
Haojun Liao 已提交
2735
        len += pCol->bytes;
2736 2737
        break;
      }
H
Haojun Liao 已提交
2738 2739 2740
    }
  }

2741
  ASSERT(taosArrayGetSize(pGroupInfo) == taosArrayGetSize(plist));
H
Haojun Liao 已提交
2742

wafwerar's avatar
wafwerar 已提交
2743
  pInfo->groupVal = taosMemoryCalloc(1, (POINTER_BYTES * numOfGroupCol + len));
2744 2745 2746 2747
  if (pInfo->groupVal == NULL) {
    taosArrayDestroy(plist);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Haojun Liao 已提交
2748

2749
  int32_t offset = 0;
L
Liu Jicong 已提交
2750 2751
  char*   start = (char*)(pInfo->groupVal + (POINTER_BYTES * numOfGroupCol));
  for (int32_t i = 0; i < numOfGroupCol; ++i) {
2752 2753
    pInfo->groupVal[i] = start + offset;
    SColumn* pCol = taosArrayGet(plist, i);
H
Haojun Liao 已提交
2754
    offset += pCol->bytes;
2755
  }
H
Haojun Liao 已提交
2756

2757
  taosArrayDestroy(plist);
H
Haojun Liao 已提交
2758

2759 2760
  return TSDB_CODE_SUCCESS;
}
H
Haojun Liao 已提交
2761

L
Liu Jicong 已提交
2762 2763 2764
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo,
                                             int32_t num, SArray* pSortInfo, SArray* pGroupInfo,
                                             SExecTaskInfo* pTaskInfo) {
wafwerar's avatar
wafwerar 已提交
2765
  SSortedMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortedMergeOperatorInfo));
L
Liu Jicong 已提交
2766
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2767
  if (pInfo == NULL || pOperator == NULL) {
2768
    goto _error;
2769
  }
H
Haojun Liao 已提交
2770

2771 2772 2773 2774 2775
  int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, num);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

2776
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
H
Haojun Liao 已提交
2777

2778
  if (pOperator->exprSupp.pCtx == NULL || pInfo->binfo.pRes == NULL) {
2779 2780
    goto _error;
  }
H
Haojun Liao 已提交
2781

2782
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
2783
  code = doInitAggInfoSup(&pInfo->aggSup, pOperator->exprSupp.pCtx, num, keyBufSize, pTaskInfo->id.str);
2784 2785 2786
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
2787

2788
  setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, num);
H
Haojun Liao 已提交
2789
  code = initGroupCol(pExprInfo, num, pGroupInfo, pInfo);
2790 2791 2792
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
2793

L
Liu Jicong 已提交
2794 2795 2796 2797 2798
  //  pInfo->resultRowFactor = (int32_t)(getRowNumForMultioutput(pRuntimeEnv->pQueryAttr,
  //      pRuntimeEnv->pQueryAttr->topBotQuery, false));
  pInfo->sortBufSize = 1024 * 16;  // 1MB
  pInfo->bufPageSize = 1024;
  pInfo->pSortInfo = pSortInfo;
H
Haojun Liao 已提交
2799

2800
  pOperator->resultInfo.capacity = blockDataGetCapacityInRow(pInfo->binfo.pRes, pInfo->bufPageSize);
H
Haojun Liao 已提交
2801

L
Liu Jicong 已提交
2802
  pOperator->name = "SortedMerge";
X
Xiaoyu Wang 已提交
2803
  // pOperator->operatorType = OP_SortedMerge;
2804 2805 2806
  pOperator->blocking = true;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
L
Liu Jicong 已提交
2807
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
2808

2809 2810
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSortedMerge, NULL, NULL, destroySortedMergeOperatorInfo,
                                         NULL, NULL, NULL);
2811 2812 2813
  code = appendDownstream(pOperator, downstream, numOfDownstream);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
2814
  }
H
Haojun Liao 已提交
2815

2816
  return pOperator;
H
Haojun Liao 已提交
2817

L
Liu Jicong 已提交
2818
_error:
2819
  if (pInfo != NULL) {
H
Haojun Liao 已提交
2820
    destroySortedMergeOperatorInfo(pInfo, num);
H
Haojun Liao 已提交
2821 2822
  }

wafwerar's avatar
wafwerar 已提交
2823 2824
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
2825 2826
  terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
  return NULL;
H
Haojun Liao 已提交
2827 2828
}

X
Xiaoyu Wang 已提交
2829
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) {
2830
  // todo add more information about exchange operation
2831
  int32_t type = pOperator->operatorType;
X
Xiaoyu Wang 已提交
2832
  if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN ||
2833
      type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN ||
2834
      type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN) {
2835 2836 2837
    *order = TSDB_ORDER_ASC;
    *scanFlag = MAIN_SCAN;
    return TSDB_CODE_SUCCESS;
2838
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
2839 2840 2841 2842 2843
    STableScanInfo* pTableScanInfo = pOperator->info;
    *order = pTableScanInfo->cond.order;
    *scanFlag = pTableScanInfo->scanFlag;
    return TSDB_CODE_SUCCESS;
  } else {
H
Haojun Liao 已提交
2844
    if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
2845
      return TSDB_CODE_INVALID_PARA;
H
Haojun Liao 已提交
2846
    } else {
2847
      return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag);
2848 2849 2850
    }
  }
}
L
Liu Jicong 已提交
2851
#if 0
L
Liu Jicong 已提交
2852
int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
L
Liu Jicong 已提交
2853
  uint8_t type = pOperator->operatorType;
2854 2855 2856

  pOperator->status = OP_OPENED;

L
Liu Jicong 已提交
2857
  if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
2858
    SStreamScanInfo* pScanInfo = pOperator->info;
L
Liu Jicong 已提交
2859
    pScanInfo->blockType = STREAM_INPUT__TABLE_SCAN;
2860

2861
    pScanInfo->pTableScanOp->status = OP_OPENED;
2862

2863
    STableScanInfo* pInfo = pScanInfo->pTableScanOp->info;
2864 2865
    ASSERT(pInfo->scanMode == TABLE_SCAN__TABLE_ORDER);

L
Liu Jicong 已提交
2866 2867 2868 2869
    if (uid == 0) {
      pInfo->noTable = 1;
      return TSDB_CODE_SUCCESS;
    }
2870 2871 2872 2873 2874 2875

    /*if (pSnapShotScanInfo->dataReader == NULL) {*/
    /*pSnapShotScanInfo->dataReader = tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, 0, 0);*/
    /*pSnapShotScanInfo->scanMode = TABLE_SCAN__TABLE_ORDER;*/
    /*}*/

L
Liu Jicong 已提交
2876 2877
    pInfo->noTable = 0;

2878
    if (pInfo->lastStatus.uid != uid || pInfo->lastStatus.ts != ts) {
L
Liu Jicong 已提交
2879 2880 2881 2882 2883 2884 2885 2886 2887 2888 2889
      SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

      int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
      bool    found = false;
      for (int32_t i = 0; i < tableSz; i++) {
        STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
        if (pTableInfo->uid == uid) {
          found = true;
          pInfo->currentTable = i;
        }
      }
2890
      // TODO after processing drop, found can be false
L
Liu Jicong 已提交
2891
      ASSERT(found);
2892 2893

      tsdbSetTableId(pInfo->dataReader, uid);
H
Haojun Liao 已提交
2894 2895 2896 2897
      int64_t oldSkey = pInfo->cond.twindows.skey;
      pInfo->cond.twindows.skey = ts + 1;
      tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
      pInfo->cond.twindows.skey = oldSkey;
2898 2899
      pInfo->scanTimes = 0;

S
Shengliang Guan 已提交
2900
      qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, ts,
L
Liu Jicong 已提交
2901
             pInfo->currentTable, tableSz);
L
Liu Jicong 已提交
2902
    }
L
Liu Jicong 已提交
2903

L
Liu Jicong 已提交
2904
    return TSDB_CODE_SUCCESS;
2905

L
Liu Jicong 已提交
2906
  } else {
2907 2908 2909 2910 2911
    if (pOperator->numOfDownstream == 1) {
      return doPrepareScan(pOperator->pDownstream[0], uid, ts);
    } else if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator to set the input data block");
      return TSDB_CODE_QRY_APP_ERROR;
L
Liu Jicong 已提交
2912
    } else {
2913 2914
      qError("join not supported for stream block scan");
      return TSDB_CODE_QRY_APP_ERROR;
L
Liu Jicong 已提交
2915 2916 2917 2918
    }
  }
}

2919 2920 2921
int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) {
  int32_t type = pOperator->operatorType;
  if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
2922 2923
    SStreamScanInfo* pScanInfo = pOperator->info;
    STableScanInfo*  pSnapShotScanInfo = pScanInfo->pTableScanOp->info;
L
Liu Jicong 已提交
2924 2925
    *uid = pSnapShotScanInfo->lastStatus.uid;
    *ts = pSnapShotScanInfo->lastStatus.ts;
2926 2927 2928 2929 2930 2931 2932 2933 2934 2935
  } else {
    if (pOperator->pDownstream[0] == NULL) {
      return TSDB_CODE_INVALID_PARA;
    } else {
      doGetScanStatus(pOperator->pDownstream[0], uid, ts);
    }
  }

  return TSDB_CODE_SUCCESS;
}
L
Liu Jicong 已提交
2936
#endif
2937

2938
// this is a blocking operator
L
Liu Jicong 已提交
2939
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
2940 2941
  if (OPTR_IS_OPENED(pOperator)) {
    return TSDB_CODE_SUCCESS;
2942 2943
  }

H
Haojun Liao 已提交
2944
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
2945
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
2946

2947 2948
  SExprSupp*     pSup = &pOperator->exprSupp;
  SOperatorInfo* downstream = pOperator->pDownstream[0];
2949

2950 2951
  int64_t st = taosGetTimestampUs();

2952 2953 2954
  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;

H
Haojun Liao 已提交
2955
  while (1) {
2956
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
2957 2958 2959 2960
    if (pBlock == NULL) {
      break;
    }

2961 2962 2963 2964
    int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, code);
    }
2965

2966
    // there is an scalar expression that needs to be calculated before apply the group aggregation.
2967 2968 2969
    if (pAggInfo->scalarExprSup.pExprInfo != NULL) {
      SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
      code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
2970
      if (code != TSDB_CODE_SUCCESS) {
2971
        longjmp(pTaskInfo->env, code);
2972
      }
2973 2974
    }

2975
    // the pDataBlock are always the same one, no need to call this again
2976 2977 2978
    setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId, pAggInfo);
    setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, true);
    code = doAggregateImpl(pOperator, 0, pSup->pCtx);
2979 2980 2981
    if (code != 0) {
      longjmp(pTaskInfo->env, code);
    }
2982

dengyihao's avatar
dengyihao 已提交
2983
#if 0  // test for encode/decode result info
2984
    if(pOperator->fpSet.encodeResultRow){
2985 2986
      char *result = NULL;
      int32_t length = 0;
2987 2988
      pOperator->fpSet.encodeResultRow(pOperator, &result, &length);
      SAggSupporter* pSup = &pAggInfo->aggSup;
2989 2990
      taosHashClear(pSup->pResultRowHashTable);
      pInfo->resultRowInfo.size = 0;
2991
      pOperator->fpSet.decodeResultRow(pOperator, result);
2992 2993 2994
      if(result){
        taosMemoryFree(result);
      }
2995
    }
2996
#endif
2997 2998
  }

H
Haojun Liao 已提交
2999
  closeAllResultRows(&pAggInfo->binfo.resultRowInfo);
3000
  initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
H
Haojun Liao 已提交
3001
  OPTR_SET_OPENED(pOperator);
3002

3003
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
3004 3005 3006
  return TSDB_CODE_SUCCESS;
}

3007
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
3008
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
3009 3010 3011 3012 3013 3014
  SOptrBasicInfo*   pInfo = &pAggInfo->binfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

L
Liu Jicong 已提交
3015
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3016
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
H
Haojun Liao 已提交
3017
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
3018
    doSetOperatorCompleted(pOperator);
H
Haojun Liao 已提交
3019 3020 3021
    return NULL;
  }

H
Haojun Liao 已提交
3022
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
S
slzhou 已提交
3023 3024 3025 3026 3027 3028 3029 3030
  while (1) {
    doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
    doFilter(pAggInfo->pCondition, pInfo->pRes);

    if (!hasDataInGroupInfo(&pAggInfo->groupResInfo)) {
      doSetOperatorCompleted(pOperator);
      break;
    }
3031

S
slzhou 已提交
3032 3033 3034 3035
    if (pInfo->pRes->info.rows > 0) {
      break;
    }
  }
3036
  size_t rows = blockDataGetNumOfRows(pInfo->pRes);
3037 3038
  pOperator->resultInfo.totalRows += rows;

3039
  return (rows == 0) ? NULL : pInfo->pRes;
3040 3041
}

wmmhello's avatar
wmmhello 已提交
3042
int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length) {
3043
  if (result == NULL || length == NULL) {
wmmhello's avatar
wmmhello 已提交
3044 3045 3046
    return TSDB_CODE_TSC_INVALID_INPUT;
  }
  SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
3047 3048 3049 3050 3051
  SAggSupporter*  pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
  int32_t         size = taosHashGetSize(pSup->pResultRowHashTable);
  size_t          keyLen = sizeof(uint64_t) * 2;  // estimate the key length
  int32_t         totalSize =
      sizeof(int32_t) + sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize);
wmmhello's avatar
wmmhello 已提交
3052

C
Cary Xu 已提交
3053 3054 3055 3056 3057 3058
  // no result
  if (getTotalBufSize(pSup->pResultBuf) == 0) {
    *result = NULL;
    *length = 0;
    return TSDB_CODE_SUCCESS;
  }
3059

wmmhello's avatar
wmmhello 已提交
3060
  *result = (char*)taosMemoryCalloc(1, totalSize);
L
Liu Jicong 已提交
3061
  if (*result == NULL) {
wmmhello's avatar
wmmhello 已提交
3062
    return TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
3063
  }
wmmhello's avatar
wmmhello 已提交
3064

wmmhello's avatar
wmmhello 已提交
3065
  int32_t offset = sizeof(int32_t);
wmmhello's avatar
wmmhello 已提交
3066 3067
  *(int32_t*)(*result + offset) = size;
  offset += sizeof(int32_t);
3068 3069

  // prepare memory
3070
  SResultRowPosition* pos = &pInfo->resultRowInfo.cur;
dengyihao's avatar
dengyihao 已提交
3071 3072
  void*               pPage = getBufPage(pSup->pResultBuf, pos->pageId);
  SResultRow*         pRow = (SResultRow*)((char*)pPage + pos->offset);
3073 3074 3075
  setBufPageDirty(pPage, true);
  releaseBufPage(pSup->pResultBuf, pPage);

dengyihao's avatar
dengyihao 已提交
3076
  void* pIter = taosHashIterate(pSup->pResultRowHashTable, NULL);
wmmhello's avatar
wmmhello 已提交
3077
  while (pIter) {
dengyihao's avatar
dengyihao 已提交
3078
    void*               key = taosHashGetKey(pIter, &keyLen);
3079
    SResultRowPosition* p1 = (SResultRowPosition*)pIter;
3080

dengyihao's avatar
dengyihao 已提交
3081
    pPage = (SFilePage*)getBufPage(pSup->pResultBuf, p1->pageId);
3082
    pRow = (SResultRow*)((char*)pPage + p1->offset);
3083 3084
    setBufPageDirty(pPage, true);
    releaseBufPage(pSup->pResultBuf, pPage);
wmmhello's avatar
wmmhello 已提交
3085 3086 3087

    // recalculate the result size
    int32_t realTotalSize = offset + sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize;
L
Liu Jicong 已提交
3088
    if (realTotalSize > totalSize) {
wmmhello's avatar
wmmhello 已提交
3089
      char* tmp = (char*)taosMemoryRealloc(*result, realTotalSize);
L
Liu Jicong 已提交
3090
      if (tmp == NULL) {
wafwerar's avatar
wafwerar 已提交
3091
        taosMemoryFree(*result);
wmmhello's avatar
wmmhello 已提交
3092
        *result = NULL;
wmmhello's avatar
wmmhello 已提交
3093
        return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
3094
      } else {
wmmhello's avatar
wmmhello 已提交
3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105 3106
        *result = tmp;
      }
    }
    // save key
    *(int32_t*)(*result + offset) = keyLen;
    offset += sizeof(int32_t);
    memcpy(*result + offset, key, keyLen);
    offset += keyLen;

    // save value
    *(int32_t*)(*result + offset) = pSup->resultRowSize;
    offset += sizeof(int32_t);
3107
    memcpy(*result + offset, pRow, pSup->resultRowSize);
wmmhello's avatar
wmmhello 已提交
3108 3109 3110 3111 3112
    offset += pSup->resultRowSize;

    pIter = taosHashIterate(pSup->pResultRowHashTable, pIter);
  }

wmmhello's avatar
wmmhello 已提交
3113 3114 3115 3116
  *(int32_t*)(*result) = offset;
  *length = offset;

  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
3117 3118
}

3119
int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) {
3120
  if (result == NULL) {
wmmhello's avatar
wmmhello 已提交
3121
    return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
3122
  }
wmmhello's avatar
wmmhello 已提交
3123
  SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
3124
  SAggSupporter*  pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
wmmhello's avatar
wmmhello 已提交
3125 3126

  //  int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
3127
  int32_t length = *(int32_t*)(result);
wmmhello's avatar
wmmhello 已提交
3128
  int32_t offset = sizeof(int32_t);
3129 3130 3131 3132

  int32_t count = *(int32_t*)(result + offset);
  offset += sizeof(int32_t);

L
Liu Jicong 已提交
3133
  while (count-- > 0 && length > offset) {
wmmhello's avatar
wmmhello 已提交
3134 3135 3136
    int32_t keyLen = *(int32_t*)(result + offset);
    offset += sizeof(int32_t);

L
Liu Jicong 已提交
3137
    uint64_t    tableGroupId = *(uint64_t*)(result + offset);
3138
    SResultRow* resultRow = getNewResultRow(pSup->pResultBuf, tableGroupId, pSup->resultRowSize);
L
Liu Jicong 已提交
3139
    if (!resultRow) {
wmmhello's avatar
wmmhello 已提交
3140
      return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
3141
    }
3142

wmmhello's avatar
wmmhello 已提交
3143
    // add a new result set for a new group
3144 3145
    SResultRowPosition pos = {.pageId = resultRow->pageId, .offset = resultRow->offset};
    taosHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &pos, sizeof(SResultRowPosition));
wmmhello's avatar
wmmhello 已提交
3146 3147 3148

    offset += keyLen;
    int32_t valueLen = *(int32_t*)(result + offset);
L
Liu Jicong 已提交
3149
    if (valueLen != pSup->resultRowSize) {
wmmhello's avatar
wmmhello 已提交
3150
      return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
3151 3152 3153 3154 3155 3156 3157 3158 3159 3160
    }
    offset += sizeof(int32_t);
    int32_t pageId = resultRow->pageId;
    int32_t pOffset = resultRow->offset;
    memcpy(resultRow, result + offset, valueLen);
    resultRow->pageId = pageId;
    resultRow->offset = pOffset;
    offset += valueLen;

    initResultRow(resultRow);
dengyihao's avatar
dengyihao 已提交
3161
    pInfo->resultRowInfo.cur = (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset};
wmmhello's avatar
wmmhello 已提交
3162 3163
  }

L
Liu Jicong 已提交
3164
  if (offset != length) {
wmmhello's avatar
wmmhello 已提交
3165
    return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
3166
  }
wmmhello's avatar
wmmhello 已提交
3167
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
3168 3169
}

3170 3171
enum {
  PROJECT_RETRIEVE_CONTINUE = 0x1,
L
Liu Jicong 已提交
3172
  PROJECT_RETRIEVE_DONE = 0x2,
3173 3174 3175 3176 3177
};

static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
  SProjectOperatorInfo* pProjectInfo = pOperator->info;
  SOptrBasicInfo*       pInfo = &pProjectInfo->binfo;
L
Liu Jicong 已提交
3178
  SSDataBlock*          pRes = pInfo->pRes;
3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197 3198 3199 3200 3201 3202 3203 3204 3205 3206 3207 3208 3209 3210 3211 3212 3213 3214 3215 3216 3217 3218 3219 3220 3221 3222 3223 3224 3225 3226

  if (pProjectInfo->curSOffset > 0) {
    if (pProjectInfo->groupId == 0) {  // it is the first group
      pProjectInfo->groupId = pBlock->info.groupId;
      blockDataCleanup(pInfo->pRes);
      return PROJECT_RETRIEVE_CONTINUE;
    } else if (pProjectInfo->groupId != pBlock->info.groupId) {
      pProjectInfo->curSOffset -= 1;

      // ignore data block in current group
      if (pProjectInfo->curSOffset > 0) {
        blockDataCleanup(pInfo->pRes);
        return PROJECT_RETRIEVE_CONTINUE;
      }
    }

    // set current group id of the project operator
    pProjectInfo->groupId = pBlock->info.groupId;
  }

  if (pProjectInfo->groupId != 0 && pProjectInfo->groupId != pBlock->info.groupId) {
    pProjectInfo->curGroupOutput += 1;
    if ((pProjectInfo->slimit.limit > 0) && (pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput)) {
      pOperator->status = OP_EXEC_DONE;
      blockDataCleanup(pRes);

      return PROJECT_RETRIEVE_DONE;
    }

    // reset the value for a new group data
    pProjectInfo->curOffset = 0;
    pProjectInfo->curOutput = 0;
  }

  // here we reach the start position, according to the limit/offset requirements.

  // set current group id
  pProjectInfo->groupId = pBlock->info.groupId;

  if (pProjectInfo->curOffset >= pRes->info.rows) {
    pProjectInfo->curOffset -= pRes->info.rows;
    blockDataCleanup(pRes);
    return PROJECT_RETRIEVE_CONTINUE;
  } else if (pProjectInfo->curOffset < pRes->info.rows && pProjectInfo->curOffset > 0) {
    blockDataTrimFirstNRows(pRes, pProjectInfo->curOffset);
    pProjectInfo->curOffset = 0;
  }

3227
  // check for the limitation in each group
wmmhello's avatar
wmmhello 已提交
3228 3229 3230
  if (pProjectInfo->limit.limit >= 0 && pProjectInfo->curOutput + pRes->info.rows >= pProjectInfo->limit.limit) {
    int32_t keepRows = (int32_t)(pProjectInfo->limit.limit - pProjectInfo->curOutput);
    blockDataKeepFirstNRows(pRes, keepRows);
3231
    if (pProjectInfo->slimit.limit > 0 && pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput) {
3232 3233 3234
      pOperator->status = OP_EXEC_DONE;
    }

3235
    return PROJECT_RETRIEVE_DONE;
3236
  }
3237

3238
  // todo optimize performance
3239 3240
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
  // they may not belong to the same group the limit/offset value is not valid in this case.
L
Liu Jicong 已提交
3241 3242
  if (pRes->info.rows >= pOperator->resultInfo.threshold || pProjectInfo->slimit.offset != -1 ||
      pProjectInfo->slimit.limit != -1) {
3243
    return PROJECT_RETRIEVE_DONE;
L
Liu Jicong 已提交
3244
  } else {  // not full enough, continue to accumulate the output data in the buffer.
3245 3246 3247 3248
    return PROJECT_RETRIEVE_CONTINUE;
  }
}

3249
static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
3250
  SProjectOperatorInfo* pProjectInfo = pOperator->info;
L
Liu Jicong 已提交
3251
  SOptrBasicInfo*       pInfo = &pProjectInfo->binfo;
3252

L
Liu Jicong 已提交
3253
  SExprSupp*   pSup = &pOperator->exprSupp;
3254
  SSDataBlock* pRes = pInfo->pRes;
3255
  blockDataCleanup(pRes);
3256

3257
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3258 3259 3260
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
3261

H
Haojun Liao 已提交
3262
#if 0
3263 3264 3265 3266 3267
  if (pProjectInfo->existDataBlock) {  // TODO refactor
    SSDataBlock* pBlock = pProjectInfo->existDataBlock;
    pProjectInfo->existDataBlock = NULL;

    // the pDataBlock are always the same one, no need to call this again
H
Haojun Liao 已提交
3268
    setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC);
3269

H
Haojun Liao 已提交
3270
    blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
3271
    projectApplyFunctions(pOperator->exprSupp.pExprInfo, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->exprSupp.numOfExprs);
L
Liu Jicong 已提交
3272
    if (pRes->info.rows >= pProjectInfo->binfo.capacity * 0.8) {
3273 3274
      copyTsColoum(pRes, pInfo->pCtx, pOperator->exprSupp.numOfExprs);
      resetResultRowEntryResult(pInfo->pCtx, pOperator->exprSupp.numOfExprs);
3275 3276 3277
      return pRes;
    }
  }
H
Haojun Liao 已提交
3278
#endif
3279

3280
  int64_t st = 0;
3281 3282 3283
  int32_t order = 0;
  int32_t scanFlag = 0;

3284 3285 3286 3287
  if (pOperator->cost.openCost == 0) {
    st = taosGetTimestampUs();
  }

H
Haojun Liao 已提交
3288 3289
  SOperatorInfo* downstream = pOperator->pDownstream[0];

L
Liu Jicong 已提交
3290
  while (1) {
H
Haojun Liao 已提交
3291
    // The downstream exec may change the value of the newgroup, so use a local variable instead.
3292
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
3293
    if (pBlock == NULL) {
L
Liu Jicong 已提交
3294 3295
      // TODO optimize
      /*if (pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM) {*/
3296
      doSetOperatorCompleted(pOperator);
L
Liu Jicong 已提交
3297
      /*}*/
3298 3299
      break;
    }
3300 3301 3302 3303
    if (pBlock->info.type == STREAM_RETRIEVE) {
      // for stream interval
      return pBlock;
    }
3304 3305

    // the pDataBlock are always the same one, no need to call this again
3306
    int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag);
3307 3308 3309
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, code);
    }
3310

3311
    setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
3312 3313
    blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);

3314
    code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
X
Xiaoyu Wang 已提交
3315
                                 pProjectInfo->pPseudoColInfo);
3316 3317
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, code);
3318 3319
    }

3320
    int32_t status = handleLimitOffset(pOperator, pBlock);
3321 3322 3323 3324

    // filter shall be applied after apply functions and limit/offset on the result
    doFilter(pProjectInfo->pFilterNode, pInfo->pRes);

3325
    if (status == PROJECT_RETRIEVE_CONTINUE) {
H
Haojun Liao 已提交
3326
      continue;
L
Liu Jicong 已提交
3327
    } else if (status == PROJECT_RETRIEVE_DONE) {
3328 3329 3330
      break;
    }
  }
dengyihao's avatar
dengyihao 已提交
3331

H
Haojun Liao 已提交
3332
  pProjectInfo->curOutput += pInfo->pRes->info.rows;
H
Haojun Liao 已提交
3333

3334 3335 3336 3337
  size_t rows = pInfo->pRes->info.rows;
  pOperator->resultInfo.totalRows += rows;

  if (pOperator->cost.openCost == 0) {
3338
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
3339 3340
  }

3341
  return (rows > 0) ? pInfo->pRes : NULL;
3342 3343
}

L
Liu Jicong 已提交
3344 3345
static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, bool* newgroup,
                                               SExecTaskInfo* pTaskInfo) {
3346
  pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
H
Haojun Liao 已提交
3347

L
Liu Jicong 已提交
3348 3349
  int64_t ekey =
      Q_STATUS_EQUAL(pTaskInfo->status, TASK_COMPLETED) ? pInfo->win.ekey : pInfo->existNewGroupBlock->info.window.ekey;
3350 3351
  taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));

3352
  taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
3353 3354
  taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);

3355
  doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pResultInfo->capacity);
3356 3357 3358 3359
  pInfo->existNewGroupBlock = NULL;
  *newgroup = true;
}

L
Liu Jicong 已提交
3360 3361
static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, bool* newgroup,
                                            SExecTaskInfo* pTaskInfo) {
3362 3363
  if (taosFillHasMoreResults(pInfo->pFillInfo)) {
    *newgroup = false;
3364
    doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pResultInfo->capacity);
H
Haojun Liao 已提交
3365
    if (pInfo->pRes->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult)) {
3366 3367 3368 3369 3370 3371
      return;
    }
  }

  // handle the cached new group data block
  if (pInfo->existNewGroupBlock) {
3372
    doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, newgroup, pTaskInfo);
3373 3374 3375
  }
}

S
slzhou 已提交
3376
static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
3377 3378
  SFillOperatorInfo* pInfo = pOperator->info;
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
3379

H
Haojun Liao 已提交
3380
  SResultInfo* pResultInfo = &pOperator->resultInfo;
3381 3382
  SSDataBlock* pResBlock = pInfo->pRes;

S
slzhou 已提交
3383 3384
  blockDataCleanup(pResBlock);

3385
  // todo handle different group data interpolation
X
Xiaoyu Wang 已提交
3386 3387
  bool  n = false;
  bool* newgroup = &n;
3388
  doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo);
3389 3390
  if (pResBlock->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult && pResBlock->info.rows > 0)) {
    return pResBlock;
H
Haojun Liao 已提交
3391
  }
3392

H
Haojun Liao 已提交
3393
  SOperatorInfo* pDownstream = pOperator->pDownstream[0];
L
Liu Jicong 已提交
3394
  while (1) {
3395
    SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream);
3396 3397 3398 3399 3400 3401 3402 3403 3404 3405
    if (*newgroup) {
      assert(pBlock != NULL);
    }

    if (*newgroup && pInfo->totalInputRows > 0) {  // there are already processed current group data block
      pInfo->existNewGroupBlock = pBlock;
      *newgroup = false;

      // Fill the previous group data block, before handle the data block of new group.
      // Close the fill operation for previous group data block
3406
      taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
3407 3408 3409 3410 3411 3412 3413
    } else {
      if (pBlock == NULL) {
        if (pInfo->totalInputRows == 0) {
          pOperator->status = OP_EXEC_DONE;
          return NULL;
        }

3414
        taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
3415 3416 3417 3418 3419 3420 3421
      } else {
        pInfo->totalInputRows += pBlock->info.rows;
        taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey);
        taosFillSetInputDataBlock(pInfo->pFillInfo, pBlock);
      }
    }

3422 3423
    blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
    doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pResBlock, pOperator->resultInfo.capacity);
3424 3425

    // current group has no more result to return
3426
    if (pResBlock->info.rows > 0) {
3427 3428
      // 1. The result in current group not reach the threshold of output result, continue
      // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
3429 3430
      if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || (!pInfo->multigroupResult)) {
        return pResBlock;
3431 3432
      }

3433
      doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo);
3434 3435
      if (pResBlock->info.rows > pOperator->resultInfo.threshold || pBlock == NULL) {
        return pResBlock;
3436 3437 3438
      }
    } else if (pInfo->existNewGroupBlock) {  // try next group
      assert(pBlock != NULL);
3439
      doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, newgroup, pTaskInfo);
3440 3441
      if (pResBlock->info.rows > pResultInfo->threshold) {
        return pResBlock;
3442 3443 3444 3445 3446 3447 3448
      }
    } else {
      return NULL;
    }
  }
}

S
slzhou 已提交
3449 3450 3451 3452 3453 3454 3455 3456
static SSDataBlock* doFill(SOperatorInfo* pOperator) {
  SFillOperatorInfo* pInfo = pOperator->info;
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

S
slzhou 已提交
3457
  SSDataBlock* fillResult = NULL;
S
slzhou 已提交
3458
  while (true) {
S
slzhou 已提交
3459
    fillResult = doFillImpl(pOperator);
S
slzhou 已提交
3460 3461 3462 3463 3464 3465 3466 3467 3468 3469 3470 3471 3472 3473
    if (fillResult != NULL) {
      doFilter(pInfo->pCondition, fillResult);
    }

    if (fillResult == NULL) {
      doSetOperatorCompleted(pOperator);
      break;
    }

    if (fillResult->info.rows > 0) {
      break;
    }
  }

S
slzhou 已提交
3474 3475 3476 3477
  if (fillResult != NULL) {
    size_t rows = fillResult->info.rows;
    pOperator->resultInfo.totalRows += rows;
  }
S
slzhou 已提交
3478

S
slzhou 已提交
3479
  return fillResult;
S
slzhou 已提交
3480 3481
}

H
Haojun Liao 已提交
3482 3483 3484 3485 3486 3487 3488 3489 3490 3491 3492
static void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
  for (int32_t i = 0; i < numOfExprs; ++i) {
    SExprInfo* pExprInfo = &pExpr[i];
    if (pExprInfo->pExpr->nodeType == QUERY_NODE_COLUMN) {
      taosMemoryFree(pExprInfo->base.pParam[0].pCol);
    }
    taosMemoryFree(pExprInfo->base.pParam);
    taosMemoryFree(pExprInfo->pExpr);
  }
}

3493 3494 3495 3496 3497
static void destroyOperatorInfo(SOperatorInfo* pOperator) {
  if (pOperator == NULL) {
    return;
  }

3498
  if (pOperator->fpSet.closeFn != NULL) {
3499
    pOperator->fpSet.closeFn(pOperator->info, pOperator->exprSupp.numOfExprs);
3500 3501
  }

H
Haojun Liao 已提交
3502
  if (pOperator->pDownstream != NULL) {
L
Liu Jicong 已提交
3503
    for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
H
Haojun Liao 已提交
3504
      destroyOperatorInfo(pOperator->pDownstream[i]);
3505 3506
    }

wafwerar's avatar
wafwerar 已提交
3507
    taosMemoryFreeClear(pOperator->pDownstream);
H
Haojun Liao 已提交
3508
    pOperator->numOfDownstream = 0;
3509 3510
  }

3511 3512
  if (pOperator->exprSupp.pExprInfo != NULL) {
    destroyExprInfo(pOperator->exprSupp.pExprInfo, pOperator->exprSupp.numOfExprs);
H
Haojun Liao 已提交
3513 3514
  }

3515
  taosMemoryFreeClear(pOperator->exprSupp.pExprInfo);
wafwerar's avatar
wafwerar 已提交
3516
  taosMemoryFreeClear(pOperator);
3517 3518
}

3519 3520 3521 3522 3523 3524 3525 3526 3527 3528 3529 3530 3531 3532 3533
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) {
  *defaultPgsz = 4096;
  while (*defaultPgsz < rowSize * 4) {
    *defaultPgsz <<= 1u;
  }

  // at least four pages need to be in buffer
  *defaultBufsz = 4096 * 256;
  if ((*defaultBufsz) <= (*defaultPgsz)) {
    (*defaultBufsz) = (*defaultPgsz) * 4;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
3534 3535
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                         const char* pKey) {
3536 3537
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);

dengyihao's avatar
dengyihao 已提交
3538 3539
  pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
  pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
3540 3541
  pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);

H
Haojun Liao 已提交
3542
  if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
3543 3544 3545
    return TSDB_CODE_OUT_OF_MEMORY;
  }

dengyihao's avatar
dengyihao 已提交
3546
  uint32_t defaultPgsz = 0;
3547 3548
  uint32_t defaultBufsz = 0;
  getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
H
Haojun Liao 已提交
3549

3550
  int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, TD_TMP_DIR_PATH);
H
Haojun Liao 已提交
3551 3552 3553 3554
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

3555 3556 3557
  return TSDB_CODE_SUCCESS;
}

3558
void cleanupAggSup(SAggSupporter* pAggSup) {
wafwerar's avatar
wafwerar 已提交
3559
  taosMemoryFreeClear(pAggSup->keyBuf);
3560
  taosHashCleanup(pAggSup->pResultRowHashTable);
H
Haojun Liao 已提交
3561
  destroyDiskbasedBuf(pAggSup->pResultBuf);
3562 3563
}

L
Liu Jicong 已提交
3564 3565
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
                    const char* pkey) {
3566 3567 3568 3569 3570
  int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

3571
  doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
L
Liu Jicong 已提交
3572
  for (int32_t i = 0; i < numOfCols; ++i) {
3573
    pSup->pCtx[i].pBuf = pAggSup->pResultBuf;
3574 3575
  }

3576
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
3577 3578
}

3579
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) {
wmmhello's avatar
wmmhello 已提交
3580
  ASSERT(numOfRows != 0);
3581 3582 3583 3584
  pOperator->resultInfo.capacity = numOfRows;
  pOperator->resultInfo.threshold = numOfRows * 0.75;

  if (pOperator->resultInfo.threshold == 0) {
wmmhello's avatar
wmmhello 已提交
3585
    pOperator->resultInfo.threshold = numOfRows;
3586 3587 3588
  }
}

3589 3590 3591 3592 3593
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
  pInfo->pRes = pBlock;
  initResultRowInfo(&pInfo->resultRowInfo);
}

3594
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) {
3595 3596 3597 3598
  pSup->pExprInfo = pExprInfo;
  pSup->numOfExprs = numOfExpr;
  if (pSup->pExprInfo != NULL) {
    pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset);
3599 3600 3601
    if (pSup->pCtx == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
3602
  }
3603 3604

  return TSDB_CODE_SUCCESS;
3605 3606
}

L
Liu Jicong 已提交
3607
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
S
slzhou 已提交
3608
                                           SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo,
wmmhello's avatar
wmmhello 已提交
3609
                                           int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) {
wafwerar's avatar
wafwerar 已提交
3610
  SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
L
Liu Jicong 已提交
3611
  SOperatorInfo*    pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
3612 3613 3614
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
H
Haojun Liao 已提交
3615

3616
  int32_t numOfRows = 1024;
dengyihao's avatar
dengyihao 已提交
3617
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
3618 3619

  initResultSizeInfo(pOperator, numOfRows);
3620
  int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
L
Liu Jicong 已提交
3621
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3622 3623
    goto _error;
  }
H
Haojun Liao 已提交
3624

3625
  initBasicInfo(&pInfo->binfo, pResultBlock);
3626 3627 3628 3629
  code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3630

L
Liu Jicong 已提交
3631
  pInfo->groupId = INT32_MIN;
S
slzhou 已提交
3632
  pInfo->pCondition = pCondition;
dengyihao's avatar
dengyihao 已提交
3633
  pOperator->name = "TableAggregate";
3634
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
3635
  pOperator->blocking = true;
dengyihao's avatar
dengyihao 已提交
3636 3637 3638
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
3639

3640 3641
  pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo,
                                         aggEncodeResultRow, aggDecodeResultRow, NULL);
H
Haojun Liao 已提交
3642 3643 3644 3645 3646

  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3647 3648

  return pOperator;
L
Liu Jicong 已提交
3649
_error:
H
Haojun Liao 已提交
3650
  destroyAggOperatorInfo(pInfo, numOfCols);
wafwerar's avatar
wafwerar 已提交
3651 3652
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
H
Haojun Liao 已提交
3653 3654
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
3655 3656
}

3657 3658 3659 3660 3661 3662 3663 3664 3665 3666 3667 3668 3669 3670 3671 3672 3673 3674 3675
static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
  if (pCtx == NULL) {
    return NULL;
  }

  for (int32_t i = 0; i < numOfOutput; ++i) {
    for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) {
      taosVariantDestroy(&pCtx[i].param[j].param);
    }

    taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
    taosMemoryFree(pCtx[i].input.pData);
    taosMemoryFree(pCtx[i].input.pColumnDataAgg);
  }

  taosMemoryFreeClear(pCtx);
  return NULL;
}

3676
void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
3677 3678
  assert(pInfo != NULL);
  cleanupResultRowInfo(&pInfo->resultRowInfo);
H
Haojun Liao 已提交
3679
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
3680 3681
}

H
Haojun Liao 已提交
3682
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
3683
  SOptrBasicInfo* pInfo = (SOptrBasicInfo*)param;
3684
  cleanupBasicInfo(pInfo);
L
Liu Jicong 已提交
3685

D
dapan1121 已提交
3686
  taosMemoryFreeClear(param);
3687
}
H
Haojun Liao 已提交
3688 3689

void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
3690
  SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
L
Liu Jicong 已提交
3691 3692
  cleanupBasicInfo(&pInfo->binfo);

D
dapan1121 已提交
3693
  taosMemoryFreeClear(param);
3694
}
3695

H
Haojun Liao 已提交
3696
void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
3697
  SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param;
3698
  pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
H
Haojun Liao 已提交
3699
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
wafwerar's avatar
wafwerar 已提交
3700
  taosMemoryFreeClear(pInfo->p);
L
Liu Jicong 已提交
3701

D
dapan1121 已提交
3702
  taosMemoryFreeClear(param);
3703 3704
}

H
Haojun Liao 已提交
3705
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
D
fix bug  
dapan 已提交
3706 3707 3708
  if (NULL == param) {
    return;
  }
L
Liu Jicong 已提交
3709
  SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*)param;
3710
  cleanupBasicInfo(&pInfo->binfo);
H
Haojun Liao 已提交
3711
  cleanupAggSup(&pInfo->aggSup);
H
Haojun Liao 已提交
3712
  taosArrayDestroy(pInfo->pPseudoColInfo);
L
Liu Jicong 已提交
3713

D
dapan1121 已提交
3714
  taosMemoryFreeClear(param);
3715 3716
}

3717
void cleanupExprSupp(SExprSupp* pSupp) {
3718 3719 3720 3721 3722 3723
  destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs);
  destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);

  taosMemoryFree(pSupp->rowEntryInfoOffset);
}

H
Haojun Liao 已提交
3724
static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) {
3725
  SIndefOperatorInfo* pInfo = (SIndefOperatorInfo*)param;
3726
  cleanupBasicInfo(&pInfo->binfo);
H
Haojun Liao 已提交
3727 3728 3729

  taosArrayDestroy(pInfo->pPseudoColInfo);
  cleanupAggSup(&pInfo->aggSup);
3730
  cleanupExprSupp(&pInfo->scalarSup);
L
Liu Jicong 已提交
3731

D
dapan1121 已提交
3732
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
3733 3734
}

H
Haojun Liao 已提交
3735
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
3736
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
3737 3738 3739 3740
  taosRemoveRef(exchangeObjRefPool, pExInfo->self);
}

void doDestroyExchangeOperatorInfo(void* param) {
X
Xiaoyu Wang 已提交
3741
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
3742

H
Haojun Liao 已提交
3743 3744 3745 3746 3747 3748 3749
  taosArrayDestroy(pExInfo->pSources);
  taosArrayDestroy(pExInfo->pSourceDataInfo);
  if (pExInfo->pResult != NULL) {
    blockDataDestroy(pExInfo->pResult);
  }

  tsem_destroy(&pExInfo->ready);
L
Liu Jicong 已提交
3750

D
dapan1121 已提交
3751
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
3752 3753
}

H
Haojun Liao 已提交
3754 3755
static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols) {
  SArray* pList = taosArrayInit(4, sizeof(int32_t));
dengyihao's avatar
dengyihao 已提交
3756
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
3757 3758 3759 3760 3761 3762 3763 3764
    if (fmIsPseudoColumnFunc(pCtx[i].functionId)) {
      taosArrayPush(pList, &i);
    }
  }

  return pList;
}

3765 3766 3767 3768
static int64_t getLimit(SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; }

static int64_t getOffset(SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; }

L
Liu Jicong 已提交
3769
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode,
dengyihao's avatar
dengyihao 已提交
3770
                                         SExecTaskInfo* pTaskInfo) {
wafwerar's avatar
wafwerar 已提交
3771
  SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
L
Liu Jicong 已提交
3772
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
3773 3774 3775
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
3776

L
Liu Jicong 已提交
3777
  int32_t    numOfCols = 0;
3778 3779 3780
  SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &numOfCols);

  SSDataBlock* pResBlock = createResDataBlock(pProjPhyNode->node.pOutputDataBlockDesc);
3781 3782
  SLimit       limit = {.limit = getLimit(pProjPhyNode->node.pLimit), .offset = getOffset(pProjPhyNode->node.pLimit)};
  SLimit slimit = {.limit = getLimit(pProjPhyNode->node.pSlimit), .offset = getOffset(pProjPhyNode->node.pSlimit)};
3783

L
Liu Jicong 已提交
3784 3785 3786 3787
  pInfo->limit = limit;
  pInfo->slimit = slimit;
  pInfo->curOffset = limit.offset;
  pInfo->curSOffset = slimit.offset;
H
Haojun Liao 已提交
3788
  pInfo->binfo.pRes = pResBlock;
3789
  pInfo->pFilterNode = pProjPhyNode->node.pConditions;
H
Haojun Liao 已提交
3790 3791

  int32_t numOfRows = 4096;
dengyihao's avatar
dengyihao 已提交
3792
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
3793

3794 3795 3796 3797 3798
  // Make sure the size of SSDataBlock will never exceed the size of 2MB.
  int32_t TWOMB = 2 * 1024 * 1024;
  if (numOfRows * pResBlock->info.rowSize > TWOMB) {
    numOfRows = TWOMB / pResBlock->info.rowSize;
  }
3799
  initResultSizeInfo(pOperator, numOfRows);
3800

3801 3802
  initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
  initBasicInfo(&pInfo->binfo, pResBlock);
3803
  setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols);
3804

3805
  pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols);
X
Xiaoyu Wang 已提交
3806
  pOperator->name = "ProjectOperator";
H
Haojun Liao 已提交
3807
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
X
Xiaoyu Wang 已提交
3808 3809 3810 3811
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
3812

L
Liu Jicong 已提交
3813 3814
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL,
                                         destroyProjectOperatorInfo, NULL, NULL, NULL);
L
Liu Jicong 已提交
3815

3816
  int32_t code = appendDownstream(pOperator, &downstream, 1);
H
Haojun Liao 已提交
3817
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3818 3819
    goto _error;
  }
3820 3821

  return pOperator;
H
Haojun Liao 已提交
3822

L
Liu Jicong 已提交
3823
_error:
H
Haojun Liao 已提交
3824 3825
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
3826 3827
}

H
Haojun Liao 已提交
3828 3829
static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
  SIndefOperatorInfo* pIndefInfo = pOperator->info;
3830
  SOptrBasicInfo*     pInfo = &pIndefInfo->binfo;
L
Liu Jicong 已提交
3831
  SExprSupp*          pSup = &pOperator->exprSupp;
H
Haojun Liao 已提交
3832 3833 3834 3835 3836 3837 3838 3839 3840 3841 3842 3843 3844 3845 3846 3847 3848 3849 3850 3851 3852 3853 3854 3855 3856 3857 3858 3859 3860 3861 3862 3863 3864 3865

  SSDataBlock* pRes = pInfo->pRes;
  blockDataCleanup(pRes);

  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  int64_t st = 0;
  int32_t order = 0;
  int32_t scanFlag = 0;

  if (pOperator->cost.openCost == 0) {
    st = taosGetTimestampUs();
  }

  SOperatorInfo* downstream = pOperator->pDownstream[0];

  while (1) {
    // The downstream exec may change the value of the newgroup, so use a local variable instead.
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
    if (pBlock == NULL) {
      doSetOperatorCompleted(pOperator);
      break;
    }

    // the pDataBlock are always the same one, no need to call this again
    int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag);
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, code);
    }

    // there is an scalar expression that needs to be calculated before apply the group aggregation.
3866
    SExprSupp* pScalarSup = &pIndefInfo->scalarSup;
3867 3868
    if (pScalarSup->pExprInfo != NULL) {
      code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
L
Liu Jicong 已提交
3869
                                   pIndefInfo->pPseudoColInfo);
H
Haojun Liao 已提交
3870 3871 3872 3873 3874
      if (code != TSDB_CODE_SUCCESS) {
        longjmp(pTaskInfo->env, code);
      }
    }

3875
    setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
H
Haojun Liao 已提交
3876 3877
    blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);

L
Liu Jicong 已提交
3878 3879
    code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pInfo->pRes, pBlock, pSup->pCtx,
                                 pOperator->exprSupp.numOfExprs, pIndefInfo->pPseudoColInfo);
H
Haojun Liao 已提交
3880 3881 3882 3883 3884
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, code);
    }
  }

S
slzhou 已提交
3885 3886
  doFilter(pIndefInfo->pCondition, pInfo->pRes);

H
Haojun Liao 已提交
3887 3888 3889 3890 3891 3892 3893 3894 3895 3896
  size_t rows = pInfo->pRes->info.rows;
  pOperator->resultInfo.totalRows += rows;

  if (pOperator->cost.openCost == 0) {
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
  }

  return (rows > 0) ? pInfo->pRes : NULL;
}

3897 3898
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode,
                                                 SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
3899
  SIndefOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIndefOperatorInfo));
3900
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
3901 3902 3903 3904
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

3905 3906
  SExprSupp* pSup = &pOperator->exprSupp;

H
Haojun Liao 已提交
3907 3908 3909
  SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode;

  int32_t    numOfExpr = 0;
X
Xiaoyu Wang 已提交
3910
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->pFuncs, NULL, &numOfExpr);
H
Haojun Liao 已提交
3911 3912

  if (pPhyNode->pExprs != NULL) {
3913
    int32_t    num = 0;
3914
    SExprInfo* pSExpr = createExprInfo(pPhyNode->pExprs, NULL, &num);
3915
    int32_t    code = initExprSupp(&pInfo->scalarSup, pSExpr, num);
3916 3917 3918
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
H
Haojun Liao 已提交
3919 3920
  }

3921
  SSDataBlock* pResBlock = createResDataBlock(pPhyNode->node.pOutputDataBlockDesc);
H
Haojun Liao 已提交
3922 3923 3924 3925 3926 3927 3928 3929 3930 3931 3932

  int32_t numOfRows = 4096;
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;

  // Make sure the size of SSDataBlock will never exceed the size of 2MB.
  int32_t TWOMB = 2 * 1024 * 1024;
  if (numOfRows * pResBlock->info.rowSize > TWOMB) {
    numOfRows = TWOMB / pResBlock->info.rowSize;
  }
  initResultSizeInfo(pOperator, numOfRows);

3933 3934 3935
  initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str);
  initBasicInfo(&pInfo->binfo, pResBlock);

3936
  setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
H
Haojun Liao 已提交
3937

3938 3939
  pInfo->binfo.pRes = pResBlock;
  pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
S
slzhou 已提交
3940
  pInfo->pCondition = pPhyNode->node.pConditions;
H
Haojun Liao 已提交
3941

3942
  pOperator->name = "IndefinitOperator";
H
Haojun Liao 已提交
3943
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
3944 3945 3946
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
3947 3948
  pOperator->exprSupp.pExprInfo = pExprInfo;
  pOperator->exprSupp.numOfExprs = numOfExpr;
3949
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
3950 3951 3952 3953 3954 3955 3956 3957 3958 3959 3960

  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL,
                                         destroyIndefinitOperatorInfo, NULL, NULL, NULL);

  int32_t code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

3961
_error:
H
Haojun Liao 已提交
3962 3963 3964 3965 3966 3967
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}

3968
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SNodeListNode* pValNode,
L
Liu Jicong 已提交
3969
                            STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) {
3970
  SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pValNode);
H
Haojun Liao 已提交
3971 3972

  STimeWindow w = TSWINDOW_INITIALIZER;
3973
  getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey, &w);
H
Haojun Liao 已提交
3974 3975

  int32_t order = TSDB_ORDER_ASC;
3976
  pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, id);
H
Haojun Liao 已提交
3977

3978
  pInfo->win = win;
L
Liu Jicong 已提交
3979
  pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
H
Haojun Liao 已提交
3980
  if (pInfo->pFillInfo == NULL || pInfo->p == NULL) {
H
Haojun Liao 已提交
3981 3982
    taosMemoryFree(pInfo->pFillInfo);
    taosMemoryFree(pInfo->p);
H
Haojun Liao 已提交
3983 3984 3985 3986 3987 3988
    return TSDB_CODE_OUT_OF_MEMORY;
  } else {
    return TSDB_CODE_SUCCESS;
  }
}

3989 3990 3991 3992 3993 3994 3995 3996
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, bool multigroupResult,
                                      SExecTaskInfo* pTaskInfo) {
  SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

L
Liu Jicong 已提交
3997 3998 3999
  int32_t      num = 0;
  SSDataBlock* pResBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc);
  SExprInfo*   pExprInfo = createExprInfo(pPhyFillNode->pTargets, NULL, &num);
4000 4001 4002 4003
  SInterval*   pInterval =
      QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
            ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
            : &((SIntervalAggOperatorInfo*)downstream->info)->interval;
4004

4005
  int32_t type = convertFillType(pPhyFillNode->mode);
4006

H
Haojun Liao 已提交
4007
  SResultInfo* pResultInfo = &pOperator->resultInfo;
4008
  initResultSizeInfo(pOperator, 4096);
H
Haojun Liao 已提交
4009
  pInfo->primaryTsCol = ((SColumnNode*)pPhyFillNode->pWStartTs)->slotId;
4010

4011 4012 4013 4014
  int32_t numOfOutputCols = 0;
  SArray* pColMatchColInfo =
      extractColMatchInfo(pPhyFillNode->pTargets, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);

4015 4016
  int32_t code = initFillInfo(pInfo, pExprInfo, num, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange,
                              pResultInfo->capacity, pTaskInfo->id.str, pInterval, type);
4017 4018 4019
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
4020

4021 4022 4023 4024 4025 4026 4027 4028 4029
  pInfo->pRes                    = pResBlock;
  pInfo->multigroupResult        = multigroupResult;
  pInfo->pCondition              = pPhyFillNode->node.pConditions;
  pInfo->pColMatchColInfo        = pColMatchColInfo;
  pOperator->name                = "FillOperator";
  pOperator->blocking            = false;
  pOperator->status              = OP_NOT_OPENED;
  pOperator->operatorType        = QUERY_NODE_PHYSICAL_PLAN_FILL;
  pOperator->exprSupp.pExprInfo  = pExprInfo;
4030
  pOperator->exprSupp.numOfExprs = num;
4031 4032
  pOperator->info                = pInfo;
  pOperator->pTaskInfo           = pTaskInfo;
H
Haojun Liao 已提交
4033

L
Liu Jicong 已提交
4034 4035
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL);
4036

4037
  code = appendDownstream(pOperator, &downstream, 1);
4038
  return pOperator;
H
Haojun Liao 已提交
4039

L
Liu Jicong 已提交
4040
_error:
wafwerar's avatar
wafwerar 已提交
4041 4042
  taosMemoryFreeClear(pOperator);
  taosMemoryFreeClear(pInfo);
H
Haojun Liao 已提交
4043
  return NULL;
4044 4045
}

D
dapan1121 已提交
4046
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model, char* dbFName) {
wafwerar's avatar
wafwerar 已提交
4047
  SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
4048
  setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
4049

D
dapan1121 已提交
4050
  pTaskInfo->schemaVer.dbname = strdup(dbFName);
4051
  pTaskInfo->cost.created = taosGetTimestampMs();
H
Haojun Liao 已提交
4052
  pTaskInfo->id.queryId = queryId;
dengyihao's avatar
dengyihao 已提交
4053
  pTaskInfo->execModel = model;
H
Haojun Liao 已提交
4054

wafwerar's avatar
wafwerar 已提交
4055
  char* p = taosMemoryCalloc(1, 128);
L
Liu Jicong 已提交
4056
  snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId);
H
Haojun Liao 已提交
4057
  pTaskInfo->id.str = p;
H
Haojun Liao 已提交
4058

4059 4060
  return pTaskInfo;
}
H
Haojun Liao 已提交
4061

H
Hongze Cheng 已提交
4062
static STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
H
Haojun Liao 已提交
4063
                                       STableListInfo* pTableListInfo, const char* idstr);
H
Haojun Liao 已提交
4064

H
Haojun Liao 已提交
4065
static SArray* extractColumnInfo(SNodeList* pNodeList);
4066

4067
int32_t extractTableSchemaInfo(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) {
4068 4069
  SMetaReader mr = {0};
  metaReaderInit(&mr, pHandle->meta, 0);
D
dapan1121 已提交
4070
  int32_t code = metaGetTableEntryByUid(&mr, uid);
4071
  if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
4072
    metaReaderClear(&mr);
4073
    return terrno;
D
dapan1121 已提交
4074
  }
4075 4076 4077 4078

  pTaskInfo->schemaVer.tablename = strdup(mr.me.name);

  if (mr.me.type == TSDB_SUPER_TABLE) {
4079
    pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
4080
    pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version;
4081 4082 4083
  } else if (mr.me.type == TSDB_CHILD_TABLE) {
    tb_uid_t suid = mr.me.ctbEntry.suid;
    metaGetTableEntryByUid(&mr, suid);
4084
    pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
4085
    pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version;
4086
  } else {
4087
    pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
4088
  }
4089 4090

  metaReaderClear(&mr);
D
dapan1121 已提交
4091
  return TSDB_CODE_SUCCESS;
4092 4093
}

4094 4095 4096 4097 4098 4099 4100 4101 4102 4103 4104
static void cleanupTableSchemaInfo(SExecTaskInfo* pTaskInfo) {
  taosMemoryFreeClear(pTaskInfo->schemaVer.dbname);
  if (pTaskInfo->schemaVer.sw == NULL) {
    return;
  }

  taosMemoryFree(pTaskInfo->schemaVer.sw->pSchema);
  taosMemoryFree(pTaskInfo->schemaVer.sw);
  taosMemoryFree(pTaskInfo->schemaVer.tablename);
}

4105
static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum) {
wmmhello's avatar
wmmhello 已提交
4106
  taosArrayClear(pTableListInfo->pGroupList);
4107 4108
  SArray* sortSupport = taosArrayInit(groupNum, sizeof(uint64_t));
  if (sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
4109 4110
  for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
4111
    uint64_t*      groupId = taosHashGet(pTableListInfo->map, &info->uid, sizeof(uint64_t));
wmmhello's avatar
wmmhello 已提交
4112 4113

    int32_t index = taosArraySearchIdx(sortSupport, groupId, compareUint64Val, TD_EQ);
4114 4115 4116 4117
    if (index == -1) {
      void*   p = taosArraySearch(sortSupport, groupId, compareUint64Val, TD_GT);
      SArray* tGroup = taosArrayInit(8, sizeof(STableKeyInfo));
      if (tGroup == NULL) {
wmmhello's avatar
wmmhello 已提交
4118 4119 4120
        taosArrayDestroy(sortSupport);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
4121
      if (taosArrayPush(tGroup, info) == NULL) {
wmmhello's avatar
wmmhello 已提交
4122 4123 4124 4125
        qError("taos push info array error");
        taosArrayDestroy(sortSupport);
        return TSDB_CODE_QRY_APP_ERROR;
      }
4126
      if (p == NULL) {
wmmhello's avatar
wmmhello 已提交
4127
        if (taosArrayPush(sortSupport, groupId) == NULL) {
wmmhello's avatar
wmmhello 已提交
4128 4129 4130 4131
          qError("taos push support array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
wmmhello's avatar
wmmhello 已提交
4132
        if (taosArrayPush(pTableListInfo->pGroupList, &tGroup) == NULL) {
wmmhello's avatar
wmmhello 已提交
4133 4134 4135 4136
          qError("taos push group array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
4137
      } else {
wmmhello's avatar
wmmhello 已提交
4138
        int32_t pos = TARRAY_ELEM_IDX(sortSupport, p);
4139
        if (taosArrayInsert(sortSupport, pos, groupId) == NULL) {
wmmhello's avatar
wmmhello 已提交
4140 4141 4142 4143
          qError("taos insert support array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
4144
        if (taosArrayInsert(pTableListInfo->pGroupList, pos, &tGroup) == NULL) {
wmmhello's avatar
wmmhello 已提交
4145 4146 4147 4148 4149
          qError("taos insert group array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
      }
4150
    } else {
wmmhello's avatar
wmmhello 已提交
4151
      SArray* tGroup = (SArray*)taosArrayGetP(pTableListInfo->pGroupList, index);
4152
      if (taosArrayPush(tGroup, info) == NULL) {
wmmhello's avatar
wmmhello 已提交
4153 4154 4155 4156 4157 4158 4159 4160 4161 4162
        qError("taos push uid array error");
        taosArrayDestroy(sortSupport);
        return TSDB_CODE_QRY_APP_ERROR;
      }
    }
  }
  taosArrayDestroy(sortSupport);
  return TDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
4163 4164
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group) {
  if (group == NULL) {
wmmhello's avatar
wmmhello 已提交
4165 4166 4167 4168 4169 4170 4171 4172
    return TDB_CODE_SUCCESS;
  }

  pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (pTableListInfo->map == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  int32_t keyLen = 0;
X
Xiaoyu Wang 已提交
4173
  void*   keyBuf = NULL;
wmmhello's avatar
wmmhello 已提交
4174

4175
  SNode* node;
wmmhello's avatar
wmmhello 已提交
4176
  FOREACH(node, group) {
4177
    SExprNode* pExpr = (SExprNode*)node;
wmmhello's avatar
wmmhello 已提交
4178
    keyLen += pExpr->resType.bytes;
wmmhello's avatar
wmmhello 已提交
4179 4180
  }

wmmhello's avatar
wmmhello 已提交
4181
  int32_t nullFlagSize = sizeof(int8_t) * LIST_LENGTH(group);
wmmhello's avatar
wmmhello 已提交
4182 4183 4184 4185 4186 4187 4188
  keyLen += nullFlagSize;

  keyBuf = taosMemoryCalloc(1, keyLen);
  if (keyBuf == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

4189
  int32_t groupNum = 0;
X
Xiaoyu Wang 已提交
4190 4191 4192
  for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
    SMetaReader    mr = {0};
wmmhello's avatar
wmmhello 已提交
4193 4194 4195
    metaReaderInit(&mr, pHandle->meta, 0);
    metaGetTableEntryByUid(&mr, info->uid);

4196
    SNodeList* groupNew = nodesCloneList(group);
wmmhello's avatar
wmmhello 已提交
4197

wmmhello's avatar
wmmhello 已提交
4198
    nodesRewriteExprsPostOrder(groupNew, doTranslateTagExpr, &mr);
wmmhello's avatar
wmmhello 已提交
4199
    char* isNull = (char*)keyBuf;
wmmhello's avatar
wmmhello 已提交
4200 4201
    char* pStart = (char*)keyBuf + nullFlagSize;

4202
    SNode*  pNode;
wmmhello's avatar
wmmhello 已提交
4203
    int32_t index = 0;
4204
    FOREACH(pNode, groupNew) {
wmmhello's avatar
wmmhello 已提交
4205 4206 4207 4208
      SNode*  pNew = NULL;
      int32_t code = scalarCalculateConstants(pNode, &pNew);
      if (TSDB_CODE_SUCCESS == code) {
        REPLACE_NODE(pNew);
X
Xiaoyu Wang 已提交
4209
      } else {
4210
        taosMemoryFree(keyBuf);
wmmhello's avatar
wmmhello 已提交
4211
        nodesClearList(groupNew);
4212
        metaReaderClear(&mr);
wmmhello's avatar
wmmhello 已提交
4213
        return code;
wmmhello's avatar
wmmhello 已提交
4214
      }
4215

wmmhello's avatar
wmmhello 已提交
4216
      ASSERT(nodeType(pNew) == QUERY_NODE_VALUE);
4217
      SValueNode* pValue = (SValueNode*)pNew;
4218

wmmhello's avatar
wmmhello 已提交
4219
      if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL || pValue->isNull) {
wmmhello's avatar
wmmhello 已提交
4220 4221 4222 4223
        isNull[index++] = 1;
        continue;
      } else {
        isNull[index++] = 0;
4224
        char* data = nodesGetValueFromNode(pValue);
L
Liu Jicong 已提交
4225 4226
        if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) {
          if (tTagIsJson(data)) {
wmmhello's avatar
wmmhello 已提交
4227 4228 4229
            terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
            taosMemoryFree(keyBuf);
            nodesClearList(groupNew);
4230
            metaReaderClear(&mr);
wmmhello's avatar
wmmhello 已提交
4231 4232
            return terrno;
          }
wmmhello's avatar
wmmhello 已提交
4233
          int32_t len = getJsonValueLen(data);
wmmhello's avatar
wmmhello 已提交
4234 4235 4236
          memcpy(pStart, data, len);
          pStart += len;
        } else if (IS_VAR_DATA_TYPE(pValue->node.resType.type)) {
wmmhello's avatar
wmmhello 已提交
4237 4238
          memcpy(pStart, data, varDataTLen(data));
          pStart += varDataTLen(data);
wmmhello's avatar
wmmhello 已提交
4239
        } else {
wmmhello's avatar
wmmhello 已提交
4240 4241
          memcpy(pStart, data, pValue->node.resType.bytes);
          pStart += pValue->node.resType.bytes;
wmmhello's avatar
wmmhello 已提交
4242 4243 4244
        }
      }
    }
4245
    int32_t  len = (int32_t)(pStart - (char*)keyBuf);
4246 4247
    uint64_t groupId = calcGroupId(keyBuf, len);
    taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &groupId, sizeof(uint64_t));
S
slzhou 已提交
4248
    info->groupId = groupId;
4249
    groupNum++;
wmmhello's avatar
wmmhello 已提交
4250

wmmhello's avatar
wmmhello 已提交
4251
    nodesClearList(groupNew);
wmmhello's avatar
wmmhello 已提交
4252 4253 4254
    metaReaderClear(&mr);
  }
  taosMemoryFree(keyBuf);
4255

4256
  if (pTableListInfo->needSortTableByGroupId) {
wmmhello's avatar
wmmhello 已提交
4257
    return sortTableGroup(pTableListInfo, groupNum);
4258 4259
  }

wmmhello's avatar
wmmhello 已提交
4260 4261 4262
  return TDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
4263
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
4264 4265
                                  uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo,
                                  const char* pUser) {
4266 4267
  int32_t type = nodeType(pPhyNode);

X
Xiaoyu Wang 已提交
4268
  if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
H
Haojun Liao 已提交
4269
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
dengyihao's avatar
dengyihao 已提交
4270
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
4271

wmmhello's avatar
wmmhello 已提交
4272
      int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
4273
      if (code) {
wmmhello's avatar
wmmhello 已提交
4274
        pTaskInfo->code = code;
D
dapan1121 已提交
4275 4276
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
4277

4278
      code = extractTableSchemaInfo(pHandle, pTableScanNode->scan.uid, pTaskInfo);
S
slzhou 已提交
4279
      if (code) {
4280
        pTaskInfo->code = terrno;
wmmhello's avatar
wmmhello 已提交
4281 4282 4283
        return NULL;
      }

H
Haojun Liao 已提交
4284
      SOperatorInfo*  pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
4285 4286
      STableScanInfo* pScanInfo = pOperator->info;
      pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
S
slzhou 已提交
4287
      return pOperator;
L
Liu Jicong 已提交
4288

S
slzhou 已提交
4289 4290
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
      STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
wmmhello's avatar
wmmhello 已提交
4291
      int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
L
Liu Jicong 已提交
4292
      if (code) {
wmmhello's avatar
wmmhello 已提交
4293
        pTaskInfo->code = code;
wmmhello's avatar
wmmhello 已提交
4294 4295
        return NULL;
      }
4296
      code = extractTableSchemaInfo(pHandle, pTableScanNode->scan.uid, pTaskInfo);
wmmhello's avatar
wmmhello 已提交
4297 4298 4299 4300
      if (code) {
        pTaskInfo->code = terrno;
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
4301

4302 4303
      SOperatorInfo* pOperator =
          createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo, queryId, taskId);
wmmhello's avatar
wmmhello 已提交
4304

4305 4306 4307
      STableScanInfo* pScanInfo = pOperator->info;
      pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
      return pOperator;
L
Liu Jicong 已提交
4308

H
Haojun Liao 已提交
4309
    } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
4310
      return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo);
L
Liu Jicong 已提交
4311

H
Haojun Liao 已提交
4312
    } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
5
54liuyao 已提交
4313
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
4314
      STimeWindowAggSupp   twSup = {
L
Liu Jicong 已提交
4315 4316 4317 4318
            .waterMark = pTableScanNode->watermark,
            .calTrigger = pTableScanNode->triggerType,
            .maxTs = INT64_MIN,
      };
L
Liu Jicong 已提交
4319
      if (pHandle) {
wmmhello's avatar
wmmhello 已提交
4320
        int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
L
Liu Jicong 已提交
4321
        if (code) {
wmmhello's avatar
wmmhello 已提交
4322 4323 4324
          pTaskInfo->code = code;
          return NULL;
        }
5
54liuyao 已提交
4325
      }
4326

4327 4328
      SOperatorInfo* pOperator =
          createStreamScanOperatorInfo(pHandle, pTableScanNode, pTaskInfo, &twSup, queryId, taskId);
H
Haojun Liao 已提交
4329
      return pOperator;
L
Liu Jicong 已提交
4330

H
Haojun Liao 已提交
4331
    } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
L
Liu Jicong 已提交
4332
      SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
4333
      return createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo);
4334
    } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
X
Xiaoyu Wang 已提交
4335
      STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
4336

4337
      int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanPhyNode, pTableListInfo);
4338
      if (code != TSDB_CODE_SUCCESS) {
4339
        pTaskInfo->code = terrno;
4340 4341 4342
        return NULL;
      }

4343
      return createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo);
4344
    } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
4345
      SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
4346 4347 4348
      pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));

      if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
4349
        int32_t code = vnodeGetAllTableList(pHandle->vnode, pBlockNode->uid, pTableListInfo->pTableList);
4350 4351 4352 4353 4354
        if (code != TSDB_CODE_SUCCESS) {
          pTaskInfo->code = terrno;
          return NULL;
        }
      } else {  // Create one table group.
S
slzhou 已提交
4355
        STableKeyInfo info = {.lastKey = 0, .uid = pBlockNode->uid, .groupId = 0};
4356 4357 4358 4359 4360 4361 4362 4363 4364 4365 4366 4367 4368 4369 4370 4371 4372 4373
        taosArrayPush(pTableListInfo->pTableList, &info);
      }

      SQueryTableDataCond cond = {0};

      {
        cond.order = TSDB_ORDER_ASC;
        cond.numOfCols = 1;
        cond.colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
        if (cond.colList == NULL) {
          terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
          return NULL;
        }

        cond.colList->colId = 1;
        cond.colList->type = TSDB_DATA_TYPE_TIMESTAMP;
        cond.colList->bytes = sizeof(TSKEY);

H
Haojun Liao 已提交
4374
        cond.twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
4375
        cond.suid = pBlockNode->suid;
H
Haojun Liao 已提交
4376
        cond.type = BLOCK_LOAD_OFFSET_ORDER;
4377
      }
H
Haojun Liao 已提交
4378 4379 4380

      STsdbReader* pReader = NULL;
      tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, "");
4381 4382
      cleanupQueryTableDataCond(&cond);

4383
      return createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo);
H
Haojun Liao 已提交
4384 4385 4386
    } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
      SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;

L
Liu Jicong 已提交
4387 4388 4389 4390 4391
      //      int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
      //      if (code) {
      //        pTaskInfo->code = code;
      //        return NULL;
      //      }
H
Haojun Liao 已提交
4392

4393
      int32_t code = extractTableSchemaInfo(pHandle, pScanNode->uid, pTaskInfo);
4394 4395 4396 4397
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
      }
4398

H
Haojun Liao 已提交
4399
      pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
H
Haojun Liao 已提交
4400
      if (pScanNode->tableType == TSDB_SUPER_TABLE) {
4401
        code = vnodeGetAllTableList(pHandle->vnode, pScanNode->uid, pTableListInfo->pTableList);
H
Haojun Liao 已提交
4402 4403 4404 4405 4406 4407 4408 4409 4410
        if (code != TSDB_CODE_SUCCESS) {
          pTaskInfo->code = terrno;
          return NULL;
        }
      } else {  // Create one table group.
        STableKeyInfo info = {.lastKey = 0, .uid = pScanNode->uid, .groupId = 0};
        taosArrayPush(pTableListInfo->pTableList, &info);
      }

H
Haojun Liao 已提交
4411
      return createLastrowScanOperator(pScanNode, pHandle, pTableListInfo->pTableList, pTaskInfo);
H
Haojun Liao 已提交
4412 4413
    } else {
      ASSERT(0);
H
Haojun Liao 已提交
4414 4415 4416
    }
  }

4417 4418
  int32_t num = 0;
  size_t  size = LIST_LENGTH(pPhyNode->pChildren);
H
Haojun Liao 已提交
4419

4420
  SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
dengyihao's avatar
dengyihao 已提交
4421
  for (int32_t i = 0; i < size; ++i) {
4422
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
4423
    ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableListInfo, pUser);
4424 4425 4426
    if (ops[i] == NULL) {
      return NULL;
    }
4427
  }
H
Haojun Liao 已提交
4428

4429
  SOperatorInfo* pOptr = NULL;
H
Haojun Liao 已提交
4430
  if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
4431
    pOptr = createProjectOperatorInfo(ops[0], (SProjectPhysiNode*)pPhyNode, pTaskInfo);
4432
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) {
H
Haojun Liao 已提交
4433 4434
    SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
    SExprInfo*     pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
4435
    SSDataBlock*   pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
H
Haojun Liao 已提交
4436

dengyihao's avatar
dengyihao 已提交
4437
    int32_t    numOfScalarExpr = 0;
4438 4439 4440 4441 4442
    SExprInfo* pScalarExprInfo = NULL;
    if (pAggNode->pExprs != NULL) {
      pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
    }

H
Haojun Liao 已提交
4443 4444
    if (pAggNode->pGroupKeys != NULL) {
      SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys);
dengyihao's avatar
dengyihao 已提交
4445
      pOptr = createGroupOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions,
wmmhello's avatar
wmmhello 已提交
4446
                                      pScalarExprInfo, numOfScalarExpr, pTaskInfo);
H
Haojun Liao 已提交
4447
    } else {
L
Liu Jicong 已提交
4448 4449
      pOptr = createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pAggNode->node.pConditions,
                                          pScalarExprInfo, numOfScalarExpr, pTaskInfo);
H
Haojun Liao 已提交
4450
    }
X
Xiaoyu Wang 已提交
4451
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type || QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
H
Haojun Liao 已提交
4452
    SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
4453

H
Haojun Liao 已提交
4454
    SExprInfo*   pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
4455
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
H
Haojun Liao 已提交
4456

dengyihao's avatar
dengyihao 已提交
4457 4458 4459 4460 4461 4462
    SInterval interval = {.interval = pIntervalPhyNode->interval,
                          .sliding = pIntervalPhyNode->sliding,
                          .intervalUnit = pIntervalPhyNode->intervalUnit,
                          .slidingUnit = pIntervalPhyNode->slidingUnit,
                          .offset = pIntervalPhyNode->offset,
                          .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
H
Haojun Liao 已提交
4463

X
Xiaoyu Wang 已提交
4464 4465 4466 4467 4468
    STimeWindowAggSupp as = {
        .waterMark = pIntervalPhyNode->window.watermark,
        .calTrigger = pIntervalPhyNode->window.triggerType,
        .maxTs = INT64_MIN,
    };
4469
    ASSERT(as.calTrigger != STREAM_TRIGGER_MAX_DELAY);
4470

4471
    int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
L
Liu Jicong 已提交
4472
    bool    isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
4473 4474
    pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pIntervalPhyNode,
                                       pTaskInfo, isStream);
4475

4476 4477
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
    SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode;
S
shenglian zhou 已提交
4478 4479 4480 4481 4482 4483 4484 4485 4486 4487

    SExprInfo*   pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);

    SInterval interval = {.interval = pIntervalPhyNode->interval,
                          .sliding = pIntervalPhyNode->sliding,
                          .intervalUnit = pIntervalPhyNode->intervalUnit,
                          .slidingUnit = pIntervalPhyNode->slidingUnit,
                          .offset = pIntervalPhyNode->offset,
                          .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
4488

S
shenglian zhou 已提交
4489
    int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
L
Liu Jicong 已提交
4490 4491
    pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId,
                                                   pPhyNode->pConditions, pTaskInfo);
S
shenglian zhou 已提交
4492
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
X
Xiaoyu Wang 已提交
4493
    SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
S
shenglian zhou 已提交
4494 4495 4496 4497 4498 4499 4500 4501 4502 4503

    SExprInfo*   pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);

    SInterval interval = {.interval = pIntervalPhyNode->interval,
                          .sliding = pIntervalPhyNode->sliding,
                          .intervalUnit = pIntervalPhyNode->intervalUnit,
                          .slidingUnit = pIntervalPhyNode->slidingUnit,
                          .offset = pIntervalPhyNode->offset,
                          .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
4504

S
shenglian zhou 已提交
4505 4506
    int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
    pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pTaskInfo);
5
54liuyao 已提交
4507
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
4508
    int32_t children = 0;
5
54liuyao 已提交
4509 4510
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) {
4511
    int32_t children = 1;
5
54liuyao 已提交
4512
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
4513
  } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
4514
    pOptr = createSortOperatorInfo(ops[0], (SSortPhysiNode*)pPhyNode, pTaskInfo);
S
shenglian zhou 已提交
4515 4516
  } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT == type) {
    pOptr = createGroupSortOperatorInfo(ops[0], (SGroupSortPhysiNode*)pPhyNode, pTaskInfo);
X
Xiaoyu Wang 已提交
4517
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) {
4518
    SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode;
4519
    pOptr = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo);
4520
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
H
Haojun Liao 已提交
4521 4522
    SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;

X
Xiaoyu Wang 已提交
4523 4524
    STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark,
                             .calTrigger = pSessionNode->window.triggerType};
4525

H
Haojun Liao 已提交
4526
    SExprInfo*   pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num);
4527
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
4528 4529
    int32_t      tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;

L
Liu Jicong 已提交
4530 4531
    pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as,
                                         pPhyNode->pConditions, pTaskInfo);
4532
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
4533 4534 4535 4536 4537 4538 4539
    pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
    int32_t children = 0;
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION == type) {
    int32_t children = 1;
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
4540
  } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
4541
    pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
4542
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
dengyihao's avatar
dengyihao 已提交
4543
    SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
4544

4545 4546
    STimeWindowAggSupp as = {.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};

dengyihao's avatar
dengyihao 已提交
4547
    SExprInfo*   pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
4548
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
4549 4550
    int32_t      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;

4551
    SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
X
Xiaoyu Wang 已提交
4552
    SColumn      col = extractColumnFromColumnNode(pColNode);
L
Liu Jicong 已提交
4553 4554
    pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pPhyNode->pConditions,
                                          pTaskInfo);
4555
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
5
54liuyao 已提交
4556
    pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
4557
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
4558
    pOptr = createMergeJoinOperatorInfo(ops, size, (SJoinPhysiNode*)pPhyNode, pTaskInfo);
4559
  } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
4560
    pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, false, pTaskInfo);
H
Haojun Liao 已提交
4561 4562
  } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
    pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo);
4563 4564
  } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
    pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
4565 4566
  } else {
    ASSERT(0);
H
Haojun Liao 已提交
4567
  }
4568 4569 4570

  taosMemoryFree(ops);
  return pOptr;
4571
}
H
Haojun Liao 已提交
4572

H
Haojun Liao 已提交
4573
SArray* extractColumnInfo(SNodeList* pNodeList) {
L
Liu Jicong 已提交
4574
  size_t  numOfCols = LIST_LENGTH(pNodeList);
H
Haojun Liao 已提交
4575 4576 4577 4578 4579 4580
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
  if (pList == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

L
Liu Jicong 已提交
4581 4582
  for (int32_t i = 0; i < numOfCols; ++i) {
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
H
Haojun Liao 已提交
4583

4584 4585 4586
    if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

4587
      SColumn c = extractColumnFromColumnNode(pColNode);
4588 4589
      taosArrayPush(pList, &c);
    } else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) {
L
Liu Jicong 已提交
4590 4591
      SValueNode* pValNode = (SValueNode*)pNode->pExpr;
      SColumn     c = {0};
4592
      c.slotId = pNode->slotId;
L
Liu Jicong 已提交
4593 4594 4595 4596
      c.colId = pNode->slotId;
      c.type = pValNode->node.type;
      c.bytes = pValNode->node.resType.bytes;
      c.scale = pValNode->node.resType.scale;
4597 4598 4599 4600
      c.precision = pValNode->node.resType.precision;

      taosArrayPush(pList, &c);
    }
H
Haojun Liao 已提交
4601 4602 4603 4604 4605
  }

  return pList;
}

L
Liu Jicong 已提交
4606 4607
STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
                                STableListInfo* pTableListInfo, const char* idstr) {
4608
  int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo);
wmmhello's avatar
wmmhello 已提交
4609 4610 4611 4612 4613 4614
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
    code = 0;
H
Haojun Liao 已提交
4615
    qDebug("no table qualified for query, %s", idstr);
wmmhello's avatar
wmmhello 已提交
4616 4617 4618
    goto _error;
  }

4619
  SQueryTableDataCond cond = {0};
wmmhello's avatar
wmmhello 已提交
4620
  code = initQueryTableDataCond(&cond, pTableScanNode);
4621
  if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
4622
    goto _error;
X
Xiaoyu Wang 已提交
4623
  }
4624

H
Hongze Cheng 已提交
4625
  STsdbReader* pReader;
H
Haojun Liao 已提交
4626
  code = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, idstr);
H
Haojun Liao 已提交
4627 4628 4629 4630
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

4631
  cleanupQueryTableDataCond(&cond);
H
Haojun Liao 已提交
4632 4633

  return pReader;
wmmhello's avatar
wmmhello 已提交
4634 4635 4636 4637

_error:
  terrno = code;
  return NULL;
H
Haojun Liao 已提交
4638 4639
}

L
Liu Jicong 已提交
4640 4641 4642 4643 4644 4645 4646 4647 4648 4649 4650 4651 4652
static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator");
      return TSDB_CODE_QRY_APP_ERROR;
    }

    if (pOperator->numOfDownstream > 1) {
      qError("join not supported for stream block scan");
      return TSDB_CODE_QRY_APP_ERROR;
    }
    return extractTbscanInStreamOpTree(pOperator->pDownstream[0], ppInfo);
  } else {
4653 4654 4655
    SStreamScanInfo* pInfo = pOperator->info;
    ASSERT(pInfo->pTableScanOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
    *ppInfo = pInfo->pTableScanOp->info;
L
Liu Jicong 已提交
4656 4657 4658 4659
    return 0;
  }
}

4660 4661 4662 4663 4664 4665 4666 4667 4668 4669 4670 4671 4672 4673 4674 4675 4676 4677 4678 4679 4680 4681
int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) {
  if (pNode->pChildren == NULL || LIST_LENGTH(pNode->pChildren) == 0) {
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == pNode->type) {
      *ppNode = (STableScanPhysiNode*)pNode;
      return 0;
    } else {
      ASSERT(0);
      terrno = TSDB_CODE_QRY_APP_ERROR;
      return -1;
    }
  } else {
    if (LIST_LENGTH(pNode->pChildren) != 1) {
      ASSERT(0);
      terrno = TSDB_CODE_QRY_APP_ERROR;
      return -1;
    }
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pNode->pChildren, 0);
    return extractTableScanNode(pChildNode, ppNode);
  }
  return -1;
}

L
Liu Jicong 已提交
4682 4683 4684 4685 4686
int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHandle, int64_t uid, int64_t ts) {
  STableScanInfo* pTableScanInfo = NULL;
  if (extractTbscanInStreamOpTree(pOperator, &pTableScanInfo) < 0) {
    return -1;
  }
4687

L
Liu Jicong 已提交
4688 4689 4690 4691
  STableScanPhysiNode* pNode = NULL;
  if (extractTableScanNode(plan->pNode, &pNode) < 0) {
    ASSERT(0);
  }
4692

H
Haojun Liao 已提交
4693
  tsdbReaderClose(pTableScanInfo->dataReader);
4694

L
Liu Jicong 已提交
4695
  STableListInfo info = {0};
H
Haojun Liao 已提交
4696
  pTableScanInfo->dataReader = doCreateDataReader(pNode, pHandle, &info, NULL);
L
Liu Jicong 已提交
4697 4698 4699 4700
  if (pTableScanInfo->dataReader == NULL) {
    ASSERT(0);
    qError("failed to create data reader");
    return TSDB_CODE_QRY_APP_ERROR;
4701
  }
L
Liu Jicong 已提交
4702
  // TODO: set uid and ts to data reader
4703 4704 4705
  return 0;
}

C
Cary Xu 已提交
4706
int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length, int32_t* nOptrWithVal) {
wmmhello's avatar
wmmhello 已提交
4707
  int32_t code = TDB_CODE_SUCCESS;
4708
  char*   pCurrent = NULL;
wmmhello's avatar
wmmhello 已提交
4709
  int32_t currLength = 0;
4710
  if (ops->fpSet.encodeResultRow) {
C
Cary Xu 已提交
4711
    if (result == NULL || length == NULL || nOptrWithVal == NULL) {
wmmhello's avatar
wmmhello 已提交
4712 4713 4714
      return TSDB_CODE_TSC_INVALID_INPUT;
    }
    code = ops->fpSet.encodeResultRow(ops, &pCurrent, &currLength);
wmmhello's avatar
wmmhello 已提交
4715

4716 4717
    if (code != TDB_CODE_SUCCESS) {
      if (*result != NULL) {
wmmhello's avatar
wmmhello 已提交
4718 4719 4720 4721
        taosMemoryFree(*result);
        *result = NULL;
      }
      return code;
C
Cary Xu 已提交
4722 4723 4724
    } else if (currLength == 0) {
      ASSERT(!pCurrent);
      goto _downstream;
wmmhello's avatar
wmmhello 已提交
4725
    }
wmmhello's avatar
wmmhello 已提交
4726

C
Cary Xu 已提交
4727 4728
    ++(*nOptrWithVal);

C
Cary Xu 已提交
4729
    ASSERT(currLength >= 0);
wmmhello's avatar
wmmhello 已提交
4730

4731
    if (*result == NULL) {
wmmhello's avatar
wmmhello 已提交
4732
      *result = (char*)taosMemoryCalloc(1, currLength + sizeof(int32_t));
wmmhello's avatar
wmmhello 已提交
4733 4734 4735 4736 4737 4738
      if (*result == NULL) {
        taosMemoryFree(pCurrent);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      memcpy(*result + sizeof(int32_t), pCurrent, currLength);
      *(int32_t*)(*result) = currLength + sizeof(int32_t);
4739
    } else {
wmmhello's avatar
wmmhello 已提交
4740
      int32_t sizePre = *(int32_t*)(*result);
4741
      char*   tmp = (char*)taosMemoryRealloc(*result, sizePre + currLength);
wmmhello's avatar
wmmhello 已提交
4742 4743 4744 4745 4746 4747 4748 4749 4750 4751 4752 4753
      if (tmp == NULL) {
        taosMemoryFree(pCurrent);
        taosMemoryFree(*result);
        *result = NULL;
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      *result = tmp;
      memcpy(*result + sizePre, pCurrent, currLength);
      *(int32_t*)(*result) += currLength;
    }
    taosMemoryFree(pCurrent);
    *length = *(int32_t*)(*result);
wmmhello's avatar
wmmhello 已提交
4754 4755
  }

C
Cary Xu 已提交
4756
_downstream:
wmmhello's avatar
wmmhello 已提交
4757
  for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
C
Cary Xu 已提交
4758
    code = encodeOperator(ops->pDownstream[i], result, length, nOptrWithVal);
4759
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
4760
      return code;
wmmhello's avatar
wmmhello 已提交
4761 4762
    }
  }
wmmhello's avatar
wmmhello 已提交
4763
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
4764 4765
}

H
Haojun Liao 已提交
4766
int32_t decodeOperator(SOperatorInfo* ops, const char* result, int32_t length) {
wmmhello's avatar
wmmhello 已提交
4767
  int32_t code = TDB_CODE_SUCCESS;
4768 4769
  if (ops->fpSet.decodeResultRow) {
    if (result == NULL) {
wmmhello's avatar
wmmhello 已提交
4770 4771
      return TSDB_CODE_TSC_INVALID_INPUT;
    }
H
Haojun Liao 已提交
4772

4773
    ASSERT(length == *(int32_t*)result);
H
Haojun Liao 已提交
4774 4775

    const char* data = result + sizeof(int32_t);
L
Liu Jicong 已提交
4776
    code = ops->fpSet.decodeResultRow(ops, (char*)data);
4777
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
4778 4779
      return code;
    }
wmmhello's avatar
wmmhello 已提交
4780

wmmhello's avatar
wmmhello 已提交
4781
    int32_t totalLength = *(int32_t*)result;
4782 4783
    int32_t dataLength = *(int32_t*)data;

4784
    if (totalLength == dataLength + sizeof(int32_t)) {  // the last data
wmmhello's avatar
wmmhello 已提交
4785 4786
      result = NULL;
      length = 0;
4787
    } else {
wmmhello's avatar
wmmhello 已提交
4788 4789 4790 4791
      result += dataLength;
      *(int32_t*)(result) = totalLength - dataLength;
      length = totalLength - dataLength;
    }
wmmhello's avatar
wmmhello 已提交
4792 4793
  }

wmmhello's avatar
wmmhello 已提交
4794 4795
  for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
    code = decodeOperator(ops->pDownstream[i], result, length);
4796
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
4797
      return code;
wmmhello's avatar
wmmhello 已提交
4798 4799
    }
  }
wmmhello's avatar
wmmhello 已提交
4800
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
4801 4802
}

D
dapan1121 已提交
4803
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle) {
D
dapan1121 已提交
4804
  SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo;
4805

D
dapan1121 已提交
4806
  switch (pNode->type) {
D
dapan1121 已提交
4807 4808 4809 4810 4811 4812
    case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
      SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
      if (NULL == pInserterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      pInserterParam->readHandle = readHandle;
L
Liu Jicong 已提交
4813

D
dapan1121 已提交
4814 4815 4816
      *pParam = pInserterParam;
      break;
    }
D
dapan1121 已提交
4817
    case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
4818
      SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
D
dapan1121 已提交
4819 4820 4821 4822
      if (NULL == pDeleterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      int32_t tbNum = taosArrayGetSize(pTask->tableqinfoList.pTableList);
D
dapan1121 已提交
4823
      pDeleterParam->suid = pTask->tableqinfoList.suid;
D
dapan1121 已提交
4824 4825 4826 4827 4828 4829
      pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t));
      if (NULL == pDeleterParam->pUidList) {
        taosMemoryFree(pDeleterParam);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      for (int32_t i = 0; i < tbNum; ++i) {
4830
        STableKeyInfo* pTable = taosArrayGet(pTask->tableqinfoList.pTableList, i);
D
dapan1121 已提交
4831 4832 4833 4834 4835 4836 4837 4838 4839 4840 4841 4842 4843
        taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
      }

      *pParam = pDeleterParam;
      break;
    }
    default:
      break;
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
4844
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
4845
                               const char* sql, EOPTR_EXEC_MODEL model) {
H
Haojun Liao 已提交
4846 4847
  uint64_t queryId = pPlan->id.queryId;

H
Haojun Liao 已提交
4848
  int32_t code = TSDB_CODE_SUCCESS;
D
dapan1121 已提交
4849
  *pTaskInfo = createExecTaskInfo(queryId, taskId, model, pPlan->dbFName);
H
Haojun Liao 已提交
4850 4851 4852 4853
  if (*pTaskInfo == NULL) {
    code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    goto _complete;
  }
H
Haojun Liao 已提交
4854

4855
  (*pTaskInfo)->sql = sql;
wmmhello's avatar
wmmhello 已提交
4856 4857
  (*pTaskInfo)->tableqinfoList.pTagCond = pPlan->pTagCond;
  (*pTaskInfo)->tableqinfoList.pTagIndexCond = pPlan->pTagIndexCond;
4858 4859
  (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId,
                                           &(*pTaskInfo)->tableqinfoList, pPlan->user);
L
Liu Jicong 已提交
4860

D
dapan1121 已提交
4861
  if (NULL == (*pTaskInfo)->pRoot) {
4862
    code = (*pTaskInfo)->code;
D
dapan1121 已提交
4863
    goto _complete;
4864 4865
  }

H
Haojun Liao 已提交
4866 4867
  return code;

H
Haojun Liao 已提交
4868
_complete:
wafwerar's avatar
wafwerar 已提交
4869
  taosMemoryFreeClear(*pTaskInfo);
H
Haojun Liao 已提交
4870 4871
  terrno = code;
  return code;
H
Haojun Liao 已提交
4872 4873
}

wmmhello's avatar
wmmhello 已提交
4874 4875 4876
static void doDestroyTableList(STableListInfo* pTableqinfoList) {
  taosArrayDestroy(pTableqinfoList->pTableList);
  taosHashCleanup(pTableqinfoList->map);
4877 4878
  if (pTableqinfoList->needSortTableByGroupId) {
    for (int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++) {
wmmhello's avatar
wmmhello 已提交
4879 4880 4881 4882 4883
      SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i);
      taosArrayDestroy(tmp);
    }
  }
  taosArrayDestroy(pTableqinfoList->pGroupList);
4884

wmmhello's avatar
wmmhello 已提交
4885 4886
  pTableqinfoList->pTableList = NULL;
  pTableqinfoList->map = NULL;
4887 4888
}

L
Liu Jicong 已提交
4889
void doDestroyTask(SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
4890 4891
  qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));

wmmhello's avatar
wmmhello 已提交
4892
  doDestroyTableList(&pTaskInfo->tableqinfoList);
H
Haojun Liao 已提交
4893
  destroyOperatorInfo(pTaskInfo->pRoot);
4894
  cleanupTableSchemaInfo(pTaskInfo);
4895

wafwerar's avatar
wafwerar 已提交
4896 4897 4898
  taosMemoryFreeClear(pTaskInfo->sql);
  taosMemoryFreeClear(pTaskInfo->id.str);
  taosMemoryFreeClear(pTaskInfo);
4899 4900 4901 4902 4903 4904 4905 4906 4907 4908 4909 4910
}

static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes) {
  if (val == NULL) {
    setNull(output, type, bytes);
    return;
  }

  if (IS_VAR_DATA_TYPE(type)) {
    // Binary data overflows for sort of unknown reasons. Let trim the overflow data
    if (varDataTLen(val) > bytes) {
      int32_t maxLen = bytes - VARSTR_HEADER_SIZE;
L
Liu Jicong 已提交
4911
      int32_t len = (varDataLen(val) > maxLen) ? maxLen : varDataLen(val);
4912 4913 4914 4915 4916 4917 4918 4919 4920 4921 4922 4923
      memcpy(varDataVal(output), varDataVal(val), len);
      varDataSetLen(output, len);
    } else {
      varDataCopy(output, val);
    }
  } else {
    memcpy(output, val, bytes);
  }
}

static int64_t getQuerySupportBufSize(size_t numOfTables) {
  size_t s1 = sizeof(STableQueryInfo);
L
Liu Jicong 已提交
4924 4925
  //  size_t s3 = sizeof(STableCheckInfo);  buffer consumption in tsdb
  return (int64_t)(s1 * 1.5 * numOfTables);
4926 4927 4928 4929 4930 4931 4932
}

int32_t checkForQueryBuf(size_t numOfTables) {
  int64_t t = getQuerySupportBufSize(numOfTables);
  if (tsQueryBufferSizeBytes < 0) {
    return TSDB_CODE_SUCCESS;
  } else if (tsQueryBufferSizeBytes > 0) {
L
Liu Jicong 已提交
4933
    while (1) {
4934 4935 4936 4937 4938 4939 4940 4941 4942 4943 4944 4945 4946 4947 4948 4949 4950 4951 4952 4953 4954 4955 4956 4957 4958 4959
      int64_t s = tsQueryBufferSizeBytes;
      int64_t remain = s - t;
      if (remain >= 0) {
        if (atomic_val_compare_exchange_64(&tsQueryBufferSizeBytes, s, remain) == s) {
          return TSDB_CODE_SUCCESS;
        }
      } else {
        return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
      }
    }
  }

  // disable query processing if the value of tsQueryBufferSize is zero.
  return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
}

void releaseQueryBuf(size_t numOfTables) {
  if (tsQueryBufferSizeBytes < 0) {
    return;
  }

  int64_t t = getQuerySupportBufSize(numOfTables);

  // restore value is not enough buffer available
  atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
}
D
dapan1121 已提交
4960

dengyihao's avatar
dengyihao 已提交
4961 4962
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity,
                                   int32_t* resNum) {
D
dapan1121 已提交
4963 4964
  if (*resNum >= *capacity) {
    *capacity += 10;
dengyihao's avatar
dengyihao 已提交
4965

D
dapan1121 已提交
4966 4967
    *pRes = taosMemoryRealloc(*pRes, (*capacity) * sizeof(SExplainExecInfo));
    if (NULL == *pRes) {
D
dapan1121 已提交
4968
      qError("malloc %d failed", (*capacity) * (int32_t)sizeof(SExplainExecInfo));
D
dapan1121 已提交
4969 4970 4971 4972
      return TSDB_CODE_QRY_OUT_OF_MEMORY;
    }
  }

4973 4974 4975 4976 4977
  SExplainExecInfo* pInfo = &(*pRes)[*resNum];

  pInfo->numOfRows = operatorInfo->resultInfo.totalRows;
  pInfo->startupCost = operatorInfo->cost.openCost;
  pInfo->totalCost = operatorInfo->cost.totalCost;
D
dapan1121 已提交
4978

4979
  if (operatorInfo->fpSet.getExplainFn) {
4980
    int32_t code = operatorInfo->fpSet.getExplainFn(operatorInfo, &pInfo->verboseInfo, &pInfo->verboseLen);
D
dapan1121 已提交
4981
    if (code) {
4982
      qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
D
dapan1121 已提交
4983 4984
      return code;
    }
4985 4986 4987
  } else {
    pInfo->verboseLen = 0;
    pInfo->verboseInfo = NULL;
D
dapan1121 已提交
4988
  }
dengyihao's avatar
dengyihao 已提交
4989

D
dapan1121 已提交
4990
  ++(*resNum);
dengyihao's avatar
dengyihao 已提交
4991

D
dapan1121 已提交
4992
  int32_t code = 0;
D
dapan1121 已提交
4993 4994
  for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
    code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pRes, capacity, resNum);
D
dapan1121 已提交
4995 4996 4997 4998 4999 5000 5001
    if (code) {
      taosMemoryFreeClear(*pRes);
      return TSDB_CODE_QRY_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
5002
}
5
54liuyao 已提交
5003

L
Liu Jicong 已提交
5004
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput,
5005
                               int32_t size) {
5006
  pSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
5
54liuyao 已提交
5007 5008
  pSup->keySize = sizeof(int64_t) + sizeof(TSKEY);
  pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize);
5009 5010
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  pSup->pResultRows = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
5
54liuyao 已提交
5011 5012 5013
  if (pSup->pKeyBuf == NULL || pSup->pResultRows == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
5014
  pSup->valueSize = size;
5
54liuyao 已提交
5015

5
54liuyao 已提交
5016 5017
  pSup->pScanWindow = taosArrayInit(4, sizeof(STimeWindow));

5
54liuyao 已提交
5018 5019 5020 5021 5022 5023 5024 5025 5026
  int32_t pageSize = 4096;
  while (pageSize < pSup->resultRowSize * 4) {
    pageSize <<= 1u;
  }
  // at least four pages need to be in buffer
  int32_t bufSize = 4096 * 256;
  if (bufSize <= pageSize) {
    bufSize = pageSize * 4;
  }
5027
  int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, TD_TMP_DIR_PATH);
L
Liu Jicong 已提交
5028
  for (int32_t i = 0; i < numOfOutput; ++i) {
5029 5030 5031
    pCtx[i].pBuf = pSup->pResultBuf;
  }
  return code;
5
54liuyao 已提交
5032
}