executorimpl.c 180.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

16
#include <executorimpl.h>
H
Haojun Liao 已提交
17 18
#include "filter.h"
#include "function.h"
19 20
#include "functionMgt.h"
#include "os.h"
H
Haojun Liao 已提交
21
#include "querynodes.h"
22
#include "tfill.h"
dengyihao's avatar
dengyihao 已提交
23
#include "tname.h"
X
Xiaoyu Wang 已提交
24
#include "tref.h"
25

H
Haojun Liao 已提交
26
#include "tdatablock.h"
27
#include "tglobal.h"
H
Haojun Liao 已提交
28
#include "tmsg.h"
H
Haojun Liao 已提交
29
#include "tsort.h"
30
#include "ttime.h"
H
Haojun Liao 已提交
31

32
#include "executorimpl.h"
dengyihao's avatar
dengyihao 已提交
33
#include "index.h"
34
#include "query.h"
35 36
#include "tcompare.h"
#include "tcompression.h"
H
Haojun Liao 已提交
37
#include "thash.h"
38
#include "ttypes.h"
dengyihao's avatar
dengyihao 已提交
39
#include "vnode.h"
40

H
Haojun Liao 已提交
41
#define IS_MAIN_SCAN(runtime)          ((runtime)->scanFlag == MAIN_SCAN)
42 43 44 45 46 47
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)

#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)

#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
wafwerar's avatar
wafwerar 已提交
48
  uint32_t v = taosRand();
49 50 51 52

  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
53
    return taosMemoryMalloc(__size);
54 55 56 57
  }
}

static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
wafwerar's avatar
wafwerar 已提交
58
  uint32_t v = taosRand();
59 60 61
  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
62
    return taosMemoryCalloc(num, __size);
63 64 65 66
  }
}

static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
wafwerar's avatar
wafwerar 已提交
67
  uint32_t v = taosRand();
68 69 70
  if (v % 5 <= 1) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
71
    return taosMemoryRealloc(p, __size);
72 73 74 75 76 77 78 79
  }
}

#define calloc  u_calloc
#define malloc  u_malloc
#define realloc u_realloc
#endif

X
Xiaoyu Wang 已提交
80
#define CLEAR_QUERY_STATUS(q, st)   ((q)->status &= (~(st)))
81 82
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)

L
Liu Jicong 已提交
83 84 85
int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; }

static int32_t getExprFunctionId(SExprInfo* pExprInfo) {
86
  assert(pExprInfo != NULL && pExprInfo->pExpr != NULL && pExprInfo->pExpr->nodeType == TEXPR_UNARYEXPR_NODE);
87
  return 0;
88 89 90 91
}

static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes);

92
static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pSDataBlock);
93

X
Xiaoyu Wang 已提交
94
static void releaseQueryBuf(size_t numOfTables);
95 96 97 98 99

static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput);
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
X
Xiaoyu Wang 已提交
100

H
Haojun Liao 已提交
101
static void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput);
H
Haojun Liao 已提交
102 103
static void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput);

104 105
static void destroyOperatorInfo(SOperatorInfo* pOperator);

106
void doSetOperatorCompleted(SOperatorInfo* pOperator) {
107
  pOperator->status = OP_EXEC_DONE;
108

109
  pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000) / 1000.0;
H
Haojun Liao 已提交
110
  if (pOperator->pTaskInfo != NULL) {
111
    setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
112 113
  }
}
114

H
Haojun Liao 已提交
115
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) {
116
  OPTR_SET_OPENED(pOperator);
117
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
118
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
119 120
}

121
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
L
Liu Jicong 已提交
122
                                   __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode,
123
                                   __optr_decode_fn_t decode, __optr_explain_fn_t explain) {
124 125 126 127 128 129 130 131 132 133 134 135 136 137
  SOperatorFpSet fpSet = {
      ._openFn = openFn,
      .getNextFn = nextFn,
      .getStreamResFn = streamFn,
      .cleanupFn = cleanup,
      .closeFn = closeFn,
      .encodeResultRow = encode,
      .decodeResultRow = decode,
      .getExplainFn = explain,
  };

  return fpSet;
}

H
Haojun Liao 已提交
138
void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
H
Haojun Liao 已提交
139

X
Xiaoyu Wang 已提交
140 141 142
static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo,
                                  SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, const int32_t* rowCellOffset,
                                  SqlFunctionCtx* pCtx, int32_t numOfExprs);
H
Haojun Liao 已提交
143

144
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
L
Liu Jicong 已提交
145 146
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput,
                                     uint64_t groupId);
147

L
Liu Jicong 已提交
148 149
// setup the output buffer for each operator
static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) {
dengyihao's avatar
dengyihao 已提交
150 151
  if (TSDB_COL_IS_TAG(pColumn->flag) || TSDB_COL_IS_UD_COL(pColumn->flag) ||
      pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
152 153 154 155 156 157 158 159 160 161
    return false;
  }

  if (pStatis != NULL && pStatis->numOfNull == 0) {
    return false;
  }

  return true;
}

162
#if 0
L
Liu Jicong 已提交
163 164
static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, char* pData,
                                int16_t bytes, bool masterscan, uint64_t uid) {
165 166 167
  bool existed = false;
  SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid);

L
Liu Jicong 已提交
168 169
  SResultRow** p1 =
      (SResultRow**)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
170 171 172 173 174 175 176 177 178 179 180

  // in case of repeat scan/reverse scan, no new time window added.
  if (QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQueryAttr)) {
    if (!masterscan) {  // the *p1 may be NULL in case of sliding+offset exists.
      return p1 != NULL;
    }

    if (p1 != NULL) {
      if (pResultRowInfo->size == 0) {
        existed = false;
      } else if (pResultRowInfo->size == 1) {
dengyihao's avatar
dengyihao 已提交
181
        //        existed = (pResultRowInfo->pResult[0] == (*p1));
182 183
      } else {  // check if current pResultRowInfo contains the existed pResultRow
        SET_RES_EXT_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid, pResultRowInfo);
L
Liu Jicong 已提交
184 185
        int64_t* index =
            taosHashGet(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes));
186 187 188 189 190 191 192 193 194 195 196 197 198
        if (index != NULL) {
          existed = true;
        } else {
          existed = false;
        }
      }
    }

    return existed;
  }

  return p1 != NULL;
}
199
#endif
200

201
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize) {
L
Liu Jicong 已提交
202
  SFilePage* pData = NULL;
203 204 205 206 207 208 209 210 211 212 213 214 215

  // in the first scan, new space needed for results
  int32_t pageId = -1;
  SIDList list = getDataBufPagesIdList(pResultBuf, tableGroupId);

  if (taosArrayGetSize(list) == 0) {
    pData = getNewBufPage(pResultBuf, tableGroupId, &pageId);
    pData->num = sizeof(SFilePage);
  } else {
    SPageInfo* pi = getLastPageInfo(list);
    pData = getBufPage(pResultBuf, getPageId(pi));
    pageId = getPageId(pi);

wmmhello's avatar
wmmhello 已提交
216
    if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
217 218 219 220 221 222 223 224 225 226 227 228 229 230
      // release current page first, and prepare the next one
      releaseBufPageInfo(pResultBuf, pi);

      pData = getNewBufPage(pResultBuf, tableGroupId, &pageId);
      if (pData != NULL) {
        pData->num = sizeof(SFilePage);
      }
    }
  }

  if (pData == NULL) {
    return NULL;
  }

231 232
  setBufPageDirty(pData, true);

233 234 235 236 237
  // set the number of rows in current disk page
  SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
  pResultRow->pageId = pageId;
  pResultRow->offset = (int32_t)pData->num;

wmmhello's avatar
wmmhello 已提交
238
  pData->num += interBufSize;
239 240 241 242

  return pResultRow;
}

243 244 245 246 247 248 249
/**
 * the struct of key in hash table
 * +----------+---------------+
 * | group id |   key data    |
 * | 8 bytes  | actual length |
 * +----------+---------------+
 */
250 251 252
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
                                   int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
                                   bool isIntervalQuery, SAggSupporter* pSup) {
253
  SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
H
Haojun Liao 已提交
254

dengyihao's avatar
dengyihao 已提交
255 256
  SResultRowPosition* p1 =
      (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
H
Haojun Liao 已提交
257

258 259
  SResultRow* pResult = NULL;

H
Haojun Liao 已提交
260 261
  // in case of repeat scan/reverse scan, no new time window added.
  if (isIntervalQuery) {
262 263
    if (masterscan && p1 != NULL) {  // the *p1 may be NULL in case of sliding+offset exists.
      pResult = getResultRowByPos(pResultBuf, p1);
264
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
265 266
    }
  } else {
dengyihao's avatar
dengyihao 已提交
267 268
    // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
    // pResultRowInfo object.
H
Haojun Liao 已提交
269
    if (p1 != NULL) {
270
      // todo
271
      pResult = getResultRowByPos(pResultBuf, p1);
272
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
273 274 275
    }
  }

L
Liu Jicong 已提交
276
  // 1. close current opened time window
277
  if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
278
#ifdef BUF_PAGE_DEBUG
wmmhello's avatar
wmmhello 已提交
279
    qDebug("page_1");
280
#endif
281
    SResultRowPosition pos = pResultRowInfo->cur;
X
Xiaoyu Wang 已提交
282
    SFilePage*         pPage = getBufPage(pResultBuf, pos.pageId);
283 284 285 286 287
    releaseBufPage(pResultBuf, pPage);
  }

  // allocate a new buffer page
  if (pResult == NULL) {
288
#ifdef BUF_PAGE_DEBUG
wmmhello's avatar
wmmhello 已提交
289
    qDebug("page_2");
290
#endif
H
Haojun Liao 已提交
291
    ASSERT(pSup->resultRowSize > 0);
292 293
    pResult = getNewResultRow(pResultBuf, groupId, pSup->resultRowSize);

294
    initResultRow(pResult);
H
Haojun Liao 已提交
295

296 297
    // add a new result set for a new group
    SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
X
Xiaoyu Wang 已提交
298 299
    taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
                sizeof(SResultRowPosition));
H
Haojun Liao 已提交
300 301
  }

302 303 304
  // 2. set the new time window to be the new active time window
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};

H
Haojun Liao 已提交
305
  // too many time window in query
306
  if (taosHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
H
Haojun Liao 已提交
307 308 309
    longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
  }

H
Haojun Liao 已提交
310
  return pResult;
H
Haojun Liao 已提交
311 312
}

313
// a new buffer page for each table. Needs to opt this design
L
Liu Jicong 已提交
314
static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, int32_t tid, uint32_t size) {
315 316 317 318
  if (pWindowRes->pageId != -1) {
    return 0;
  }

L
Liu Jicong 已提交
319
  SFilePage* pData = NULL;
320 321 322 323 324 325

  // in the first scan, new space needed for results
  int32_t pageId = -1;
  SIDList list = getDataBufPagesIdList(pResultBuf, tid);

  if (taosArrayGetSize(list) == 0) {
H
Haojun Liao 已提交
326
    pData = getNewBufPage(pResultBuf, tid, &pageId);
327
    pData->num = sizeof(SFilePage);
328 329
  } else {
    SPageInfo* pi = getLastPageInfo(list);
330
    pData = getBufPage(pResultBuf, getPageId(pi));
331
    pageId = getPageId(pi);
332

333
    if (pData->num + size > getBufPageSize(pResultBuf)) {
334
      // release current page first, and prepare the next one
335
      releaseBufPageInfo(pResultBuf, pi);
336

H
Haojun Liao 已提交
337
      pData = getNewBufPage(pResultBuf, tid, &pageId);
338
      if (pData != NULL) {
339
        pData->num = sizeof(SFilePage);
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
      }
    }
  }

  if (pData == NULL) {
    return -1;
  }

  // set the number of rows in current disk page
  if (pWindowRes->pageId == -1) {  // not allocated yet, allocate new buffer
    pWindowRes->pageId = pageId;
    pWindowRes->offset = (int32_t)pData->num;

    pData->num += size;
    assert(pWindowRes->pageId >= 0);
  }

  return 0;
}

360
//  query_range_start, query_range_end, window_duration, window_start, window_end
361
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
362 363 364
  pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
  pColData->info.bytes = sizeof(int64_t);

365
  colInfoDataEnsureCapacity(pColData, 5);
366 367 368 369 370 371 372 373 374
  colDataAppendInt64(pColData, 0, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 1, &pQueryWindow->ekey);

  int64_t interval = 0;
  colDataAppendInt64(pColData, 2, &interval);  // this value may be variable in case of 'n' and 'y'.
  colDataAppendInt64(pColData, 3, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
}

X
Xiaoyu Wang 已提交
375 376 377
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin,
                      SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol,
                      int32_t numOfTotal, int32_t numOfOutput, int32_t order) {
378
  for (int32_t k = 0; k < numOfOutput; ++k) {
H
Haojun Liao 已提交
379
    // keep it temporarily
380
    // todo no need this??
dengyihao's avatar
dengyihao 已提交
381 382
    bool    hasAgg = pCtx[k].input.colDataAggIsSet;
    int32_t numOfRows = pCtx[k].input.numOfRows;
H
Haojun Liao 已提交
383
    int32_t startOffset = pCtx[k].input.startRowIndex;
384

385
    pCtx[k].input.startRowIndex = offset;
386
    pCtx[k].input.numOfRows = forwardStep;
387 388 389

    // not a whole block involved in query processing, statistics data can not be used
    // NOTE: the original value of isSet have been changed here
390 391
    if (pCtx[k].input.colDataAggIsSet && forwardStep < numOfTotal) {
      pCtx[k].input.colDataAggIsSet = false;
392 393
    }

394 395
    if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) {
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
396 397

      char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
398

399
      SColumnInfoData idata = {0};
dengyihao's avatar
dengyihao 已提交
400
      idata.info.type = TSDB_DATA_TYPE_BIGINT;
401
      idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
dengyihao's avatar
dengyihao 已提交
402
      idata.pData = p;
403 404 405 406

      SScalarParam out = {.columnData = &idata};
      SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
      pCtx[k].sfp.process(&tw, 1, &out);
407
      pEntryInfo->numOfRes = 1;
408 409 410 411 412 413 414 415 416 417
    } else {
      int32_t code = TSDB_CODE_SUCCESS;
      if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
        code = pCtx[k].fpSet.process(&pCtx[k]);

        if (code != TSDB_CODE_SUCCESS) {
          qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
          taskInfo->code = code;
          longjmp(taskInfo->env, code);
        }
418
      }
419

420 421 422 423 424
      // restore it
      pCtx[k].input.colDataAggIsSet = hasAgg;
      pCtx[k].input.startRowIndex = startOffset;
      pCtx[k].input.numOfRows = numOfRows;
    }
425 426 427
  }
}

dengyihao's avatar
dengyihao 已提交
428
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
429
                                   int32_t scanFlag, bool createDummyCol);
430

dengyihao's avatar
dengyihao 已提交
431 432
static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock,
                                    int32_t order) {
433
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
434
    pCtx[i].order = order;
435
    pCtx[i].input.numOfRows = pBlock->info.rows;
436
    setBlockStatisInfo(&pCtx[i], &pOperator->exprSupp.pExprInfo[i], pBlock);
437 438 439
  }
}

X
Xiaoyu Wang 已提交
440 441
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
                       int32_t scanFlag, bool createDummyCol) {
442
  if (pBlock->pBlockAgg != NULL) {
H
Haojun Liao 已提交
443
    doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
444
  } else {
445
    doSetInputDataBlock(pOperator, pCtx, pBlock, order, scanFlag, createDummyCol);
H
Haojun Liao 已提交
446
  }
447 448
}

L
Liu Jicong 已提交
449 450
static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t paramIndex,
                                             int32_t numOfRows) {
451 452 453 454 455 456 457 458
  SColumnInfoData* pColInfo = NULL;
  if (pInput->pData[paramIndex] == NULL) {
    pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pColInfo == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    // Set the correct column info (data type and bytes)
459 460
    pColInfo->info.type = pFuncParam->param.nType;
    pColInfo->info.bytes = pFuncParam->param.nLen;
461 462

    pInput->pData[paramIndex] = pColInfo;
463 464
  } else {
    pColInfo = pInput->pData[paramIndex];
465 466
  }

467
  colInfoDataEnsureCapacity(pColInfo, numOfRows);
468

469
  int8_t type = pFuncParam->param.nType;
470 471
  if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
    int64_t v = pFuncParam->param.i;
dengyihao's avatar
dengyihao 已提交
472
    for (int32_t i = 0; i < numOfRows; ++i) {
473 474 475 476
      colDataAppendInt64(pColInfo, i, &v);
    }
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
dengyihao's avatar
dengyihao 已提交
477
    for (int32_t i = 0; i < numOfRows; ++i) {
478 479
      colDataAppendDouble(pColInfo, i, &v);
    }
480
  } else if (type == TSDB_DATA_TYPE_VARCHAR) {
L
Liu Jicong 已提交
481
    char* tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE);
482
    STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen);
L
Liu Jicong 已提交
483
    for (int32_t i = 0; i < numOfRows; ++i) {
484 485
      colDataAppend(pColInfo, i, tmp, false);
    }
486 487 488 489 490
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
491
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
X
Xiaoyu Wang 已提交
492
                                   int32_t scanFlag, bool createDummyCol) {
493 494
  int32_t code = TSDB_CODE_SUCCESS;

495
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
L
Liu Jicong 已提交
496
    pCtx[i].order = order;
497 498
    pCtx[i].input.numOfRows = pBlock->info.rows;

L
Liu Jicong 已提交
499
    pCtx[i].pSrcBlock = pBlock;
X
Xiaoyu Wang 已提交
500
    pCtx[i].scanFlag = scanFlag;
H
Haojun Liao 已提交
501

502
    SInputColumnInfoData* pInput = &pCtx[i].input;
503
    pInput->uid = pBlock->info.uid;
C
Cary Xu 已提交
504
    pInput->colDataAggIsSet = false;
505

506
    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
507
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
dengyihao's avatar
dengyihao 已提交
508
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
G
Ganlin Zhao 已提交
509 510
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
dengyihao's avatar
dengyihao 已提交
511
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
512 513 514
        pInput->totalRows = pBlock->info.rows;
        pInput->numOfRows = pBlock->info.rows;
        pInput->startRowIndex = 0;
515

516
        // NOTE: the last parameter is the primary timestamp column
H
Haojun Liao 已提交
517
        // todo: refactor this
518
        if (fmIsTimelineFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
H
Haojun Liao 已提交
519 520
          pInput->pPTS = pInput->pData[j];   // in case of merge function, this is not always the ts column data.
//          ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP);
521
        }
522 523
        ASSERT(pInput->pData[j] != NULL);
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
524 525 526
        // todo avoid case: top(k, 12), 12 is the value parameter.
        // sum(11), 11 is also the value parameter.
        if (createDummyCol && pOneExpr->base.numOfParams == 1) {
527 528 529 530
          pInput->totalRows = pBlock->info.rows;
          pInput->numOfRows = pBlock->info.rows;
          pInput->startRowIndex = 0;

531
          code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows);
532 533 534
          if (code != TSDB_CODE_SUCCESS) {
            return code;
          }
535
        }
G
Ganlin Zhao 已提交
536 537
      }
    }
H
Haojun Liao 已提交
538
  }
539 540

  return code;
H
Haojun Liao 已提交
541 542
}

543
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
544
  for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
H
Haojun Liao 已提交
545
    if (functionNeedToExecute(&pCtx[k])) {
546
      // todo add a dummy funtion to avoid process check
547 548 549
      if (pCtx[k].fpSet.process == NULL) {
        continue;
      }
H
Haojun Liao 已提交
550

551 552 553 554
      int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
      if (code != TSDB_CODE_SUCCESS) {
        qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
        return code;
555
      }
556 557
    }
  }
558 559

  return TSDB_CODE_SUCCESS;
560 561
}

H
Haojun Liao 已提交
562
static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) {
dengyihao's avatar
dengyihao 已提交
563
  size_t num = (pPseudoList != NULL) ? taosArrayGetSize(pPseudoList) : 0;
H
Haojun Liao 已提交
564 565 566 567 568
  for (int32_t i = 0; i < num; ++i) {
    pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i);
  }
}

569
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
X
Xiaoyu Wang 已提交
570
                              int32_t numOfOutput, SArray* pPseudoList) {
H
Haojun Liao 已提交
571
  setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
H
Haojun Liao 已提交
572
  pResult->info.groupId = pSrcBlock->info.groupId;
H
Haojun Liao 已提交
573

574 575
  // if the source equals to the destination, it is to create a new column as the result of scalar
  // function or some operators.
576 577
  bool createNewColModel = (pResult == pSrcBlock);

578 579
  int32_t numOfRows = 0;

580
  for (int32_t k = 0; k < numOfOutput; ++k) {
581 582
    int32_t               outputSlotId = pExpr[k].base.resSchema.slotId;
    SqlFunctionCtx*       pfCtx = &pCtx[k];
583
    SInputColumnInfoData* pInputData = &pfCtx->input;
584

L
Liu Jicong 已提交
585
    if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) {  // it is a project query
586
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
587
      if (pResult->info.rows > 0 && !createNewColModel) {
588 589
        colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pInputData->pData[0],
                        pInputData->numOfRows);
590
      } else {
591
        colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info);
592
      }
593

594
      numOfRows = pInputData->numOfRows;
595
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
596
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
597

dengyihao's avatar
dengyihao 已提交
598
      int32_t offset = createNewColModel ? 0 : pResult->info.rows;
599 600 601 602 603 604 605 606

      int32_t type = pExpr[k].base.pParam[0].param.nType;
      if (TSDB_DATA_TYPE_NULL == type) {
        colDataAppendNNULL(pColInfoData, offset, pSrcBlock->info.rows);
      } else {
        for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
          colDataAppend(pColInfoData, i + offset, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false);
        }
607
      }
608 609

      numOfRows = pSrcBlock->info.rows;
H
Haojun Liao 已提交
610
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) {
611 612 613
      SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
      taosArrayPush(pBlockList, &pSrcBlock);

614
      SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
615
      SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
616

617
      SScalarParam dest = {.columnData = &idata};
X
Xiaoyu Wang 已提交
618
      int32_t      code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
619 620 621 622
      if (code != TSDB_CODE_SUCCESS) {
        taosArrayDestroy(pBlockList);
        return code;
      }
623

dengyihao's avatar
dengyihao 已提交
624
      int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
625
      ASSERT(pResult->info.capacity > 0);
626
      colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
D
dapan1121 已提交
627 628
      colDataDestroy(&idata);
      
629
      numOfRows = dest.numOfRows;
630 631
      taosArrayDestroy(pBlockList);
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
632 633
      // _rowts/_c0, not tbname column
      if (fmIsPseudoColumnFunc(pfCtx->functionId) && (!fmIsScanPseudoColumnFunc(pfCtx->functionId))) {
H
Haojun Liao 已提交
634
        // do nothing
635
      } else if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) {
636 637
        SResultRowEntryInfo* pResInfo = GET_RES_INFO(pfCtx);
        pfCtx->fpSet.init(pfCtx, pResInfo);
638 639 640 641 642 643 644 645 646 647 648

        pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
        pfCtx->offset = createNewColModel ? 0 : pResult->info.rows;  // set the start offset

        // set the timestamp(_rowts) output buffer
        if (taosArrayGetSize(pPseudoList) > 0) {
          int32_t* outputColIndex = taosArrayGet(pPseudoList, 0);
          pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
        }

        numOfRows = pfCtx->fpSet.process(pfCtx);
H
Haojun Liao 已提交
649
      } else if (fmIsAggFunc(pfCtx->functionId)) {
650 651
        // _group_key function for "partition by tbname" + csum(col_name) query
        SColumnInfoData* pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
652
        int32_t          slotId = pfCtx->param[0].pCol->slotId;
653 654 655

        // todo handle the json tag
        SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
656
        for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
657 658 659 660 661 662 663 664 665
          bool isNull = colDataIsNull_s(pInput, f);
          if (isNull) {
            colDataAppendNULL(pOutput, pResult->info.rows + f);
          } else {
            char* data = colDataGetData(pInput, f);
            colDataAppend(pOutput, pResult->info.rows + f, data, isNull);
          }
        }

H
Haojun Liao 已提交
666 667 668
      } else {
        SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
        taosArrayPush(pBlockList, &pSrcBlock);
G
Ganlin Zhao 已提交
669

670
        SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
671
        SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
H
Haojun Liao 已提交
672

673
        SScalarParam dest = {.columnData = &idata};
X
Xiaoyu Wang 已提交
674
        int32_t      code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
675 676 677 678
        if (code != TSDB_CODE_SUCCESS) {
          taosArrayDestroy(pBlockList);
          return code;
        }
679

dengyihao's avatar
dengyihao 已提交
680
        int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
681
        ASSERT(pResult->info.capacity > 0);
682
        colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
D
dapan1121 已提交
683
        colDataDestroy(&idata);
684 685

        numOfRows = dest.numOfRows;
H
Haojun Liao 已提交
686 687
        taosArrayDestroy(pBlockList);
      }
688
    } else {
689
      ASSERT(0);
690 691
    }
  }
692

693 694 695
  if (!createNewColModel) {
    pResult->info.rows += numOfRows;
  }
696 697

  return TSDB_CODE_SUCCESS;
698 699
}

5
54liuyao 已提交
700
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
701
  struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
702

703 704 705 706 707
  // in case of timestamp column, always generated results.
  int32_t functionId = pCtx->functionId;
  if (functionId == -1) {
    return false;
  }
708

709 710
  if (pCtx->scanFlag == REPEAT_SCAN) {
    return fmIsRepeatScanFunc(pCtx->functionId);
711 712
  }

713 714
  if (isRowEntryCompleted(pResInfo)) {
    return false;
715 716
  }

717 718 719
  return true;
}

720 721 722 723 724 725 726
static int32_t doCreateConstantValColumnAggInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type,
                                                int32_t paramIndex, int32_t numOfRows) {
  if (pInput->pData[paramIndex] == NULL) {
    pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pInput->pData[paramIndex] == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
727

728 729 730
    // Set the correct column info (data type and bytes)
    pInput->pData[paramIndex]->info.type = type;
    pInput->pData[paramIndex]->info.bytes = tDataTypes[type].bytes;
731
  }
H
Haojun Liao 已提交
732

733 734 735 736 737 738
  SColumnDataAgg* da = NULL;
  if (pInput->pColumnDataAgg[paramIndex] == NULL) {
    da = taosMemoryCalloc(1, sizeof(SColumnDataAgg));
    pInput->pColumnDataAgg[paramIndex] = da;
    if (da == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
739 740
    }
  } else {
741
    da = pInput->pColumnDataAgg[paramIndex];
742 743
  }

744
  ASSERT(!IS_VAR_DATA_TYPE(type));
745

746 747
  if (type == TSDB_DATA_TYPE_BIGINT) {
    int64_t v = pFuncParam->param.i;
748
    *da = (SColumnDataAgg){.numOfNull = 0, .min = v, .max = v, .sum = v * numOfRows};
749 750
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
751
    *da = (SColumnDataAgg){.numOfNull = 0};
752

753 754 755 756 757 758
    *(double*)&da->min = v;
    *(double*)&da->max = v;
    *(double*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_BOOL) {  // todo validate this data type
    bool v = pFuncParam->param.i;

759
    *da = (SColumnDataAgg){.numOfNull = 0};
760 761 762 763 764
    *(bool*)&da->min = 0;
    *(bool*)&da->max = v;
    *(bool*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_TIMESTAMP) {
    // do nothing
765
  } else {
766
    ASSERT(0);
767 768
  }

769 770
  return TSDB_CODE_SUCCESS;
}
771 772 773 774 775 776 777 778 779 780 781

void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock) {
  int32_t numOfRows = pBlock->info.rows;

  SInputColumnInfoData* pInput = &pCtx->input;
  pInput->numOfRows = numOfRows;
  pInput->totalRows = numOfRows;

  if (pBlock->pBlockAgg != NULL) {
    pInput->colDataAggIsSet = true;

782 783
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
784

785 786
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
787 788 789 790
        pInput->pColumnDataAgg[j] = pBlock->pBlockAgg[slotId];
        if (pInput->pColumnDataAgg[j] == NULL) {
          pInput->colDataAggIsSet = false;
        }
791 792 793 794

        // Here we set the column info data since the data type for each column data is required, but
        // the data in the corresponding SColumnInfoData will not be used.
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
795 796
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
        doCreateConstantValColumnAggInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
797 798
      }
    }
799
  } else {
800
    pInput->colDataAggIsSet = false;
801 802 803
  }

  // set the statistics data for primary time stamp column
804 805 806 807 808
  //  if (pCtx->functionId == FUNCTION_SPREAD && pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
  //    pCtx->isAggSet = true;
  //    pCtx->agg.min = pBlock->info.window.skey;
  //    pCtx->agg.max = pBlock->info.window.ekey;
  //  }
809 810
}

L
Liu Jicong 已提交
811
bool isTaskKilled(SExecTaskInfo* pTaskInfo) {
812 813
  // query has been executed more than tsShellActivityTimer, and the retrieve has not arrived
  // abort current query execution.
L
Liu Jicong 已提交
814 815
  if (pTaskInfo->owner != 0 &&
      ((taosGetTimestampSec() - pTaskInfo->cost.start / 1000) > 10 * getMaximumIdleDurationSec())
816 817
      /*(!needBuildResAfterQueryComplete(pTaskInfo))*/) {
    assert(pTaskInfo->cost.start != 0);
L
Liu Jicong 已提交
818 819 820
    //    qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64
    //           ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec());
    //    return true;
821 822 823 824 825
  }

  return false;
}

L
Liu Jicong 已提交
826
void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED; }
827 828

/////////////////////////////////////////////////////////////////////////////////////////////
L
Liu Jicong 已提交
829
// todo refactor : return window
830
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win) {
H
Haojun Liao 已提交
831
  win->skey = taosTimeTruncate(key, pInterval, precision);
832 833

  /*
H
Haojun Liao 已提交
834
   * if the realSkey > INT64_MAX - pInterval->interval, the query duration between
835 836
   * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
   */
837 838
  win->ekey = taosTimeAdd(win->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
  if (win->ekey < win->skey) {
839 840 841 842
    win->ekey = INT64_MAX;
  }
}

843
#if 0
L
Liu Jicong 已提交
844
static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
845

846 847 848
  bool hasFirstLastFunc = false;
  bool hasOtherFunc = false;

849
  if (status == BLK_DATA_DATA_LOAD || status == BLK_DATA_FILTEROUT) {
850 851 852 853 854
    return status;
  }

  for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
    int32_t functionId = getExprFunctionId(&pQuery->pExpr1[i]);
855

856 857 858 859 860 861 862 863 864 865
    if (functionId == FUNCTION_TS || functionId == FUNCTION_TS_DUMMY || functionId == FUNCTION_TAG ||
        functionId == FUNCTION_TAG_DUMMY) {
      continue;
    }

    if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_LAST_DST) {
      hasFirstLastFunc = true;
    } else {
      hasOtherFunc = true;
    }
866

867 868
  }

869
  if (hasFirstLastFunc && status == BLK_DATA_NOT_LOAD) {
L
Liu Jicong 已提交
870
    if (!hasOtherFunc) {
871
      return BLK_DATA_FILTEROUT;
872
    } else {
873
      return BLK_DATA_DATA_LOAD;
874 875 876 877 878 879
    }
  }

  return status;
}

880 881
#endif

L
Liu Jicong 已提交
882 883
// static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableReq* pQueryMsg, bool stableQuery) {
//   STaskAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
H
Haojun Liao 已提交
884
//
L
Liu Jicong 已提交
885 886 887 888
//   // in case of point-interpolation query, use asc order scan
//   char msg[] = "QInfo:0x%"PRIx64" scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%"
//   PRId64
//                "-%" PRId64 ", new qrange:%" PRId64 "-%" PRId64;
H
Haojun Liao 已提交
889
//
L
Liu Jicong 已提交
890 891 892 893 894
//   // todo handle the case the the order irrelevant query type mixed up with order critical query type
//   // descending order query for last_row query
//   if (isFirstLastRowQuery(pQueryAttr)) {
//     //qDebug("QInfo:0x%"PRIx64" scan order changed for last_row query, old:%d, new:%d", pQInfo->qId,
//     pQueryAttr->order.order, TSDB_ORDER_ASC);
H
Haojun Liao 已提交
895
//
L
Liu Jicong 已提交
896 897
//     pQueryAttr->order.order = TSDB_ORDER_ASC;
//     if (pQueryAttr->window.skey > pQueryAttr->window.ekey) {
wafwerar's avatar
wafwerar 已提交
898
//       TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
L
Liu Jicong 已提交
899
//     }
H
Haojun Liao 已提交
900
//
L
Liu Jicong 已提交
901 902 903
//     pQueryAttr->needReverseScan = false;
//     return;
//   }
H
Haojun Liao 已提交
904
//
L
Liu Jicong 已提交
905 906 907
//   if (pQueryAttr->groupbyColumn && pQueryAttr->order.order == TSDB_ORDER_DESC) {
//     pQueryAttr->order.order = TSDB_ORDER_ASC;
//     if (pQueryAttr->window.skey > pQueryAttr->window.ekey) {
wafwerar's avatar
wafwerar 已提交
908
//       TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
L
Liu Jicong 已提交
909
//     }
H
Haojun Liao 已提交
910
//
L
Liu Jicong 已提交
911 912 913 914
//     pQueryAttr->needReverseScan = false;
//     doUpdateLastKey(pQueryAttr);
//     return;
//   }
H
Haojun Liao 已提交
915
//
L
Liu Jicong 已提交
916 917 918 919 920 921
//   if (pQueryAttr->pointInterpQuery && pQueryAttr->interval.interval == 0) {
//     if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
//       //qDebug(msg, pQInfo->qId, "interp", pQueryAttr->order.order, TSDB_ORDER_ASC, pQueryAttr->window.skey,
//       pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey); TSWAP(pQueryAttr->window.skey,
//       pQueryAttr->window.ekey, TSKEY);
//     }
H
Haojun Liao 已提交
922
//
L
Liu Jicong 已提交
923 924 925
//     pQueryAttr->order.order = TSDB_ORDER_ASC;
//     return;
//   }
H
Haojun Liao 已提交
926
//
L
Liu Jicong 已提交
927 928 929 930
//   if (pQueryAttr->interval.interval == 0) {
//     if (onlyFirstQuery(pQueryAttr)) {
//       if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
//         //qDebug(msg, pQInfo->qId, "only-first", pQueryAttr->order.order, TSDB_ORDER_ASC, pQueryAttr->window.skey,
H
Haojun Liao 已提交
931 932
////               pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
//
wafwerar's avatar
wafwerar 已提交
933
//        TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
H
Haojun Liao 已提交
934 935 936 937 938 939 940 941 942 943
//        doUpdateLastKey(pQueryAttr);
//      }
//
//      pQueryAttr->order.order = TSDB_ORDER_ASC;
//      pQueryAttr->needReverseScan = false;
//    } else if (onlyLastQuery(pQueryAttr) && notContainSessionOrStateWindow(pQueryAttr)) {
//      if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//        //qDebug(msg, pQInfo->qId, "only-last", pQueryAttr->order.order, TSDB_ORDER_DESC, pQueryAttr->window.skey,
////               pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
//
wafwerar's avatar
wafwerar 已提交
944
//        TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
H
Haojun Liao 已提交
945 946 947 948 949 950 951 952 953 954 955 956
//        doUpdateLastKey(pQueryAttr);
//      }
//
//      pQueryAttr->order.order = TSDB_ORDER_DESC;
//      pQueryAttr->needReverseScan = false;
//    }
//
//  } else {  // interval query
//    if (stableQuery) {
//      if (onlyFirstQuery(pQueryAttr)) {
//        if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
//          //qDebug(msg, pQInfo->qId, "only-first stable", pQueryAttr->order.order, TSDB_ORDER_ASC,
L
Liu Jicong 已提交
957 958
////                 pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey,
/// pQueryAttr->window.skey);
H
Haojun Liao 已提交
959
//
wafwerar's avatar
wafwerar 已提交
960
//          TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
H
Haojun Liao 已提交
961 962 963 964 965 966 967 968
//          doUpdateLastKey(pQueryAttr);
//        }
//
//        pQueryAttr->order.order = TSDB_ORDER_ASC;
//        pQueryAttr->needReverseScan = false;
//      } else if (onlyLastQuery(pQueryAttr)) {
//        if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//          //qDebug(msg, pQInfo->qId, "only-last stable", pQueryAttr->order.order, TSDB_ORDER_DESC,
L
Liu Jicong 已提交
969 970
////                 pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey,
/// pQueryAttr->window.skey);
H
Haojun Liao 已提交
971
//
wafwerar's avatar
wafwerar 已提交
972
//          TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
H
Haojun Liao 已提交
973 974 975 976 977 978 979 980 981
//          doUpdateLastKey(pQueryAttr);
//        }
//
//        pQueryAttr->order.order = TSDB_ORDER_DESC;
//        pQueryAttr->needReverseScan = false;
//      }
//    }
//  }
//}
982

L
Liu Jicong 已提交
983 984 985
// static FORCE_INLINE bool doFilterByBlockStatistics(STaskRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis,
// SqlFunctionCtx *pCtx, int32_t numOfRows) {
//   STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
986
//
L
Liu Jicong 已提交
987 988 989
//   if (pDataStatis == NULL || pQueryAttr->pFilters == NULL) {
//     return true;
//   }
990
//
L
Liu Jicong 已提交
991 992
//   return filterRangeExecute(pQueryAttr->pFilters, pDataStatis, pQueryAttr->numOfCols, numOfRows);
// }
993
#if 0
H
Haojun Liao 已提交
994
static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockInfo) {
995 996
  STimeWindow w = {0};

dengyihao's avatar
dengyihao 已提交
997 998
  TSKEY sk = TMIN(pQueryAttr->window.skey, pQueryAttr->window.ekey);
  TSKEY ek = TMAX(pQueryAttr->window.skey, pQueryAttr->window.ekey);
999

1000
  if (true) {
L
Liu Jicong 已提交
1001
    //    getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.skey, sk, ek, &w);
1002 1003 1004 1005 1006 1007
    assert(w.ekey >= pBlockInfo->window.skey);

    if (w.ekey < pBlockInfo->window.ekey) {
      return true;
    }

L
Liu Jicong 已提交
1008 1009
    while (1) {
      //      getNextTimeWindow(pQueryAttr, &w);
1010 1011 1012 1013 1014 1015 1016 1017 1018 1019
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

      assert(w.ekey > pBlockInfo->window.ekey);
      if (w.skey <= pBlockInfo->window.ekey && w.skey > pBlockInfo->window.skey) {
        return true;
      }
    }
  } else {
L
Liu Jicong 已提交
1020
    //    getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.ekey, sk, ek, &w);
1021 1022 1023 1024 1025 1026
    assert(w.skey <= pBlockInfo->window.ekey);

    if (w.skey > pBlockInfo->window.skey) {
      return true;
    }

L
Liu Jicong 已提交
1027 1028
    while (1) {
      //      getNextTimeWindow(pQueryAttr, &w);
1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

      assert(w.skey < pBlockInfo->window.skey);
      if (w.ekey < pBlockInfo->window.ekey && w.ekey >= pBlockInfo->window.skey) {
        return true;
      }
    }
  }

  return false;
}
1042
#endif
1043 1044

static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) {
H
Haojun Liao 已提交
1045
#if 0
H
Haojun Liao 已提交
1046
  SqlFunctionCtx* pCtx = pTableScanInfo->pCtx;
1047
  uint32_t        status = BLK_DATA_NOT_LOAD;
1048

L
Liu Jicong 已提交
1049
  int32_t numOfOutput = 0;  // pTableScanInfo->numOfOutput;
1050 1051
  for (int32_t i = 0; i < numOfOutput; ++i) {
    int32_t functionId = pCtx[i].functionId;
H
Haojun Liao 已提交
1052
    int32_t colId = pTableScanInfo->pExpr[i].base.pParam[0].pCol->colId;
1053 1054 1055

    // group by + first/last should not apply the first/last block filter
    if (functionId < 0) {
1056
      status |= BLK_DATA_DATA_LOAD;
1057 1058
      return status;
    } else {
L
Liu Jicong 已提交
1059
      //      status |= aAggs[functionId].dataReqFunc(&pTableScanInfo->pCtx[i], &pBlock->info.window, colId);
1060
      //      if ((status & BLK_DATA_DATA_LOAD) == BLK_DATA_DATA_LOAD) {
L
Liu Jicong 已提交
1061 1062
      //        return status;
      //      }
1063 1064 1065 1066
    }
  }

  return status;
H
Haojun Liao 已提交
1067 1068
#endif
  return 0;
1069 1070
}

L
Liu Jicong 已提交
1071 1072
int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
                              uint32_t* status) {
1073
  *status = BLK_DATA_NOT_LOAD;
1074

H
Haojun Liao 已提交
1075
  pBlock->pDataBlock = NULL;
L
Liu Jicong 已提交
1076
  pBlock->pBlockAgg = NULL;
H
Haojun Liao 已提交
1077

L
Liu Jicong 已提交
1078 1079
  //  int64_t groupId = pRuntimeEnv->current->groupIndex;
  //  bool    ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
1080

H
Haojun Liao 已提交
1081
  STaskCostInfo* pCost = &pTaskInfo->cost;
1082

1083 1084
//  pCost->totalBlocks += 1;
//  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
1085
#if 0
1086 1087 1088
  // Calculate all time windows that are overlapping or contain current data block.
  // If current data block is contained by all possible time window, do not load current data block.
  if (/*pQueryAttr->pFilters || */pQueryAttr->groupbyColumn || pQueryAttr->sw.gap > 0 ||
H
Haojun Liao 已提交
1089
      (QUERY_IS_INTERVAL_QUERY(pQueryAttr) && overlapWithTimeWindow(pTaskInfo, &pBlock->info))) {
1090
    (*status) = BLK_DATA_DATA_LOAD;
1091 1092 1093
  }

  // check if this data block is required to load
1094
  if ((*status) != BLK_DATA_DATA_LOAD) {
1095 1096 1097 1098 1099 1100 1101
    bool needFilter = true;

    // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
    // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
    if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
      SResultRow* pResult = NULL;

H
Haojun Liao 已提交
1102
      bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
1103 1104 1105 1106 1107 1108
      TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

      STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
      if (pQueryAttr->pointInterpQuery) {
        needFilter = chkWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
1109
                                    pTableScanInfo->rowEntryInfoOffset);
1110 1111 1112
      } else {
        if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId,
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
1113
                                    pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
1114 1115 1116 1117 1118
          longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }
    } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate
      doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx,
1119
                               pTableScanInfo->rowEntryInfoOffset, pTableScanInfo->numOfOutput,
1120 1121 1122 1123 1124 1125
                               pRuntimeEnv->current->groupIndex);
    }

    if (needFilter) {
      (*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock);
    } else {
1126
      (*status) = BLK_DATA_DATA_LOAD;
1127 1128 1129 1130
    }
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
H
Haojun Liao 已提交
1131
//  *status = updateBlockLoadStatus(pRuntimeEnv->pQueryAttr, *status);
1132

1133
  if ((*status) == BLK_DATA_NOT_LOAD || (*status) == BLK_DATA_FILTEROUT) {
1134 1135
    //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//           pBlockInfo->window.ekey, pBlockInfo->rows);
1136
    pCost->skipBlocks += 1;
1137
  } else if ((*status) == BLK_DATA_SMA_LOAD) {
1138 1139
    // this function never returns error?
    pCost->loadBlockStatis += 1;
1140
//    tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
1141 1142

    if (pBlock->pBlockAgg == NULL) {  // data block statistics does not exist, load data block
1143
//      pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
1144 1145 1146
      pCost->totalCheckedRows += pBlock->info.rows;
    }
  } else {
1147
    assert((*status) == BLK_DATA_DATA_LOAD);
1148 1149 1150

    // load the data block statistics to perform further filter
    pCost->loadBlockStatis += 1;
1151
//    tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
1152 1153 1154 1155 1156 1157

    if (pQueryAttr->topBotQuery && pBlock->pBlockAgg != NULL) {
      { // set previous window
        if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
          SResultRow* pResult = NULL;

H
Haojun Liao 已提交
1158
          bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
1159 1160 1161 1162 1163
          TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

          STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
          if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId,
                                      pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
1164
                                      pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175
            longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
          }
        }
      }
      bool load = false;
      for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
        int32_t functionId = pTableScanInfo->pCtx[i].functionId;
        if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM) {
//          load = topbot_datablock_filter(&pTableScanInfo->pCtx[i], (char*)&(pBlock->pBlockAgg[i].min),
//                                         (char*)&(pBlock->pBlockAgg[i].max));
          if (!load) { // current block has been discard due to filter applied
1176
            pCost->skipBlocks += 1;
1177 1178
            //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId,
//                   pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
1179
            (*status) = BLK_DATA_FILTEROUT;
1180 1181 1182 1183 1184 1185 1186 1187
            return TSDB_CODE_SUCCESS;
          }
        }
      }
    }

    // current block has been discard due to filter applied
//    if (!doFilterByBlockStatistics(pRuntimeEnv, pBlock->pBlockAgg, pTableScanInfo->pCtx, pBlockInfo->rows)) {
1188
//      pCost->skipBlocks += 1;
1189 1190
//      qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//             pBlockInfo->window.ekey, pBlockInfo->rows);
1191
//      (*status) = BLK_DATA_FILTEROUT;
1192 1193 1194 1195 1196
//      return TSDB_CODE_SUCCESS;
//    }

    pCost->totalCheckedRows += pBlockInfo->rows;
    pCost->loadBlocks += 1;
1197
//    pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
1198 1199 1200 1201 1202
//    if (pBlock->pDataBlock == NULL) {
//      return terrno;
//    }

//    if (pQueryAttr->pFilters != NULL) {
1203
//      filterSetColFieldData(pQueryAttr->pFilters, taosArrayGetSize(pBlock->pDataBlock), pBlock->pDataBlock);
1204
//    }
1205

1206 1207 1208 1209
//    if (pQueryAttr->pFilters != NULL || pRuntimeEnv->pTsBuf != NULL) {
//      filterColRowsInDataBlock(pRuntimeEnv, pBlock, ascQuery);
//    }
  }
H
Haojun Liao 已提交
1210
#endif
1211 1212 1213
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1214
static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo) {
1215 1216 1217 1218
  if (pTableQueryInfo == NULL) {
    return;
  }

wafwerar's avatar
wafwerar 已提交
1219
  //  TSWAP(pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey);
L
Liu Jicong 已提交
1220
  //  pTableQueryInfo->lastKey = pTableQueryInfo->win.skey;
1221

L
Liu Jicong 已提交
1222 1223
  //  SWITCH_ORDER(pTableQueryInfo->cur.order);
  //  pTableQueryInfo->cur.vgroupIndex = -1;
1224 1225

  // set the index to be the end slot of result rows array
dengyihao's avatar
dengyihao 已提交
1226 1227 1228 1229 1230 1231
  //  SResultRowInfo* pResultRowInfo = &pTableQueryInfo->resInfo;
  //  if (pResultRowInfo->size > 0) {
  //    pResultRowInfo->curPos = pResultRowInfo->size - 1;
  //  } else {
  //    pResultRowInfo->curPos = -1;
  //  }
1232 1233
}

H
Haojun Liao 已提交
1234
void initResultRow(SResultRow* pResultRow) {
X
Xiaoyu Wang 已提交
1235
  //  pResultRow->pEntryInfo = (struct SResultRowEntryInfo*)((char*)pResultRow + sizeof(SResultRow));
1236 1237 1238 1239 1240
}

/*
 * The start of each column SResultRowEntryInfo is denote by RowCellInfoOffset.
 * Note that in case of top/bottom query, the whole multiple rows of result is treated as only one row of results.
H
Haojun Liao 已提交
1241 1242 1243
 * +------------+-----------------result column 1------------+------------------result column 2-----------+
 * | SResultRow | SResultRowEntryInfo | intermediate buffer1 | SResultRowEntryInfo | intermediate buffer 2|
 * +------------+--------------------------------------------+--------------------------------------------+
1244 1245
 *           offset[0]                                  offset[1]                                   offset[2]
 */
1246
// TODO refactor: some function move away
L
Liu Jicong 已提交
1247 1248 1249
void setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage,
                             int32_t numOfExprs) {
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
1250 1251
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
H
Haojun Liao 已提交
1252

H
Haojun Liao 已提交
1253
  SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
1254
  initResultRowInfo(pResultRowInfo);
H
Haojun Liao 已提交
1255

L
Liu Jicong 已提交
1256 1257
  int64_t     tid = 0;
  int64_t     groupId = 0;
1258 1259
  SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, (char*)&tid, sizeof(tid), true, groupId,
                                            pTaskInfo, false, pSup);
H
Haojun Liao 已提交
1260

1261
  for (int32_t i = 0; i < numOfExprs; ++i) {
1262
    struct SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, rowEntryInfoOffset);
H
Haojun Liao 已提交
1263 1264
    cleanupResultRowEntry(pEntry);

L
Liu Jicong 已提交
1265
    pCtx[i].resultInfo = pEntry;
1266
    pCtx[i].scanFlag = stage;
H
Haojun Liao 已提交
1267 1268
  }

1269
  initCtxOutputBuffer(pCtx, numOfExprs);
H
Haojun Liao 已提交
1270 1271
}

H
Haojun Liao 已提交
1272
void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) {
1273 1274
  for (int32_t j = 0; j < size; ++j) {
    struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
dengyihao's avatar
dengyihao 已提交
1275 1276
    if (isRowEntryInitialized(pResInfo) || fmIsPseudoColumnFunc(pCtx[j].functionId) || pCtx[j].functionId == -1 ||
        fmIsScalarFunc(pCtx[j].functionId)) {
1277 1278 1279
      continue;
    }

H
Haojun Liao 已提交
1280
    pCtx[j].fpSet.init(&pCtx[j], pCtx[j].resultInfo);
1281 1282 1283
  }
}

L
Liu Jicong 已提交
1284
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
1285
  if (status == TASK_NOT_COMPLETED) {
H
Haojun Liao 已提交
1286
    pTaskInfo->status = status;
1287 1288
  } else {
    // QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first
1289
    CLEAR_QUERY_STATUS(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
1290
    pTaskInfo->status |= status;
1291 1292 1293
  }
}

L
Liu Jicong 已提交
1294
void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) {
1295 1296 1297 1298
  if (pTableQueryInfo == NULL) {
    return;
  }

L
Liu Jicong 已提交
1299
  //  taosVariantDestroy(&pTableQueryInfo->tag);
dengyihao's avatar
dengyihao 已提交
1300
  //  cleanupResultRowInfo(&pTableQueryInfo->resInfo);
1301 1302
}

1303
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
1304
  for (int32_t i = 0; i < numOfOutput; ++i) {
1305
    pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
1306 1307 1308 1309 1310

    struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
    if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
      continue;
    }
1311 1312 1313 1314 1315

    if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
      continue;
    }

1316 1317 1318 1319 1320 1321
    if (!pResInfo->initialized) {
      if (pCtx[i].functionId != -1) {
        pCtx[i].fpSet.init(&pCtx[i], pResInfo);
      } else {
        pResInfo->initialized = true;
      }
1322 1323 1324 1325
    }
  }
}

H
Haojun Liao 已提交
1326
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep);
1327

1328
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
1329 1330 1331 1332 1333
  if (pFilterNode == NULL) {
    return;
  }

  SFilterInfo* filter = NULL;
H
Haojun Liao 已提交
1334

H
Haojun Liao 已提交
1335
  // todo move to the initialization function
H
Haojun Liao 已提交
1336
  int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
1337

1338
  size_t             numOfCols = taosArrayGetSize(pBlock->pDataBlock);
1339
  SFilterColumnParam param1 = {.numOfCols = numOfCols, .pDataBlock = pBlock->pDataBlock};
1340 1341 1342
  code = filterSetDataFromSlotId(filter, &param1);

  int8_t* rowRes = NULL;
1343

1344
  // todo the keep seems never to be True??
1345
  bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols);
D
dapan1121 已提交
1346
  filterFreeInfo(filter);
1347

H
Haojun Liao 已提交
1348
  extractQualifiedTupleByFilterResult(pBlock, rowRes, keep);
1349
  blockDataUpdateTsWindow(pBlock, 0);
H
Haojun Liao 已提交
1350 1351

  taosMemoryFree(rowRes);
1352 1353
}

H
Haojun Liao 已提交
1354
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep) {
1355 1356 1357 1358 1359
  if (keep) {
    return;
  }

  if (rowRes != NULL) {
L
Liu Jicong 已提交
1360
    int32_t      totalRows = pBlock->info.rows;
1361
    SSDataBlock* px = createOneDataBlock(pBlock, true);
1362

1363 1364
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
    for (int32_t i = 0; i < numOfCols; ++i) {
1365 1366
      SColumnInfoData* pSrc = taosArrayGet(px->pDataBlock, i);
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
1367
      // it is a reserved column for scalar function, and no data in this column yet.
1368
      if (pDst->pData == NULL || pSrc->pData == NULL) {
1369 1370 1371
        continue;
      }

1372 1373
      colInfoDataCleanup(pDst, pBlock->info.rows);

1374
      int32_t numOfRows = 0;
1375
      for (int32_t j = 0; j < totalRows; ++j) {
D
dapan1121 已提交
1376 1377 1378
        if (rowRes[j] == 0) {
          continue;
        }
1379

D
dapan1121 已提交
1380
        if (colDataIsNull_s(pSrc, j)) {
1381
          colDataAppendNULL(pDst, numOfRows);
D
dapan1121 已提交
1382
        } else {
1383
          colDataAppend(pDst, numOfRows, colDataGetData(pSrc, j), false);
D
dapan1121 已提交
1384
        }
1385
        numOfRows += 1;
H
Haojun Liao 已提交
1386
      }
1387

1388 1389 1390 1391 1392
      if (pBlock->info.rows == totalRows) {
        pBlock->info.rows = numOfRows;
      } else {
        ASSERT(pBlock->info.rows == numOfRows);
      }
1393
    }
1394

dengyihao's avatar
dengyihao 已提交
1395
    blockDataDestroy(px);  // fix memory leak
1396 1397 1398
  } else {
    // do nothing
    pBlock->info.rows = 0;
1399 1400 1401
  }
}

L
Liu Jicong 已提交
1402 1403
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput,
                              uint64_t groupId) {
1404
  // for simple group by query without interval, all the tables belong to one group result.
L
Liu Jicong 已提交
1405
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
1406
  SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
1407 1408
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
1409

1410
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId,
L
Liu Jicong 已提交
1411
                                                  sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
L
Liu Jicong 已提交
1412
  assert(pResultRow != NULL);
1413 1414 1415 1416 1417 1418

  /*
   * not assign result buffer yet, add new result buffer
   * all group belong to one result set, and each group result has different group id so set the id to be one
   */
  if (pResultRow->pageId == -1) {
dengyihao's avatar
dengyihao 已提交
1419 1420
    int32_t ret =
        addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, groupId, pAggInfo->binfo.pRes->info.rowSize);
1421 1422 1423 1424 1425
    if (ret != TSDB_CODE_SUCCESS) {
      return;
    }
  }

1426
  setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
1427 1428
}

1429
void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId, SAggOperatorInfo* pAggInfo) {
H
Haojun Liao 已提交
1430
  if (pAggInfo->groupId != INT32_MIN && pAggInfo->groupId == groupId) {
1431 1432
    return;
  }
1433
#ifdef BUF_PAGE_DEBUG
L
Liu Jicong 已提交
1434
  qDebug("page_setbuf, groupId:%" PRIu64, groupId);
1435
#endif
1436
  doSetTableGroupOutputBuf(pOperator, pAggInfo, numOfOutput, groupId);
1437 1438

  // record the current active group id
H
Haojun Liao 已提交
1439
  pAggInfo->groupId = groupId;
1440 1441
}

1442 1443
static void doUpdateNumOfRows(SResultRow* pRow, int32_t numOfExprs, const int32_t* rowCellOffset) {
  for (int32_t j = 0; j < numOfExprs; ++j) {
1444
    struct SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowCellOffset);
1445 1446 1447 1448 1449 1450 1451 1452 1453 1454
    if (!isRowEntryInitialized(pResInfo)) {
      continue;
    }

    if (pRow->numOfRows < pResInfo->numOfRes) {
      pRow->numOfRows = pResInfo->numOfRes;
    }
  }
}

1455
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
S
shenglian zhou 已提交
1456 1457 1458
                                             SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs,
                                             const int32_t* rowCellOffset, SSDataBlock* pBlock,
                                             SExecTaskInfo* pTaskInfo) {
1459 1460 1461 1462 1463 1464 1465 1466 1467
  SFilePage*  page = getBufPage(pBuf, resultRowPosition->pageId);
  SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);

  doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset);
  if (pRow->numOfRows == 0) {
    releaseBufPage(pBuf, page);
    return 0;
  }

1468 1469 1470 1471 1472 1473 1474
  while (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
    int32_t code = blockDataEnsureCapacity(pBlock, pBlock->info.capacity * 1.25);
    if (TAOS_FAILED(code)) {
      releaseBufPage(pBuf, page);
      qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
      longjmp(pTaskInfo->env, code);
    }
1475 1476 1477 1478 1479
  }

  for (int32_t j = 0; j < numOfExprs; ++j) {
    int32_t slotId = pExprInfo[j].base.resSchema.slotId;

1480
    pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowCellOffset);
1481 1482 1483 1484 1485 1486 1487 1488 1489
    if (pCtx[j].fpSet.finalize) {
      int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
      if (TAOS_FAILED(code)) {
        qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
        longjmp(pTaskInfo->env, code);
      }
    } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
      // do nothing, todo refactor
    } else {
1490 1491
      // expand the result into multiple rows. E.g., _wstart, top(k, 20)
      // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
1492 1493 1494 1495 1496 1497 1498 1499 1500
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
      char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
      for (int32_t k = 0; k < pRow->numOfRows; ++k) {
        colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
      }
    }
  }

  releaseBufPage(pBuf, page);
1501
  pBlock->info.rows += pRow->numOfRows;
1502 1503 1504 1505

  return 0;
}

X
Xiaoyu Wang 已提交
1506 1507 1508
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf,
                           SGroupResInfo* pGroupResInfo, const int32_t* rowCellOffset, SqlFunctionCtx* pCtx,
                           int32_t numOfExprs) {
1509
  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
1510
  int32_t start = pGroupResInfo->index;
1511

1512
  for (int32_t i = start; i < numOfRows; i += 1) {
L
Liu Jicong 已提交
1513 1514
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
    SFilePage*  page = getBufPage(pBuf, pPos->pos.pageId);
1515

1516
    SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
1517 1518

    doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset);
1519 1520
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
1521
      releaseBufPage(pBuf, page);
1522 1523 1524
      continue;
    }

1525 1526 1527 1528 1529
    if (pBlock->info.groupId == 0) {
      pBlock->info.groupId = pPos->groupId;
    } else {
      // current value belongs to different group, it can't be packed into one datablock
      if (pBlock->info.groupId != pPos->groupId) {
1530
        releaseBufPage(pBuf, page);
1531 1532 1533 1534
        break;
      }
    }

1535
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
1536
      releaseBufPage(pBuf, page);
1537 1538 1539 1540 1541
      break;
    }

    pGroupResInfo->index += 1;

1542
    for (int32_t j = 0; j < numOfExprs; ++j) {
1543 1544
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

1545
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowCellOffset);
1546
      if (pCtx[j].fpSet.finalize) {
1547
#ifdef BUF_PAGE_DEBUG
wmmhello's avatar
wmmhello 已提交
1548
        qDebug("\npage_finalize %d", numOfExprs);
1549
#endif
1550
        int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
1551
        if (TAOS_FAILED(code)) {
1552 1553
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
          longjmp(pTaskInfo->env, code);
1554
        }
1555 1556
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
1557
      } else {
1558 1559
        // expand the result into multiple rows. E.g., _wstart, top(k, 20)
        // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
X
Xiaoyu Wang 已提交
1560 1561
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
1562
        if (pCtx[j].increase) {
L
Liu Jicong 已提交
1563
          int64_t ts = *(int64_t*)in;
1564
          for (int32_t k = 0; k < pRow->numOfRows; ++k) {
L
Liu Jicong 已提交
1565
            colDataAppend(pColInfoData, pBlock->info.rows + k, (const char*)&ts, pCtx[j].resultInfo->isNullRes);
1566 1567 1568 1569 1570 1571
            ts++;
          }
        } else {
          for (int32_t k = 0; k < pRow->numOfRows; ++k) {
            colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
          }
X
Xiaoyu Wang 已提交
1572
        }
1573
      }
1574 1575
    }

1576
    releaseBufPage(pBuf, page);
1577
    pBlock->info.rows += pRow->numOfRows;
L
Liu Jicong 已提交
1578 1579 1580
    //    if (pBlock->info.rows >= pBlock->info.capacity) {  // output buffer is full
    //      break;
    //    }
1581 1582
  }

X
Xiaoyu Wang 已提交
1583 1584
  qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
         pBlock->info.groupId);
1585
  blockDataUpdateTsWindow(pBlock, 0);
1586 1587 1588
  return 0;
}

X
Xiaoyu Wang 已提交
1589 1590
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                            SDiskbasedBuf* pBuf) {
1591 1592
  SExprInfo*     pExprInfo = pOperator->exprSupp.pExprInfo;
  int32_t        numOfExprs = pOperator->exprSupp.numOfExprs;
1593 1594
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

1595
  int32_t*        rowCellOffset = pOperator->exprSupp.rowEntryInfoOffset;
X
Xiaoyu Wang 已提交
1596
  SSDataBlock*    pBlock = pbInfo->pRes;
1597
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
1598

1599
  blockDataCleanup(pBlock);
1600
  if (!hasDataInGroupInfo(pGroupResInfo)) {
1601 1602 1603
    return;
  }

1604 1605
  // clear the existed group id
  pBlock->info.groupId = 0;
1606
  doCopyToSDataBlock(pTaskInfo, pBlock, pExprInfo, pBuf, pGroupResInfo, rowCellOffset, pCtx, numOfExprs);
1607 1608
}

L
Liu Jicong 已提交
1609
static void updateNumOfRowsInResultRows(SqlFunctionCtx* pCtx, int32_t numOfOutput, SResultRowInfo* pResultRowInfo,
1610
                                        int32_t* rowEntryInfoOffset) {
1611
  // update the number of result for each, only update the number of rows for the corresponding window result.
L
Liu Jicong 已提交
1612 1613 1614
  //  if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
  //    return;
  //  }
H
Haojun Liao 已提交
1615
#if 0
1616
  for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
L
Liu Jicong 已提交
1617
    SResultRow* pResult = pResultRowInfo->pResult[i];
1618 1619 1620 1621 1622 1623 1624

    for (int32_t j = 0; j < numOfOutput; ++j) {
      int32_t functionId = pCtx[j].functionId;
      if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG || functionId == FUNCTION_TAGPRJ) {
        continue;
      }

1625
      SResultRowEntryInfo* pCell = getResultEntryInfo(pResult, j, rowEntryInfoOffset);
1626
      pResult->numOfRows = (uint16_t)(TMAX(pResult->numOfRows, pCell->numOfRes));
1627 1628
    }
  }
H
Haojun Liao 已提交
1629
#endif
1630 1631
}

L
Liu Jicong 已提交
1632
static int32_t compressQueryColData(SColumnInfoData* pColRes, int32_t numOfRows, char* data, int8_t compressed) {
1633 1634
  int32_t colSize = pColRes->info.bytes * numOfRows;
  return (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colSize, numOfRows, data,
L
Liu Jicong 已提交
1635
                                                      colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
1636 1637
}

1638 1639 1640
int32_t doFillTimeIntervalGapsInResults(struct SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t capacity) {
  int32_t numOfRows = (int32_t)taosFillResultDataBlock(pFillInfo, pBlock, capacity - pBlock->info.rows);
  return pBlock->info.rows;
1641 1642
}

L
Liu Jicong 已提交
1643 1644
void queryCostStatis(SExecTaskInfo* pTaskInfo) {
  STaskCostInfo* pSummary = &pTaskInfo->cost;
1645

L
Liu Jicong 已提交
1646 1647 1648
  //  uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pResultRowHashTable);
  //  hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map);
  //  pSummary->hashSize = hashSize;
1649 1650 1651 1652

  // add the merge time
  pSummary->elapsedTime += pSummary->firstStageMergeTime;

L
Liu Jicong 已提交
1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663
  //  SResultRowPool* p = pTaskInfo->pool;
  //  if (p != NULL) {
  //    pSummary->winInfoSize = getResultRowPoolMemSize(p);
  //    pSummary->numOfTimeWindows = getNumOfAllocatedResultRows(p);
  //  } else {
  //    pSummary->winInfoSize = 0;
  //    pSummary->numOfTimeWindows = 0;
  //  }
  //
  //  calculateOperatorProfResults(pQInfo);

1664 1665
  SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
  if (pSummary->pRecoder != NULL) {
X
Xiaoyu Wang 已提交
1666 1667
    qDebug("%s :cost summary: elapsed time:%" PRId64 " us, first merge:%" PRId64
           " us, total blocks:%d, "
1668 1669 1670 1671
           "load block statis:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
           GET_TASKID(pTaskInfo), pSummary->elapsedTime, pSummary->firstStageMergeTime, pRecorder->totalBlocks,
           pRecorder->loadBlockStatis, pRecorder->loadBlocks, pRecorder->totalRows, pRecorder->totalCheckedRows);
  }
L
Liu Jicong 已提交
1672 1673 1674
  // qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb,
  // hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
  //      pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0);
1675 1676
}

L
Liu Jicong 已提交
1677 1678 1679
// static void updateOffsetVal(STaskRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
1680
//
L
Liu Jicong 已提交
1681
//   int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
1682
//
L
Liu Jicong 已提交
1683 1684 1685 1686
//   if (pQueryAttr->limit.offset == pBlockInfo->rows) {  // current block will ignore completed
//     pTableQueryInfo->lastKey = QUERY_IS_ASC_QUERY(pQueryAttr) ? pBlockInfo->window.ekey + step :
//     pBlockInfo->window.skey + step; pQueryAttr->limit.offset = 0; return;
//   }
1687
//
L
Liu Jicong 已提交
1688 1689 1690 1691 1692
//   if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//     pQueryAttr->pos = (int32_t)pQueryAttr->limit.offset;
//   } else {
//     pQueryAttr->pos = pBlockInfo->rows - (int32_t)pQueryAttr->limit.offset - 1;
//   }
1693
//
L
Liu Jicong 已提交
1694
//   assert(pQueryAttr->pos >= 0 && pQueryAttr->pos <= pBlockInfo->rows - 1);
1695
//
L
Liu Jicong 已提交
1696 1697
//   SArray *         pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//   SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1698
//
L
Liu Jicong 已提交
1699 1700
//   // update the pQueryAttr->limit.offset value, and pQueryAttr->pos value
//   TSKEY *keys = (TSKEY *) pColInfoData->pData;
1701
//
L
Liu Jicong 已提交
1702 1703 1704
//   // update the offset value
//   pTableQueryInfo->lastKey = keys[pQueryAttr->pos];
//   pQueryAttr->limit.offset = 0;
1705
//
L
Liu Jicong 已提交
1706
//   int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
1707
//
L
Liu Jicong 已提交
1708 1709 1710 1711
//   //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numBlocksOfStep:%d, numOfRes:%d,
//   lastKey:%"PRId64, GET_TASKID(pRuntimeEnv),
//          pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey);
// }
1712

L
Liu Jicong 已提交
1713 1714
// void skipBlocks(STaskRuntimeEnv *pRuntimeEnv) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
1715
//
L
Liu Jicong 已提交
1716 1717 1718
//   if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0) {
//     return;
//   }
1719
//
L
Liu Jicong 已提交
1720 1721
//   pQueryAttr->pos = 0;
//   int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
1722
//
L
Liu Jicong 已提交
1723 1724
//   STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
//   TsdbQueryHandleT pTsdbReadHandle = pRuntimeEnv->pTsdbReadHandle;
1725
//
L
Liu Jicong 已提交
1726 1727 1728 1729 1730
//   SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
//   while (tsdbNextDataBlock(pTsdbReadHandle)) {
//     if (isTaskKilled(pRuntimeEnv->qinfo)) {
//       longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
//     }
1731
//
L
Liu Jicong 已提交
1732
//     tsdbRetrieveDataBlockInfo(pTsdbReadHandle, &blockInfo);
1733
//
L
Liu Jicong 已提交
1734 1735 1736 1737
//     if (pQueryAttr->limit.offset > blockInfo.rows) {
//       pQueryAttr->limit.offset -= blockInfo.rows;
//       pTableQueryInfo->lastKey = (QUERY_IS_ASC_QUERY(pQueryAttr)) ? blockInfo.window.ekey : blockInfo.window.skey;
//       pTableQueryInfo->lastKey += step;
1738
//
L
Liu Jicong 已提交
1739 1740 1741 1742 1743 1744 1745
//       //qDebug("QInfo:0x%"PRIx64" skip rows:%d, offset:%" PRId64, GET_TASKID(pRuntimeEnv), blockInfo.rows,
//              pQuery->limit.offset);
//     } else {  // find the appropriated start position in current block
//       updateOffsetVal(pRuntimeEnv, &blockInfo);
//       break;
//     }
//   }
1746
//
L
Liu Jicong 已提交
1747 1748 1749 1750 1751 1752 1753 1754 1755
//   if (terrno != TSDB_CODE_SUCCESS) {
//     longjmp(pRuntimeEnv->env, terrno);
//   }
// }

// static TSKEY doSkipIntervalProcess(STaskRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo,
// STableQueryInfo* pTableQueryInfo) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
1756
//
L
Liu Jicong 已提交
1757 1758 1759
//   assert(pQueryAttr->limit.offset == 0);
//   STimeWindow tw = *win;
//   getNextTimeWindow(pQueryAttr, &tw);
1760
//
L
Liu Jicong 已提交
1761 1762
//   if ((tw.skey <= pBlockInfo->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) ||
//       (tw.ekey >= pBlockInfo->window.skey && !QUERY_IS_ASC_QUERY(pQueryAttr))) {
1763
//
L
Liu Jicong 已提交
1764 1765 1766 1767
//     // load the data block and check data remaining in current data block
//     // TODO optimize performance
//     SArray *         pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//     SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1768
//
L
Liu Jicong 已提交
1769 1770 1771 1772
//     tw = *win;
//     int32_t startPos =
//         getNextQualifiedWindow(pQueryAttr, &tw, pBlockInfo, pColInfoData->pData, binarySearchForKey, -1);
//     assert(startPos >= 0);
1773
//
L
Liu Jicong 已提交
1774 1775
//     // set the abort info
//     pQueryAttr->pos = startPos;
1776
//
L
Liu Jicong 已提交
1777 1778 1779 1780
//     // reset the query start timestamp
//     pTableQueryInfo->win.skey = ((TSKEY *)pColInfoData->pData)[startPos];
//     pQueryAttr->window.skey = pTableQueryInfo->win.skey;
//     TSKEY key = pTableQueryInfo->win.skey;
1781
//
L
Liu Jicong 已提交
1782 1783
//     pWindowResInfo->prevSKey = tw.skey;
//     int32_t index = pRuntimeEnv->resultRowInfo.curIndex;
1784
//
L
Liu Jicong 已提交
1785 1786
//     int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
//     pRuntimeEnv->resultRowInfo.curIndex = index;  // restore the window index
1787
//
L
Liu Jicong 已提交
1788 1789 1790 1791
//     //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d,
//     lastKey:%" PRId64,
//            GET_TASKID(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes,
//            pQueryAttr->current->lastKey);
1792
//
L
Liu Jicong 已提交
1793 1794 1795 1796 1797
//     return key;
//   } else {  // do nothing
//     pQueryAttr->window.skey      = tw.skey;
//     pWindowResInfo->prevSKey = tw.skey;
//     pTableQueryInfo->lastKey = tw.skey;
1798
//
L
Liu Jicong 已提交
1799 1800
//     return tw.skey;
//   }
1801
//
L
Liu Jicong 已提交
1802 1803 1804 1805 1806 1807 1808 1809 1810 1811
//   return true;
// }

// static bool skipTimeInterval(STaskRuntimeEnv *pRuntimeEnv, TSKEY* start) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//     assert(*start <= pRuntimeEnv->current->lastKey);
//   } else {
//     assert(*start >= pRuntimeEnv->current->lastKey);
//   }
1812
//
L
Liu Jicong 已提交
1813 1814 1815 1816 1817
//   // if queried with value filter, do NOT forward query start position
//   if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL ||
//   pRuntimeEnv->pFillInfo != NULL) {
//     return true;
//   }
1818
//
L
Liu Jicong 已提交
1819 1820 1821 1822 1823 1824 1825
//   /*
//    * 1. for interval without interpolation query we forward pQueryAttr->interval.interval at a time for
//    *    pQueryAttr->limit.offset times. Since hole exists, pQueryAttr->interval.interval*pQueryAttr->limit.offset
//    value is
//    *    not valid. otherwise, we only forward pQueryAttr->limit.offset number of points
//    */
//   assert(pRuntimeEnv->resultRowInfo.prevSKey == TSKEY_INITIAL_VAL);
1826
//
L
Liu Jicong 已提交
1827 1828
//   STimeWindow w = TSWINDOW_INITIALIZER;
//   bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
1829
//
L
Liu Jicong 已提交
1830 1831
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
//   STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
1832
//
L
Liu Jicong 已提交
1833 1834 1835
//   SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
//   while (tsdbNextDataBlock(pRuntimeEnv->pTsdbReadHandle)) {
//     tsdbRetrieveDataBlockInfo(pRuntimeEnv->pTsdbReadHandle, &blockInfo);
1836
//
L
Liu Jicong 已提交
1837 1838 1839 1840 1841 1842 1843 1844 1845
//     if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//       if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
//         getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.skey, blockInfo.window.skey, pQueryAttr->window.ekey,
//         &w); pWindowResInfo->prevSKey = w.skey;
//       }
//     } else {
//       getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.ekey, pQueryAttr->window.ekey, blockInfo.window.ekey, &w);
//       pWindowResInfo->prevSKey = w.skey;
//     }
1846
//
L
Liu Jicong 已提交
1847 1848
//     // the first time window
//     STimeWindow win = getActiveTimeWindow(pWindowResInfo, pWindowResInfo->prevSKey, pQueryAttr);
1849
//
L
Liu Jicong 已提交
1850 1851
//     while (pQueryAttr->limit.offset > 0) {
//       STimeWindow tw = win;
1852
//
L
Liu Jicong 已提交
1853 1854 1855
//       if ((win.ekey <= blockInfo.window.ekey && ascQuery) || (win.ekey >= blockInfo.window.skey && !ascQuery)) {
//         pQueryAttr->limit.offset -= 1;
//         pWindowResInfo->prevSKey = win.skey;
1856
//
L
Liu Jicong 已提交
1857 1858 1859 1860 1861 1862
//         // current time window is aligned with blockInfo.window.ekey
//         // restart it from next data block by set prevSKey to be TSKEY_INITIAL_VAL;
//         if ((win.ekey == blockInfo.window.ekey && ascQuery) || (win.ekey == blockInfo.window.skey && !ascQuery)) {
//           pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL;
//         }
//       }
1863
//
L
Liu Jicong 已提交
1864 1865 1866 1867
//       if (pQueryAttr->limit.offset == 0) {
//         *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//         return true;
//       }
1868
//
L
Liu Jicong 已提交
1869 1870
//       // current window does not ended in current data block, try next data block
//       getNextTimeWindow(pQueryAttr, &tw);
1871
//
L
Liu Jicong 已提交
1872 1873 1874 1875 1876 1877 1878 1879 1880
//       /*
//        * If the next time window still starts from current data block,
//        * load the primary timestamp column first, and then find the start position for the next queried time window.
//        * Note that only the primary timestamp column is required.
//        * TODO: Optimize for this cases. All data blocks are not needed to be loaded, only if the first actually
//        required
//        * time window resides in current data block.
//        */
//       if ((tw.skey <= blockInfo.window.ekey && ascQuery) || (tw.ekey >= blockInfo.window.skey && !ascQuery)) {
1881
//
L
Liu Jicong 已提交
1882 1883
//         SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//         SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1884
//
L
Liu Jicong 已提交
1885 1886 1887
//         if ((win.ekey > blockInfo.window.ekey && ascQuery) || (win.ekey < blockInfo.window.skey && !ascQuery)) {
//           pQueryAttr->limit.offset -= 1;
//         }
1888
//
L
Liu Jicong 已提交
1889 1890 1891 1892 1893 1894 1895 1896
//         if (pQueryAttr->limit.offset == 0) {
//           *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//           return true;
//         } else {
//           tw = win;
//           int32_t startPos =
//               getNextQualifiedWindow(pQueryAttr, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1);
//           assert(startPos >= 0);
1897
//
L
Liu Jicong 已提交
1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908
//           // set the abort info
//           pQueryAttr->pos = startPos;
//           pTableQueryInfo->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
//           pWindowResInfo->prevSKey = tw.skey;
//           win = tw;
//         }
//       } else {
//         break;  // offset is not 0, and next time window begins or ends in the next block.
//       }
//     }
//   }
1909
//
L
Liu Jicong 已提交
1910 1911 1912 1913
//   // check for error
//   if (terrno != TSDB_CODE_SUCCESS) {
//     longjmp(pRuntimeEnv->env, terrno);
//   }
1914
//
L
Liu Jicong 已提交
1915 1916
//   return true;
// }
1917

1918
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) {
H
Haojun Liao 已提交
1919
  if (p->pDownstream == NULL) {
H
Haojun Liao 已提交
1920
    assert(p->numOfDownstream == 0);
1921 1922
  }

wafwerar's avatar
wafwerar 已提交
1923
  p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES);
1924 1925 1926 1927 1928 1929 1930
  if (p->pDownstream == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  memcpy(p->pDownstream, pDownstream, num * POINTER_BYTES);
  p->numOfDownstream = num;
  return TSDB_CODE_SUCCESS;
1931 1932
}

wmmhello's avatar
wmmhello 已提交
1933
static void doDestroyTableList(STableListInfo* pTableqinfoList);
1934

1935
static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQueryInfo* pTableQueryInfo, int32_t order) {
H
Haojun Liao 已提交
1936 1937
#if 0
    if (order == TSDB_ORDER_ASC) {
1938 1939
    assert(
        (pTableQueryInfo->win.skey <= pTableQueryInfo->win.ekey) &&
H
Haojun Liao 已提交
1940 1941
        (pTableQueryInfo->lastKey >= pTaskInfo->window.skey) &&
        (pTableQueryInfo->win.skey >= pTaskInfo->window.skey && pTableQueryInfo->win.ekey <= pTaskInfo->window.ekey));
1942 1943 1944
  } else {
    assert(
        (pTableQueryInfo->win.skey >= pTableQueryInfo->win.ekey) &&
H
Haojun Liao 已提交
1945 1946
        (pTableQueryInfo->lastKey <= pTaskInfo->window.skey) &&
        (pTableQueryInfo->win.skey <= pTaskInfo->window.skey && pTableQueryInfo->win.ekey >= pTaskInfo->window.ekey));
1947
  }
H
Haojun Liao 已提交
1948
#endif
1949 1950
}

1951 1952 1953 1954
typedef struct SFetchRspHandleWrapper {
  uint32_t exchangeId;
  int32_t  sourceIndex;
} SFetchRspHandleWrapper;
1955

D
dapan1121 已提交
1956
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1957
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
1958 1959 1960 1961 1962 1963 1964

  SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
  if (pExchangeInfo == NULL) {
    qWarn("failed to acquire exchange operator, since it may have been released");
    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
1965
  int32_t          index = pWrapper->sourceIndex;
1966
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
1967

H
Haojun Liao 已提交
1968 1969
  if (code == TSDB_CODE_SUCCESS) {
    pSourceDataInfo->pRsp = pMsg->pData;
1970

H
Haojun Liao 已提交
1971 1972
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
    pRsp->numOfRows = htonl(pRsp->numOfRows);
dengyihao's avatar
dengyihao 已提交
1973
    pRsp->compLen = htonl(pRsp->compLen);
1974
    pRsp->numOfCols = htonl(pRsp->numOfCols);
dengyihao's avatar
dengyihao 已提交
1975
    pRsp->useconds = htobe64(pRsp->useconds);
1976

1977
    ASSERT(pRsp != NULL);
1978
    qDebug("%s fetch rsp received, index:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfRows);
H
Haojun Liao 已提交
1979 1980
  } else {
    pSourceDataInfo->code = code;
D
dapan1121 已提交
1981
    qDebug("%s fetch rsp received, index:%d, error:%d", pSourceDataInfo->taskId, index, tstrerror(code));
H
Haojun Liao 已提交
1982
  }
H
Haojun Liao 已提交
1983

H
Haojun Liao 已提交
1984
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
1985 1986 1987 1988 1989

  tsem_post(&pExchangeInfo->ready);
  taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);

  taosMemoryFree(pWrapper);
wmmhello's avatar
wmmhello 已提交
1990
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1991 1992 1993 1994
}

static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
  assert(pMsgBody != NULL);
wafwerar's avatar
wafwerar 已提交
1995 1996
  taosMemoryFreeClear(pMsgBody->msgInfo.pData);
  taosMemoryFreeClear(pMsgBody);
H
Haojun Liao 已提交
1997 1998
}

D
dapan1121 已提交
1999
void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
S
Shengliang Guan 已提交
2000 2001
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
  assert(pMsg->info.ahandle != NULL);
H
Haojun Liao 已提交
2002 2003 2004 2005

  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};

  if (pMsg->contLen > 0) {
wafwerar's avatar
wafwerar 已提交
2006
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
H
Haojun Liao 已提交
2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
  }

  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
  rpcFreeCont(pMsg->pCont);
  destroySendMsgInfo(pSendInfo);
2018 2019
}

L
Liu Jicong 已提交
2020
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
2021
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2022

wafwerar's avatar
wafwerar 已提交
2023
  SResFetchReq* pMsg = taosMemoryCalloc(1, sizeof(SResFetchReq));
2024 2025 2026 2027
  if (NULL == pMsg) {
    pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return pTaskInfo->code;
  }
2028

L
Liu Jicong 已提交
2029 2030
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex);
  SSourceDataInfo*       pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
2031

2032 2033
  ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY);

2034 2035 2036
  qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu,
         GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, pSource->execId,
         sourceIndex, totalSources);
2037 2038 2039 2040 2041

  pMsg->header.vgId = htonl(pSource->addr.nodeId);
  pMsg->sId = htobe64(pSource->schedId);
  pMsg->taskId = htobe64(pSource->taskId);
  pMsg->queryId = htobe64(pTaskInfo->id.queryId);
D
dapan1121 已提交
2042
  pMsg->execId = htonl(pSource->execId);
2043 2044

  // send the fetch remote task result reques
wafwerar's avatar
wafwerar 已提交
2045
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
2046
  if (NULL == pMsgSendInfo) {
wafwerar's avatar
wafwerar 已提交
2047
    taosMemoryFreeClear(pMsg);
2048 2049 2050
    qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
    pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return pTaskInfo->code;
H
Haojun Liao 已提交
2051 2052
  }

2053
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
X
Xiaoyu Wang 已提交
2054
  pWrapper->exchangeId = pExchangeInfo->self;
2055 2056 2057
  pWrapper->sourceIndex = sourceIndex;

  pMsgSendInfo->param = pWrapper;
2058 2059
  pMsgSendInfo->msgInfo.pData = pMsg;
  pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
L
Liu Jicong 已提交
2060
  pMsgSendInfo->msgType = pSource->fetchMsgType;
2061
  pMsgSendInfo->fp = loadRemoteDataCallback;
2062

2063
  int64_t transporterId = 0;
L
Liu Jicong 已提交
2064
  int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
2065 2066 2067
  return TSDB_CODE_SUCCESS;
}

2068
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
L
Liu Jicong 已提交
2069 2070
                                     int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
                                     SArray* pColList) {
H
Haojun Liao 已提交
2071
  if (pColList == NULL) {  // data from other sources
2072
    blockDataCleanup(pRes);
2073
    //    blockDataEnsureCapacity(pRes, numOfRows);
2074
    blockDecode(pRes, numOfOutput, numOfRows, pData);
H
Haojun Liao 已提交
2075
  } else {  // extract data according to pColList
2076
    ASSERT(numOfOutput == taosArrayGetSize(pColList));
2077 2078 2079 2080 2081
    char* pStart = pData;

    int32_t numOfCols = htonl(*(int32_t*)pStart);
    pStart += sizeof(int32_t);

2082
    // todo refactor:extract method
2083
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
dengyihao's avatar
dengyihao 已提交
2084
    for (int32_t i = 0; i < numOfCols; ++i) {
2085 2086 2087 2088 2089 2090 2091
      SSysTableSchema* p = (SSysTableSchema*)pStart;

      p->colId = htons(p->colId);
      p->bytes = htonl(p->bytes);
      pStart += sizeof(SSysTableSchema);
    }

2092
    SSDataBlock* pBlock = createDataBlock();
dengyihao's avatar
dengyihao 已提交
2093
    for (int32_t i = 0; i < numOfCols; ++i) {
2094 2095
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
      blockDataAppendColInfo(pBlock, &idata);
2096 2097
    }

2098
    blockDecode(pBlock, numOfCols, numOfRows, pStart);
2099 2100
    blockDataEnsureCapacity(pRes, numOfRows);

H
Haojun Liao 已提交
2101
    // data from mnode
2102
    pRes->info.rows = numOfRows;
2103 2104
    relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
    blockDataDestroy(pBlock);
2105
  }
2106

2107 2108
  // todo move this to time window aggregator, since the primary timestamp may not be known by exchange operator.
  blockDataUpdateTsWindow(pRes, 0);
2109

2110
  int64_t el = taosGetTimestampUs() - startTs;
2111

H
Haojun Liao 已提交
2112 2113
  pLoadInfo->totalRows += numOfRows;
  pLoadInfo->totalSize += compLen;
2114

H
Haojun Liao 已提交
2115 2116 2117
  if (total != NULL) {
    *total += numOfRows;
  }
2118

H
Haojun Liao 已提交
2119
  pLoadInfo->totalElapsed += el;
2120 2121
  return TSDB_CODE_SUCCESS;
}
2122

L
Liu Jicong 已提交
2123 2124
static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
2125
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
H
Haojun Liao 已提交
2126

2127
  int64_t              el = taosGetTimestampUs() - startTs;
H
Haojun Liao 已提交
2128
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2129

H
Haojun Liao 已提交
2130
  pLoadInfo->totalElapsed += el;
H
Haojun Liao 已提交
2131

2132
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
L
Liu Jicong 已提交
2133 2134 2135
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
         pLoadInfo->totalElapsed / 1000.0);
2136 2137 2138 2139 2140

  doSetOperatorCompleted(pOperator);
  return NULL;
}

L
Liu Jicong 已提交
2141 2142
static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
                                                   SExecTaskInfo* pTaskInfo) {
2143 2144 2145 2146 2147 2148 2149 2150
  int32_t code = 0;
  int64_t startTs = taosGetTimestampUs();
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);

  while (1) {
    int32_t completed = 0;
    for (int32_t i = 0; i < totalSources; ++i) {
      SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
2151
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
2152
        completed += 1;
H
Haojun Liao 已提交
2153 2154
        continue;
      }
2155

2156
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
2157 2158 2159
        continue;
      }

2160 2161 2162 2163 2164
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
        code = pDataInfo->code;
        goto _error;
      }

L
Liu Jicong 已提交
2165
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
X
Xiaoyu Wang 已提交
2166
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, i);
2167

L
Liu Jicong 已提交
2168
      SSDataBlock*         pRes = pExchangeInfo->pResult;
H
Haojun Liao 已提交
2169
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2170
      if (pRsp->numOfRows == 0) {
2171 2172
        qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64
               ", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu,
D
dapan1121 已提交
2173
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows,
2174
               pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources);
2175
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2176
        completed += 1;
D
dapan1121 已提交
2177
        taosMemoryFreeClear(pDataInfo->pRsp);
2178 2179
        continue;
      }
H
Haojun Liao 已提交
2180

H
Haojun Liao 已提交
2181
      SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
L
Liu Jicong 已提交
2182 2183 2184
      code =
          extractDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
                                       pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
2185
      if (code != 0) {
2186
        taosMemoryFreeClear(pDataInfo->pRsp);
2187 2188 2189
        goto _error;
      }

2190
      if (pRsp->completed == 1) {
2191 2192
        qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
               " execId:%d"
X
Xiaoyu Wang 已提交
2193 2194
               " index:%d completed, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64
               ", completed:%d try next %d/%" PRIzu,
2195 2196
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRes->info.rows,
               pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, completed + 1, i + 1, totalSources);
2197
        completed += 1;
2198
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2199
      } else {
D
dapan1121 已提交
2200
        qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64
dengyihao's avatar
dengyihao 已提交
2201
               ", totalBytes:%" PRIu64,
2202 2203
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows,
               pLoadInfo->totalRows, pLoadInfo->totalSize);
2204 2205
      }

2206 2207
      taosMemoryFreeClear(pDataInfo->pRsp);

2208 2209
      if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED) {
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
2210 2211
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
        if (code != TSDB_CODE_SUCCESS) {
2212
          taosMemoryFreeClear(pDataInfo->pRsp);
2213 2214 2215 2216 2217 2218 2219
          goto _error;
        }
      }

      return pExchangeInfo->pResult;
    }

2220
    if (completed == totalSources) {
2221 2222
      return setAllSourcesCompleted(pOperator, startTs);
    }
H
Haojun Liao 已提交
2223 2224

    sched_yield();
2225 2226 2227 2228 2229 2230 2231
  }

_error:
  pTaskInfo->code = code;
  return NULL;
}

L
Liu Jicong 已提交
2232 2233 2234
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2235

L
Liu Jicong 已提交
2236
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2237 2238 2239
  int64_t startTs = taosGetTimestampUs();

  // Asynchronously send all fetch requests to all sources.
L
Liu Jicong 已提交
2240
  for (int32_t i = 0; i < totalSources; ++i) {
2241 2242
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2243 2244
      pTaskInfo->code = code;
      return code;
2245 2246 2247 2248
    }
  }

  int64_t endTs = taosGetTimestampUs();
2249
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
X
Xiaoyu Wang 已提交
2250
         totalSources, (endTs - startTs) / 1000.0);
2251

2252
  pOperator->status = OP_RES_TO_RETURN;
H
Haojun Liao 已提交
2253
  pOperator->cost.openCost = taosGetTimestampUs() - startTs;
2254

2255
  tsem_wait(&pExchangeInfo->ready);
H
Haojun Liao 已提交
2256
  return TSDB_CODE_SUCCESS;
2257 2258
}

L
Liu Jicong 已提交
2259 2260 2261
static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2262

L
Liu Jicong 已提交
2263
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2264
  int64_t startTs = taosGetTimestampUs();
2265

L
Liu Jicong 已提交
2266
  while (1) {
2267 2268
    if (pExchangeInfo->current >= totalSources) {
      return setAllSourcesCompleted(pOperator, startTs);
2269
    }
2270

2271 2272 2273
    doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
    tsem_wait(&pExchangeInfo->ready);

dengyihao's avatar
dengyihao 已提交
2274
    SSourceDataInfo*       pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
X
Xiaoyu Wang 已提交
2275
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
2276

H
Haojun Liao 已提交
2277
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
2278 2279
      qError("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d error happens, code:%s", GET_TASKID(pTaskInfo),
             pSource->addr.nodeId, pSource->taskId, pSource->execId, tstrerror(pDataInfo->code));
H
Haojun Liao 已提交
2280 2281 2282 2283
      pOperator->pTaskInfo->code = pDataInfo->code;
      return NULL;
    }

L
Liu Jicong 已提交
2284
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
H
Haojun Liao 已提交
2285
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2286
    if (pRsp->numOfRows == 0) {
2287 2288
      qDebug("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d %d of total completed, rowsOfSource:%" PRIu64
             ", totalRows:%" PRIu64 " try next",
D
dapan1121 已提交
2289
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pExchangeInfo->current + 1,
H
Haojun Liao 已提交
2290
             pDataInfo->totalRows, pLoadInfo->totalRows);
H
Haojun Liao 已提交
2291

2292
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2293
      pExchangeInfo->current += 1;
D
dapan1121 已提交
2294
      taosMemoryFreeClear(pDataInfo->pRsp);
2295 2296
      continue;
    }
H
Haojun Liao 已提交
2297

L
Liu Jicong 已提交
2298
    SSDataBlock*       pRes = pExchangeInfo->pResult;
H
Haojun Liao 已提交
2299
    SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
L
Liu Jicong 已提交
2300
    int32_t            code =
2301
        extractDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
L
Liu Jicong 已提交
2302
                                     pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
2303 2304

    if (pRsp->completed == 1) {
D
dapan1121 已提交
2305
      qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64
L
Liu Jicong 已提交
2306
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
2307 2308 2309
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows,
             pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1,
             totalSources);
2310

2311
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2312 2313
      pExchangeInfo->current += 1;
    } else {
D
dapan1121 已提交
2314
      qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64
L
Liu Jicong 已提交
2315
             ", totalBytes:%" PRIu64,
2316 2317
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows,
             pLoadInfo->totalRows, pLoadInfo->totalSize);
2318 2319
    }

2320
    pOperator->resultInfo.totalRows += pRes->info.rows;
2321
    taosMemoryFreeClear(pDataInfo->pRsp);
2322 2323
    return pExchangeInfo->pResult;
  }
2324 2325
}

L
Liu Jicong 已提交
2326
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
2327
  if (OPTR_IS_OPENED(pOperator)) {
H
Haojun Liao 已提交
2328 2329 2330
    return TSDB_CODE_SUCCESS;
  }

2331 2332
  int64_t st = taosGetTimestampUs();

L
Liu Jicong 已提交
2333
  SExchangeInfo* pExchangeInfo = pOperator->info;
2334
  if (!pExchangeInfo->seqLoadData) {
H
Haojun Liao 已提交
2335 2336 2337 2338 2339 2340
    int32_t code = prepareConcurrentlyLoad(pOperator);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
  }

2341
  OPTR_SET_OPENED(pOperator);
2342
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
2343 2344 2345
  return TSDB_CODE_SUCCESS;
}

2346
static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2347 2348
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2349

2350
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
2351
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2352 2353
    return NULL;
  }
2354

L
Liu Jicong 已提交
2355
  size_t               totalSources = taosArrayGetSize(pExchangeInfo->pSources);
H
Haojun Liao 已提交
2356
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
H
Haojun Liao 已提交
2357

2358
  if (pOperator->status == OP_EXEC_DONE) {
L
Liu Jicong 已提交
2359 2360 2361
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
           pLoadInfo->totalElapsed / 1000.0);
2362 2363 2364 2365 2366 2367
    return NULL;
  }

  if (pExchangeInfo->seqLoadData) {
    return seqLoadRemoteData(pOperator);
  } else {
2368
    return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
2369
  }
H
Haojun Liao 已提交
2370
}
2371

2372
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
2373
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
H
Haojun Liao 已提交
2374 2375
  if (pInfo->pSourceDataInfo == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
2376 2377
  }

L
Liu Jicong 已提交
2378
  for (int32_t i = 0; i < numOfSources; ++i) {
2379
    SSourceDataInfo dataInfo = {0};
H
Haojun Liao 已提交
2380
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
2381
    dataInfo.taskId = id;
L
Liu Jicong 已提交
2382
    dataInfo.index = i;
X
Xiaoyu Wang 已提交
2383
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
2384
    if (pDs == NULL) {
H
Haojun Liao 已提交
2385 2386 2387 2388 2389 2390 2391 2392
      taosArrayDestroy(pInfo->pSourceDataInfo);
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
}

2393
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
2394
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
H
Haojun Liao 已提交
2395

2396
  if (numOfSources == 0) {
X
Xiaoyu Wang 已提交
2397
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
2398 2399 2400
    return TSDB_CODE_INVALID_PARA;
  }

H
Haojun Liao 已提交
2401
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
wmmhello's avatar
wmmhello 已提交
2402
  if (pInfo->pSources == NULL) {
2403
    return TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
2404 2405
  }

L
Liu Jicong 已提交
2406
  for (int32_t i = 0; i < numOfSources; ++i) {
D
dapan1121 已提交
2407
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
H
Haojun Liao 已提交
2408 2409
    taosArrayPush(pInfo->pSources, pNode);
  }
2410

2411 2412
  pInfo->self = taosAddRef(exchangeObjRefPool, pInfo);

2413
  return initDataSource(numOfSources, pInfo, id);
2414 2415 2416 2417 2418 2419
}

SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo) {
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
2420
    goto _error;
2421
  }
H
Haojun Liao 已提交
2422

2423
  int32_t code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
2424 2425 2426
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2427 2428

  tsem_init(&pInfo->ready, 0, 0);
2429

2430
  pInfo->seqLoadData = false;
2431
  pInfo->pTransporter = pTransporter;
2432 2433
  pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc);
  pOperator->name = "ExchangeOperator";
X
Xiaoyu Wang 已提交
2434
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
X
Xiaoyu Wang 已提交
2435
  pOperator->blocking = false;
2436 2437
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
2438
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pResult->pDataBlock);
X
Xiaoyu Wang 已提交
2439
  pOperator->pTaskInfo = pTaskInfo;
2440

L
Liu Jicong 已提交
2441 2442
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
                                         destroyExchangeOperatorInfo, NULL, NULL, NULL);
2443
  return pOperator;
H
Haojun Liao 已提交
2444

L
Liu Jicong 已提交
2445
_error:
H
Haojun Liao 已提交
2446
  if (pInfo != NULL) {
2447
    doDestroyExchangeOperatorInfo(pInfo);
H
Haojun Liao 已提交
2448 2449
  }

wafwerar's avatar
wafwerar 已提交
2450
  taosMemoryFreeClear(pOperator);
2451
  pTaskInfo->code = code;
H
Haojun Liao 已提交
2452
  return NULL;
2453 2454
}

dengyihao's avatar
dengyihao 已提交
2455 2456
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                                const char* pKey);
2457

2458
static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
2459
  SSortedMergeOperatorInfo* pInfo = (SSortedMergeOperatorInfo*)param;
H
Haojun Liao 已提交
2460
  taosArrayDestroy(pInfo->pSortInfo);
2461 2462 2463
  taosArrayDestroy(pInfo->groupInfo);

  if (pInfo->pSortHandle != NULL) {
H
Haojun Liao 已提交
2464
    tsortDestroySortHandle(pInfo->pSortHandle);
2465 2466
  }

H
Haojun Liao 已提交
2467
  blockDataDestroy(pInfo->binfo.pRes);
H
Haojun Liao 已提交
2468
  cleanupAggSup(&pInfo->aggSup);
L
Liu Jicong 已提交
2469

D
dapan1121 已提交
2470
  taosMemoryFreeClear(param);
2471
}
H
Haojun Liao 已提交
2472

L
Liu Jicong 已提交
2473
static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) {
2474 2475 2476 2477
  size_t size = taosArrayGetSize(groupInfo);
  if (size == 0) {
    return true;
  }
2478

2479 2480
  for (int32_t i = 0; i < size; ++i) {
    int32_t* index = taosArrayGet(groupInfo, i);
2481

2482
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index);
L
Liu Jicong 已提交
2483
    bool             isNull = colDataIsNull(pColInfo, rowIndex, pBlock->info.rows, NULL);
2484

2485 2486 2487
    if ((isNull && buf[i] != NULL) || (!isNull && buf[i] == NULL)) {
      return false;
    }
2488

2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501
    char* pCell = colDataGetData(pColInfo, rowIndex);
    if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
      if (varDataLen(pCell) != varDataLen(buf[i])) {
        return false;
      } else {
        if (memcmp(varDataVal(pCell), varDataVal(buf[i]), varDataLen(pCell)) != 0) {
          return false;
        }
      }
    } else {
      if (memcmp(pCell, buf[i], pColInfo->info.bytes) != 0) {
        return false;
      }
2502 2503 2504
    }
  }

2505
  return 0;
2506 2507
}

L
Liu Jicong 已提交
2508 2509 2510
static void doMergeResultImpl(SSortedMergeOperatorInfo* pInfo, SqlFunctionCtx* pCtx, int32_t numOfExpr,
                              int32_t rowIndex) {
  for (int32_t j = 0; j < numOfExpr; ++j) {  // TODO set row index
X
Xiaoyu Wang 已提交
2511
                                             //    pCtx[j].startRow = rowIndex;
2512 2513
  }

2514 2515
  for (int32_t j = 0; j < numOfExpr; ++j) {
    int32_t functionId = pCtx[j].functionId;
L
Liu Jicong 已提交
2516 2517 2518 2519 2520 2521 2522 2523 2524
    //    pCtx[j].fpSet->addInput(&pCtx[j]);

    //    if (functionId < 0) {
    //      SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
    //      doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
    //    } else {
    //      assert(!TSDB_FUNC_IS_SCALAR(functionId));
    //      aAggs[functionId].mergeFunc(&pCtx[j]);
    //    }
2525
  }
2526
}
2527

L
Liu Jicong 已提交
2528 2529
static void doFinalizeResultImpl(SqlFunctionCtx* pCtx, int32_t numOfExpr) {
  for (int32_t j = 0; j < numOfExpr; ++j) {
2530 2531 2532 2533
    int32_t functionId = pCtx[j].functionId;
    //    if (functionId == FUNC_TAG_DUMMY || functionId == FUNC_TS_DUMMY) {
    //      continue;
    //    }
2534

2535 2536 2537 2538
    //    if (functionId < 0) {
    //      SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
    //      doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
    //    } else {
dengyihao's avatar
dengyihao 已提交
2539
    //    pCtx[j].fpSet.finalize(&pCtx[j]);
2540 2541
  }
}
2542

2543
static bool saveCurrentTuple(char** rowColData, SArray* pColumnList, SSDataBlock* pBlock, int32_t rowIndex) {
L
Liu Jicong 已提交
2544
  int32_t size = (int32_t)taosArrayGetSize(pColumnList);
2545

L
Liu Jicong 已提交
2546 2547
  for (int32_t i = 0; i < size; ++i) {
    int32_t*         index = taosArrayGet(pColumnList, i);
2548
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index);
H
Haojun Liao 已提交
2549

2550 2551 2552
    char* data = colDataGetData(pColInfo, rowIndex);
    memcpy(rowColData[i], data, colDataGetLength(pColInfo, rowIndex));
  }
2553

2554 2555
  return true;
}
2556

2557 2558
static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock* pBlock) {
  SSortedMergeOperatorInfo* pInfo = pOperator->info;
2559

2560
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
2561

L
Liu Jicong 已提交
2562
  for (int32_t i = 0; i < pBlock->info.rows; ++i) {
2563 2564 2565 2566 2567 2568 2569 2570 2571
    if (!pInfo->hasGroupVal) {
      ASSERT(i == 0);
      doMergeResultImpl(pInfo, pCtx, numOfExpr, i);
      pInfo->hasGroupVal = saveCurrentTuple(pInfo->groupVal, pInfo->groupInfo, pBlock, i);
    } else {
      if (needToMerge(pBlock, pInfo->groupInfo, pInfo->groupVal, i)) {
        doMergeResultImpl(pInfo, pCtx, numOfExpr, i);
      } else {
        doFinalizeResultImpl(pCtx, numOfExpr);
2572 2573
        int32_t numOfRows = getNumOfResult(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL);
        //        setTagValueForMultipleRows(pCtx, pOperator->exprSupp.numOfExprs, numOfRows);
2574

2575
        // TODO check for available buffer;
H
Haojun Liao 已提交
2576

2577 2578 2579 2580 2581
        // next group info data
        pInfo->binfo.pRes->info.rows += numOfRows;
        for (int32_t j = 0; j < numOfExpr; ++j) {
          if (pCtx[j].functionId < 0) {
            continue;
2582
          }
2583

H
Haojun Liao 已提交
2584
          pCtx[j].fpSet.process(&pCtx[j]);
2585
        }
2586 2587 2588

        doMergeResultImpl(pInfo, pCtx, numOfExpr, i);
        pInfo->hasGroupVal = saveCurrentTuple(pInfo->groupVal, pInfo->groupInfo, pBlock, i);
H
Haojun Liao 已提交
2589
      }
2590 2591 2592 2593
    }
  }
}

2594 2595
static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
  SSortedMergeOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
2596
  SSortHandle*              pHandle = pInfo->pSortHandle;
2597

2598
  SSDataBlock* pDataBlock = createOneDataBlock(pInfo->binfo.pRes, false);
2599
  blockDataEnsureCapacity(pDataBlock, pOperator->resultInfo.capacity);
2600

L
Liu Jicong 已提交
2601
  while (1) {
2602
    blockDataCleanup(pDataBlock);
2603
    while (1) {
H
Haojun Liao 已提交
2604
      STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2605 2606
      if (pTupleHandle == NULL) {
        break;
2607
      }
2608

2609 2610
      // build datablock for merge for one group
      appendOneRowToDataBlock(pDataBlock, pTupleHandle);
2611
      if (pDataBlock->info.rows >= pOperator->resultInfo.capacity) {
2612 2613
        break;
      }
2614
    }
2615

2616 2617 2618
    if (pDataBlock->info.rows == 0) {
      break;
    }
2619

2620
    setInputDataBlock(pOperator, pOperator->exprSupp.pCtx, pDataBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
L
Liu Jicong 已提交
2621 2622
    //  updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor,
    //  pOperator->pRuntimeEnv, true);
2623
    doMergeImpl(pOperator, pOperator->exprSupp.numOfExprs, pDataBlock);
2624 2625
    // flush to tuple store, and after all data have been handled, return to upstream node or sink node
  }
2626

2627 2628 2629
  doFinalizeResultImpl(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs);
  int32_t numOfRows = getNumOfResult(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL);
  //        setTagValueForMultipleRows(pCtx, pOperator->exprSupp.numOfExprs, numOfRows);
2630

2631
  // TODO check for available buffer;
2632

2633 2634
  // next group info data
  pInfo->binfo.pRes->info.rows += numOfRows;
L
Liu Jicong 已提交
2635
  return (pInfo->binfo.pRes->info.rows > 0) ? pInfo->binfo.pRes : NULL;
2636
}
2637

L
Liu Jicong 已提交
2638 2639
SSDataBlock* getSortedMergeBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
                                     SArray* pColMatchInfo, SSortedMergeOperatorInfo* pInfo) {
2640 2641 2642 2643 2644 2645 2646 2647 2648 2649
  blockDataCleanup(pDataBlock);

  SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
  if (p == NULL) {
    return NULL;
  }

  blockDataEnsureCapacity(p, capacity);

  while (1) {
2650
    STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2651 2652 2653 2654
    if (pTupleHandle == NULL) {
      break;
    }

2655
    appendOneRowToDataBlock(p, pTupleHandle);
2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668
    if (p->info.rows >= capacity) {
      break;
    }
  }

  if (p->info.rows > 0) {
    int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
    for (int32_t i = 0; i < numOfCols; ++i) {
      SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i);
      ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID);

      SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
      SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId);
2669
      colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
2670 2671 2672 2673 2674 2675 2676 2677 2678 2679
    }

    pDataBlock->info.rows = p->info.rows;
    pDataBlock->info.capacity = p->info.rows;
  }

  blockDataDestroy(p);
  return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
}

2680
static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) {
2681 2682
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
2683 2684
  }

L
Liu Jicong 已提交
2685
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
2686
  SSortedMergeOperatorInfo* pInfo = pOperator->info;
H
Haojun Liao 已提交
2687
  if (pOperator->status == OP_RES_TO_RETURN) {
2688
    return getSortedMergeBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, NULL, pInfo);
2689 2690
  }

2691
  int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
L
Liu Jicong 已提交
2692 2693
  pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
                                             pInfo->binfo.pRes, "GET_TASKID(pTaskInfo)");
H
Haojun Liao 已提交
2694

2695
  tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
2696

L
Liu Jicong 已提交
2697
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
wmmhello's avatar
wmmhello 已提交
2698
    SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
H
Haojun Liao 已提交
2699
    ps->param = pOperator->pDownstream[i];
H
Haojun Liao 已提交
2700
    tsortAddSource(pInfo->pSortHandle, ps);
2701 2702
  }

H
Haojun Liao 已提交
2703
  int32_t code = tsortOpen(pInfo->pSortHandle);
2704
  if (code != TSDB_CODE_SUCCESS) {
2705
    longjmp(pTaskInfo->env, terrno);
2706 2707
  }

H
Haojun Liao 已提交
2708
  pOperator->status = OP_RES_TO_RETURN;
2709
  return doMerge(pOperator);
2710
}
2711

L
Liu Jicong 已提交
2712 2713
static int32_t initGroupCol(SExprInfo* pExprInfo, int32_t numOfCols, SArray* pGroupInfo,
                            SSortedMergeOperatorInfo* pInfo) {
2714 2715
  if (pGroupInfo == NULL || taosArrayGetSize(pGroupInfo) == 0) {
    return 0;
H
Haojun Liao 已提交
2716 2717
  }

2718 2719 2720 2721 2722 2723 2724 2725
  int32_t len = 0;
  SArray* plist = taosArrayInit(3, sizeof(SColumn));
  pInfo->groupInfo = taosArrayInit(3, sizeof(int32_t));

  if (plist == NULL || pInfo->groupInfo == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

L
Liu Jicong 已提交
2726 2727
  size_t numOfGroupCol = taosArrayGetSize(pInfo->groupInfo);
  for (int32_t i = 0; i < numOfGroupCol; ++i) {
2728
    SColumn* pCol = taosArrayGet(pGroupInfo, i);
L
Liu Jicong 已提交
2729
    for (int32_t j = 0; j < numOfCols; ++j) {
H
Haojun Liao 已提交
2730
      SExprInfo* pe = &pExprInfo[j];
2731
      if (pe->base.resSchema.slotId == pCol->colId) {
2732 2733
        taosArrayPush(plist, pCol);
        taosArrayPush(pInfo->groupInfo, &j);
H
Haojun Liao 已提交
2734
        len += pCol->bytes;
2735 2736
        break;
      }
H
Haojun Liao 已提交
2737 2738 2739
    }
  }

2740
  ASSERT(taosArrayGetSize(pGroupInfo) == taosArrayGetSize(plist));
H
Haojun Liao 已提交
2741

wafwerar's avatar
wafwerar 已提交
2742
  pInfo->groupVal = taosMemoryCalloc(1, (POINTER_BYTES * numOfGroupCol + len));
2743 2744 2745 2746
  if (pInfo->groupVal == NULL) {
    taosArrayDestroy(plist);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Haojun Liao 已提交
2747

2748
  int32_t offset = 0;
L
Liu Jicong 已提交
2749 2750
  char*   start = (char*)(pInfo->groupVal + (POINTER_BYTES * numOfGroupCol));
  for (int32_t i = 0; i < numOfGroupCol; ++i) {
2751 2752
    pInfo->groupVal[i] = start + offset;
    SColumn* pCol = taosArrayGet(plist, i);
H
Haojun Liao 已提交
2753
    offset += pCol->bytes;
2754
  }
H
Haojun Liao 已提交
2755

2756
  taosArrayDestroy(plist);
H
Haojun Liao 已提交
2757

2758 2759
  return TSDB_CODE_SUCCESS;
}
H
Haojun Liao 已提交
2760

L
Liu Jicong 已提交
2761 2762 2763
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo,
                                             int32_t num, SArray* pSortInfo, SArray* pGroupInfo,
                                             SExecTaskInfo* pTaskInfo) {
wafwerar's avatar
wafwerar 已提交
2764
  SSortedMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortedMergeOperatorInfo));
L
Liu Jicong 已提交
2765
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2766
  if (pInfo == NULL || pOperator == NULL) {
2767
    goto _error;
2768
  }
H
Haojun Liao 已提交
2769

2770 2771 2772 2773 2774
  int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, num);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

2775
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
H
Haojun Liao 已提交
2776

2777
  if (pOperator->exprSupp.pCtx == NULL || pInfo->binfo.pRes == NULL) {
2778 2779
    goto _error;
  }
H
Haojun Liao 已提交
2780

2781
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
2782
  code = doInitAggInfoSup(&pInfo->aggSup, pOperator->exprSupp.pCtx, num, keyBufSize, pTaskInfo->id.str);
2783 2784 2785
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
2786

2787
  setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, num);
H
Haojun Liao 已提交
2788
  code = initGroupCol(pExprInfo, num, pGroupInfo, pInfo);
2789 2790 2791
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
2792

L
Liu Jicong 已提交
2793 2794 2795 2796 2797
  //  pInfo->resultRowFactor = (int32_t)(getRowNumForMultioutput(pRuntimeEnv->pQueryAttr,
  //      pRuntimeEnv->pQueryAttr->topBotQuery, false));
  pInfo->sortBufSize = 1024 * 16;  // 1MB
  pInfo->bufPageSize = 1024;
  pInfo->pSortInfo = pSortInfo;
H
Haojun Liao 已提交
2798

2799
  pOperator->resultInfo.capacity = blockDataGetCapacityInRow(pInfo->binfo.pRes, pInfo->bufPageSize);
H
Haojun Liao 已提交
2800

L
Liu Jicong 已提交
2801
  pOperator->name = "SortedMerge";
X
Xiaoyu Wang 已提交
2802
  // pOperator->operatorType = OP_SortedMerge;
2803 2804 2805
  pOperator->blocking = true;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
L
Liu Jicong 已提交
2806
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
2807

2808 2809
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSortedMerge, NULL, NULL, destroySortedMergeOperatorInfo,
                                         NULL, NULL, NULL);
2810 2811 2812
  code = appendDownstream(pOperator, downstream, numOfDownstream);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
2813
  }
H
Haojun Liao 已提交
2814

2815
  return pOperator;
H
Haojun Liao 已提交
2816

L
Liu Jicong 已提交
2817
_error:
2818
  if (pInfo != NULL) {
H
Haojun Liao 已提交
2819
    destroySortedMergeOperatorInfo(pInfo, num);
H
Haojun Liao 已提交
2820 2821
  }

wafwerar's avatar
wafwerar 已提交
2822 2823
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
2824 2825
  terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
  return NULL;
H
Haojun Liao 已提交
2826 2827
}

X
Xiaoyu Wang 已提交
2828
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) {
2829
  // todo add more information about exchange operation
2830
  int32_t type = pOperator->operatorType;
X
Xiaoyu Wang 已提交
2831
  if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN ||
2832
      type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN ||
2833
      type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN) {
2834 2835 2836
    *order = TSDB_ORDER_ASC;
    *scanFlag = MAIN_SCAN;
    return TSDB_CODE_SUCCESS;
2837
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
2838 2839 2840 2841 2842
    STableScanInfo* pTableScanInfo = pOperator->info;
    *order = pTableScanInfo->cond.order;
    *scanFlag = pTableScanInfo->scanFlag;
    return TSDB_CODE_SUCCESS;
  } else {
H
Haojun Liao 已提交
2843
    if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
2844
      return TSDB_CODE_INVALID_PARA;
H
Haojun Liao 已提交
2845
    } else {
2846
      return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag);
2847 2848 2849
    }
  }
}
L
Liu Jicong 已提交
2850
#if 0
L
Liu Jicong 已提交
2851
int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
L
Liu Jicong 已提交
2852
  uint8_t type = pOperator->operatorType;
2853 2854 2855

  pOperator->status = OP_OPENED;

L
Liu Jicong 已提交
2856
  if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
2857
    SStreamScanInfo* pScanInfo = pOperator->info;
L
Liu Jicong 已提交
2858
    pScanInfo->blockType = STREAM_INPUT__TABLE_SCAN;
2859

2860
    pScanInfo->pTableScanOp->status = OP_OPENED;
2861

2862
    STableScanInfo* pInfo = pScanInfo->pTableScanOp->info;
2863 2864
    ASSERT(pInfo->scanMode == TABLE_SCAN__TABLE_ORDER);

L
Liu Jicong 已提交
2865 2866 2867 2868
    if (uid == 0) {
      pInfo->noTable = 1;
      return TSDB_CODE_SUCCESS;
    }
2869 2870 2871 2872 2873 2874

    /*if (pSnapShotScanInfo->dataReader == NULL) {*/
    /*pSnapShotScanInfo->dataReader = tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, 0, 0);*/
    /*pSnapShotScanInfo->scanMode = TABLE_SCAN__TABLE_ORDER;*/
    /*}*/

L
Liu Jicong 已提交
2875 2876
    pInfo->noTable = 0;

2877
    if (pInfo->lastStatus.uid != uid || pInfo->lastStatus.ts != ts) {
L
Liu Jicong 已提交
2878 2879 2880 2881 2882 2883 2884 2885 2886 2887 2888
      SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

      int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
      bool    found = false;
      for (int32_t i = 0; i < tableSz; i++) {
        STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
        if (pTableInfo->uid == uid) {
          found = true;
          pInfo->currentTable = i;
        }
      }
2889
      // TODO after processing drop, found can be false
L
Liu Jicong 已提交
2890
      ASSERT(found);
2891 2892

      tsdbSetTableId(pInfo->dataReader, uid);
H
Haojun Liao 已提交
2893 2894 2895 2896
      int64_t oldSkey = pInfo->cond.twindows.skey;
      pInfo->cond.twindows.skey = ts + 1;
      tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
      pInfo->cond.twindows.skey = oldSkey;
2897 2898
      pInfo->scanTimes = 0;

S
Shengliang Guan 已提交
2899
      qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, ts,
L
Liu Jicong 已提交
2900
             pInfo->currentTable, tableSz);
L
Liu Jicong 已提交
2901
    }
L
Liu Jicong 已提交
2902

L
Liu Jicong 已提交
2903
    return TSDB_CODE_SUCCESS;
2904

L
Liu Jicong 已提交
2905
  } else {
2906 2907 2908 2909 2910
    if (pOperator->numOfDownstream == 1) {
      return doPrepareScan(pOperator->pDownstream[0], uid, ts);
    } else if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator to set the input data block");
      return TSDB_CODE_QRY_APP_ERROR;
L
Liu Jicong 已提交
2911
    } else {
2912 2913
      qError("join not supported for stream block scan");
      return TSDB_CODE_QRY_APP_ERROR;
L
Liu Jicong 已提交
2914 2915 2916 2917
    }
  }
}

2918 2919 2920
int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) {
  int32_t type = pOperator->operatorType;
  if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
2921 2922
    SStreamScanInfo* pScanInfo = pOperator->info;
    STableScanInfo*  pSnapShotScanInfo = pScanInfo->pTableScanOp->info;
L
Liu Jicong 已提交
2923 2924
    *uid = pSnapShotScanInfo->lastStatus.uid;
    *ts = pSnapShotScanInfo->lastStatus.ts;
2925 2926 2927 2928 2929 2930 2931 2932 2933 2934
  } else {
    if (pOperator->pDownstream[0] == NULL) {
      return TSDB_CODE_INVALID_PARA;
    } else {
      doGetScanStatus(pOperator->pDownstream[0], uid, ts);
    }
  }

  return TSDB_CODE_SUCCESS;
}
L
Liu Jicong 已提交
2935
#endif
2936

2937
// this is a blocking operator
L
Liu Jicong 已提交
2938
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
2939 2940
  if (OPTR_IS_OPENED(pOperator)) {
    return TSDB_CODE_SUCCESS;
2941 2942
  }

H
Haojun Liao 已提交
2943
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
2944
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
2945

2946 2947
  SExprSupp*     pSup = &pOperator->exprSupp;
  SOperatorInfo* downstream = pOperator->pDownstream[0];
2948

2949 2950
  int64_t st = taosGetTimestampUs();

2951 2952 2953
  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;

H
Haojun Liao 已提交
2954
  while (1) {
2955
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
2956 2957 2958 2959
    if (pBlock == NULL) {
      break;
    }

2960 2961 2962 2963
    int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, code);
    }
2964

2965
    // there is an scalar expression that needs to be calculated before apply the group aggregation.
2966 2967 2968
    if (pAggInfo->scalarExprSup.pExprInfo != NULL) {
      SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
      code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
2969
      if (code != TSDB_CODE_SUCCESS) {
2970
        longjmp(pTaskInfo->env, code);
2971
      }
2972 2973
    }

2974
    // the pDataBlock are always the same one, no need to call this again
2975 2976
    setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId, pAggInfo);
    setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, true);
2977
    code = doAggregateImpl(pOperator, pSup->pCtx);
2978 2979 2980
    if (code != 0) {
      longjmp(pTaskInfo->env, code);
    }
2981 2982
  }

H
Haojun Liao 已提交
2983
  closeAllResultRows(&pAggInfo->binfo.resultRowInfo);
2984
  initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
H
Haojun Liao 已提交
2985
  OPTR_SET_OPENED(pOperator);
2986

2987
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
2988 2989 2990
  return TSDB_CODE_SUCCESS;
}

2991
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2992
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
2993 2994 2995 2996 2997 2998
  SOptrBasicInfo*   pInfo = &pAggInfo->binfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

L
Liu Jicong 已提交
2999
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3000
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
H
Haojun Liao 已提交
3001
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
3002
    doSetOperatorCompleted(pOperator);
H
Haojun Liao 已提交
3003 3004 3005
    return NULL;
  }

H
Haojun Liao 已提交
3006
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
S
slzhou 已提交
3007 3008 3009 3010 3011 3012 3013 3014
  while (1) {
    doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
    doFilter(pAggInfo->pCondition, pInfo->pRes);

    if (!hasDataInGroupInfo(&pAggInfo->groupResInfo)) {
      doSetOperatorCompleted(pOperator);
      break;
    }
3015

S
slzhou 已提交
3016 3017 3018 3019
    if (pInfo->pRes->info.rows > 0) {
      break;
    }
  }
3020
  size_t rows = blockDataGetNumOfRows(pInfo->pRes);
3021 3022
  pOperator->resultInfo.totalRows += rows;

3023
  return (rows == 0) ? NULL : pInfo->pRes;
3024 3025
}

wmmhello's avatar
wmmhello 已提交
3026
int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length) {
3027
  if (result == NULL || length == NULL) {
wmmhello's avatar
wmmhello 已提交
3028 3029 3030
    return TSDB_CODE_TSC_INVALID_INPUT;
  }
  SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
3031 3032 3033 3034 3035
  SAggSupporter*  pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
  int32_t         size = taosHashGetSize(pSup->pResultRowHashTable);
  size_t          keyLen = sizeof(uint64_t) * 2;  // estimate the key length
  int32_t         totalSize =
      sizeof(int32_t) + sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize);
wmmhello's avatar
wmmhello 已提交
3036

C
Cary Xu 已提交
3037 3038 3039 3040 3041 3042
  // no result
  if (getTotalBufSize(pSup->pResultBuf) == 0) {
    *result = NULL;
    *length = 0;
    return TSDB_CODE_SUCCESS;
  }
3043

wmmhello's avatar
wmmhello 已提交
3044
  *result = (char*)taosMemoryCalloc(1, totalSize);
L
Liu Jicong 已提交
3045
  if (*result == NULL) {
wmmhello's avatar
wmmhello 已提交
3046
    return TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
3047
  }
wmmhello's avatar
wmmhello 已提交
3048

wmmhello's avatar
wmmhello 已提交
3049
  int32_t offset = sizeof(int32_t);
wmmhello's avatar
wmmhello 已提交
3050 3051
  *(int32_t*)(*result + offset) = size;
  offset += sizeof(int32_t);
3052 3053

  // prepare memory
3054
  SResultRowPosition* pos = &pInfo->resultRowInfo.cur;
dengyihao's avatar
dengyihao 已提交
3055 3056
  void*               pPage = getBufPage(pSup->pResultBuf, pos->pageId);
  SResultRow*         pRow = (SResultRow*)((char*)pPage + pos->offset);
3057 3058 3059
  setBufPageDirty(pPage, true);
  releaseBufPage(pSup->pResultBuf, pPage);

dengyihao's avatar
dengyihao 已提交
3060
  void* pIter = taosHashIterate(pSup->pResultRowHashTable, NULL);
wmmhello's avatar
wmmhello 已提交
3061
  while (pIter) {
dengyihao's avatar
dengyihao 已提交
3062
    void*               key = taosHashGetKey(pIter, &keyLen);
3063
    SResultRowPosition* p1 = (SResultRowPosition*)pIter;
3064

dengyihao's avatar
dengyihao 已提交
3065
    pPage = (SFilePage*)getBufPage(pSup->pResultBuf, p1->pageId);
3066
    pRow = (SResultRow*)((char*)pPage + p1->offset);
3067 3068
    setBufPageDirty(pPage, true);
    releaseBufPage(pSup->pResultBuf, pPage);
wmmhello's avatar
wmmhello 已提交
3069 3070 3071

    // recalculate the result size
    int32_t realTotalSize = offset + sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize;
L
Liu Jicong 已提交
3072
    if (realTotalSize > totalSize) {
wmmhello's avatar
wmmhello 已提交
3073
      char* tmp = (char*)taosMemoryRealloc(*result, realTotalSize);
L
Liu Jicong 已提交
3074
      if (tmp == NULL) {
wafwerar's avatar
wafwerar 已提交
3075
        taosMemoryFree(*result);
wmmhello's avatar
wmmhello 已提交
3076
        *result = NULL;
wmmhello's avatar
wmmhello 已提交
3077
        return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
3078
      } else {
wmmhello's avatar
wmmhello 已提交
3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090
        *result = tmp;
      }
    }
    // save key
    *(int32_t*)(*result + offset) = keyLen;
    offset += sizeof(int32_t);
    memcpy(*result + offset, key, keyLen);
    offset += keyLen;

    // save value
    *(int32_t*)(*result + offset) = pSup->resultRowSize;
    offset += sizeof(int32_t);
3091
    memcpy(*result + offset, pRow, pSup->resultRowSize);
wmmhello's avatar
wmmhello 已提交
3092 3093 3094 3095 3096
    offset += pSup->resultRowSize;

    pIter = taosHashIterate(pSup->pResultRowHashTable, pIter);
  }

wmmhello's avatar
wmmhello 已提交
3097 3098 3099 3100
  *(int32_t*)(*result) = offset;
  *length = offset;

  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
3101 3102
}

3103
int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) {
3104
  if (result == NULL) {
wmmhello's avatar
wmmhello 已提交
3105
    return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
3106
  }
wmmhello's avatar
wmmhello 已提交
3107
  SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
3108
  SAggSupporter*  pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
wmmhello's avatar
wmmhello 已提交
3109 3110

  //  int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
3111
  int32_t length = *(int32_t*)(result);
wmmhello's avatar
wmmhello 已提交
3112
  int32_t offset = sizeof(int32_t);
3113 3114 3115 3116

  int32_t count = *(int32_t*)(result + offset);
  offset += sizeof(int32_t);

L
Liu Jicong 已提交
3117
  while (count-- > 0 && length > offset) {
wmmhello's avatar
wmmhello 已提交
3118 3119 3120
    int32_t keyLen = *(int32_t*)(result + offset);
    offset += sizeof(int32_t);

L
Liu Jicong 已提交
3121
    uint64_t    tableGroupId = *(uint64_t*)(result + offset);
3122
    SResultRow* resultRow = getNewResultRow(pSup->pResultBuf, tableGroupId, pSup->resultRowSize);
L
Liu Jicong 已提交
3123
    if (!resultRow) {
wmmhello's avatar
wmmhello 已提交
3124
      return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
3125
    }
3126

wmmhello's avatar
wmmhello 已提交
3127
    // add a new result set for a new group
3128 3129
    SResultRowPosition pos = {.pageId = resultRow->pageId, .offset = resultRow->offset};
    taosHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &pos, sizeof(SResultRowPosition));
wmmhello's avatar
wmmhello 已提交
3130 3131 3132

    offset += keyLen;
    int32_t valueLen = *(int32_t*)(result + offset);
L
Liu Jicong 已提交
3133
    if (valueLen != pSup->resultRowSize) {
wmmhello's avatar
wmmhello 已提交
3134
      return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
3135 3136 3137 3138 3139 3140 3141 3142 3143 3144
    }
    offset += sizeof(int32_t);
    int32_t pageId = resultRow->pageId;
    int32_t pOffset = resultRow->offset;
    memcpy(resultRow, result + offset, valueLen);
    resultRow->pageId = pageId;
    resultRow->offset = pOffset;
    offset += valueLen;

    initResultRow(resultRow);
dengyihao's avatar
dengyihao 已提交
3145
    pInfo->resultRowInfo.cur = (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset};
wmmhello's avatar
wmmhello 已提交
3146 3147
  }

L
Liu Jicong 已提交
3148
  if (offset != length) {
wmmhello's avatar
wmmhello 已提交
3149
    return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
3150
  }
wmmhello's avatar
wmmhello 已提交
3151
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
3152 3153
}

3154 3155
enum {
  PROJECT_RETRIEVE_CONTINUE = 0x1,
L
Liu Jicong 已提交
3156
  PROJECT_RETRIEVE_DONE = 0x2,
3157 3158 3159 3160 3161
};

static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
  SProjectOperatorInfo* pProjectInfo = pOperator->info;
  SOptrBasicInfo*       pInfo = &pProjectInfo->binfo;
L
Liu Jicong 已提交
3162
  SSDataBlock*          pRes = pInfo->pRes;
3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173 3174 3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197 3198 3199 3200 3201 3202 3203 3204 3205 3206 3207 3208 3209 3210

  if (pProjectInfo->curSOffset > 0) {
    if (pProjectInfo->groupId == 0) {  // it is the first group
      pProjectInfo->groupId = pBlock->info.groupId;
      blockDataCleanup(pInfo->pRes);
      return PROJECT_RETRIEVE_CONTINUE;
    } else if (pProjectInfo->groupId != pBlock->info.groupId) {
      pProjectInfo->curSOffset -= 1;

      // ignore data block in current group
      if (pProjectInfo->curSOffset > 0) {
        blockDataCleanup(pInfo->pRes);
        return PROJECT_RETRIEVE_CONTINUE;
      }
    }

    // set current group id of the project operator
    pProjectInfo->groupId = pBlock->info.groupId;
  }

  if (pProjectInfo->groupId != 0 && pProjectInfo->groupId != pBlock->info.groupId) {
    pProjectInfo->curGroupOutput += 1;
    if ((pProjectInfo->slimit.limit > 0) && (pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput)) {
      pOperator->status = OP_EXEC_DONE;
      blockDataCleanup(pRes);

      return PROJECT_RETRIEVE_DONE;
    }

    // reset the value for a new group data
    pProjectInfo->curOffset = 0;
    pProjectInfo->curOutput = 0;
  }

  // here we reach the start position, according to the limit/offset requirements.

  // set current group id
  pProjectInfo->groupId = pBlock->info.groupId;

  if (pProjectInfo->curOffset >= pRes->info.rows) {
    pProjectInfo->curOffset -= pRes->info.rows;
    blockDataCleanup(pRes);
    return PROJECT_RETRIEVE_CONTINUE;
  } else if (pProjectInfo->curOffset < pRes->info.rows && pProjectInfo->curOffset > 0) {
    blockDataTrimFirstNRows(pRes, pProjectInfo->curOffset);
    pProjectInfo->curOffset = 0;
  }

3211
  // check for the limitation in each group
wmmhello's avatar
wmmhello 已提交
3212 3213 3214
  if (pProjectInfo->limit.limit >= 0 && pProjectInfo->curOutput + pRes->info.rows >= pProjectInfo->limit.limit) {
    int32_t keepRows = (int32_t)(pProjectInfo->limit.limit - pProjectInfo->curOutput);
    blockDataKeepFirstNRows(pRes, keepRows);
3215
    if (pProjectInfo->slimit.limit > 0 && pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput) {
3216 3217 3218
      pOperator->status = OP_EXEC_DONE;
    }

3219
    return PROJECT_RETRIEVE_DONE;
3220
  }
3221

3222
  // todo optimize performance
3223 3224
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
  // they may not belong to the same group the limit/offset value is not valid in this case.
L
Liu Jicong 已提交
3225 3226
  if (pRes->info.rows >= pOperator->resultInfo.threshold || pProjectInfo->slimit.offset != -1 ||
      pProjectInfo->slimit.limit != -1) {
3227
    return PROJECT_RETRIEVE_DONE;
L
Liu Jicong 已提交
3228
  } else {  // not full enough, continue to accumulate the output data in the buffer.
3229 3230 3231 3232
    return PROJECT_RETRIEVE_CONTINUE;
  }
}

3233
static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
3234
  SProjectOperatorInfo* pProjectInfo = pOperator->info;
L
Liu Jicong 已提交
3235
  SOptrBasicInfo*       pInfo = &pProjectInfo->binfo;
3236

L
Liu Jicong 已提交
3237
  SExprSupp*   pSup = &pOperator->exprSupp;
3238
  SSDataBlock* pRes = pInfo->pRes;
3239
  blockDataCleanup(pRes);
3240

3241
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3242
  if (pOperator->status == OP_EXEC_DONE) {
L
Liu Jicong 已提交
3243 3244 3245 3246
    if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
      pOperator->status = OP_OPENED;
      return NULL;
    }
3247 3248
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
3249

H
Haojun Liao 已提交
3250
#if 0
3251 3252 3253 3254 3255
  if (pProjectInfo->existDataBlock) {  // TODO refactor
    SSDataBlock* pBlock = pProjectInfo->existDataBlock;
    pProjectInfo->existDataBlock = NULL;

    // the pDataBlock are always the same one, no need to call this again
H
Haojun Liao 已提交
3256
    setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC);
3257

H
Haojun Liao 已提交
3258
    blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
3259
    projectApplyFunctions(pOperator->exprSupp.pExprInfo, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->exprSupp.numOfExprs);
L
Liu Jicong 已提交
3260
    if (pRes->info.rows >= pProjectInfo->binfo.capacity * 0.8) {
3261 3262
      copyTsColoum(pRes, pInfo->pCtx, pOperator->exprSupp.numOfExprs);
      resetResultRowEntryResult(pInfo->pCtx, pOperator->exprSupp.numOfExprs);
3263 3264 3265
      return pRes;
    }
  }
H
Haojun Liao 已提交
3266
#endif
3267

3268
  int64_t st = 0;
3269 3270 3271
  int32_t order = 0;
  int32_t scanFlag = 0;

3272 3273 3274 3275
  if (pOperator->cost.openCost == 0) {
    st = taosGetTimestampUs();
  }

H
Haojun Liao 已提交
3276 3277
  SOperatorInfo* downstream = pOperator->pDownstream[0];

L
Liu Jicong 已提交
3278
  while (1) {
H
Haojun Liao 已提交
3279
    // The downstream exec may change the value of the newgroup, so use a local variable instead.
L
Liu Jicong 已提交
3280
    qDebug("projection call next");
3281
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
3282
    if (pBlock == NULL) {
L
Liu Jicong 已提交
3283 3284 3285
      qDebug("projection get null");

      /*if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH) {*/
3286
      doSetOperatorCompleted(pOperator);
L
Liu Jicong 已提交
3287 3288
      /*} else if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {*/
      /*pOperator->status = OP_RES_TO_RETURN;*/
L
Liu Jicong 已提交
3289
      /*}*/
3290 3291
      break;
    }
3292 3293 3294 3295
    if (pBlock->info.type == STREAM_RETRIEVE) {
      // for stream interval
      return pBlock;
    }
3296 3297

    // the pDataBlock are always the same one, no need to call this again
3298
    int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag);
3299 3300 3301
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, code);
    }
3302

3303
    setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
3304 3305
    blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);

3306
    code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
X
Xiaoyu Wang 已提交
3307
                                 pProjectInfo->pPseudoColInfo);
3308 3309
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, code);
3310 3311
    }

3312
    int32_t status = handleLimitOffset(pOperator, pBlock);
3313 3314 3315 3316

    // filter shall be applied after apply functions and limit/offset on the result
    doFilter(pProjectInfo->pFilterNode, pInfo->pRes);

3317
    if (status == PROJECT_RETRIEVE_CONTINUE) {
H
Haojun Liao 已提交
3318
      continue;
L
Liu Jicong 已提交
3319
    } else if (status == PROJECT_RETRIEVE_DONE) {
3320 3321 3322
      break;
    }
  }
dengyihao's avatar
dengyihao 已提交
3323

H
Haojun Liao 已提交
3324
  pProjectInfo->curOutput += pInfo->pRes->info.rows;
H
Haojun Liao 已提交
3325

3326 3327 3328 3329
  size_t rows = pInfo->pRes->info.rows;
  pOperator->resultInfo.totalRows += rows;

  if (pOperator->cost.openCost == 0) {
3330
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
3331 3332
  }

3333
  return (rows > 0) ? pInfo->pRes : NULL;
3334 3335
}

H
Haojun Liao 已提交
3336
static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo,
L
Liu Jicong 已提交
3337
                                               SExecTaskInfo* pTaskInfo) {
3338
  pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
H
Haojun Liao 已提交
3339

L
Liu Jicong 已提交
3340 3341
  int64_t ekey =
      Q_STATUS_EQUAL(pTaskInfo->status, TASK_COMPLETED) ? pInfo->win.ekey : pInfo->existNewGroupBlock->info.window.ekey;
3342 3343
  taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));

3344
  taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
3345 3346
  taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);

H
Haojun Liao 已提交
3347 3348 3349
  int32_t numOfResultRows = pResultInfo->capacity - pInfo->pRes->info.rows;
  taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pRes, numOfResultRows);

3350
  pInfo->curGroupId = pInfo->existNewGroupBlock->info.groupId;
3351 3352 3353
  pInfo->existNewGroupBlock = NULL;
}

H
Haojun Liao 已提交
3354
static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo,
L
Liu Jicong 已提交
3355
                                            SExecTaskInfo* pTaskInfo) {
3356
  if (taosFillHasMoreResults(pInfo->pFillInfo)) {
H
Haojun Liao 已提交
3357 3358 3359
    int32_t numOfResultRows = pResultInfo->capacity - pInfo->pRes->info.rows;
    taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pRes, numOfResultRows);
    if (pInfo->pRes->info.rows > pResultInfo->threshold) {
3360 3361 3362 3363 3364 3365
      return;
    }
  }

  // handle the cached new group data block
  if (pInfo->existNewGroupBlock) {
H
Haojun Liao 已提交
3366
    doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, pTaskInfo);
3367 3368 3369
  }
}

S
slzhou 已提交
3370
static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
3371 3372
  SFillOperatorInfo* pInfo = pOperator->info;
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
3373

H
Haojun Liao 已提交
3374
  SResultInfo* pResultInfo = &pOperator->resultInfo;
3375 3376 3377
  SSDataBlock* pResBlock = pInfo->pRes;

  blockDataCleanup(pResBlock);
3378

H
Haojun Liao 已提交
3379 3380
  doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo);
  if (pResBlock->info.rows > pResultInfo->threshold || pResBlock->info.rows > 0) {
3381
    return pResBlock;
H
Haojun Liao 已提交
3382
  }
3383

H
Haojun Liao 已提交
3384
  SOperatorInfo* pDownstream = pOperator->pDownstream[0];
L
Liu Jicong 已提交
3385
  while (1) {
3386
    SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream);
3387 3388 3389 3390 3391
    if (pBlock == NULL) {
      if (pInfo->totalInputRows == 0) {
        pOperator->status = OP_EXEC_DONE;
        return NULL;
      }
3392

3393
      taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
3394
    } else {
3395 3396 3397 3398
      blockDataUpdateTsWindow(pBlock, pInfo->primaryTsCol);

      if (pInfo->curGroupId == 0 || pInfo->curGroupId == pBlock->info.groupId) {
        pInfo->curGroupId = pBlock->info.groupId;   // the first data block
3399 3400

        pInfo->totalInputRows += pBlock->info.rows;
3401

3402 3403
        taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey);
        taosFillSetInputDataBlock(pInfo->pFillInfo, pBlock);
3404 3405 3406 3407 3408 3409
      } else if (pInfo->curGroupId != pBlock->info.groupId) { // the new group data block
        pInfo->existNewGroupBlock = pBlock;

        // Fill the previous group data block, before handle the data block of new group.
        // Close the fill operation for previous group data block
        taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
3410 3411 3412
      }
    }

3413
    blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
3414

3415 3416
    int32_t numOfResultRows = pOperator->resultInfo.capacity - pResBlock->info.rows;
    taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows);
3417 3418

    // current group has no more result to return
3419
    if (pResBlock->info.rows > 0) {
3420 3421
      // 1. The result in current group not reach the threshold of output result, continue
      // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
3422
      if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || pInfo->existNewGroupBlock != NULL) {
3423
        return pResBlock;
3424 3425
      }

H
Haojun Liao 已提交
3426
      doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo);
3427
      if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) {
3428
        return pResBlock;
3429 3430 3431
      }
    } else if (pInfo->existNewGroupBlock) {  // try next group
      assert(pBlock != NULL);
H
Haojun Liao 已提交
3432
      doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, pTaskInfo);
3433 3434
      if (pResBlock->info.rows > pResultInfo->threshold) {
        return pResBlock;
3435 3436 3437 3438 3439 3440 3441
      }
    } else {
      return NULL;
    }
  }
}

S
slzhou 已提交
3442 3443 3444 3445 3446 3447 3448 3449
static SSDataBlock* doFill(SOperatorInfo* pOperator) {
  SFillOperatorInfo* pInfo = pOperator->info;
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

S
slzhou 已提交
3450
  SSDataBlock* fillResult = NULL;
S
slzhou 已提交
3451
  while (true) {
S
slzhou 已提交
3452
    fillResult = doFillImpl(pOperator);
S
slzhou 已提交
3453 3454 3455 3456 3457 3458 3459 3460 3461 3462 3463 3464 3465 3466
    if (fillResult != NULL) {
      doFilter(pInfo->pCondition, fillResult);
    }

    if (fillResult == NULL) {
      doSetOperatorCompleted(pOperator);
      break;
    }

    if (fillResult->info.rows > 0) {
      break;
    }
  }

S
slzhou 已提交
3467 3468 3469 3470
  if (fillResult != NULL) {
    size_t rows = fillResult->info.rows;
    pOperator->resultInfo.totalRows += rows;
  }
S
slzhou 已提交
3471

S
slzhou 已提交
3472
  return fillResult;
S
slzhou 已提交
3473 3474
}

H
Haojun Liao 已提交
3475 3476 3477
static void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
  for (int32_t i = 0; i < numOfExprs; ++i) {
    SExprInfo* pExprInfo = &pExpr[i];
H
Haojun Liao 已提交
3478 3479 3480 3481
    for(int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
        taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
      }
H
Haojun Liao 已提交
3482
    }
H
Haojun Liao 已提交
3483

H
Haojun Liao 已提交
3484
    taosMemoryFree(pExprInfo->base.pParam);
H
Haojun Liao 已提交
3485 3486 3487 3488
    taosMemoryFree(pExprInfo->pExpr);
  }
}

3489 3490 3491 3492 3493
static void destroyOperatorInfo(SOperatorInfo* pOperator) {
  if (pOperator == NULL) {
    return;
  }

3494
  if (pOperator->fpSet.closeFn != NULL) {
3495
    pOperator->fpSet.closeFn(pOperator->info, pOperator->exprSupp.numOfExprs);
3496 3497
  }

H
Haojun Liao 已提交
3498
  if (pOperator->pDownstream != NULL) {
L
Liu Jicong 已提交
3499
    for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
H
Haojun Liao 已提交
3500
      destroyOperatorInfo(pOperator->pDownstream[i]);
3501 3502
    }

wafwerar's avatar
wafwerar 已提交
3503
    taosMemoryFreeClear(pOperator->pDownstream);
H
Haojun Liao 已提交
3504
    pOperator->numOfDownstream = 0;
3505 3506
  }

3507
  cleanupExprSupp(&pOperator->exprSupp);
wafwerar's avatar
wafwerar 已提交
3508
  taosMemoryFreeClear(pOperator);
3509 3510
}

3511 3512 3513 3514 3515 3516 3517 3518 3519 3520 3521 3522 3523 3524 3525
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) {
  *defaultPgsz = 4096;
  while (*defaultPgsz < rowSize * 4) {
    *defaultPgsz <<= 1u;
  }

  // at least four pages need to be in buffer
  *defaultBufsz = 4096 * 256;
  if ((*defaultBufsz) <= (*defaultPgsz)) {
    (*defaultBufsz) = (*defaultPgsz) * 4;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
3526 3527
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                         const char* pKey) {
3528 3529
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);

dengyihao's avatar
dengyihao 已提交
3530 3531
  pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
  pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
3532 3533
  pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);

H
Haojun Liao 已提交
3534
  if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
3535 3536 3537
    return TSDB_CODE_OUT_OF_MEMORY;
  }

dengyihao's avatar
dengyihao 已提交
3538
  uint32_t defaultPgsz = 0;
3539 3540
  uint32_t defaultBufsz = 0;
  getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
H
Haojun Liao 已提交
3541

3542
  int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, TD_TMP_DIR_PATH);
H
Haojun Liao 已提交
3543 3544 3545 3546
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

3547 3548 3549
  return TSDB_CODE_SUCCESS;
}

3550
void cleanupAggSup(SAggSupporter* pAggSup) {
wafwerar's avatar
wafwerar 已提交
3551
  taosMemoryFreeClear(pAggSup->keyBuf);
3552
  taosHashCleanup(pAggSup->pResultRowHashTable);
H
Haojun Liao 已提交
3553
  destroyDiskbasedBuf(pAggSup->pResultBuf);
3554 3555
}

L
Liu Jicong 已提交
3556 3557
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
                    const char* pkey) {
3558 3559 3560 3561 3562
  int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

3563
  doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
L
Liu Jicong 已提交
3564
  for (int32_t i = 0; i < numOfCols; ++i) {
3565
    pSup->pCtx[i].pBuf = pAggSup->pResultBuf;
3566 3567
  }

3568
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
3569 3570
}

3571
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) {
wmmhello's avatar
wmmhello 已提交
3572
  ASSERT(numOfRows != 0);
3573 3574 3575 3576
  pOperator->resultInfo.capacity = numOfRows;
  pOperator->resultInfo.threshold = numOfRows * 0.75;

  if (pOperator->resultInfo.threshold == 0) {
wmmhello's avatar
wmmhello 已提交
3577
    pOperator->resultInfo.threshold = numOfRows;
3578 3579 3580
  }
}

3581 3582 3583 3584 3585
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
  pInfo->pRes = pBlock;
  initResultRowInfo(&pInfo->resultRowInfo);
}

3586 3587 3588 3589 3590 3591 3592 3593 3594 3595 3596 3597 3598 3599 3600 3601 3602 3603 3604
static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
  if (pCtx == NULL) {
    return NULL;
  }

  for (int32_t i = 0; i < numOfOutput; ++i) {
    for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) {
      taosVariantDestroy(&pCtx[i].param[j].param);
    }

    taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
    taosMemoryFree(pCtx[i].input.pData);
    taosMemoryFree(pCtx[i].input.pColumnDataAgg);
  }

  taosMemoryFreeClear(pCtx);
  return NULL;
}

3605
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) {
3606 3607 3608 3609
  pSup->pExprInfo = pExprInfo;
  pSup->numOfExprs = numOfExpr;
  if (pSup->pExprInfo != NULL) {
    pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset);
3610 3611 3612
    if (pSup->pCtx == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
3613
  }
3614 3615

  return TSDB_CODE_SUCCESS;
3616 3617
}

3618 3619 3620 3621 3622 3623 3624 3625 3626 3627
void cleanupExprSupp(SExprSupp* pSupp) {
  destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs);
  if (pSupp->pExprInfo != NULL) {
    destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
  }

  taosMemoryFreeClear(pSupp->pExprInfo);
  taosMemoryFree(pSupp->rowEntryInfoOffset);
}

L
Liu Jicong 已提交
3628
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
S
slzhou 已提交
3629
                                           SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo,
wmmhello's avatar
wmmhello 已提交
3630
                                           int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) {
wafwerar's avatar
wafwerar 已提交
3631
  SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
L
Liu Jicong 已提交
3632
  SOperatorInfo*    pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
3633 3634 3635
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
H
Haojun Liao 已提交
3636

3637
  int32_t numOfRows = 1024;
dengyihao's avatar
dengyihao 已提交
3638
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
3639 3640

  initResultSizeInfo(pOperator, numOfRows);
3641
  int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
L
Liu Jicong 已提交
3642
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3643 3644
    goto _error;
  }
H
Haojun Liao 已提交
3645

3646
  initBasicInfo(&pInfo->binfo, pResultBlock);
3647 3648 3649 3650
  code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3651

L
Liu Jicong 已提交
3652
  pInfo->groupId = INT32_MIN;
S
slzhou 已提交
3653
  pInfo->pCondition = pCondition;
dengyihao's avatar
dengyihao 已提交
3654
  pOperator->name = "TableAggregate";
3655
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
3656
  pOperator->blocking = true;
dengyihao's avatar
dengyihao 已提交
3657 3658 3659
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
3660

3661 3662
  pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo,
                                         aggEncodeResultRow, aggDecodeResultRow, NULL);
H
Haojun Liao 已提交
3663 3664 3665 3666 3667

  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3668 3669

  return pOperator;
L
Liu Jicong 已提交
3670
_error:
H
Haojun Liao 已提交
3671
  destroyAggOperatorInfo(pInfo, numOfCols);
wafwerar's avatar
wafwerar 已提交
3672 3673
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
H
Haojun Liao 已提交
3674 3675
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
3676 3677
}

3678
void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
3679 3680
  assert(pInfo != NULL);
  cleanupResultRowInfo(&pInfo->resultRowInfo);
H
Haojun Liao 已提交
3681
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
3682 3683
}

H
Haojun Liao 已提交
3684
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
3685
  SOptrBasicInfo* pInfo = (SOptrBasicInfo*)param;
3686
  cleanupBasicInfo(pInfo);
L
Liu Jicong 已提交
3687

D
dapan1121 已提交
3688
  taosMemoryFreeClear(param);
3689
}
H
Haojun Liao 已提交
3690

H
Haojun Liao 已提交
3691 3692 3693 3694 3695 3696 3697 3698

static void freeItem(void* pItem) {
  void** p = pItem;
  if (*p != NULL) {
    taosMemoryFreeClear(*p);
  }
}

H
Haojun Liao 已提交
3699
void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
3700
  SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
L
Liu Jicong 已提交
3701 3702
  cleanupBasicInfo(&pInfo->binfo);

H
Haojun Liao 已提交
3703 3704
  cleanupAggSup(&pInfo->aggSup);
  taosArrayDestroyEx(pInfo->groupResInfo.pRows, freeItem);
D
dapan1121 已提交
3705
  taosMemoryFreeClear(param);
3706
}
3707

H
Haojun Liao 已提交
3708
void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
3709
  SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param;
3710
  pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
H
Haojun Liao 已提交
3711
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
wafwerar's avatar
wafwerar 已提交
3712
  taosMemoryFreeClear(pInfo->p);
L
Liu Jicong 已提交
3713

D
dapan1121 已提交
3714
  taosMemoryFreeClear(param);
3715 3716
}

H
Haojun Liao 已提交
3717
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
D
fix bug  
dapan 已提交
3718 3719 3720
  if (NULL == param) {
    return;
  }
L
Liu Jicong 已提交
3721
  SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*)param;
3722
  cleanupBasicInfo(&pInfo->binfo);
H
Haojun Liao 已提交
3723
  cleanupAggSup(&pInfo->aggSup);
H
Haojun Liao 已提交
3724
  taosArrayDestroy(pInfo->pPseudoColInfo);
L
Liu Jicong 已提交
3725

D
dapan1121 已提交
3726
  taosMemoryFreeClear(param);
3727 3728
}

H
Haojun Liao 已提交
3729
static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) {
3730
  SIndefOperatorInfo* pInfo = (SIndefOperatorInfo*)param;
3731
  cleanupBasicInfo(&pInfo->binfo);
H
Haojun Liao 已提交
3732 3733 3734

  taosArrayDestroy(pInfo->pPseudoColInfo);
  cleanupAggSup(&pInfo->aggSup);
3735
  cleanupExprSupp(&pInfo->scalarSup);
L
Liu Jicong 已提交
3736

D
dapan1121 已提交
3737
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
3738 3739
}

H
Haojun Liao 已提交
3740
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
3741
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
3742 3743 3744 3745
  taosRemoveRef(exchangeObjRefPool, pExInfo->self);
}

void doDestroyExchangeOperatorInfo(void* param) {
X
Xiaoyu Wang 已提交
3746
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
3747

H
Haojun Liao 已提交
3748 3749 3750
  taosArrayDestroy(pExInfo->pSources);
  taosArrayDestroy(pExInfo->pSourceDataInfo);
  if (pExInfo->pResult != NULL) {
H
Haojun Liao 已提交
3751
    pExInfo->pResult = blockDataDestroy(pExInfo->pResult);
H
Haojun Liao 已提交
3752 3753 3754
  }

  tsem_destroy(&pExInfo->ready);
L
Liu Jicong 已提交
3755

D
dapan1121 已提交
3756
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
3757 3758
}

H
Haojun Liao 已提交
3759 3760
static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols) {
  SArray* pList = taosArrayInit(4, sizeof(int32_t));
dengyihao's avatar
dengyihao 已提交
3761
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
3762 3763 3764 3765 3766 3767 3768 3769
    if (fmIsPseudoColumnFunc(pCtx[i].functionId)) {
      taosArrayPush(pList, &i);
    }
  }

  return pList;
}

3770 3771 3772 3773
static int64_t getLimit(SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; }

static int64_t getOffset(SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; }

L
Liu Jicong 已提交
3774
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode,
dengyihao's avatar
dengyihao 已提交
3775
                                         SExecTaskInfo* pTaskInfo) {
wafwerar's avatar
wafwerar 已提交
3776
  SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
L
Liu Jicong 已提交
3777
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
3778 3779 3780
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
3781

L
Liu Jicong 已提交
3782
  int32_t    numOfCols = 0;
3783 3784 3785
  SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &numOfCols);

  SSDataBlock* pResBlock = createResDataBlock(pProjPhyNode->node.pOutputDataBlockDesc);
3786 3787
  SLimit       limit = {.limit = getLimit(pProjPhyNode->node.pLimit), .offset = getOffset(pProjPhyNode->node.pLimit)};
  SLimit slimit = {.limit = getLimit(pProjPhyNode->node.pSlimit), .offset = getOffset(pProjPhyNode->node.pSlimit)};
3788

L
Liu Jicong 已提交
3789 3790 3791 3792
  pInfo->limit = limit;
  pInfo->slimit = slimit;
  pInfo->curOffset = limit.offset;
  pInfo->curSOffset = slimit.offset;
H
Haojun Liao 已提交
3793
  pInfo->binfo.pRes = pResBlock;
3794
  pInfo->pFilterNode = pProjPhyNode->node.pConditions;
H
Haojun Liao 已提交
3795 3796

  int32_t numOfRows = 4096;
dengyihao's avatar
dengyihao 已提交
3797
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
3798

3799 3800 3801 3802 3803
  // Make sure the size of SSDataBlock will never exceed the size of 2MB.
  int32_t TWOMB = 2 * 1024 * 1024;
  if (numOfRows * pResBlock->info.rowSize > TWOMB) {
    numOfRows = TWOMB / pResBlock->info.rowSize;
  }
3804
  initResultSizeInfo(pOperator, numOfRows);
3805

3806 3807
  initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
  initBasicInfo(&pInfo->binfo, pResBlock);
3808
  setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols);
3809

3810
  pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols);
X
Xiaoyu Wang 已提交
3811
  pOperator->name = "ProjectOperator";
H
Haojun Liao 已提交
3812
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
X
Xiaoyu Wang 已提交
3813 3814 3815 3816
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
3817

L
Liu Jicong 已提交
3818 3819
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL,
                                         destroyProjectOperatorInfo, NULL, NULL, NULL);
L
Liu Jicong 已提交
3820

3821
  int32_t code = appendDownstream(pOperator, &downstream, 1);
H
Haojun Liao 已提交
3822
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3823 3824
    goto _error;
  }
3825 3826

  return pOperator;
H
Haojun Liao 已提交
3827

L
Liu Jicong 已提交
3828
_error:
H
Haojun Liao 已提交
3829 3830
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
3831 3832
}

3833 3834
static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOperatorInfo* downstream,
                              SExecTaskInfo* pTaskInfo) {
3835 3836 3837 3838 3839 3840 3841 3842 3843 3844 3845 3846 3847 3848 3849 3850 3851 3852 3853 3854 3855 3856 3857 3858 3859 3860 3861 3862 3863 3864 3865 3866 3867
  int32_t order = 0;
  int32_t scanFlag = 0;

  SIndefOperatorInfo* pIndefInfo = pOperator->info;
  SOptrBasicInfo*     pInfo = &pIndefInfo->binfo;
  SExprSupp*          pSup = &pOperator->exprSupp;

  // the pDataBlock are always the same one, no need to call this again
  int32_t code = getTableScanInfo(downstream, &order, &scanFlag);
  if (code != TSDB_CODE_SUCCESS) {
    longjmp(pTaskInfo->env, code);
  }

  // there is an scalar expression that needs to be calculated before apply the group aggregation.
  SExprSupp* pScalarSup = &pIndefInfo->scalarSup;
  if (pScalarSup->pExprInfo != NULL) {
    code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
                                 pIndefInfo->pPseudoColInfo);
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, code);
    }
  }

  setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
  blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);

  code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
                               pIndefInfo->pPseudoColInfo);
  if (code != TSDB_CODE_SUCCESS) {
    longjmp(pTaskInfo->env, code);
  }
}

H
Haojun Liao 已提交
3868 3869
static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
  SIndefOperatorInfo* pIndefInfo = pOperator->info;
3870
  SOptrBasicInfo*     pInfo = &pIndefInfo->binfo;
L
Liu Jicong 已提交
3871
  SExprSupp*          pSup = &pOperator->exprSupp;
H
Haojun Liao 已提交
3872 3873 3874 3875 3876 3877 3878 3879 3880 3881 3882 3883 3884 3885 3886 3887 3888

  SSDataBlock* pRes = pInfo->pRes;
  blockDataCleanup(pRes);

  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  int64_t st = 0;

  if (pOperator->cost.openCost == 0) {
    st = taosGetTimestampUs();
  }

  SOperatorInfo* downstream = pOperator->pDownstream[0];

3889
  while (1) {
3890
    // here we need to handle the existsed group results
3891
    if (pIndefInfo->pNextGroupRes != NULL) {  // todo extract method
3892 3893
      for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
        SqlFunctionCtx* pCtx = &pSup->pCtx[k];
H
Haojun Liao 已提交
3894

3895 3896 3897 3898 3899 3900 3901
        SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
        pResInfo->initialized = false;
        pCtx->pOutput = NULL;
      }

      doHandleDataBlock(pOperator, pIndefInfo->pNextGroupRes, downstream, pTaskInfo);
      pIndefInfo->pNextGroupRes = NULL;
H
Haojun Liao 已提交
3902 3903
    }

3904 3905 3906 3907 3908 3909 3910 3911 3912 3913 3914 3915 3916 3917 3918 3919 3920 3921 3922 3923 3924 3925 3926
    if (pInfo->pRes->info.rows < pOperator->resultInfo.threshold) {
      while (1) {
        // The downstream exec may change the value of the newgroup, so use a local variable instead.
        SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
        if (pBlock == NULL) {
          doSetOperatorCompleted(pOperator);
          break;
        }

        if (pIndefInfo->groupId == 0 && pBlock->info.groupId != 0) {
          pIndefInfo->groupId = pBlock->info.groupId;  // this is the initial group result
        } else {
          if (pIndefInfo->groupId != pBlock->info.groupId) {  // reset output buffer and computing status
            pIndefInfo->groupId = pBlock->info.groupId;
            pIndefInfo->pNextGroupRes = pBlock;
            break;
          }
        }

        doHandleDataBlock(pOperator, pBlock, downstream, pTaskInfo);
        if (pInfo->pRes->info.rows >= pOperator->resultInfo.threshold) {
          break;
        }
H
Haojun Liao 已提交
3927 3928 3929
      }
    }

3930 3931 3932 3933
    doFilter(pIndefInfo->pCondition, pInfo->pRes);
    size_t rows = pInfo->pRes->info.rows;
    if (rows >= 0) {
      break;
H
Haojun Liao 已提交
3934 3935 3936 3937 3938 3939 3940 3941 3942 3943 3944 3945 3946
    }
  }

  size_t rows = pInfo->pRes->info.rows;
  pOperator->resultInfo.totalRows += rows;

  if (pOperator->cost.openCost == 0) {
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
  }

  return (rows > 0) ? pInfo->pRes : NULL;
}

3947 3948
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode,
                                                 SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
3949
  SIndefOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIndefOperatorInfo));
3950
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
3951 3952 3953 3954
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

3955 3956
  SExprSupp* pSup = &pOperator->exprSupp;

H
Haojun Liao 已提交
3957 3958 3959
  SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode;

  int32_t    numOfExpr = 0;
X
Xiaoyu Wang 已提交
3960
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->pFuncs, NULL, &numOfExpr);
H
Haojun Liao 已提交
3961 3962

  if (pPhyNode->pExprs != NULL) {
3963
    int32_t    num = 0;
3964
    SExprInfo* pSExpr = createExprInfo(pPhyNode->pExprs, NULL, &num);
3965
    int32_t    code = initExprSupp(&pInfo->scalarSup, pSExpr, num);
3966 3967 3968
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
H
Haojun Liao 已提交
3969 3970
  }

3971
  SSDataBlock* pResBlock = createResDataBlock(pPhyNode->node.pOutputDataBlockDesc);
H
Haojun Liao 已提交
3972 3973 3974 3975 3976 3977 3978 3979 3980

  int32_t numOfRows = 4096;
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;

  // Make sure the size of SSDataBlock will never exceed the size of 2MB.
  int32_t TWOMB = 2 * 1024 * 1024;
  if (numOfRows * pResBlock->info.rowSize > TWOMB) {
    numOfRows = TWOMB / pResBlock->info.rowSize;
  }
3981

H
Haojun Liao 已提交
3982 3983
  initResultSizeInfo(pOperator, numOfRows);

3984
  initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str);
3985 3986
  initBasicInfo(&pInfo->binfo, pResBlock);

3987
  setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
H
Haojun Liao 已提交
3988

3989 3990 3991
  pInfo->binfo.pRes = pResBlock;
  pInfo->pCondition = pPhyNode->node.pConditions;
  pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
H
Haojun Liao 已提交
3992

3993
  pOperator->name = "IndefinitOperator";
3994
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC;
3995 3996 3997
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
3998
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
3999 4000 4001 4002 4003 4004 4005 4006 4007 4008 4009

  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL,
                                         destroyIndefinitOperatorInfo, NULL, NULL, NULL);

  int32_t code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

4010
_error:
H
Haojun Liao 已提交
4011 4012 4013 4014 4015 4016
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}

4017
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SNodeListNode* pValNode,
L
Liu Jicong 已提交
4018
                            STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) {
4019
  SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pValNode);
H
Haojun Liao 已提交
4020 4021

  STimeWindow w = TSWINDOW_INITIALIZER;
4022
  getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey, &w);
4023
  w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC);
H
Haojun Liao 已提交
4024 4025

  int32_t order = TSDB_ORDER_ASC;
4026 4027
  pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval,
      fillType, pColInfo, pInfo->primaryTsCol, id);
H
Haojun Liao 已提交
4028

4029
  pInfo->win = win;
L
Liu Jicong 已提交
4030
  pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
4031

H
Haojun Liao 已提交
4032
  if (pInfo->pFillInfo == NULL || pInfo->p == NULL) {
H
Haojun Liao 已提交
4033 4034
    taosMemoryFree(pInfo->pFillInfo);
    taosMemoryFree(pInfo->p);
H
Haojun Liao 已提交
4035 4036 4037 4038 4039 4040
    return TSDB_CODE_OUT_OF_MEMORY;
  } else {
    return TSDB_CODE_SUCCESS;
  }
}

H
Haojun Liao 已提交
4041
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo) {
4042 4043 4044 4045 4046 4047
  SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

L
Liu Jicong 已提交
4048 4049 4050
  int32_t      num = 0;
  SSDataBlock* pResBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc);
  SExprInfo*   pExprInfo = createExprInfo(pPhyFillNode->pTargets, NULL, &num);
4051 4052 4053 4054
  SInterval*   pInterval =
      QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
            ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
            : &((SIntervalAggOperatorInfo*)downstream->info)->interval;
4055

4056
  int32_t type = convertFillType(pPhyFillNode->mode);
4057

H
Haojun Liao 已提交
4058
  SResultInfo* pResultInfo = &pOperator->resultInfo;
4059
  initResultSizeInfo(pOperator, 4096);
H
Haojun Liao 已提交
4060
  pInfo->primaryTsCol = ((SColumnNode*)pPhyFillNode->pWStartTs)->slotId;
4061

4062
  int32_t numOfOutputCols = 0;
4063 4064
  SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pTargets, pPhyFillNode->node.pOutputDataBlockDesc,
                                                 &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
4065

4066 4067
  int32_t code = initFillInfo(pInfo, pExprInfo, num, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange,
                              pResultInfo->capacity, pTaskInfo->id.str, pInterval, type);
4068 4069 4070
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
4071

4072 4073 4074 4075 4076 4077 4078 4079
  pInfo->pRes = pResBlock;
  pInfo->pCondition = pPhyFillNode->node.pConditions;
  pInfo->pColMatchColInfo = pColMatchColInfo;
  pOperator->name = "FillOperator";
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
  pOperator->exprSupp.pExprInfo = pExprInfo;
4080
  pOperator->exprSupp.numOfExprs = num;
4081 4082
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
4083

L
Liu Jicong 已提交
4084 4085
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL);
4086

4087
  code = appendDownstream(pOperator, &downstream, 1);
4088
  return pOperator;
H
Haojun Liao 已提交
4089

L
Liu Jicong 已提交
4090
_error:
wafwerar's avatar
wafwerar 已提交
4091 4092
  taosMemoryFreeClear(pOperator);
  taosMemoryFreeClear(pInfo);
H
Haojun Liao 已提交
4093
  return NULL;
4094 4095
}

D
dapan1121 已提交
4096
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model, char* dbFName) {
wafwerar's avatar
wafwerar 已提交
4097
  SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
4098
  setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
4099

D
dapan1121 已提交
4100
  pTaskInfo->schemaVer.dbname = strdup(dbFName);
4101
  pTaskInfo->cost.created = taosGetTimestampMs();
H
Haojun Liao 已提交
4102
  pTaskInfo->id.queryId = queryId;
dengyihao's avatar
dengyihao 已提交
4103
  pTaskInfo->execModel = model;
H
Haojun Liao 已提交
4104

wafwerar's avatar
wafwerar 已提交
4105
  char* p = taosMemoryCalloc(1, 128);
L
Liu Jicong 已提交
4106
  snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId);
H
Haojun Liao 已提交
4107
  pTaskInfo->id.str = p;
H
Haojun Liao 已提交
4108

4109 4110
  return pTaskInfo;
}
H
Haojun Liao 已提交
4111

H
Hongze Cheng 已提交
4112
static STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
H
Haojun Liao 已提交
4113
                                       STableListInfo* pTableListInfo, const char* idstr);
H
Haojun Liao 已提交
4114

H
Haojun Liao 已提交
4115
static SArray* extractColumnInfo(SNodeList* pNodeList);
4116

4117
int32_t extractTableSchemaInfo(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) {
4118 4119
  SMetaReader mr = {0};
  metaReaderInit(&mr, pHandle->meta, 0);
D
dapan1121 已提交
4120
  int32_t code = metaGetTableEntryByUid(&mr, uid);
4121
  if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
4122
    metaReaderClear(&mr);
4123
    return terrno;
D
dapan1121 已提交
4124
  }
4125 4126 4127 4128

  pTaskInfo->schemaVer.tablename = strdup(mr.me.name);

  if (mr.me.type == TSDB_SUPER_TABLE) {
4129
    pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
4130
    pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version;
4131
  } else if (mr.me.type == TSDB_CHILD_TABLE) {
4132 4133
    tDecoderClear(&mr.coder);

4134 4135
    tb_uid_t suid = mr.me.ctbEntry.suid;
    metaGetTableEntryByUid(&mr, suid);
4136
    pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
4137
    pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version;
4138
  } else {
4139
    pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
4140
  }
4141 4142

  metaReaderClear(&mr);
D
dapan1121 已提交
4143
  return TSDB_CODE_SUCCESS;
4144 4145
}

4146 4147 4148 4149 4150 4151 4152 4153 4154 4155 4156
static void cleanupTableSchemaInfo(SExecTaskInfo* pTaskInfo) {
  taosMemoryFreeClear(pTaskInfo->schemaVer.dbname);
  if (pTaskInfo->schemaVer.sw == NULL) {
    return;
  }

  taosMemoryFree(pTaskInfo->schemaVer.sw->pSchema);
  taosMemoryFree(pTaskInfo->schemaVer.sw);
  taosMemoryFree(pTaskInfo->schemaVer.tablename);
}

4157
static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum) {
wmmhello's avatar
wmmhello 已提交
4158
  taosArrayClear(pTableListInfo->pGroupList);
4159 4160
  SArray* sortSupport = taosArrayInit(groupNum, sizeof(uint64_t));
  if (sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
4161 4162
  for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
4163
    uint64_t*      groupId = taosHashGet(pTableListInfo->map, &info->uid, sizeof(uint64_t));
wmmhello's avatar
wmmhello 已提交
4164 4165

    int32_t index = taosArraySearchIdx(sortSupport, groupId, compareUint64Val, TD_EQ);
4166 4167 4168 4169
    if (index == -1) {
      void*   p = taosArraySearch(sortSupport, groupId, compareUint64Val, TD_GT);
      SArray* tGroup = taosArrayInit(8, sizeof(STableKeyInfo));
      if (tGroup == NULL) {
wmmhello's avatar
wmmhello 已提交
4170 4171 4172
        taosArrayDestroy(sortSupport);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
4173
      if (taosArrayPush(tGroup, info) == NULL) {
wmmhello's avatar
wmmhello 已提交
4174 4175 4176 4177
        qError("taos push info array error");
        taosArrayDestroy(sortSupport);
        return TSDB_CODE_QRY_APP_ERROR;
      }
4178
      if (p == NULL) {
wmmhello's avatar
wmmhello 已提交
4179
        if (taosArrayPush(sortSupport, groupId) == NULL) {
wmmhello's avatar
wmmhello 已提交
4180 4181 4182 4183
          qError("taos push support array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
wmmhello's avatar
wmmhello 已提交
4184
        if (taosArrayPush(pTableListInfo->pGroupList, &tGroup) == NULL) {
wmmhello's avatar
wmmhello 已提交
4185 4186 4187 4188
          qError("taos push group array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
4189
      } else {
wmmhello's avatar
wmmhello 已提交
4190
        int32_t pos = TARRAY_ELEM_IDX(sortSupport, p);
4191
        if (taosArrayInsert(sortSupport, pos, groupId) == NULL) {
wmmhello's avatar
wmmhello 已提交
4192 4193 4194 4195
          qError("taos insert support array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
4196
        if (taosArrayInsert(pTableListInfo->pGroupList, pos, &tGroup) == NULL) {
wmmhello's avatar
wmmhello 已提交
4197 4198 4199 4200 4201
          qError("taos insert group array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
      }
4202
    } else {
wmmhello's avatar
wmmhello 已提交
4203
      SArray* tGroup = (SArray*)taosArrayGetP(pTableListInfo->pGroupList, index);
4204
      if (taosArrayPush(tGroup, info) == NULL) {
wmmhello's avatar
wmmhello 已提交
4205 4206 4207 4208 4209 4210 4211 4212 4213 4214
        qError("taos push uid array error");
        taosArrayDestroy(sortSupport);
        return TSDB_CODE_QRY_APP_ERROR;
      }
    }
  }
  taosArrayDestroy(sortSupport);
  return TDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
4215 4216
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group) {
  if (group == NULL) {
wmmhello's avatar
wmmhello 已提交
4217 4218 4219 4220 4221 4222 4223 4224
    return TDB_CODE_SUCCESS;
  }

  pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (pTableListInfo->map == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  int32_t keyLen = 0;
X
Xiaoyu Wang 已提交
4225
  void*   keyBuf = NULL;
wmmhello's avatar
wmmhello 已提交
4226

4227
  SNode* node;
wmmhello's avatar
wmmhello 已提交
4228
  FOREACH(node, group) {
4229
    SExprNode* pExpr = (SExprNode*)node;
wmmhello's avatar
wmmhello 已提交
4230
    keyLen += pExpr->resType.bytes;
wmmhello's avatar
wmmhello 已提交
4231 4232
  }

wmmhello's avatar
wmmhello 已提交
4233
  int32_t nullFlagSize = sizeof(int8_t) * LIST_LENGTH(group);
wmmhello's avatar
wmmhello 已提交
4234 4235 4236 4237 4238 4239 4240
  keyLen += nullFlagSize;

  keyBuf = taosMemoryCalloc(1, keyLen);
  if (keyBuf == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

4241
  int32_t groupNum = 0;
X
Xiaoyu Wang 已提交
4242 4243 4244
  for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
    SMetaReader    mr = {0};
wmmhello's avatar
wmmhello 已提交
4245 4246 4247
    metaReaderInit(&mr, pHandle->meta, 0);
    metaGetTableEntryByUid(&mr, info->uid);

4248
    SNodeList* groupNew = nodesCloneList(group);
wmmhello's avatar
wmmhello 已提交
4249

wmmhello's avatar
wmmhello 已提交
4250
    nodesRewriteExprsPostOrder(groupNew, doTranslateTagExpr, &mr);
wmmhello's avatar
wmmhello 已提交
4251
    char* isNull = (char*)keyBuf;
wmmhello's avatar
wmmhello 已提交
4252 4253
    char* pStart = (char*)keyBuf + nullFlagSize;

4254
    SNode*  pNode;
wmmhello's avatar
wmmhello 已提交
4255
    int32_t index = 0;
4256
    FOREACH(pNode, groupNew) {
wmmhello's avatar
wmmhello 已提交
4257 4258 4259 4260
      SNode*  pNew = NULL;
      int32_t code = scalarCalculateConstants(pNode, &pNew);
      if (TSDB_CODE_SUCCESS == code) {
        REPLACE_NODE(pNew);
X
Xiaoyu Wang 已提交
4261
      } else {
4262
        taosMemoryFree(keyBuf);
wmmhello's avatar
wmmhello 已提交
4263
        nodesClearList(groupNew);
4264
        metaReaderClear(&mr);
wmmhello's avatar
wmmhello 已提交
4265
        return code;
wmmhello's avatar
wmmhello 已提交
4266
      }
4267

wmmhello's avatar
wmmhello 已提交
4268
      ASSERT(nodeType(pNew) == QUERY_NODE_VALUE);
4269
      SValueNode* pValue = (SValueNode*)pNew;
4270

wmmhello's avatar
wmmhello 已提交
4271
      if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL || pValue->isNull) {
wmmhello's avatar
wmmhello 已提交
4272 4273 4274 4275
        isNull[index++] = 1;
        continue;
      } else {
        isNull[index++] = 0;
4276
        char* data = nodesGetValueFromNode(pValue);
L
Liu Jicong 已提交
4277 4278
        if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) {
          if (tTagIsJson(data)) {
wmmhello's avatar
wmmhello 已提交
4279 4280 4281
            terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
            taosMemoryFree(keyBuf);
            nodesClearList(groupNew);
4282
            metaReaderClear(&mr);
wmmhello's avatar
wmmhello 已提交
4283 4284
            return terrno;
          }
wmmhello's avatar
wmmhello 已提交
4285
          int32_t len = getJsonValueLen(data);
wmmhello's avatar
wmmhello 已提交
4286 4287 4288
          memcpy(pStart, data, len);
          pStart += len;
        } else if (IS_VAR_DATA_TYPE(pValue->node.resType.type)) {
wmmhello's avatar
wmmhello 已提交
4289 4290
          memcpy(pStart, data, varDataTLen(data));
          pStart += varDataTLen(data);
wmmhello's avatar
wmmhello 已提交
4291
        } else {
wmmhello's avatar
wmmhello 已提交
4292 4293
          memcpy(pStart, data, pValue->node.resType.bytes);
          pStart += pValue->node.resType.bytes;
wmmhello's avatar
wmmhello 已提交
4294 4295 4296
        }
      }
    }
4297

4298
    int32_t  len = (int32_t)(pStart - (char*)keyBuf);
4299 4300
    uint64_t groupId = calcGroupId(keyBuf, len);
    taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &groupId, sizeof(uint64_t));
S
slzhou 已提交
4301
    info->groupId = groupId;
4302
    groupNum++;
wmmhello's avatar
wmmhello 已提交
4303

wmmhello's avatar
wmmhello 已提交
4304
    nodesClearList(groupNew);
wmmhello's avatar
wmmhello 已提交
4305 4306 4307
    metaReaderClear(&mr);
  }
  taosMemoryFree(keyBuf);
4308

4309
  if (pTableListInfo->needSortTableByGroupId) {
wmmhello's avatar
wmmhello 已提交
4310
    return sortTableGroup(pTableListInfo, groupNum);
4311 4312
  }

wmmhello's avatar
wmmhello 已提交
4313 4314 4315
  return TDB_CODE_SUCCESS;
}

4316 4317 4318 4319 4320 4321 4322 4323 4324 4325 4326 4327 4328 4329 4330 4331 4332 4333 4334 4335 4336 4337 4338 4339
static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) {
  memset(pCond, 0, sizeof(SQueryTableDataCond));

  pCond->order = TSDB_ORDER_ASC;
  pCond->numOfCols = 1;
  pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
  if (pCond->colList == NULL) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return terrno;
  }

  pCond->colList->colId = 1;
  pCond->colList->type = TSDB_DATA_TYPE_TIMESTAMP;
  pCond->colList->bytes = sizeof(TSKEY);

  pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
  pCond->suid = uid;
  pCond->type = BLOCK_LOAD_OFFSET_ORDER;
  pCond->startVersion = -1;
  pCond->endVersion  =  -1;

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
4340
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
4341 4342
                                  uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo,
                                  const char* pUser) {
4343 4344
  int32_t type = nodeType(pPhyNode);

X
Xiaoyu Wang 已提交
4345
  if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
H
Haojun Liao 已提交
4346
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
dengyihao's avatar
dengyihao 已提交
4347
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
4348

4349 4350
      int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags,
                                             pTableScanNode->groupSort, pHandle, pTableListInfo, queryId, taskId);
4351
      if (code) {
wmmhello's avatar
wmmhello 已提交
4352
        pTaskInfo->code = code;
D
dapan1121 已提交
4353 4354
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
4355

4356
      code = extractTableSchemaInfo(pHandle, pTableScanNode->scan.uid, pTaskInfo);
S
slzhou 已提交
4357
      if (code) {
4358
        pTaskInfo->code = terrno;
wmmhello's avatar
wmmhello 已提交
4359 4360 4361
        return NULL;
      }

H
Haojun Liao 已提交
4362
      SOperatorInfo*  pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
4363 4364
      STableScanInfo* pScanInfo = pOperator->info;
      pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
S
slzhou 已提交
4365
      return pOperator;
L
Liu Jicong 已提交
4366

S
slzhou 已提交
4367 4368
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
      STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
4369 4370
      int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags,
                                             pTableScanNode->groupSort, pHandle, pTableListInfo, queryId, taskId);
L
Liu Jicong 已提交
4371
      if (code) {
wmmhello's avatar
wmmhello 已提交
4372
        pTaskInfo->code = code;
wmmhello's avatar
wmmhello 已提交
4373 4374
        return NULL;
      }
4375

4376
      code = extractTableSchemaInfo(pHandle, pTableScanNode->scan.uid, pTaskInfo);
wmmhello's avatar
wmmhello 已提交
4377 4378 4379 4380
      if (code) {
        pTaskInfo->code = terrno;
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
4381

4382 4383
      SOperatorInfo* pOperator =
          createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo, queryId, taskId);
wmmhello's avatar
wmmhello 已提交
4384

4385 4386 4387
      STableScanInfo* pScanInfo = pOperator->info;
      pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
      return pOperator;
L
Liu Jicong 已提交
4388

H
Haojun Liao 已提交
4389
    } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
4390
      return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
4391
    } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
5
54liuyao 已提交
4392
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
4393
      STimeWindowAggSupp   twSup = {
L
Liu Jicong 已提交
4394 4395 4396 4397
            .waterMark = pTableScanNode->watermark,
            .calTrigger = pTableScanNode->triggerType,
            .maxTs = INT64_MIN,
      };
L
Liu Jicong 已提交
4398
      if (pHandle) {
4399 4400
        int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags,
                                               pTableScanNode->groupSort, pHandle, pTableListInfo, queryId, taskId);
L
Liu Jicong 已提交
4401
        if (code) {
wmmhello's avatar
wmmhello 已提交
4402 4403 4404
          pTaskInfo->code = code;
          return NULL;
        }
5
54liuyao 已提交
4405
      }
4406

4407 4408
      SOperatorInfo* pOperator =
          createStreamScanOperatorInfo(pHandle, pTableScanNode, pTaskInfo, &twSup, queryId, taskId);
H
Haojun Liao 已提交
4409
      return pOperator;
L
Liu Jicong 已提交
4410

H
Haojun Liao 已提交
4411
    } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
L
Liu Jicong 已提交
4412
      SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
4413
      return createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo);
4414
    } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
X
Xiaoyu Wang 已提交
4415
      STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
4416

4417
      int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanPhyNode, pTableListInfo);
4418
      if (code != TSDB_CODE_SUCCESS) {
4419
        pTaskInfo->code = terrno;
4420 4421 4422
        return NULL;
      }

4423
      return createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo);
4424
    } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
4425
      SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
4426 4427 4428
      pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));

      if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
4429
        int32_t code = vnodeGetAllTableList(pHandle->vnode, pBlockNode->uid, pTableListInfo->pTableList);
4430 4431 4432 4433 4434
        if (code != TSDB_CODE_SUCCESS) {
          pTaskInfo->code = terrno;
          return NULL;
        }
      } else {  // Create one table group.
S
slzhou 已提交
4435
        STableKeyInfo info = {.lastKey = 0, .uid = pBlockNode->uid, .groupId = 0};
4436 4437 4438 4439
        taosArrayPush(pTableListInfo->pTableList, &info);
      }

      SQueryTableDataCond cond = {0};
4440 4441 4442
      int32_t code = initTableblockDistQueryCond(pBlockNode->suid, &cond);
      if (code != TSDB_CODE_SUCCESS) {
        return NULL;
4443
      }
H
Haojun Liao 已提交
4444 4445 4446

      STsdbReader* pReader = NULL;
      tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, "");
4447 4448
      cleanupQueryTableDataCond(&cond);

4449
      return createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo);
H
Haojun Liao 已提交
4450 4451 4452
    } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
      SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;

4453 4454
      int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo,
                                             queryId, taskId);
4455 4456 4457 4458
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
      }
4459

4460 4461 4462 4463
      code = extractTableSchemaInfo(pHandle, pScanNode->scan.uid, pTaskInfo);
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
H
Haojun Liao 已提交
4464 4465
      }

4466
      return createLastrowScanOperator(pScanNode, pHandle, pTaskInfo);
H
Haojun Liao 已提交
4467 4468
    } else {
      ASSERT(0);
H
Haojun Liao 已提交
4469 4470 4471
    }
  }

4472 4473
  int32_t num = 0;
  size_t  size = LIST_LENGTH(pPhyNode->pChildren);
H
Haojun Liao 已提交
4474

4475
  SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
dengyihao's avatar
dengyihao 已提交
4476
  for (int32_t i = 0; i < size; ++i) {
4477
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
4478
    ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableListInfo, pUser);
4479 4480 4481
    if (ops[i] == NULL) {
      return NULL;
    }
4482
  }
H
Haojun Liao 已提交
4483

4484
  SOperatorInfo* pOptr = NULL;
H
Haojun Liao 已提交
4485
  if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
4486
    pOptr = createProjectOperatorInfo(ops[0], (SProjectPhysiNode*)pPhyNode, pTaskInfo);
4487
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) {
H
Haojun Liao 已提交
4488 4489
    SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
    SExprInfo*     pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
4490
    SSDataBlock*   pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
H
Haojun Liao 已提交
4491

dengyihao's avatar
dengyihao 已提交
4492
    int32_t    numOfScalarExpr = 0;
4493 4494 4495 4496 4497
    SExprInfo* pScalarExprInfo = NULL;
    if (pAggNode->pExprs != NULL) {
      pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
    }

H
Haojun Liao 已提交
4498 4499
    if (pAggNode->pGroupKeys != NULL) {
      SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys);
dengyihao's avatar
dengyihao 已提交
4500
      pOptr = createGroupOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions,
wmmhello's avatar
wmmhello 已提交
4501
                                      pScalarExprInfo, numOfScalarExpr, pTaskInfo);
H
Haojun Liao 已提交
4502
    } else {
L
Liu Jicong 已提交
4503 4504
      pOptr = createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pAggNode->node.pConditions,
                                          pScalarExprInfo, numOfScalarExpr, pTaskInfo);
H
Haojun Liao 已提交
4505
    }
X
Xiaoyu Wang 已提交
4506
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type || QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
H
Haojun Liao 已提交
4507
    SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
4508

H
Haojun Liao 已提交
4509
    SExprInfo*   pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
4510
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
H
Haojun Liao 已提交
4511

dengyihao's avatar
dengyihao 已提交
4512 4513 4514 4515 4516 4517
    SInterval interval = {.interval = pIntervalPhyNode->interval,
                          .sliding = pIntervalPhyNode->sliding,
                          .intervalUnit = pIntervalPhyNode->intervalUnit,
                          .slidingUnit = pIntervalPhyNode->slidingUnit,
                          .offset = pIntervalPhyNode->offset,
                          .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
H
Haojun Liao 已提交
4518

X
Xiaoyu Wang 已提交
4519 4520 4521 4522 4523
    STimeWindowAggSupp as = {
        .waterMark = pIntervalPhyNode->window.watermark,
        .calTrigger = pIntervalPhyNode->window.triggerType,
        .maxTs = INT64_MIN,
    };
4524
    ASSERT(as.calTrigger != STREAM_TRIGGER_MAX_DELAY);
4525

4526
    int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
L
Liu Jicong 已提交
4527
    bool    isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
4528 4529
    pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pIntervalPhyNode,
                                       pTaskInfo, isStream);
4530

4531 4532
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
    SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode;
S
shenglian zhou 已提交
4533 4534 4535 4536 4537 4538 4539 4540 4541 4542

    SExprInfo*   pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);

    SInterval interval = {.interval = pIntervalPhyNode->interval,
                          .sliding = pIntervalPhyNode->sliding,
                          .intervalUnit = pIntervalPhyNode->intervalUnit,
                          .slidingUnit = pIntervalPhyNode->slidingUnit,
                          .offset = pIntervalPhyNode->offset,
                          .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
4543

S
shenglian zhou 已提交
4544
    int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
L
Liu Jicong 已提交
4545 4546
    pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId,
                                                   pPhyNode->pConditions, pTaskInfo);
S
shenglian zhou 已提交
4547
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
X
Xiaoyu Wang 已提交
4548
    SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
S
shenglian zhou 已提交
4549 4550 4551 4552 4553 4554 4555 4556 4557 4558

    SExprInfo*   pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);

    SInterval interval = {.interval = pIntervalPhyNode->interval,
                          .sliding = pIntervalPhyNode->sliding,
                          .intervalUnit = pIntervalPhyNode->intervalUnit,
                          .slidingUnit = pIntervalPhyNode->slidingUnit,
                          .offset = pIntervalPhyNode->offset,
                          .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
4559

S
shenglian zhou 已提交
4560 4561
    int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
    pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pTaskInfo);
5
54liuyao 已提交
4562
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
4563
    int32_t children = 0;
5
54liuyao 已提交
4564 4565
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) {
4566
    int32_t children = 1;
5
54liuyao 已提交
4567
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
4568
  } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
4569
    pOptr = createSortOperatorInfo(ops[0], (SSortPhysiNode*)pPhyNode, pTaskInfo);
S
shenglian zhou 已提交
4570 4571
  } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT == type) {
    pOptr = createGroupSortOperatorInfo(ops[0], (SGroupSortPhysiNode*)pPhyNode, pTaskInfo);
X
Xiaoyu Wang 已提交
4572
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) {
4573
    SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode;
4574
    pOptr = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo);
4575
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
H
Haojun Liao 已提交
4576 4577
    SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;

X
Xiaoyu Wang 已提交
4578 4579
    STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark,
                             .calTrigger = pSessionNode->window.triggerType};
4580

H
Haojun Liao 已提交
4581
    SExprInfo*   pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num);
4582
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
4583 4584
    int32_t      tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;

L
Liu Jicong 已提交
4585 4586
    pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as,
                                         pPhyNode->pConditions, pTaskInfo);
4587
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
4588 4589 4590 4591 4592 4593 4594
    pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
    int32_t children = 0;
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION == type) {
    int32_t children = 1;
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
4595
  } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
4596
    pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
4597
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
dengyihao's avatar
dengyihao 已提交
4598
    SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
4599

4600 4601
    STimeWindowAggSupp as = {.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};

dengyihao's avatar
dengyihao 已提交
4602
    SExprInfo*   pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
4603
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
4604 4605
    int32_t      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;

4606
    SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
X
Xiaoyu Wang 已提交
4607
    SColumn      col = extractColumnFromColumnNode(pColNode);
L
Liu Jicong 已提交
4608 4609
    pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pPhyNode->pConditions,
                                          pTaskInfo);
4610
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
5
54liuyao 已提交
4611
    pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
4612
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
4613
    pOptr = createMergeJoinOperatorInfo(ops, size, (SJoinPhysiNode*)pPhyNode, pTaskInfo);
4614
  } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
H
Haojun Liao 已提交
4615
    pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
4616 4617
  } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
    pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo);
4618 4619
  } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
    pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
4620 4621
  } else {
    ASSERT(0);
H
Haojun Liao 已提交
4622
  }
4623 4624 4625

  taosMemoryFree(ops);
  return pOptr;
4626
}
H
Haojun Liao 已提交
4627

H
Haojun Liao 已提交
4628
SArray* extractColumnInfo(SNodeList* pNodeList) {
L
Liu Jicong 已提交
4629
  size_t  numOfCols = LIST_LENGTH(pNodeList);
H
Haojun Liao 已提交
4630 4631 4632 4633 4634 4635
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
  if (pList == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

L
Liu Jicong 已提交
4636 4637
  for (int32_t i = 0; i < numOfCols; ++i) {
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
H
Haojun Liao 已提交
4638

4639 4640 4641
    if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

4642
      SColumn c = extractColumnFromColumnNode(pColNode);
4643 4644
      taosArrayPush(pList, &c);
    } else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) {
L
Liu Jicong 已提交
4645 4646
      SValueNode* pValNode = (SValueNode*)pNode->pExpr;
      SColumn     c = {0};
4647
      c.slotId = pNode->slotId;
L
Liu Jicong 已提交
4648 4649 4650 4651
      c.colId = pNode->slotId;
      c.type = pValNode->node.type;
      c.bytes = pValNode->node.resType.bytes;
      c.scale = pValNode->node.resType.scale;
4652 4653 4654 4655
      c.precision = pValNode->node.resType.precision;

      taosArrayPush(pList, &c);
    }
H
Haojun Liao 已提交
4656 4657 4658 4659 4660
  }

  return pList;
}

L
Liu Jicong 已提交
4661 4662
STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
                                STableListInfo* pTableListInfo, const char* idstr) {
4663
  int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo);
wmmhello's avatar
wmmhello 已提交
4664 4665 4666 4667 4668 4669
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
    code = 0;
H
Haojun Liao 已提交
4670
    qDebug("no table qualified for query, %s", idstr);
wmmhello's avatar
wmmhello 已提交
4671 4672 4673
    goto _error;
  }

4674
  SQueryTableDataCond cond = {0};
wmmhello's avatar
wmmhello 已提交
4675
  code = initQueryTableDataCond(&cond, pTableScanNode);
4676
  if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
4677
    goto _error;
X
Xiaoyu Wang 已提交
4678
  }
4679

H
Hongze Cheng 已提交
4680
  STsdbReader* pReader;
H
Haojun Liao 已提交
4681
  code = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, idstr);
H
Haojun Liao 已提交
4682 4683 4684 4685
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

4686
  cleanupQueryTableDataCond(&cond);
H
Haojun Liao 已提交
4687 4688

  return pReader;
wmmhello's avatar
wmmhello 已提交
4689 4690 4691 4692

_error:
  terrno = code;
  return NULL;
H
Haojun Liao 已提交
4693 4694
}

L
Liu Jicong 已提交
4695 4696 4697 4698 4699 4700 4701 4702 4703 4704 4705 4706 4707
static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator");
      return TSDB_CODE_QRY_APP_ERROR;
    }

    if (pOperator->numOfDownstream > 1) {
      qError("join not supported for stream block scan");
      return TSDB_CODE_QRY_APP_ERROR;
    }
    return extractTbscanInStreamOpTree(pOperator->pDownstream[0], ppInfo);
  } else {
4708 4709 4710
    SStreamScanInfo* pInfo = pOperator->info;
    ASSERT(pInfo->pTableScanOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
    *ppInfo = pInfo->pTableScanOp->info;
L
Liu Jicong 已提交
4711 4712 4713 4714
    return 0;
  }
}

4715 4716 4717 4718 4719 4720 4721 4722 4723 4724 4725 4726 4727 4728 4729 4730 4731 4732 4733 4734 4735 4736
int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) {
  if (pNode->pChildren == NULL || LIST_LENGTH(pNode->pChildren) == 0) {
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == pNode->type) {
      *ppNode = (STableScanPhysiNode*)pNode;
      return 0;
    } else {
      ASSERT(0);
      terrno = TSDB_CODE_QRY_APP_ERROR;
      return -1;
    }
  } else {
    if (LIST_LENGTH(pNode->pChildren) != 1) {
      ASSERT(0);
      terrno = TSDB_CODE_QRY_APP_ERROR;
      return -1;
    }
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pNode->pChildren, 0);
    return extractTableScanNode(pChildNode, ppNode);
  }
  return -1;
}

L
Liu Jicong 已提交
4737 4738 4739 4740 4741
int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHandle, int64_t uid, int64_t ts) {
  STableScanInfo* pTableScanInfo = NULL;
  if (extractTbscanInStreamOpTree(pOperator, &pTableScanInfo) < 0) {
    return -1;
  }
4742

L
Liu Jicong 已提交
4743 4744 4745 4746
  STableScanPhysiNode* pNode = NULL;
  if (extractTableScanNode(plan->pNode, &pNode) < 0) {
    ASSERT(0);
  }
4747

H
Haojun Liao 已提交
4748
  tsdbReaderClose(pTableScanInfo->dataReader);
4749

L
Liu Jicong 已提交
4750
  STableListInfo info = {0};
H
Haojun Liao 已提交
4751
  pTableScanInfo->dataReader = doCreateDataReader(pNode, pHandle, &info, NULL);
L
Liu Jicong 已提交
4752 4753 4754 4755
  if (pTableScanInfo->dataReader == NULL) {
    ASSERT(0);
    qError("failed to create data reader");
    return TSDB_CODE_QRY_APP_ERROR;
4756
  }
L
Liu Jicong 已提交
4757
  // TODO: set uid and ts to data reader
4758 4759 4760
  return 0;
}

C
Cary Xu 已提交
4761
int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length, int32_t* nOptrWithVal) {
wmmhello's avatar
wmmhello 已提交
4762
  int32_t code = TDB_CODE_SUCCESS;
4763
  char*   pCurrent = NULL;
wmmhello's avatar
wmmhello 已提交
4764
  int32_t currLength = 0;
4765
  if (ops->fpSet.encodeResultRow) {
C
Cary Xu 已提交
4766
    if (result == NULL || length == NULL || nOptrWithVal == NULL) {
wmmhello's avatar
wmmhello 已提交
4767 4768 4769
      return TSDB_CODE_TSC_INVALID_INPUT;
    }
    code = ops->fpSet.encodeResultRow(ops, &pCurrent, &currLength);
wmmhello's avatar
wmmhello 已提交
4770

4771 4772
    if (code != TDB_CODE_SUCCESS) {
      if (*result != NULL) {
wmmhello's avatar
wmmhello 已提交
4773 4774 4775 4776
        taosMemoryFree(*result);
        *result = NULL;
      }
      return code;
C
Cary Xu 已提交
4777 4778 4779
    } else if (currLength == 0) {
      ASSERT(!pCurrent);
      goto _downstream;
wmmhello's avatar
wmmhello 已提交
4780
    }
wmmhello's avatar
wmmhello 已提交
4781

C
Cary Xu 已提交
4782 4783
    ++(*nOptrWithVal);

C
Cary Xu 已提交
4784
    ASSERT(currLength >= 0);
wmmhello's avatar
wmmhello 已提交
4785

4786
    if (*result == NULL) {
wmmhello's avatar
wmmhello 已提交
4787
      *result = (char*)taosMemoryCalloc(1, currLength + sizeof(int32_t));
wmmhello's avatar
wmmhello 已提交
4788 4789 4790 4791 4792 4793
      if (*result == NULL) {
        taosMemoryFree(pCurrent);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      memcpy(*result + sizeof(int32_t), pCurrent, currLength);
      *(int32_t*)(*result) = currLength + sizeof(int32_t);
4794
    } else {
wmmhello's avatar
wmmhello 已提交
4795
      int32_t sizePre = *(int32_t*)(*result);
4796
      char*   tmp = (char*)taosMemoryRealloc(*result, sizePre + currLength);
wmmhello's avatar
wmmhello 已提交
4797 4798 4799 4800 4801 4802 4803 4804 4805 4806 4807 4808
      if (tmp == NULL) {
        taosMemoryFree(pCurrent);
        taosMemoryFree(*result);
        *result = NULL;
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      *result = tmp;
      memcpy(*result + sizePre, pCurrent, currLength);
      *(int32_t*)(*result) += currLength;
    }
    taosMemoryFree(pCurrent);
    *length = *(int32_t*)(*result);
wmmhello's avatar
wmmhello 已提交
4809 4810
  }

C
Cary Xu 已提交
4811
_downstream:
wmmhello's avatar
wmmhello 已提交
4812
  for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
C
Cary Xu 已提交
4813
    code = encodeOperator(ops->pDownstream[i], result, length, nOptrWithVal);
4814
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
4815
      return code;
wmmhello's avatar
wmmhello 已提交
4816 4817
    }
  }
wmmhello's avatar
wmmhello 已提交
4818
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
4819 4820
}

H
Haojun Liao 已提交
4821
int32_t decodeOperator(SOperatorInfo* ops, const char* result, int32_t length) {
wmmhello's avatar
wmmhello 已提交
4822
  int32_t code = TDB_CODE_SUCCESS;
4823 4824
  if (ops->fpSet.decodeResultRow) {
    if (result == NULL) {
wmmhello's avatar
wmmhello 已提交
4825 4826
      return TSDB_CODE_TSC_INVALID_INPUT;
    }
H
Haojun Liao 已提交
4827

4828
    ASSERT(length == *(int32_t*)result);
H
Haojun Liao 已提交
4829 4830

    const char* data = result + sizeof(int32_t);
L
Liu Jicong 已提交
4831
    code = ops->fpSet.decodeResultRow(ops, (char*)data);
4832
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
4833 4834
      return code;
    }
wmmhello's avatar
wmmhello 已提交
4835

wmmhello's avatar
wmmhello 已提交
4836
    int32_t totalLength = *(int32_t*)result;
4837 4838
    int32_t dataLength = *(int32_t*)data;

4839
    if (totalLength == dataLength + sizeof(int32_t)) {  // the last data
wmmhello's avatar
wmmhello 已提交
4840 4841
      result = NULL;
      length = 0;
4842
    } else {
wmmhello's avatar
wmmhello 已提交
4843 4844 4845 4846
      result += dataLength;
      *(int32_t*)(result) = totalLength - dataLength;
      length = totalLength - dataLength;
    }
wmmhello's avatar
wmmhello 已提交
4847 4848
  }

wmmhello's avatar
wmmhello 已提交
4849 4850
  for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
    code = decodeOperator(ops->pDownstream[i], result, length);
4851
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
4852
      return code;
wmmhello's avatar
wmmhello 已提交
4853 4854
    }
  }
wmmhello's avatar
wmmhello 已提交
4855
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
4856 4857
}

D
dapan1121 已提交
4858
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle) {
D
dapan1121 已提交
4859
  SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo;
4860

D
dapan1121 已提交
4861
  switch (pNode->type) {
D
dapan1121 已提交
4862 4863 4864 4865 4866 4867
    case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
      SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
      if (NULL == pInserterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      pInserterParam->readHandle = readHandle;
L
Liu Jicong 已提交
4868

D
dapan1121 已提交
4869 4870 4871
      *pParam = pInserterParam;
      break;
    }
D
dapan1121 已提交
4872
    case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
4873
      SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
D
dapan1121 已提交
4874 4875 4876 4877
      if (NULL == pDeleterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      int32_t tbNum = taosArrayGetSize(pTask->tableqinfoList.pTableList);
D
dapan1121 已提交
4878
      pDeleterParam->suid = pTask->tableqinfoList.suid;
D
dapan1121 已提交
4879 4880 4881 4882 4883 4884
      pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t));
      if (NULL == pDeleterParam->pUidList) {
        taosMemoryFree(pDeleterParam);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      for (int32_t i = 0; i < tbNum; ++i) {
4885
        STableKeyInfo* pTable = taosArrayGet(pTask->tableqinfoList.pTableList, i);
D
dapan1121 已提交
4886 4887 4888 4889 4890 4891 4892 4893 4894 4895 4896 4897 4898
        taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
      }

      *pParam = pDeleterParam;
      break;
    }
    default:
      break;
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
4899
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
4900
                               const char* sql, EOPTR_EXEC_MODEL model) {
H
Haojun Liao 已提交
4901 4902
  uint64_t queryId = pPlan->id.queryId;

H
Haojun Liao 已提交
4903
  int32_t code = TSDB_CODE_SUCCESS;
D
dapan1121 已提交
4904
  *pTaskInfo = createExecTaskInfo(queryId, taskId, model, pPlan->dbFName);
H
Haojun Liao 已提交
4905 4906 4907 4908
  if (*pTaskInfo == NULL) {
    code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    goto _complete;
  }
H
Haojun Liao 已提交
4909

4910
  (*pTaskInfo)->sql = sql;
wmmhello's avatar
wmmhello 已提交
4911 4912
  (*pTaskInfo)->tableqinfoList.pTagCond = pPlan->pTagCond;
  (*pTaskInfo)->tableqinfoList.pTagIndexCond = pPlan->pTagIndexCond;
4913 4914
  (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId,
                                           &(*pTaskInfo)->tableqinfoList, pPlan->user);
L
Liu Jicong 已提交
4915

D
dapan1121 已提交
4916
  if (NULL == (*pTaskInfo)->pRoot) {
4917
    code = (*pTaskInfo)->code;
D
dapan1121 已提交
4918
    goto _complete;
4919 4920
  }

H
Haojun Liao 已提交
4921 4922
  return code;

H
Haojun Liao 已提交
4923
_complete:
wafwerar's avatar
wafwerar 已提交
4924
  taosMemoryFreeClear(*pTaskInfo);
H
Haojun Liao 已提交
4925 4926
  terrno = code;
  return code;
H
Haojun Liao 已提交
4927 4928
}

wmmhello's avatar
wmmhello 已提交
4929 4930 4931
static void doDestroyTableList(STableListInfo* pTableqinfoList) {
  taosArrayDestroy(pTableqinfoList->pTableList);
  taosHashCleanup(pTableqinfoList->map);
4932 4933
  if (pTableqinfoList->needSortTableByGroupId) {
    for (int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++) {
wmmhello's avatar
wmmhello 已提交
4934
      SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i);
4935 4936 4937
      if (tmp == pTableqinfoList->pTableList) {
        continue;
      }
wmmhello's avatar
wmmhello 已提交
4938 4939 4940 4941
      taosArrayDestroy(tmp);
    }
  }
  taosArrayDestroy(pTableqinfoList->pGroupList);
4942

wmmhello's avatar
wmmhello 已提交
4943 4944
  pTableqinfoList->pTableList = NULL;
  pTableqinfoList->map = NULL;
4945 4946
}

L
Liu Jicong 已提交
4947
void doDestroyTask(SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
4948 4949
  qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));

wmmhello's avatar
wmmhello 已提交
4950
  doDestroyTableList(&pTaskInfo->tableqinfoList);
H
Haojun Liao 已提交
4951
  destroyOperatorInfo(pTaskInfo->pRoot);
4952
  cleanupTableSchemaInfo(pTaskInfo);
4953

wafwerar's avatar
wafwerar 已提交
4954 4955 4956
  taosMemoryFreeClear(pTaskInfo->sql);
  taosMemoryFreeClear(pTaskInfo->id.str);
  taosMemoryFreeClear(pTaskInfo);
4957 4958 4959 4960 4961 4962 4963 4964 4965 4966 4967 4968
}

static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes) {
  if (val == NULL) {
    setNull(output, type, bytes);
    return;
  }

  if (IS_VAR_DATA_TYPE(type)) {
    // Binary data overflows for sort of unknown reasons. Let trim the overflow data
    if (varDataTLen(val) > bytes) {
      int32_t maxLen = bytes - VARSTR_HEADER_SIZE;
L
Liu Jicong 已提交
4969
      int32_t len = (varDataLen(val) > maxLen) ? maxLen : varDataLen(val);
4970 4971 4972 4973 4974 4975 4976 4977 4978 4979 4980 4981
      memcpy(varDataVal(output), varDataVal(val), len);
      varDataSetLen(output, len);
    } else {
      varDataCopy(output, val);
    }
  } else {
    memcpy(output, val, bytes);
  }
}

static int64_t getQuerySupportBufSize(size_t numOfTables) {
  size_t s1 = sizeof(STableQueryInfo);
L
Liu Jicong 已提交
4982 4983
  //  size_t s3 = sizeof(STableCheckInfo);  buffer consumption in tsdb
  return (int64_t)(s1 * 1.5 * numOfTables);
4984 4985 4986 4987 4988 4989 4990
}

int32_t checkForQueryBuf(size_t numOfTables) {
  int64_t t = getQuerySupportBufSize(numOfTables);
  if (tsQueryBufferSizeBytes < 0) {
    return TSDB_CODE_SUCCESS;
  } else if (tsQueryBufferSizeBytes > 0) {
L
Liu Jicong 已提交
4991
    while (1) {
4992 4993 4994 4995 4996 4997 4998 4999 5000 5001 5002 5003 5004 5005 5006 5007 5008 5009 5010 5011 5012 5013 5014 5015 5016 5017
      int64_t s = tsQueryBufferSizeBytes;
      int64_t remain = s - t;
      if (remain >= 0) {
        if (atomic_val_compare_exchange_64(&tsQueryBufferSizeBytes, s, remain) == s) {
          return TSDB_CODE_SUCCESS;
        }
      } else {
        return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
      }
    }
  }

  // disable query processing if the value of tsQueryBufferSize is zero.
  return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
}

void releaseQueryBuf(size_t numOfTables) {
  if (tsQueryBufferSizeBytes < 0) {
    return;
  }

  int64_t t = getQuerySupportBufSize(numOfTables);

  // restore value is not enough buffer available
  atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
}
D
dapan1121 已提交
5018

dengyihao's avatar
dengyihao 已提交
5019 5020
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity,
                                   int32_t* resNum) {
D
dapan1121 已提交
5021 5022
  if (*resNum >= *capacity) {
    *capacity += 10;
dengyihao's avatar
dengyihao 已提交
5023

D
dapan1121 已提交
5024 5025
    *pRes = taosMemoryRealloc(*pRes, (*capacity) * sizeof(SExplainExecInfo));
    if (NULL == *pRes) {
D
dapan1121 已提交
5026
      qError("malloc %d failed", (*capacity) * (int32_t)sizeof(SExplainExecInfo));
D
dapan1121 已提交
5027 5028 5029 5030
      return TSDB_CODE_QRY_OUT_OF_MEMORY;
    }
  }

5031 5032 5033 5034 5035
  SExplainExecInfo* pInfo = &(*pRes)[*resNum];

  pInfo->numOfRows = operatorInfo->resultInfo.totalRows;
  pInfo->startupCost = operatorInfo->cost.openCost;
  pInfo->totalCost = operatorInfo->cost.totalCost;
D
dapan1121 已提交
5036

5037
  if (operatorInfo->fpSet.getExplainFn) {
5038
    int32_t code = operatorInfo->fpSet.getExplainFn(operatorInfo, &pInfo->verboseInfo, &pInfo->verboseLen);
D
dapan1121 已提交
5039
    if (code) {
5040
      qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
D
dapan1121 已提交
5041 5042
      return code;
    }
5043 5044 5045
  } else {
    pInfo->verboseLen = 0;
    pInfo->verboseInfo = NULL;
D
dapan1121 已提交
5046
  }
dengyihao's avatar
dengyihao 已提交
5047

D
dapan1121 已提交
5048
  ++(*resNum);
dengyihao's avatar
dengyihao 已提交
5049

D
dapan1121 已提交
5050
  int32_t code = 0;
D
dapan1121 已提交
5051 5052
  for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
    code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pRes, capacity, resNum);
D
dapan1121 已提交
5053 5054 5055 5056 5057 5058 5059
    if (code) {
      taosMemoryFreeClear(*pRes);
      return TSDB_CODE_QRY_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
5060
}
5
54liuyao 已提交
5061

L
Liu Jicong 已提交
5062
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput,
5063
                               int32_t size) {
5064
  pSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
5
54liuyao 已提交
5065 5066
  pSup->keySize = sizeof(int64_t) + sizeof(TSKEY);
  pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize);
5067 5068
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  pSup->pResultRows = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
5
54liuyao 已提交
5069 5070 5071
  if (pSup->pKeyBuf == NULL || pSup->pResultRows == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
5072
  pSup->valueSize = size;
5
54liuyao 已提交
5073

5
54liuyao 已提交
5074 5075
  pSup->pScanWindow = taosArrayInit(4, sizeof(STimeWindow));

5
54liuyao 已提交
5076 5077 5078 5079 5080 5081 5082 5083 5084
  int32_t pageSize = 4096;
  while (pageSize < pSup->resultRowSize * 4) {
    pageSize <<= 1u;
  }
  // at least four pages need to be in buffer
  int32_t bufSize = 4096 * 256;
  if (bufSize <= pageSize) {
    bufSize = pageSize * 4;
  }
5085
  int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, TD_TMP_DIR_PATH);
L
Liu Jicong 已提交
5086
  for (int32_t i = 0; i < numOfOutput; ++i) {
5087 5088 5089
    pCtx[i].pBuf = pSup->pResultBuf;
  }
  return code;
5
54liuyao 已提交
5090
}