executorimpl.c 181.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

H
Haojun Liao 已提交
16 17
#include "filter.h"
#include "function.h"
18 19
#include "functionMgt.h"
#include "os.h"
H
Haojun Liao 已提交
20
#include "querynodes.h"
21
#include "tfill.h"
dengyihao's avatar
dengyihao 已提交
22
#include "tname.h"
X
Xiaoyu Wang 已提交
23
#include "tref.h"
24

H
Haojun Liao 已提交
25
#include "tdatablock.h"
26
#include "tglobal.h"
H
Haojun Liao 已提交
27
#include "tmsg.h"
H
Haojun Liao 已提交
28
#include "tsort.h"
29
#include "ttime.h"
H
Haojun Liao 已提交
30

31
#include "executorimpl.h"
dengyihao's avatar
dengyihao 已提交
32
#include "index.h"
33
#include "query.h"
34 35
#include "tcompare.h"
#include "tcompression.h"
H
Haojun Liao 已提交
36
#include "thash.h"
37
#include "ttypes.h"
dengyihao's avatar
dengyihao 已提交
38
#include "vnode.h"
39

H
Haojun Liao 已提交
40
#define IS_MAIN_SCAN(runtime)          ((runtime)->scanFlag == MAIN_SCAN)
41 42 43 44
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)

#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)

45 46 47 48 49
enum {
  PROJECT_RETRIEVE_CONTINUE = 0x1,
  PROJECT_RETRIEVE_DONE = 0x2,
};

50 51
#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
wafwerar's avatar
wafwerar 已提交
52
  uint32_t v = taosRand();
53 54 55 56

  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
57
    return taosMemoryMalloc(__size);
58 59 60 61
  }
}

static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
wafwerar's avatar
wafwerar 已提交
62
  uint32_t v = taosRand();
63 64 65
  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
66
    return taosMemoryCalloc(num, __size);
67 68 69 70
  }
}

static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
wafwerar's avatar
wafwerar 已提交
71
  uint32_t v = taosRand();
72 73 74
  if (v % 5 <= 1) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
75
    return taosMemoryRealloc(p, __size);
76 77 78 79 80 81 82 83
  }
}

#define calloc  u_calloc
#define malloc  u_malloc
#define realloc u_realloc
#endif

X
Xiaoyu Wang 已提交
84
#define CLEAR_QUERY_STATUS(q, st)   ((q)->status &= (~(st)))
85 86
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)

L
Liu Jicong 已提交
87 88 89
int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; }

static int32_t getExprFunctionId(SExprInfo* pExprInfo) {
90
  assert(pExprInfo != NULL && pExprInfo->pExpr != NULL && pExprInfo->pExpr->nodeType == TEXPR_UNARYEXPR_NODE);
91
  return 0;
92 93 94 95
}

static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes);

96
static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pSDataBlock);
97

X
Xiaoyu Wang 已提交
98
static void releaseQueryBuf(size_t numOfTables);
99 100 101 102 103

static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput);
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
X
Xiaoyu Wang 已提交
104

H
Haojun Liao 已提交
105
static void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput);
H
Haojun Liao 已提交
106 107
static void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput);

108 109
static void destroyOperatorInfo(SOperatorInfo* pOperator);

110
void doSetOperatorCompleted(SOperatorInfo* pOperator) {
111
  pOperator->status = OP_EXEC_DONE;
112

113
  pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000) / 1000.0;
H
Haojun Liao 已提交
114
  if (pOperator->pTaskInfo != NULL) {
115
    setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
116 117
  }
}
118

H
Haojun Liao 已提交
119
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) {
120
  OPTR_SET_OPENED(pOperator);
121
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
122
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
123 124
}

125
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
L
Liu Jicong 已提交
126
                                   __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode,
127
                                   __optr_decode_fn_t decode, __optr_explain_fn_t explain) {
128 129 130 131 132 133 134 135 136 137 138 139 140 141
  SOperatorFpSet fpSet = {
      ._openFn = openFn,
      .getNextFn = nextFn,
      .getStreamResFn = streamFn,
      .cleanupFn = cleanup,
      .closeFn = closeFn,
      .encodeResultRow = encode,
      .decodeResultRow = decode,
      .getExplainFn = explain,
  };

  return fpSet;
}

H
Haojun Liao 已提交
142
void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
H
Haojun Liao 已提交
143

X
Xiaoyu Wang 已提交
144 145 146
static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo,
                                  SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, const int32_t* rowCellOffset,
                                  SqlFunctionCtx* pCtx, int32_t numOfExprs);
H
Haojun Liao 已提交
147

148
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
L
Liu Jicong 已提交
149 150
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput,
                                     uint64_t groupId);
151

L
Liu Jicong 已提交
152 153
// setup the output buffer for each operator
static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) {
dengyihao's avatar
dengyihao 已提交
154 155
  if (TSDB_COL_IS_TAG(pColumn->flag) || TSDB_COL_IS_UD_COL(pColumn->flag) ||
      pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
156 157 158 159 160 161 162 163 164 165
    return false;
  }

  if (pStatis != NULL && pStatis->numOfNull == 0) {
    return false;
  }

  return true;
}

166
#if 0
L
Liu Jicong 已提交
167 168
static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, char* pData,
                                int16_t bytes, bool masterscan, uint64_t uid) {
169 170 171
  bool existed = false;
  SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid);

L
Liu Jicong 已提交
172 173
  SResultRow** p1 =
      (SResultRow**)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
174 175 176 177 178 179 180 181 182 183 184

  // in case of repeat scan/reverse scan, no new time window added.
  if (QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQueryAttr)) {
    if (!masterscan) {  // the *p1 may be NULL in case of sliding+offset exists.
      return p1 != NULL;
    }

    if (p1 != NULL) {
      if (pResultRowInfo->size == 0) {
        existed = false;
      } else if (pResultRowInfo->size == 1) {
dengyihao's avatar
dengyihao 已提交
185
        //        existed = (pResultRowInfo->pResult[0] == (*p1));
186 187
      } else {  // check if current pResultRowInfo contains the existed pResultRow
        SET_RES_EXT_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid, pResultRowInfo);
L
Liu Jicong 已提交
188 189
        int64_t* index =
            taosHashGet(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes));
190 191 192 193 194 195 196 197 198 199 200 201 202
        if (index != NULL) {
          existed = true;
        } else {
          existed = false;
        }
      }
    }

    return existed;
  }

  return p1 != NULL;
}
203
#endif
204

205
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize) {
L
Liu Jicong 已提交
206
  SFilePage* pData = NULL;
207 208 209 210 211 212 213 214 215 216 217 218 219

  // in the first scan, new space needed for results
  int32_t pageId = -1;
  SIDList list = getDataBufPagesIdList(pResultBuf, tableGroupId);

  if (taosArrayGetSize(list) == 0) {
    pData = getNewBufPage(pResultBuf, tableGroupId, &pageId);
    pData->num = sizeof(SFilePage);
  } else {
    SPageInfo* pi = getLastPageInfo(list);
    pData = getBufPage(pResultBuf, getPageId(pi));
    pageId = getPageId(pi);

wmmhello's avatar
wmmhello 已提交
220
    if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
221 222 223 224 225 226 227 228 229 230 231 232 233 234
      // release current page first, and prepare the next one
      releaseBufPageInfo(pResultBuf, pi);

      pData = getNewBufPage(pResultBuf, tableGroupId, &pageId);
      if (pData != NULL) {
        pData->num = sizeof(SFilePage);
      }
    }
  }

  if (pData == NULL) {
    return NULL;
  }

235 236
  setBufPageDirty(pData, true);

237 238 239 240 241
  // set the number of rows in current disk page
  SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
  pResultRow->pageId = pageId;
  pResultRow->offset = (int32_t)pData->num;

wmmhello's avatar
wmmhello 已提交
242
  pData->num += interBufSize;
243 244 245 246

  return pResultRow;
}

247 248 249 250 251 252 253
/**
 * the struct of key in hash table
 * +----------+---------------+
 * | group id |   key data    |
 * | 8 bytes  | actual length |
 * +----------+---------------+
 */
254 255 256
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
                                   int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
                                   bool isIntervalQuery, SAggSupporter* pSup) {
257
  SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
H
Haojun Liao 已提交
258

dengyihao's avatar
dengyihao 已提交
259 260
  SResultRowPosition* p1 =
      (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
H
Haojun Liao 已提交
261

262 263
  SResultRow* pResult = NULL;

H
Haojun Liao 已提交
264 265
  // in case of repeat scan/reverse scan, no new time window added.
  if (isIntervalQuery) {
266 267
    if (masterscan && p1 != NULL) {  // the *p1 may be NULL in case of sliding+offset exists.
      pResult = getResultRowByPos(pResultBuf, p1);
268
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
269 270
    }
  } else {
dengyihao's avatar
dengyihao 已提交
271 272
    // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
    // pResultRowInfo object.
H
Haojun Liao 已提交
273
    if (p1 != NULL) {
274
      // todo
275
      pResult = getResultRowByPos(pResultBuf, p1);
276
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
277 278 279
    }
  }

L
Liu Jicong 已提交
280
  // 1. close current opened time window
281
  if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
282
#ifdef BUF_PAGE_DEBUG
wmmhello's avatar
wmmhello 已提交
283
    qDebug("page_1");
284
#endif
285
    SResultRowPosition pos = pResultRowInfo->cur;
X
Xiaoyu Wang 已提交
286
    SFilePage*         pPage = getBufPage(pResultBuf, pos.pageId);
287 288 289 290 291
    releaseBufPage(pResultBuf, pPage);
  }

  // allocate a new buffer page
  if (pResult == NULL) {
292
#ifdef BUF_PAGE_DEBUG
wmmhello's avatar
wmmhello 已提交
293
    qDebug("page_2");
294
#endif
H
Haojun Liao 已提交
295
    ASSERT(pSup->resultRowSize > 0);
296 297
    pResult = getNewResultRow(pResultBuf, groupId, pSup->resultRowSize);

298
    initResultRow(pResult);
H
Haojun Liao 已提交
299

300 301
    // add a new result set for a new group
    SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
X
Xiaoyu Wang 已提交
302 303
    taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
                sizeof(SResultRowPosition));
H
Haojun Liao 已提交
304 305
  }

306 307 308
  // 2. set the new time window to be the new active time window
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};

H
Haojun Liao 已提交
309
  // too many time window in query
310
  if (taosHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
H
Haojun Liao 已提交
311 312 313
    longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
  }

H
Haojun Liao 已提交
314
  return pResult;
H
Haojun Liao 已提交
315 316
}

317
// a new buffer page for each table. Needs to opt this design
L
Liu Jicong 已提交
318
static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, int32_t tid, uint32_t size) {
319 320 321 322
  if (pWindowRes->pageId != -1) {
    return 0;
  }

L
Liu Jicong 已提交
323
  SFilePage* pData = NULL;
324 325 326 327 328 329

  // in the first scan, new space needed for results
  int32_t pageId = -1;
  SIDList list = getDataBufPagesIdList(pResultBuf, tid);

  if (taosArrayGetSize(list) == 0) {
H
Haojun Liao 已提交
330
    pData = getNewBufPage(pResultBuf, tid, &pageId);
331
    pData->num = sizeof(SFilePage);
332 333
  } else {
    SPageInfo* pi = getLastPageInfo(list);
334
    pData = getBufPage(pResultBuf, getPageId(pi));
335
    pageId = getPageId(pi);
336

337
    if (pData->num + size > getBufPageSize(pResultBuf)) {
338
      // release current page first, and prepare the next one
339
      releaseBufPageInfo(pResultBuf, pi);
340

H
Haojun Liao 已提交
341
      pData = getNewBufPage(pResultBuf, tid, &pageId);
342
      if (pData != NULL) {
343
        pData->num = sizeof(SFilePage);
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
      }
    }
  }

  if (pData == NULL) {
    return -1;
  }

  // set the number of rows in current disk page
  if (pWindowRes->pageId == -1) {  // not allocated yet, allocate new buffer
    pWindowRes->pageId = pageId;
    pWindowRes->offset = (int32_t)pData->num;

    pData->num += size;
    assert(pWindowRes->pageId >= 0);
  }

  return 0;
}

364
//  query_range_start, query_range_end, window_duration, window_start, window_end
365
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
366 367 368
  pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
  pColData->info.bytes = sizeof(int64_t);

369
  colInfoDataEnsureCapacity(pColData, 5);
370 371 372 373 374 375 376 377 378
  colDataAppendInt64(pColData, 0, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 1, &pQueryWindow->ekey);

  int64_t interval = 0;
  colDataAppendInt64(pColData, 2, &interval);  // this value may be variable in case of 'n' and 'y'.
  colDataAppendInt64(pColData, 3, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
}

H
Haojun Liao 已提交
379 380 381 382
void cleanupExecTimeWindowInfo(SColumnInfoData* pColData) {
  colDataDestroy(pColData);
}

X
Xiaoyu Wang 已提交
383 384 385
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin,
                      SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol,
                      int32_t numOfTotal, int32_t numOfOutput, int32_t order) {
386
  for (int32_t k = 0; k < numOfOutput; ++k) {
H
Haojun Liao 已提交
387
    // keep it temporarily
388
    // todo no need this??
dengyihao's avatar
dengyihao 已提交
389 390
    bool    hasAgg = pCtx[k].input.colDataAggIsSet;
    int32_t numOfRows = pCtx[k].input.numOfRows;
H
Haojun Liao 已提交
391
    int32_t startOffset = pCtx[k].input.startRowIndex;
392

393
    pCtx[k].input.startRowIndex = offset;
394
    pCtx[k].input.numOfRows = forwardStep;
395 396 397

    // not a whole block involved in query processing, statistics data can not be used
    // NOTE: the original value of isSet have been changed here
398 399
    if (pCtx[k].input.colDataAggIsSet && forwardStep < numOfTotal) {
      pCtx[k].input.colDataAggIsSet = false;
400 401
    }

402 403
    if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) {
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
404 405

      char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
406

407
      SColumnInfoData idata = {0};
dengyihao's avatar
dengyihao 已提交
408
      idata.info.type = TSDB_DATA_TYPE_BIGINT;
409
      idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
dengyihao's avatar
dengyihao 已提交
410
      idata.pData = p;
411 412 413 414

      SScalarParam out = {.columnData = &idata};
      SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
      pCtx[k].sfp.process(&tw, 1, &out);
415
      pEntryInfo->numOfRes = 1;
416 417 418 419 420 421 422 423 424 425
    } else {
      int32_t code = TSDB_CODE_SUCCESS;
      if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
        code = pCtx[k].fpSet.process(&pCtx[k]);

        if (code != TSDB_CODE_SUCCESS) {
          qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
          taskInfo->code = code;
          longjmp(taskInfo->env, code);
        }
426
      }
427

428 429 430 431 432
      // restore it
      pCtx[k].input.colDataAggIsSet = hasAgg;
      pCtx[k].input.startRowIndex = startOffset;
      pCtx[k].input.numOfRows = numOfRows;
    }
433 434 435
  }
}

dengyihao's avatar
dengyihao 已提交
436
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
437
                                   int32_t scanFlag, bool createDummyCol);
438

dengyihao's avatar
dengyihao 已提交
439 440
static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock,
                                    int32_t order) {
441
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
442
    pCtx[i].order = order;
443
    pCtx[i].input.numOfRows = pBlock->info.rows;
444
    setBlockStatisInfo(&pCtx[i], &pOperator->exprSupp.pExprInfo[i], pBlock);
445 446 447
  }
}

X
Xiaoyu Wang 已提交
448 449
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
                       int32_t scanFlag, bool createDummyCol) {
450
  if (pBlock->pBlockAgg != NULL) {
H
Haojun Liao 已提交
451
    doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
452
  } else {
453
    doSetInputDataBlock(pOperator, pCtx, pBlock, order, scanFlag, createDummyCol);
H
Haojun Liao 已提交
454
  }
455 456
}

L
Liu Jicong 已提交
457 458
static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t paramIndex,
                                             int32_t numOfRows) {
459 460 461 462 463 464 465 466
  SColumnInfoData* pColInfo = NULL;
  if (pInput->pData[paramIndex] == NULL) {
    pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pColInfo == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    // Set the correct column info (data type and bytes)
467 468
    pColInfo->info.type = pFuncParam->param.nType;
    pColInfo->info.bytes = pFuncParam->param.nLen;
469 470

    pInput->pData[paramIndex] = pColInfo;
471 472
  } else {
    pColInfo = pInput->pData[paramIndex];
473 474
  }

475
  colInfoDataEnsureCapacity(pColInfo, numOfRows);
476

477
  int8_t type = pFuncParam->param.nType;
478 479
  if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
    int64_t v = pFuncParam->param.i;
dengyihao's avatar
dengyihao 已提交
480
    for (int32_t i = 0; i < numOfRows; ++i) {
481 482 483 484
      colDataAppendInt64(pColInfo, i, &v);
    }
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
dengyihao's avatar
dengyihao 已提交
485
    for (int32_t i = 0; i < numOfRows; ++i) {
486 487
      colDataAppendDouble(pColInfo, i, &v);
    }
488
  } else if (type == TSDB_DATA_TYPE_VARCHAR) {
L
Liu Jicong 已提交
489
    char* tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE);
490
    STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen);
L
Liu Jicong 已提交
491
    for (int32_t i = 0; i < numOfRows; ++i) {
492 493
      colDataAppend(pColInfo, i, tmp, false);
    }
494 495 496 497 498
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
499
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
X
Xiaoyu Wang 已提交
500
                                   int32_t scanFlag, bool createDummyCol) {
501 502
  int32_t code = TSDB_CODE_SUCCESS;

503
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
L
Liu Jicong 已提交
504
    pCtx[i].order = order;
505 506
    pCtx[i].input.numOfRows = pBlock->info.rows;

L
Liu Jicong 已提交
507
    pCtx[i].pSrcBlock = pBlock;
X
Xiaoyu Wang 已提交
508
    pCtx[i].scanFlag = scanFlag;
H
Haojun Liao 已提交
509

510
    SInputColumnInfoData* pInput = &pCtx[i].input;
511
    pInput->uid = pBlock->info.uid;
C
Cary Xu 已提交
512
    pInput->colDataAggIsSet = false;
513

514
    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
515
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
dengyihao's avatar
dengyihao 已提交
516
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
G
Ganlin Zhao 已提交
517 518
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
dengyihao's avatar
dengyihao 已提交
519
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
520 521 522
        pInput->totalRows = pBlock->info.rows;
        pInput->numOfRows = pBlock->info.rows;
        pInput->startRowIndex = 0;
523

524
        // NOTE: the last parameter is the primary timestamp column
H
Haojun Liao 已提交
525 526 527 528
        // todo: refactor this
        if (fmIsTimelineFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
          pInput->pPTS = pInput->pData[j];   // in case of merge function, this is not always the ts column data.
//          ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP);
529
        }
530 531
        ASSERT(pInput->pData[j] != NULL);
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
532 533 534
        // todo avoid case: top(k, 12), 12 is the value parameter.
        // sum(11), 11 is also the value parameter.
        if (createDummyCol && pOneExpr->base.numOfParams == 1) {
535 536 537 538
          pInput->totalRows = pBlock->info.rows;
          pInput->numOfRows = pBlock->info.rows;
          pInput->startRowIndex = 0;

539
          code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows);
540 541 542
          if (code != TSDB_CODE_SUCCESS) {
            return code;
          }
543
        }
G
Ganlin Zhao 已提交
544 545
      }
    }
H
Haojun Liao 已提交
546
  }
547 548

  return code;
H
Haojun Liao 已提交
549 550
}

551
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
552
  for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
H
Haojun Liao 已提交
553
    if (functionNeedToExecute(&pCtx[k])) {
554
      // todo add a dummy funtion to avoid process check
555 556 557
      if (pCtx[k].fpSet.process == NULL) {
        continue;
      }
H
Haojun Liao 已提交
558

559 560 561 562
      int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
      if (code != TSDB_CODE_SUCCESS) {
        qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
        return code;
563
      }
564 565
    }
  }
566 567

  return TSDB_CODE_SUCCESS;
568 569
}

H
Haojun Liao 已提交
570
static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) {
dengyihao's avatar
dengyihao 已提交
571
  size_t num = (pPseudoList != NULL) ? taosArrayGetSize(pPseudoList) : 0;
H
Haojun Liao 已提交
572 573 574 575 576
  for (int32_t i = 0; i < num; ++i) {
    pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i);
  }
}

577
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
X
Xiaoyu Wang 已提交
578
                              int32_t numOfOutput, SArray* pPseudoList) {
H
Haojun Liao 已提交
579
  setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
H
Haojun Liao 已提交
580
  pResult->info.groupId = pSrcBlock->info.groupId;
H
Haojun Liao 已提交
581

582 583
  // if the source equals to the destination, it is to create a new column as the result of scalar
  // function or some operators.
584 585
  bool createNewColModel = (pResult == pSrcBlock);

586 587
  int32_t numOfRows = 0;

588
  for (int32_t k = 0; k < numOfOutput; ++k) {
589 590
    int32_t               outputSlotId = pExpr[k].base.resSchema.slotId;
    SqlFunctionCtx*       pfCtx = &pCtx[k];
591
    SInputColumnInfoData* pInputData = &pfCtx->input;
592

L
Liu Jicong 已提交
593
    if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) {  // it is a project query
594
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
595
      if (pResult->info.rows > 0 && !createNewColModel) {
596 597
        colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pInputData->pData[0],
                        pInputData->numOfRows);
598
      } else {
599
        colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info);
600
      }
601

602
      numOfRows = pInputData->numOfRows;
603
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
604
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
605

dengyihao's avatar
dengyihao 已提交
606
      int32_t offset = createNewColModel ? 0 : pResult->info.rows;
607 608 609 610 611 612 613 614

      int32_t type = pExpr[k].base.pParam[0].param.nType;
      if (TSDB_DATA_TYPE_NULL == type) {
        colDataAppendNNULL(pColInfoData, offset, pSrcBlock->info.rows);
      } else {
        for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
          colDataAppend(pColInfoData, i + offset, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false);
        }
615
      }
616 617

      numOfRows = pSrcBlock->info.rows;
H
Haojun Liao 已提交
618
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) {
619 620 621
      SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
      taosArrayPush(pBlockList, &pSrcBlock);

622
      SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
623
      SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
624

625
      SScalarParam dest = {.columnData = &idata};
X
Xiaoyu Wang 已提交
626
      int32_t      code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
627 628 629 630
      if (code != TSDB_CODE_SUCCESS) {
        taosArrayDestroy(pBlockList);
        return code;
      }
631

dengyihao's avatar
dengyihao 已提交
632
      int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
633
      ASSERT(pResult->info.capacity > 0);
634
      colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
D
dapan1121 已提交
635 636
      colDataDestroy(&idata);
      
637
      numOfRows = dest.numOfRows;
638 639
      taosArrayDestroy(pBlockList);
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
640 641
      // _rowts/_c0, not tbname column
      if (fmIsPseudoColumnFunc(pfCtx->functionId) && (!fmIsScanPseudoColumnFunc(pfCtx->functionId))) {
H
Haojun Liao 已提交
642
        // do nothing
643
      } else if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) {
644 645
        SResultRowEntryInfo* pResInfo = GET_RES_INFO(pfCtx);
        pfCtx->fpSet.init(pfCtx, pResInfo);
646 647 648 649 650 651 652 653 654 655 656

        pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
        pfCtx->offset = createNewColModel ? 0 : pResult->info.rows;  // set the start offset

        // set the timestamp(_rowts) output buffer
        if (taosArrayGetSize(pPseudoList) > 0) {
          int32_t* outputColIndex = taosArrayGet(pPseudoList, 0);
          pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
        }

        numOfRows = pfCtx->fpSet.process(pfCtx);
H
Haojun Liao 已提交
657
      } else if (fmIsAggFunc(pfCtx->functionId)) {
658 659
        // _group_key function for "partition by tbname" + csum(col_name) query
        SColumnInfoData* pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
660
        int32_t          slotId = pfCtx->param[0].pCol->slotId;
661 662 663

        // todo handle the json tag
        SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
664
        for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
665 666 667 668 669 670 671 672 673
          bool isNull = colDataIsNull_s(pInput, f);
          if (isNull) {
            colDataAppendNULL(pOutput, pResult->info.rows + f);
          } else {
            char* data = colDataGetData(pInput, f);
            colDataAppend(pOutput, pResult->info.rows + f, data, isNull);
          }
        }

H
Haojun Liao 已提交
674 675 676
      } else {
        SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
        taosArrayPush(pBlockList, &pSrcBlock);
G
Ganlin Zhao 已提交
677

678
        SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
679
        SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
H
Haojun Liao 已提交
680

681
        SScalarParam dest = {.columnData = &idata};
X
Xiaoyu Wang 已提交
682
        int32_t      code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
683 684 685 686
        if (code != TSDB_CODE_SUCCESS) {
          taosArrayDestroy(pBlockList);
          return code;
        }
687

dengyihao's avatar
dengyihao 已提交
688
        int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
689
        ASSERT(pResult->info.capacity > 0);
690
        colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
D
dapan1121 已提交
691
        colDataDestroy(&idata);
692 693

        numOfRows = dest.numOfRows;
H
Haojun Liao 已提交
694 695
        taosArrayDestroy(pBlockList);
      }
696
    } else {
697
      ASSERT(0);
698 699
    }
  }
700

701 702 703
  if (!createNewColModel) {
    pResult->info.rows += numOfRows;
  }
704 705

  return TSDB_CODE_SUCCESS;
706 707
}

5
54liuyao 已提交
708
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
709
  struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
710

711 712 713 714 715
  // in case of timestamp column, always generated results.
  int32_t functionId = pCtx->functionId;
  if (functionId == -1) {
    return false;
  }
716

717 718
  if (pCtx->scanFlag == REPEAT_SCAN) {
    return fmIsRepeatScanFunc(pCtx->functionId);
719 720
  }

721 722
  if (isRowEntryCompleted(pResInfo)) {
    return false;
723 724
  }

725 726 727
  return true;
}

728 729 730 731 732 733 734
static int32_t doCreateConstantValColumnAggInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type,
                                                int32_t paramIndex, int32_t numOfRows) {
  if (pInput->pData[paramIndex] == NULL) {
    pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pInput->pData[paramIndex] == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
735

736 737 738
    // Set the correct column info (data type and bytes)
    pInput->pData[paramIndex]->info.type = type;
    pInput->pData[paramIndex]->info.bytes = tDataTypes[type].bytes;
739
  }
H
Haojun Liao 已提交
740

741 742 743 744 745 746
  SColumnDataAgg* da = NULL;
  if (pInput->pColumnDataAgg[paramIndex] == NULL) {
    da = taosMemoryCalloc(1, sizeof(SColumnDataAgg));
    pInput->pColumnDataAgg[paramIndex] = da;
    if (da == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
747 748
    }
  } else {
749
    da = pInput->pColumnDataAgg[paramIndex];
750 751
  }

752
  ASSERT(!IS_VAR_DATA_TYPE(type));
753

754 755
  if (type == TSDB_DATA_TYPE_BIGINT) {
    int64_t v = pFuncParam->param.i;
756
    *da = (SColumnDataAgg){.numOfNull = 0, .min = v, .max = v, .sum = v * numOfRows};
757 758
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
759
    *da = (SColumnDataAgg){.numOfNull = 0};
760

761 762 763 764 765 766
    *(double*)&da->min = v;
    *(double*)&da->max = v;
    *(double*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_BOOL) {  // todo validate this data type
    bool v = pFuncParam->param.i;

767
    *da = (SColumnDataAgg){.numOfNull = 0};
768 769 770 771 772
    *(bool*)&da->min = 0;
    *(bool*)&da->max = v;
    *(bool*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_TIMESTAMP) {
    // do nothing
773
  } else {
774
    ASSERT(0);
775 776
  }

777 778
  return TSDB_CODE_SUCCESS;
}
779 780 781 782 783 784 785 786 787 788 789

void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock) {
  int32_t numOfRows = pBlock->info.rows;

  SInputColumnInfoData* pInput = &pCtx->input;
  pInput->numOfRows = numOfRows;
  pInput->totalRows = numOfRows;

  if (pBlock->pBlockAgg != NULL) {
    pInput->colDataAggIsSet = true;

790 791
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
792

793 794
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
795 796 797 798
        pInput->pColumnDataAgg[j] = pBlock->pBlockAgg[slotId];
        if (pInput->pColumnDataAgg[j] == NULL) {
          pInput->colDataAggIsSet = false;
        }
799 800 801 802

        // Here we set the column info data since the data type for each column data is required, but
        // the data in the corresponding SColumnInfoData will not be used.
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
803 804
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
        doCreateConstantValColumnAggInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
805 806
      }
    }
807
  } else {
808
    pInput->colDataAggIsSet = false;
809 810 811
  }

  // set the statistics data for primary time stamp column
812 813 814 815 816
  //  if (pCtx->functionId == FUNCTION_SPREAD && pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
  //    pCtx->isAggSet = true;
  //    pCtx->agg.min = pBlock->info.window.skey;
  //    pCtx->agg.max = pBlock->info.window.ekey;
  //  }
817 818
}

L
Liu Jicong 已提交
819
bool isTaskKilled(SExecTaskInfo* pTaskInfo) {
820 821
  // query has been executed more than tsShellActivityTimer, and the retrieve has not arrived
  // abort current query execution.
L
Liu Jicong 已提交
822 823
  if (pTaskInfo->owner != 0 &&
      ((taosGetTimestampSec() - pTaskInfo->cost.start / 1000) > 10 * getMaximumIdleDurationSec())
824 825
      /*(!needBuildResAfterQueryComplete(pTaskInfo))*/) {
    assert(pTaskInfo->cost.start != 0);
L
Liu Jicong 已提交
826 827 828
    //    qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64
    //           ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec());
    //    return true;
829 830 831 832 833
  }

  return false;
}

L
Liu Jicong 已提交
834
void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED; }
835 836

/////////////////////////////////////////////////////////////////////////////////////////////
837 838 839
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) {
  STimeWindow win  = {0};
  win.skey = taosTimeTruncate(key, pInterval, precision);
840 841

  /*
H
Haojun Liao 已提交
842
   * if the realSkey > INT64_MAX - pInterval->interval, the query duration between
843 844
   * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
   */
845 846 847
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
  if (win.ekey < win.skey) {
    win.ekey = INT64_MAX;
848
  }
849 850

  return win;
851 852
}

853
#if 0
L
Liu Jicong 已提交
854
static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
855

856 857 858
  bool hasFirstLastFunc = false;
  bool hasOtherFunc = false;

859
  if (status == BLK_DATA_DATA_LOAD || status == BLK_DATA_FILTEROUT) {
860 861 862 863 864
    return status;
  }

  for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
    int32_t functionId = getExprFunctionId(&pQuery->pExpr1[i]);
865

866 867 868 869 870 871 872 873 874 875
    if (functionId == FUNCTION_TS || functionId == FUNCTION_TS_DUMMY || functionId == FUNCTION_TAG ||
        functionId == FUNCTION_TAG_DUMMY) {
      continue;
    }

    if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_LAST_DST) {
      hasFirstLastFunc = true;
    } else {
      hasOtherFunc = true;
    }
876

877 878
  }

879
  if (hasFirstLastFunc && status == BLK_DATA_NOT_LOAD) {
L
Liu Jicong 已提交
880
    if (!hasOtherFunc) {
881
      return BLK_DATA_FILTEROUT;
882
    } else {
883
      return BLK_DATA_DATA_LOAD;
884 885 886 887 888 889
    }
  }

  return status;
}

890 891
#endif

L
Liu Jicong 已提交
892 893
// static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableReq* pQueryMsg, bool stableQuery) {
//   STaskAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
H
Haojun Liao 已提交
894
//
L
Liu Jicong 已提交
895 896 897 898
//   // in case of point-interpolation query, use asc order scan
//   char msg[] = "QInfo:0x%"PRIx64" scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%"
//   PRId64
//                "-%" PRId64 ", new qrange:%" PRId64 "-%" PRId64;
H
Haojun Liao 已提交
899
//
L
Liu Jicong 已提交
900 901 902 903 904
//   // todo handle the case the the order irrelevant query type mixed up with order critical query type
//   // descending order query for last_row query
//   if (isFirstLastRowQuery(pQueryAttr)) {
//     //qDebug("QInfo:0x%"PRIx64" scan order changed for last_row query, old:%d, new:%d", pQInfo->qId,
//     pQueryAttr->order.order, TSDB_ORDER_ASC);
H
Haojun Liao 已提交
905
//
L
Liu Jicong 已提交
906 907
//     pQueryAttr->order.order = TSDB_ORDER_ASC;
//     if (pQueryAttr->window.skey > pQueryAttr->window.ekey) {
wafwerar's avatar
wafwerar 已提交
908
//       TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
L
Liu Jicong 已提交
909
//     }
H
Haojun Liao 已提交
910
//
L
Liu Jicong 已提交
911 912 913
//     pQueryAttr->needReverseScan = false;
//     return;
//   }
H
Haojun Liao 已提交
914
//
L
Liu Jicong 已提交
915 916 917
//   if (pQueryAttr->groupbyColumn && pQueryAttr->order.order == TSDB_ORDER_DESC) {
//     pQueryAttr->order.order = TSDB_ORDER_ASC;
//     if (pQueryAttr->window.skey > pQueryAttr->window.ekey) {
wafwerar's avatar
wafwerar 已提交
918
//       TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
L
Liu Jicong 已提交
919
//     }
H
Haojun Liao 已提交
920
//
L
Liu Jicong 已提交
921 922 923 924
//     pQueryAttr->needReverseScan = false;
//     doUpdateLastKey(pQueryAttr);
//     return;
//   }
H
Haojun Liao 已提交
925
//
L
Liu Jicong 已提交
926 927 928 929 930 931
//   if (pQueryAttr->pointInterpQuery && pQueryAttr->interval.interval == 0) {
//     if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
//       //qDebug(msg, pQInfo->qId, "interp", pQueryAttr->order.order, TSDB_ORDER_ASC, pQueryAttr->window.skey,
//       pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey); TSWAP(pQueryAttr->window.skey,
//       pQueryAttr->window.ekey, TSKEY);
//     }
H
Haojun Liao 已提交
932
//
L
Liu Jicong 已提交
933 934 935
//     pQueryAttr->order.order = TSDB_ORDER_ASC;
//     return;
//   }
H
Haojun Liao 已提交
936
//
L
Liu Jicong 已提交
937 938 939 940
//   if (pQueryAttr->interval.interval == 0) {
//     if (onlyFirstQuery(pQueryAttr)) {
//       if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
//         //qDebug(msg, pQInfo->qId, "only-first", pQueryAttr->order.order, TSDB_ORDER_ASC, pQueryAttr->window.skey,
H
Haojun Liao 已提交
941 942
////               pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
//
wafwerar's avatar
wafwerar 已提交
943
//        TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
H
Haojun Liao 已提交
944 945 946 947 948 949 950 951 952 953
//        doUpdateLastKey(pQueryAttr);
//      }
//
//      pQueryAttr->order.order = TSDB_ORDER_ASC;
//      pQueryAttr->needReverseScan = false;
//    } else if (onlyLastQuery(pQueryAttr) && notContainSessionOrStateWindow(pQueryAttr)) {
//      if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//        //qDebug(msg, pQInfo->qId, "only-last", pQueryAttr->order.order, TSDB_ORDER_DESC, pQueryAttr->window.skey,
////               pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
//
wafwerar's avatar
wafwerar 已提交
954
//        TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
H
Haojun Liao 已提交
955 956 957 958 959 960 961 962 963 964 965 966
//        doUpdateLastKey(pQueryAttr);
//      }
//
//      pQueryAttr->order.order = TSDB_ORDER_DESC;
//      pQueryAttr->needReverseScan = false;
//    }
//
//  } else {  // interval query
//    if (stableQuery) {
//      if (onlyFirstQuery(pQueryAttr)) {
//        if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
//          //qDebug(msg, pQInfo->qId, "only-first stable", pQueryAttr->order.order, TSDB_ORDER_ASC,
L
Liu Jicong 已提交
967 968
////                 pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey,
/// pQueryAttr->window.skey);
H
Haojun Liao 已提交
969
//
wafwerar's avatar
wafwerar 已提交
970
//          TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
H
Haojun Liao 已提交
971 972 973 974 975 976 977 978
//          doUpdateLastKey(pQueryAttr);
//        }
//
//        pQueryAttr->order.order = TSDB_ORDER_ASC;
//        pQueryAttr->needReverseScan = false;
//      } else if (onlyLastQuery(pQueryAttr)) {
//        if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//          //qDebug(msg, pQInfo->qId, "only-last stable", pQueryAttr->order.order, TSDB_ORDER_DESC,
L
Liu Jicong 已提交
979 980
////                 pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey,
/// pQueryAttr->window.skey);
H
Haojun Liao 已提交
981
//
wafwerar's avatar
wafwerar 已提交
982
//          TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
H
Haojun Liao 已提交
983 984 985 986 987 988 989 990 991
//          doUpdateLastKey(pQueryAttr);
//        }
//
//        pQueryAttr->order.order = TSDB_ORDER_DESC;
//        pQueryAttr->needReverseScan = false;
//      }
//    }
//  }
//}
992

L
Liu Jicong 已提交
993 994 995
// static FORCE_INLINE bool doFilterByBlockStatistics(STaskRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis,
// SqlFunctionCtx *pCtx, int32_t numOfRows) {
//   STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
996
//
L
Liu Jicong 已提交
997 998 999
//   if (pDataStatis == NULL || pQueryAttr->pFilters == NULL) {
//     return true;
//   }
1000
//
L
Liu Jicong 已提交
1001 1002
//   return filterRangeExecute(pQueryAttr->pFilters, pDataStatis, pQueryAttr->numOfCols, numOfRows);
// }
1003
#if 0
H
Haojun Liao 已提交
1004
static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockInfo) {
1005 1006
  STimeWindow w = {0};

dengyihao's avatar
dengyihao 已提交
1007 1008
  TSKEY sk = TMIN(pQueryAttr->window.skey, pQueryAttr->window.ekey);
  TSKEY ek = TMAX(pQueryAttr->window.skey, pQueryAttr->window.ekey);
1009

1010
  if (true) {
L
Liu Jicong 已提交
1011
    //    getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.skey, sk, ek, &w);
1012 1013 1014 1015 1016 1017
    assert(w.ekey >= pBlockInfo->window.skey);

    if (w.ekey < pBlockInfo->window.ekey) {
      return true;
    }

L
Liu Jicong 已提交
1018 1019
    while (1) {
      //      getNextTimeWindow(pQueryAttr, &w);
1020 1021 1022 1023 1024 1025 1026 1027 1028 1029
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

      assert(w.ekey > pBlockInfo->window.ekey);
      if (w.skey <= pBlockInfo->window.ekey && w.skey > pBlockInfo->window.skey) {
        return true;
      }
    }
  } else {
L
Liu Jicong 已提交
1030
    //    getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.ekey, sk, ek, &w);
1031 1032 1033 1034 1035 1036
    assert(w.skey <= pBlockInfo->window.ekey);

    if (w.skey > pBlockInfo->window.skey) {
      return true;
    }

L
Liu Jicong 已提交
1037 1038
    while (1) {
      //      getNextTimeWindow(pQueryAttr, &w);
1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

      assert(w.skey < pBlockInfo->window.skey);
      if (w.ekey < pBlockInfo->window.ekey && w.ekey >= pBlockInfo->window.skey) {
        return true;
      }
    }
  }

  return false;
}
1052
#endif
1053 1054

static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) {
H
Haojun Liao 已提交
1055
#if 0
H
Haojun Liao 已提交
1056
  SqlFunctionCtx* pCtx = pTableScanInfo->pCtx;
1057
  uint32_t        status = BLK_DATA_NOT_LOAD;
1058

L
Liu Jicong 已提交
1059
  int32_t numOfOutput = 0;  // pTableScanInfo->numOfOutput;
1060 1061
  for (int32_t i = 0; i < numOfOutput; ++i) {
    int32_t functionId = pCtx[i].functionId;
H
Haojun Liao 已提交
1062
    int32_t colId = pTableScanInfo->pExpr[i].base.pParam[0].pCol->colId;
1063 1064 1065

    // group by + first/last should not apply the first/last block filter
    if (functionId < 0) {
1066
      status |= BLK_DATA_DATA_LOAD;
1067 1068
      return status;
    } else {
L
Liu Jicong 已提交
1069
      //      status |= aAggs[functionId].dataReqFunc(&pTableScanInfo->pCtx[i], &pBlock->info.window, colId);
1070
      //      if ((status & BLK_DATA_DATA_LOAD) == BLK_DATA_DATA_LOAD) {
L
Liu Jicong 已提交
1071 1072
      //        return status;
      //      }
1073 1074 1075 1076
    }
  }

  return status;
H
Haojun Liao 已提交
1077 1078
#endif
  return 0;
1079 1080
}

L
Liu Jicong 已提交
1081 1082
int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
                              uint32_t* status) {
1083
  *status = BLK_DATA_NOT_LOAD;
1084

H
Haojun Liao 已提交
1085
  pBlock->pDataBlock = NULL;
L
Liu Jicong 已提交
1086
  pBlock->pBlockAgg = NULL;
H
Haojun Liao 已提交
1087

L
Liu Jicong 已提交
1088 1089
  //  int64_t groupId = pRuntimeEnv->current->groupIndex;
  //  bool    ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
1090

H
Haojun Liao 已提交
1091
  STaskCostInfo* pCost = &pTaskInfo->cost;
1092

1093 1094
//  pCost->totalBlocks += 1;
//  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
1095
#if 0
1096 1097 1098
  // Calculate all time windows that are overlapping or contain current data block.
  // If current data block is contained by all possible time window, do not load current data block.
  if (/*pQueryAttr->pFilters || */pQueryAttr->groupbyColumn || pQueryAttr->sw.gap > 0 ||
H
Haojun Liao 已提交
1099
      (QUERY_IS_INTERVAL_QUERY(pQueryAttr) && overlapWithTimeWindow(pTaskInfo, &pBlock->info))) {
1100
    (*status) = BLK_DATA_DATA_LOAD;
1101 1102 1103
  }

  // check if this data block is required to load
1104
  if ((*status) != BLK_DATA_DATA_LOAD) {
1105 1106 1107 1108 1109 1110 1111
    bool needFilter = true;

    // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
    // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
    if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
      SResultRow* pResult = NULL;

H
Haojun Liao 已提交
1112
      bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
1113 1114 1115 1116 1117 1118
      TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

      STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
      if (pQueryAttr->pointInterpQuery) {
        needFilter = chkWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
1119
                                    pTableScanInfo->rowEntryInfoOffset);
1120 1121 1122
      } else {
        if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId,
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
1123
                                    pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
1124 1125 1126 1127 1128
          longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }
    } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate
      doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx,
1129
                               pTableScanInfo->rowEntryInfoOffset, pTableScanInfo->numOfOutput,
1130 1131 1132 1133 1134 1135
                               pRuntimeEnv->current->groupIndex);
    }

    if (needFilter) {
      (*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock);
    } else {
1136
      (*status) = BLK_DATA_DATA_LOAD;
1137 1138 1139 1140
    }
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
H
Haojun Liao 已提交
1141
//  *status = updateBlockLoadStatus(pRuntimeEnv->pQueryAttr, *status);
1142

1143
  if ((*status) == BLK_DATA_NOT_LOAD || (*status) == BLK_DATA_FILTEROUT) {
1144 1145
    //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//           pBlockInfo->window.ekey, pBlockInfo->rows);
1146
    pCost->skipBlocks += 1;
1147
  } else if ((*status) == BLK_DATA_SMA_LOAD) {
1148 1149
    // this function never returns error?
    pCost->loadBlockStatis += 1;
1150
//    tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
1151 1152

    if (pBlock->pBlockAgg == NULL) {  // data block statistics does not exist, load data block
1153
//      pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
1154 1155 1156
      pCost->totalCheckedRows += pBlock->info.rows;
    }
  } else {
1157
    assert((*status) == BLK_DATA_DATA_LOAD);
1158 1159 1160

    // load the data block statistics to perform further filter
    pCost->loadBlockStatis += 1;
1161
//    tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
1162 1163 1164 1165 1166 1167

    if (pQueryAttr->topBotQuery && pBlock->pBlockAgg != NULL) {
      { // set previous window
        if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
          SResultRow* pResult = NULL;

H
Haojun Liao 已提交
1168
          bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
1169 1170 1171 1172 1173
          TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

          STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
          if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId,
                                      pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
1174
                                      pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185
            longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
          }
        }
      }
      bool load = false;
      for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
        int32_t functionId = pTableScanInfo->pCtx[i].functionId;
        if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM) {
//          load = topbot_datablock_filter(&pTableScanInfo->pCtx[i], (char*)&(pBlock->pBlockAgg[i].min),
//                                         (char*)&(pBlock->pBlockAgg[i].max));
          if (!load) { // current block has been discard due to filter applied
1186
            pCost->skipBlocks += 1;
1187 1188
            //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId,
//                   pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
1189
            (*status) = BLK_DATA_FILTEROUT;
1190 1191 1192 1193 1194 1195 1196 1197
            return TSDB_CODE_SUCCESS;
          }
        }
      }
    }

    // current block has been discard due to filter applied
//    if (!doFilterByBlockStatistics(pRuntimeEnv, pBlock->pBlockAgg, pTableScanInfo->pCtx, pBlockInfo->rows)) {
1198
//      pCost->skipBlocks += 1;
1199 1200
//      qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//             pBlockInfo->window.ekey, pBlockInfo->rows);
1201
//      (*status) = BLK_DATA_FILTEROUT;
1202 1203 1204 1205 1206
//      return TSDB_CODE_SUCCESS;
//    }

    pCost->totalCheckedRows += pBlockInfo->rows;
    pCost->loadBlocks += 1;
1207
//    pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
1208 1209 1210 1211 1212
//    if (pBlock->pDataBlock == NULL) {
//      return terrno;
//    }

//    if (pQueryAttr->pFilters != NULL) {
1213
//      filterSetColFieldData(pQueryAttr->pFilters, taosArrayGetSize(pBlock->pDataBlock), pBlock->pDataBlock);
1214
//    }
1215

1216 1217 1218 1219
//    if (pQueryAttr->pFilters != NULL || pRuntimeEnv->pTsBuf != NULL) {
//      filterColRowsInDataBlock(pRuntimeEnv, pBlock, ascQuery);
//    }
  }
H
Haojun Liao 已提交
1220
#endif
1221 1222 1223
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1224
static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo) {
1225 1226 1227 1228
  if (pTableQueryInfo == NULL) {
    return;
  }

wafwerar's avatar
wafwerar 已提交
1229
  //  TSWAP(pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey);
L
Liu Jicong 已提交
1230
  //  pTableQueryInfo->lastKey = pTableQueryInfo->win.skey;
1231

L
Liu Jicong 已提交
1232 1233
  //  SWITCH_ORDER(pTableQueryInfo->cur.order);
  //  pTableQueryInfo->cur.vgroupIndex = -1;
1234 1235

  // set the index to be the end slot of result rows array
dengyihao's avatar
dengyihao 已提交
1236 1237 1238 1239 1240 1241
  //  SResultRowInfo* pResultRowInfo = &pTableQueryInfo->resInfo;
  //  if (pResultRowInfo->size > 0) {
  //    pResultRowInfo->curPos = pResultRowInfo->size - 1;
  //  } else {
  //    pResultRowInfo->curPos = -1;
  //  }
1242 1243
}

H
Haojun Liao 已提交
1244
void initResultRow(SResultRow* pResultRow) {
X
Xiaoyu Wang 已提交
1245
  //  pResultRow->pEntryInfo = (struct SResultRowEntryInfo*)((char*)pResultRow + sizeof(SResultRow));
1246 1247 1248 1249 1250
}

/*
 * The start of each column SResultRowEntryInfo is denote by RowCellInfoOffset.
 * Note that in case of top/bottom query, the whole multiple rows of result is treated as only one row of results.
H
Haojun Liao 已提交
1251 1252 1253
 * +------------+-----------------result column 1------------+------------------result column 2-----------+
 * | SResultRow | SResultRowEntryInfo | intermediate buffer1 | SResultRowEntryInfo | intermediate buffer 2|
 * +------------+--------------------------------------------+--------------------------------------------+
1254 1255
 *           offset[0]                                  offset[1]                                   offset[2]
 */
1256
// TODO refactor: some function move away
L
Liu Jicong 已提交
1257 1258 1259
void setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage,
                             int32_t numOfExprs) {
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
1260 1261
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
H
Haojun Liao 已提交
1262

H
Haojun Liao 已提交
1263
  SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
1264
  initResultRowInfo(pResultRowInfo);
H
Haojun Liao 已提交
1265

L
Liu Jicong 已提交
1266 1267
  int64_t     tid = 0;
  int64_t     groupId = 0;
1268 1269
  SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, (char*)&tid, sizeof(tid), true, groupId,
                                            pTaskInfo, false, pSup);
H
Haojun Liao 已提交
1270

1271
  for (int32_t i = 0; i < numOfExprs; ++i) {
1272
    struct SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, rowEntryInfoOffset);
H
Haojun Liao 已提交
1273 1274
    cleanupResultRowEntry(pEntry);

L
Liu Jicong 已提交
1275
    pCtx[i].resultInfo = pEntry;
1276
    pCtx[i].scanFlag = stage;
H
Haojun Liao 已提交
1277 1278
  }

1279
  initCtxOutputBuffer(pCtx, numOfExprs);
H
Haojun Liao 已提交
1280 1281
}

H
Haojun Liao 已提交
1282
void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) {
1283 1284
  for (int32_t j = 0; j < size; ++j) {
    struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
dengyihao's avatar
dengyihao 已提交
1285 1286
    if (isRowEntryInitialized(pResInfo) || fmIsPseudoColumnFunc(pCtx[j].functionId) || pCtx[j].functionId == -1 ||
        fmIsScalarFunc(pCtx[j].functionId)) {
1287 1288 1289
      continue;
    }

H
Haojun Liao 已提交
1290
    pCtx[j].fpSet.init(&pCtx[j], pCtx[j].resultInfo);
1291 1292 1293
  }
}

L
Liu Jicong 已提交
1294
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
1295
  if (status == TASK_NOT_COMPLETED) {
H
Haojun Liao 已提交
1296
    pTaskInfo->status = status;
1297 1298
  } else {
    // QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first
1299
    CLEAR_QUERY_STATUS(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
1300
    pTaskInfo->status |= status;
1301 1302 1303
  }
}

L
Liu Jicong 已提交
1304
void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) {
1305 1306 1307 1308
  if (pTableQueryInfo == NULL) {
    return;
  }

L
Liu Jicong 已提交
1309
  //  taosVariantDestroy(&pTableQueryInfo->tag);
dengyihao's avatar
dengyihao 已提交
1310
  //  cleanupResultRowInfo(&pTableQueryInfo->resInfo);
1311 1312
}

1313
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
1314
  for (int32_t i = 0; i < numOfOutput; ++i) {
1315
    pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
1316 1317 1318 1319 1320

    struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
    if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
      continue;
    }
1321 1322 1323 1324 1325

    if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
      continue;
    }

1326 1327 1328 1329 1330 1331
    if (!pResInfo->initialized) {
      if (pCtx[i].functionId != -1) {
        pCtx[i].fpSet.init(&pCtx[i], pResInfo);
      } else {
        pResInfo->initialized = true;
      }
1332 1333 1334 1335
    }
  }
}

H
Haojun Liao 已提交
1336
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep);
1337

1338
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
1339 1340 1341
  if (pFilterNode == NULL) {
    return;
  }
S
shenglian zhou 已提交
1342 1343 1344
  if (pBlock->info.rows == 0) {
    return;
  }
1345
  SFilterInfo* filter = NULL;
H
Haojun Liao 已提交
1346

H
Haojun Liao 已提交
1347
  // todo move to the initialization function
H
Haojun Liao 已提交
1348
  int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
1349

1350
  size_t             numOfCols = taosArrayGetSize(pBlock->pDataBlock);
1351
  SFilterColumnParam param1 = {.numOfCols = numOfCols, .pDataBlock = pBlock->pDataBlock};
1352 1353 1354
  code = filterSetDataFromSlotId(filter, &param1);

  int8_t* rowRes = NULL;
1355

1356
  // todo the keep seems never to be True??
1357
  bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols);
D
dapan1121 已提交
1358
  filterFreeInfo(filter);
1359

H
Haojun Liao 已提交
1360
  extractQualifiedTupleByFilterResult(pBlock, rowRes, keep);
1361
  blockDataUpdateTsWindow(pBlock, 0);
H
Haojun Liao 已提交
1362 1363

  taosMemoryFree(rowRes);
1364 1365
}

H
Haojun Liao 已提交
1366
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep) {
1367 1368 1369 1370 1371
  if (keep) {
    return;
  }

  if (rowRes != NULL) {
L
Liu Jicong 已提交
1372
    int32_t      totalRows = pBlock->info.rows;
1373
    SSDataBlock* px = createOneDataBlock(pBlock, true);
1374

1375 1376
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
    for (int32_t i = 0; i < numOfCols; ++i) {
1377 1378
      SColumnInfoData* pSrc = taosArrayGet(px->pDataBlock, i);
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
1379
      // it is a reserved column for scalar function, and no data in this column yet.
1380
      if (pDst->pData == NULL || pSrc->pData == NULL) {
1381 1382 1383
        continue;
      }

1384 1385
      colInfoDataCleanup(pDst, pBlock->info.rows);

1386
      int32_t numOfRows = 0;
1387
      for (int32_t j = 0; j < totalRows; ++j) {
D
dapan1121 已提交
1388 1389 1390
        if (rowRes[j] == 0) {
          continue;
        }
1391

D
dapan1121 已提交
1392
        if (colDataIsNull_s(pSrc, j)) {
1393
          colDataAppendNULL(pDst, numOfRows);
D
dapan1121 已提交
1394
        } else {
1395
          colDataAppend(pDst, numOfRows, colDataGetData(pSrc, j), false);
D
dapan1121 已提交
1396
        }
1397
        numOfRows += 1;
H
Haojun Liao 已提交
1398
      }
1399

1400 1401 1402 1403 1404
      if (pBlock->info.rows == totalRows) {
        pBlock->info.rows = numOfRows;
      } else {
        ASSERT(pBlock->info.rows == numOfRows);
      }
1405
    }
1406

dengyihao's avatar
dengyihao 已提交
1407
    blockDataDestroy(px);  // fix memory leak
1408 1409 1410
  } else {
    // do nothing
    pBlock->info.rows = 0;
1411 1412 1413
  }
}

L
Liu Jicong 已提交
1414 1415
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput,
                              uint64_t groupId) {
1416
  // for simple group by query without interval, all the tables belong to one group result.
L
Liu Jicong 已提交
1417
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
1418
  SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
1419 1420
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
1421

1422
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId,
L
Liu Jicong 已提交
1423
                                                  sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
L
Liu Jicong 已提交
1424
  assert(pResultRow != NULL);
1425 1426 1427 1428 1429 1430

  /*
   * not assign result buffer yet, add new result buffer
   * all group belong to one result set, and each group result has different group id so set the id to be one
   */
  if (pResultRow->pageId == -1) {
dengyihao's avatar
dengyihao 已提交
1431 1432
    int32_t ret =
        addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, groupId, pAggInfo->binfo.pRes->info.rowSize);
1433 1434 1435 1436 1437
    if (ret != TSDB_CODE_SUCCESS) {
      return;
    }
  }

1438
  setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
1439 1440
}

1441
void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId, SAggOperatorInfo* pAggInfo) {
H
Haojun Liao 已提交
1442
  if (pAggInfo->groupId != INT32_MIN && pAggInfo->groupId == groupId) {
1443 1444
    return;
  }
1445
#ifdef BUF_PAGE_DEBUG
L
Liu Jicong 已提交
1446
  qDebug("page_setbuf, groupId:%" PRIu64, groupId);
1447
#endif
1448
  doSetTableGroupOutputBuf(pOperator, pAggInfo, numOfOutput, groupId);
1449 1450

  // record the current active group id
H
Haojun Liao 已提交
1451
  pAggInfo->groupId = groupId;
1452 1453
}

1454 1455
static void doUpdateNumOfRows(SResultRow* pRow, int32_t numOfExprs, const int32_t* rowCellOffset) {
  for (int32_t j = 0; j < numOfExprs; ++j) {
1456
    struct SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowCellOffset);
1457 1458 1459 1460 1461 1462 1463 1464 1465 1466
    if (!isRowEntryInitialized(pResInfo)) {
      continue;
    }

    if (pRow->numOfRows < pResInfo->numOfRes) {
      pRow->numOfRows = pResInfo->numOfRes;
    }
  }
}

1467
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
S
shenglian zhou 已提交
1468 1469 1470
                                             SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs,
                                             const int32_t* rowCellOffset, SSDataBlock* pBlock,
                                             SExecTaskInfo* pTaskInfo) {
1471 1472 1473 1474 1475 1476 1477 1478 1479
  SFilePage*  page = getBufPage(pBuf, resultRowPosition->pageId);
  SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);

  doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset);
  if (pRow->numOfRows == 0) {
    releaseBufPage(pBuf, page);
    return 0;
  }

1480 1481 1482 1483 1484 1485 1486
  while (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
    int32_t code = blockDataEnsureCapacity(pBlock, pBlock->info.capacity * 1.25);
    if (TAOS_FAILED(code)) {
      releaseBufPage(pBuf, page);
      qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
      longjmp(pTaskInfo->env, code);
    }
1487 1488 1489 1490 1491
  }

  for (int32_t j = 0; j < numOfExprs; ++j) {
    int32_t slotId = pExprInfo[j].base.resSchema.slotId;

1492
    pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowCellOffset);
1493 1494 1495 1496 1497 1498 1499 1500 1501
    if (pCtx[j].fpSet.finalize) {
      int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
      if (TAOS_FAILED(code)) {
        qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
        longjmp(pTaskInfo->env, code);
      }
    } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
      // do nothing, todo refactor
    } else {
1502 1503
      // expand the result into multiple rows. E.g., _wstart, top(k, 20)
      // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
1504 1505 1506 1507 1508 1509 1510 1511 1512
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
      char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
      for (int32_t k = 0; k < pRow->numOfRows; ++k) {
        colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
      }
    }
  }

  releaseBufPage(pBuf, page);
1513
  pBlock->info.rows += pRow->numOfRows;
1514 1515 1516 1517

  return 0;
}

X
Xiaoyu Wang 已提交
1518 1519 1520
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf,
                           SGroupResInfo* pGroupResInfo, const int32_t* rowCellOffset, SqlFunctionCtx* pCtx,
                           int32_t numOfExprs) {
1521
  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
1522
  int32_t start = pGroupResInfo->index;
1523

1524
  for (int32_t i = start; i < numOfRows; i += 1) {
L
Liu Jicong 已提交
1525 1526
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
    SFilePage*  page = getBufPage(pBuf, pPos->pos.pageId);
1527

1528
    SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
1529 1530

    doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset);
1531 1532
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
1533
      releaseBufPage(pBuf, page);
1534 1535 1536
      continue;
    }

1537 1538 1539 1540 1541
    if (pBlock->info.groupId == 0) {
      pBlock->info.groupId = pPos->groupId;
    } else {
      // current value belongs to different group, it can't be packed into one datablock
      if (pBlock->info.groupId != pPos->groupId) {
1542
        releaseBufPage(pBuf, page);
1543 1544 1545 1546
        break;
      }
    }

1547
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
1548
      releaseBufPage(pBuf, page);
1549 1550 1551 1552 1553
      break;
    }

    pGroupResInfo->index += 1;

1554
    for (int32_t j = 0; j < numOfExprs; ++j) {
1555 1556
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

1557
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowCellOffset);
1558
      if (pCtx[j].fpSet.finalize) {
1559
#ifdef BUF_PAGE_DEBUG
wmmhello's avatar
wmmhello 已提交
1560
        qDebug("\npage_finalize %d", numOfExprs);
1561
#endif
1562
        int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
1563
        if (TAOS_FAILED(code)) {
1564 1565
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
          longjmp(pTaskInfo->env, code);
1566
        }
1567 1568
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
1569
      } else {
1570 1571
        // expand the result into multiple rows. E.g., _wstart, top(k, 20)
        // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
X
Xiaoyu Wang 已提交
1572 1573
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
1574
        if (pCtx[j].increase) {
L
Liu Jicong 已提交
1575
          int64_t ts = *(int64_t*)in;
1576
          for (int32_t k = 0; k < pRow->numOfRows; ++k) {
L
Liu Jicong 已提交
1577
            colDataAppend(pColInfoData, pBlock->info.rows + k, (const char*)&ts, pCtx[j].resultInfo->isNullRes);
1578 1579 1580 1581 1582 1583
            ts++;
          }
        } else {
          for (int32_t k = 0; k < pRow->numOfRows; ++k) {
            colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
          }
X
Xiaoyu Wang 已提交
1584
        }
1585
      }
1586 1587
    }

1588
    releaseBufPage(pBuf, page);
1589
    pBlock->info.rows += pRow->numOfRows;
L
Liu Jicong 已提交
1590 1591 1592
    //    if (pBlock->info.rows >= pBlock->info.capacity) {  // output buffer is full
    //      break;
    //    }
1593 1594
  }

X
Xiaoyu Wang 已提交
1595 1596
  qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
         pBlock->info.groupId);
1597
  blockDataUpdateTsWindow(pBlock, 0);
1598 1599 1600
  return 0;
}

X
Xiaoyu Wang 已提交
1601 1602
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                            SDiskbasedBuf* pBuf) {
1603 1604
  SExprInfo*     pExprInfo = pOperator->exprSupp.pExprInfo;
  int32_t        numOfExprs = pOperator->exprSupp.numOfExprs;
1605 1606
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

1607
  int32_t*        rowCellOffset = pOperator->exprSupp.rowEntryInfoOffset;
X
Xiaoyu Wang 已提交
1608
  SSDataBlock*    pBlock = pbInfo->pRes;
1609
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
1610

1611
  blockDataCleanup(pBlock);
1612
  if (!hasDataInGroupInfo(pGroupResInfo)) {
1613 1614 1615
    return;
  }

1616 1617
  // clear the existed group id
  pBlock->info.groupId = 0;
1618
  doCopyToSDataBlock(pTaskInfo, pBlock, pExprInfo, pBuf, pGroupResInfo, rowCellOffset, pCtx, numOfExprs);
1619 1620
}

L
Liu Jicong 已提交
1621
static void updateNumOfRowsInResultRows(SqlFunctionCtx* pCtx, int32_t numOfOutput, SResultRowInfo* pResultRowInfo,
1622
                                        int32_t* rowEntryInfoOffset) {
1623
  // update the number of result for each, only update the number of rows for the corresponding window result.
L
Liu Jicong 已提交
1624 1625 1626
  //  if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
  //    return;
  //  }
H
Haojun Liao 已提交
1627
#if 0
1628
  for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
L
Liu Jicong 已提交
1629
    SResultRow* pResult = pResultRowInfo->pResult[i];
1630 1631 1632 1633 1634 1635 1636

    for (int32_t j = 0; j < numOfOutput; ++j) {
      int32_t functionId = pCtx[j].functionId;
      if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG || functionId == FUNCTION_TAGPRJ) {
        continue;
      }

1637
      SResultRowEntryInfo* pCell = getResultEntryInfo(pResult, j, rowEntryInfoOffset);
1638
      pResult->numOfRows = (uint16_t)(TMAX(pResult->numOfRows, pCell->numOfRes));
1639 1640
    }
  }
H
Haojun Liao 已提交
1641
#endif
1642 1643
}

L
Liu Jicong 已提交
1644
static int32_t compressQueryColData(SColumnInfoData* pColRes, int32_t numOfRows, char* data, int8_t compressed) {
1645 1646
  int32_t colSize = pColRes->info.bytes * numOfRows;
  return (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colSize, numOfRows, data,
L
Liu Jicong 已提交
1647
                                                      colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
1648 1649
}

1650 1651 1652
int32_t doFillTimeIntervalGapsInResults(struct SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t capacity) {
  int32_t numOfRows = (int32_t)taosFillResultDataBlock(pFillInfo, pBlock, capacity - pBlock->info.rows);
  return pBlock->info.rows;
1653 1654
}

L
Liu Jicong 已提交
1655 1656
void queryCostStatis(SExecTaskInfo* pTaskInfo) {
  STaskCostInfo* pSummary = &pTaskInfo->cost;
1657

L
Liu Jicong 已提交
1658 1659 1660
  //  uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pResultRowHashTable);
  //  hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map);
  //  pSummary->hashSize = hashSize;
1661 1662 1663 1664

  // add the merge time
  pSummary->elapsedTime += pSummary->firstStageMergeTime;

L
Liu Jicong 已提交
1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675
  //  SResultRowPool* p = pTaskInfo->pool;
  //  if (p != NULL) {
  //    pSummary->winInfoSize = getResultRowPoolMemSize(p);
  //    pSummary->numOfTimeWindows = getNumOfAllocatedResultRows(p);
  //  } else {
  //    pSummary->winInfoSize = 0;
  //    pSummary->numOfTimeWindows = 0;
  //  }
  //
  //  calculateOperatorProfResults(pQInfo);

1676 1677
  SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
  if (pSummary->pRecoder != NULL) {
X
Xiaoyu Wang 已提交
1678 1679
    qDebug("%s :cost summary: elapsed time:%" PRId64 " us, first merge:%" PRId64
           " us, total blocks:%d, "
1680 1681 1682 1683
           "load block statis:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
           GET_TASKID(pTaskInfo), pSummary->elapsedTime, pSummary->firstStageMergeTime, pRecorder->totalBlocks,
           pRecorder->loadBlockStatis, pRecorder->loadBlocks, pRecorder->totalRows, pRecorder->totalCheckedRows);
  }
L
Liu Jicong 已提交
1684 1685 1686
  // qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb,
  // hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
  //      pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0);
1687 1688
}

L
Liu Jicong 已提交
1689 1690 1691
// static void updateOffsetVal(STaskRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
1692
//
L
Liu Jicong 已提交
1693
//   int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
1694
//
L
Liu Jicong 已提交
1695 1696 1697 1698
//   if (pQueryAttr->limit.offset == pBlockInfo->rows) {  // current block will ignore completed
//     pTableQueryInfo->lastKey = QUERY_IS_ASC_QUERY(pQueryAttr) ? pBlockInfo->window.ekey + step :
//     pBlockInfo->window.skey + step; pQueryAttr->limit.offset = 0; return;
//   }
1699
//
L
Liu Jicong 已提交
1700 1701 1702 1703 1704
//   if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//     pQueryAttr->pos = (int32_t)pQueryAttr->limit.offset;
//   } else {
//     pQueryAttr->pos = pBlockInfo->rows - (int32_t)pQueryAttr->limit.offset - 1;
//   }
1705
//
L
Liu Jicong 已提交
1706
//   assert(pQueryAttr->pos >= 0 && pQueryAttr->pos <= pBlockInfo->rows - 1);
1707
//
L
Liu Jicong 已提交
1708 1709
//   SArray *         pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//   SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1710
//
L
Liu Jicong 已提交
1711 1712
//   // update the pQueryAttr->limit.offset value, and pQueryAttr->pos value
//   TSKEY *keys = (TSKEY *) pColInfoData->pData;
1713
//
L
Liu Jicong 已提交
1714 1715 1716
//   // update the offset value
//   pTableQueryInfo->lastKey = keys[pQueryAttr->pos];
//   pQueryAttr->limit.offset = 0;
1717
//
L
Liu Jicong 已提交
1718
//   int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
1719
//
L
Liu Jicong 已提交
1720 1721 1722 1723
//   //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numBlocksOfStep:%d, numOfRes:%d,
//   lastKey:%"PRId64, GET_TASKID(pRuntimeEnv),
//          pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey);
// }
1724

L
Liu Jicong 已提交
1725 1726
// void skipBlocks(STaskRuntimeEnv *pRuntimeEnv) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
1727
//
L
Liu Jicong 已提交
1728 1729 1730
//   if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0) {
//     return;
//   }
1731
//
L
Liu Jicong 已提交
1732 1733
//   pQueryAttr->pos = 0;
//   int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
1734
//
L
Liu Jicong 已提交
1735 1736
//   STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
//   TsdbQueryHandleT pTsdbReadHandle = pRuntimeEnv->pTsdbReadHandle;
1737
//
L
Liu Jicong 已提交
1738 1739 1740 1741 1742
//   SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
//   while (tsdbNextDataBlock(pTsdbReadHandle)) {
//     if (isTaskKilled(pRuntimeEnv->qinfo)) {
//       longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
//     }
1743
//
L
Liu Jicong 已提交
1744
//     tsdbRetrieveDataBlockInfo(pTsdbReadHandle, &blockInfo);
1745
//
L
Liu Jicong 已提交
1746 1747 1748 1749
//     if (pQueryAttr->limit.offset > blockInfo.rows) {
//       pQueryAttr->limit.offset -= blockInfo.rows;
//       pTableQueryInfo->lastKey = (QUERY_IS_ASC_QUERY(pQueryAttr)) ? blockInfo.window.ekey : blockInfo.window.skey;
//       pTableQueryInfo->lastKey += step;
1750
//
L
Liu Jicong 已提交
1751 1752 1753 1754 1755 1756 1757
//       //qDebug("QInfo:0x%"PRIx64" skip rows:%d, offset:%" PRId64, GET_TASKID(pRuntimeEnv), blockInfo.rows,
//              pQuery->limit.offset);
//     } else {  // find the appropriated start position in current block
//       updateOffsetVal(pRuntimeEnv, &blockInfo);
//       break;
//     }
//   }
1758
//
L
Liu Jicong 已提交
1759 1760 1761 1762 1763 1764 1765 1766 1767
//   if (terrno != TSDB_CODE_SUCCESS) {
//     longjmp(pRuntimeEnv->env, terrno);
//   }
// }

// static TSKEY doSkipIntervalProcess(STaskRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo,
// STableQueryInfo* pTableQueryInfo) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
1768
//
L
Liu Jicong 已提交
1769 1770 1771
//   assert(pQueryAttr->limit.offset == 0);
//   STimeWindow tw = *win;
//   getNextTimeWindow(pQueryAttr, &tw);
1772
//
L
Liu Jicong 已提交
1773 1774
//   if ((tw.skey <= pBlockInfo->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) ||
//       (tw.ekey >= pBlockInfo->window.skey && !QUERY_IS_ASC_QUERY(pQueryAttr))) {
1775
//
L
Liu Jicong 已提交
1776 1777 1778 1779
//     // load the data block and check data remaining in current data block
//     // TODO optimize performance
//     SArray *         pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//     SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1780
//
L
Liu Jicong 已提交
1781 1782 1783 1784
//     tw = *win;
//     int32_t startPos =
//         getNextQualifiedWindow(pQueryAttr, &tw, pBlockInfo, pColInfoData->pData, binarySearchForKey, -1);
//     assert(startPos >= 0);
1785
//
L
Liu Jicong 已提交
1786 1787
//     // set the abort info
//     pQueryAttr->pos = startPos;
1788
//
L
Liu Jicong 已提交
1789 1790 1791 1792
//     // reset the query start timestamp
//     pTableQueryInfo->win.skey = ((TSKEY *)pColInfoData->pData)[startPos];
//     pQueryAttr->window.skey = pTableQueryInfo->win.skey;
//     TSKEY key = pTableQueryInfo->win.skey;
1793
//
L
Liu Jicong 已提交
1794 1795
//     pWindowResInfo->prevSKey = tw.skey;
//     int32_t index = pRuntimeEnv->resultRowInfo.curIndex;
1796
//
L
Liu Jicong 已提交
1797 1798
//     int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
//     pRuntimeEnv->resultRowInfo.curIndex = index;  // restore the window index
1799
//
L
Liu Jicong 已提交
1800 1801 1802 1803
//     //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d,
//     lastKey:%" PRId64,
//            GET_TASKID(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes,
//            pQueryAttr->current->lastKey);
1804
//
L
Liu Jicong 已提交
1805 1806 1807 1808 1809
//     return key;
//   } else {  // do nothing
//     pQueryAttr->window.skey      = tw.skey;
//     pWindowResInfo->prevSKey = tw.skey;
//     pTableQueryInfo->lastKey = tw.skey;
1810
//
L
Liu Jicong 已提交
1811 1812
//     return tw.skey;
//   }
1813
//
L
Liu Jicong 已提交
1814 1815 1816 1817 1818 1819 1820 1821 1822 1823
//   return true;
// }

// static bool skipTimeInterval(STaskRuntimeEnv *pRuntimeEnv, TSKEY* start) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//     assert(*start <= pRuntimeEnv->current->lastKey);
//   } else {
//     assert(*start >= pRuntimeEnv->current->lastKey);
//   }
1824
//
L
Liu Jicong 已提交
1825 1826 1827 1828 1829
//   // if queried with value filter, do NOT forward query start position
//   if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL ||
//   pRuntimeEnv->pFillInfo != NULL) {
//     return true;
//   }
1830
//
L
Liu Jicong 已提交
1831 1832 1833 1834 1835 1836 1837
//   /*
//    * 1. for interval without interpolation query we forward pQueryAttr->interval.interval at a time for
//    *    pQueryAttr->limit.offset times. Since hole exists, pQueryAttr->interval.interval*pQueryAttr->limit.offset
//    value is
//    *    not valid. otherwise, we only forward pQueryAttr->limit.offset number of points
//    */
//   assert(pRuntimeEnv->resultRowInfo.prevSKey == TSKEY_INITIAL_VAL);
1838
//
L
Liu Jicong 已提交
1839 1840
//   STimeWindow w = TSWINDOW_INITIALIZER;
//   bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
1841
//
L
Liu Jicong 已提交
1842 1843
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
//   STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
1844
//
L
Liu Jicong 已提交
1845 1846 1847
//   SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
//   while (tsdbNextDataBlock(pRuntimeEnv->pTsdbReadHandle)) {
//     tsdbRetrieveDataBlockInfo(pRuntimeEnv->pTsdbReadHandle, &blockInfo);
1848
//
L
Liu Jicong 已提交
1849 1850 1851 1852 1853 1854 1855 1856 1857
//     if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//       if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
//         getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.skey, blockInfo.window.skey, pQueryAttr->window.ekey,
//         &w); pWindowResInfo->prevSKey = w.skey;
//       }
//     } else {
//       getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.ekey, pQueryAttr->window.ekey, blockInfo.window.ekey, &w);
//       pWindowResInfo->prevSKey = w.skey;
//     }
1858
//
L
Liu Jicong 已提交
1859 1860
//     // the first time window
//     STimeWindow win = getActiveTimeWindow(pWindowResInfo, pWindowResInfo->prevSKey, pQueryAttr);
1861
//
L
Liu Jicong 已提交
1862 1863
//     while (pQueryAttr->limit.offset > 0) {
//       STimeWindow tw = win;
1864
//
L
Liu Jicong 已提交
1865 1866 1867
//       if ((win.ekey <= blockInfo.window.ekey && ascQuery) || (win.ekey >= blockInfo.window.skey && !ascQuery)) {
//         pQueryAttr->limit.offset -= 1;
//         pWindowResInfo->prevSKey = win.skey;
1868
//
L
Liu Jicong 已提交
1869 1870 1871 1872 1873 1874
//         // current time window is aligned with blockInfo.window.ekey
//         // restart it from next data block by set prevSKey to be TSKEY_INITIAL_VAL;
//         if ((win.ekey == blockInfo.window.ekey && ascQuery) || (win.ekey == blockInfo.window.skey && !ascQuery)) {
//           pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL;
//         }
//       }
1875
//
L
Liu Jicong 已提交
1876 1877 1878 1879
//       if (pQueryAttr->limit.offset == 0) {
//         *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//         return true;
//       }
1880
//
L
Liu Jicong 已提交
1881 1882
//       // current window does not ended in current data block, try next data block
//       getNextTimeWindow(pQueryAttr, &tw);
1883
//
L
Liu Jicong 已提交
1884 1885 1886 1887 1888 1889 1890 1891 1892
//       /*
//        * If the next time window still starts from current data block,
//        * load the primary timestamp column first, and then find the start position for the next queried time window.
//        * Note that only the primary timestamp column is required.
//        * TODO: Optimize for this cases. All data blocks are not needed to be loaded, only if the first actually
//        required
//        * time window resides in current data block.
//        */
//       if ((tw.skey <= blockInfo.window.ekey && ascQuery) || (tw.ekey >= blockInfo.window.skey && !ascQuery)) {
1893
//
L
Liu Jicong 已提交
1894 1895
//         SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//         SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1896
//
L
Liu Jicong 已提交
1897 1898 1899
//         if ((win.ekey > blockInfo.window.ekey && ascQuery) || (win.ekey < blockInfo.window.skey && !ascQuery)) {
//           pQueryAttr->limit.offset -= 1;
//         }
1900
//
L
Liu Jicong 已提交
1901 1902 1903 1904 1905 1906 1907 1908
//         if (pQueryAttr->limit.offset == 0) {
//           *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//           return true;
//         } else {
//           tw = win;
//           int32_t startPos =
//               getNextQualifiedWindow(pQueryAttr, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1);
//           assert(startPos >= 0);
1909
//
L
Liu Jicong 已提交
1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920
//           // set the abort info
//           pQueryAttr->pos = startPos;
//           pTableQueryInfo->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
//           pWindowResInfo->prevSKey = tw.skey;
//           win = tw;
//         }
//       } else {
//         break;  // offset is not 0, and next time window begins or ends in the next block.
//       }
//     }
//   }
1921
//
L
Liu Jicong 已提交
1922 1923 1924 1925
//   // check for error
//   if (terrno != TSDB_CODE_SUCCESS) {
//     longjmp(pRuntimeEnv->env, terrno);
//   }
1926
//
L
Liu Jicong 已提交
1927 1928
//   return true;
// }
1929

1930
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) {
H
Haojun Liao 已提交
1931
  if (p->pDownstream == NULL) {
H
Haojun Liao 已提交
1932
    assert(p->numOfDownstream == 0);
1933 1934
  }

wafwerar's avatar
wafwerar 已提交
1935
  p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES);
1936 1937 1938 1939 1940 1941 1942
  if (p->pDownstream == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  memcpy(p->pDownstream, pDownstream, num * POINTER_BYTES);
  p->numOfDownstream = num;
  return TSDB_CODE_SUCCESS;
1943 1944
}

wmmhello's avatar
wmmhello 已提交
1945
static void doDestroyTableList(STableListInfo* pTableqinfoList);
1946

1947
static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQueryInfo* pTableQueryInfo, int32_t order) {
H
Haojun Liao 已提交
1948 1949
#if 0
    if (order == TSDB_ORDER_ASC) {
1950 1951
    assert(
        (pTableQueryInfo->win.skey <= pTableQueryInfo->win.ekey) &&
H
Haojun Liao 已提交
1952 1953
        (pTableQueryInfo->lastKey >= pTaskInfo->window.skey) &&
        (pTableQueryInfo->win.skey >= pTaskInfo->window.skey && pTableQueryInfo->win.ekey <= pTaskInfo->window.ekey));
1954 1955 1956
  } else {
    assert(
        (pTableQueryInfo->win.skey >= pTableQueryInfo->win.ekey) &&
H
Haojun Liao 已提交
1957 1958
        (pTableQueryInfo->lastKey <= pTaskInfo->window.skey) &&
        (pTableQueryInfo->win.skey <= pTaskInfo->window.skey && pTableQueryInfo->win.ekey >= pTaskInfo->window.ekey));
1959
  }
H
Haojun Liao 已提交
1960
#endif
1961 1962
}

1963 1964 1965 1966
typedef struct SFetchRspHandleWrapper {
  uint32_t exchangeId;
  int32_t  sourceIndex;
} SFetchRspHandleWrapper;
1967

D
dapan1121 已提交
1968
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1969
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
1970 1971 1972 1973 1974 1975 1976

  SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
  if (pExchangeInfo == NULL) {
    qWarn("failed to acquire exchange operator, since it may have been released");
    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
1977
  int32_t          index = pWrapper->sourceIndex;
1978
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
1979

H
Haojun Liao 已提交
1980 1981
  if (code == TSDB_CODE_SUCCESS) {
    pSourceDataInfo->pRsp = pMsg->pData;
1982

H
Haojun Liao 已提交
1983 1984
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
    pRsp->numOfRows = htonl(pRsp->numOfRows);
dengyihao's avatar
dengyihao 已提交
1985
    pRsp->compLen = htonl(pRsp->compLen);
1986
    pRsp->numOfCols = htonl(pRsp->numOfCols);
dengyihao's avatar
dengyihao 已提交
1987
    pRsp->useconds = htobe64(pRsp->useconds);
1988

1989
    ASSERT(pRsp != NULL);
1990
    qDebug("%s fetch rsp received, index:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfRows);
H
Haojun Liao 已提交
1991 1992
  } else {
    pSourceDataInfo->code = code;
D
dapan1121 已提交
1993
    qDebug("%s fetch rsp received, index:%d, error:%d", pSourceDataInfo->taskId, index, tstrerror(code));
H
Haojun Liao 已提交
1994
  }
H
Haojun Liao 已提交
1995

H
Haojun Liao 已提交
1996
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
1997 1998 1999 2000

  tsem_post(&pExchangeInfo->ready);
  taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);

wmmhello's avatar
wmmhello 已提交
2001
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2002 2003
}

D
dapan1121 已提交
2004
void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
S
Shengliang Guan 已提交
2005 2006
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
  assert(pMsg->info.ahandle != NULL);
H
Haojun Liao 已提交
2007 2008 2009 2010

  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};

  if (pMsg->contLen > 0) {
wafwerar's avatar
wafwerar 已提交
2011
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
H
Haojun Liao 已提交
2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
  }

  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
  rpcFreeCont(pMsg->pCont);
  destroySendMsgInfo(pSendInfo);
2023 2024
}

L
Liu Jicong 已提交
2025
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
2026
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2027

wafwerar's avatar
wafwerar 已提交
2028
  SResFetchReq* pMsg = taosMemoryCalloc(1, sizeof(SResFetchReq));
2029 2030 2031 2032
  if (NULL == pMsg) {
    pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return pTaskInfo->code;
  }
2033

L
Liu Jicong 已提交
2034 2035
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex);
  SSourceDataInfo*       pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
2036

2037 2038
  ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY);

2039 2040 2041
  qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu,
         GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, pSource->execId,
         sourceIndex, totalSources);
2042 2043 2044 2045 2046

  pMsg->header.vgId = htonl(pSource->addr.nodeId);
  pMsg->sId = htobe64(pSource->schedId);
  pMsg->taskId = htobe64(pSource->taskId);
  pMsg->queryId = htobe64(pTaskInfo->id.queryId);
D
dapan1121 已提交
2047
  pMsg->execId = htonl(pSource->execId);
2048 2049

  // send the fetch remote task result reques
wafwerar's avatar
wafwerar 已提交
2050
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
2051
  if (NULL == pMsgSendInfo) {
wafwerar's avatar
wafwerar 已提交
2052
    taosMemoryFreeClear(pMsg);
2053 2054 2055
    qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
    pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return pTaskInfo->code;
H
Haojun Liao 已提交
2056 2057
  }

2058
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
X
Xiaoyu Wang 已提交
2059
  pWrapper->exchangeId = pExchangeInfo->self;
2060 2061 2062
  pWrapper->sourceIndex = sourceIndex;

  pMsgSendInfo->param = pWrapper;
D
dapan1121 已提交
2063
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
2064 2065
  pMsgSendInfo->msgInfo.pData = pMsg;
  pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
L
Liu Jicong 已提交
2066
  pMsgSendInfo->msgType = pSource->fetchMsgType;
2067
  pMsgSendInfo->fp = loadRemoteDataCallback;
2068

2069
  int64_t transporterId = 0;
L
Liu Jicong 已提交
2070
  int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
2071 2072 2073
  return TSDB_CODE_SUCCESS;
}

2074
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
L
Liu Jicong 已提交
2075 2076
                                     int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
                                     SArray* pColList) {
H
Haojun Liao 已提交
2077
  if (pColList == NULL) {  // data from other sources
2078
    blockDataCleanup(pRes);
2079
    //    blockDataEnsureCapacity(pRes, numOfRows);
2080
    blockDecode(pRes, numOfOutput, numOfRows, pData);
H
Haojun Liao 已提交
2081
  } else {  // extract data according to pColList
2082
    ASSERT(numOfOutput == taosArrayGetSize(pColList));
2083 2084 2085 2086 2087
    char* pStart = pData;

    int32_t numOfCols = htonl(*(int32_t*)pStart);
    pStart += sizeof(int32_t);

2088
    // todo refactor:extract method
2089
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
dengyihao's avatar
dengyihao 已提交
2090
    for (int32_t i = 0; i < numOfCols; ++i) {
2091 2092 2093 2094 2095 2096 2097
      SSysTableSchema* p = (SSysTableSchema*)pStart;

      p->colId = htons(p->colId);
      p->bytes = htonl(p->bytes);
      pStart += sizeof(SSysTableSchema);
    }

2098
    SSDataBlock* pBlock = createDataBlock();
dengyihao's avatar
dengyihao 已提交
2099
    for (int32_t i = 0; i < numOfCols; ++i) {
2100 2101
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
      blockDataAppendColInfo(pBlock, &idata);
2102 2103
    }

2104
    blockDecode(pBlock, numOfCols, numOfRows, pStart);
2105 2106
    blockDataEnsureCapacity(pRes, numOfRows);

H
Haojun Liao 已提交
2107
    // data from mnode
2108
    pRes->info.rows = numOfRows;
2109 2110
    relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
    blockDataDestroy(pBlock);
2111
  }
2112

2113 2114
  // todo move this to time window aggregator, since the primary timestamp may not be known by exchange operator.
  blockDataUpdateTsWindow(pRes, 0);
2115

2116
  int64_t el = taosGetTimestampUs() - startTs;
2117

H
Haojun Liao 已提交
2118 2119
  pLoadInfo->totalRows += numOfRows;
  pLoadInfo->totalSize += compLen;
2120

H
Haojun Liao 已提交
2121 2122 2123
  if (total != NULL) {
    *total += numOfRows;
  }
2124

H
Haojun Liao 已提交
2125
  pLoadInfo->totalElapsed += el;
2126 2127
  return TSDB_CODE_SUCCESS;
}
2128

L
Liu Jicong 已提交
2129 2130
static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
2131
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
H
Haojun Liao 已提交
2132

2133
  int64_t              el = taosGetTimestampUs() - startTs;
H
Haojun Liao 已提交
2134
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2135

H
Haojun Liao 已提交
2136
  pLoadInfo->totalElapsed += el;
H
Haojun Liao 已提交
2137

2138
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
L
Liu Jicong 已提交
2139 2140 2141
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
         pLoadInfo->totalElapsed / 1000.0);
2142 2143 2144 2145 2146

  doSetOperatorCompleted(pOperator);
  return NULL;
}

L
Liu Jicong 已提交
2147 2148
static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
                                                   SExecTaskInfo* pTaskInfo) {
2149 2150 2151 2152 2153 2154 2155 2156
  int32_t code = 0;
  int64_t startTs = taosGetTimestampUs();
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);

  while (1) {
    int32_t completed = 0;
    for (int32_t i = 0; i < totalSources; ++i) {
      SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
2157
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
2158
        completed += 1;
H
Haojun Liao 已提交
2159 2160
        continue;
      }
2161

2162
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
2163 2164 2165
        continue;
      }

2166 2167 2168 2169 2170
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
        code = pDataInfo->code;
        goto _error;
      }

L
Liu Jicong 已提交
2171
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
X
Xiaoyu Wang 已提交
2172
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, i);
2173

L
Liu Jicong 已提交
2174
      SSDataBlock*         pRes = pExchangeInfo->pResult;
H
Haojun Liao 已提交
2175
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2176
      if (pRsp->numOfRows == 0) {
2177 2178
        qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64
               ", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu,
D
dapan1121 已提交
2179
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows,
2180
               pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources);
2181
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2182
        completed += 1;
D
dapan1121 已提交
2183
        taosMemoryFreeClear(pDataInfo->pRsp);
2184 2185
        continue;
      }
H
Haojun Liao 已提交
2186

H
Haojun Liao 已提交
2187
      SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
L
Liu Jicong 已提交
2188 2189 2190
      code =
          extractDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
                                       pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
2191
      if (code != 0) {
2192
        taosMemoryFreeClear(pDataInfo->pRsp);
2193 2194 2195
        goto _error;
      }

2196
      if (pRsp->completed == 1) {
2197 2198
        qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
               " execId:%d"
X
Xiaoyu Wang 已提交
2199 2200
               " index:%d completed, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64
               ", completed:%d try next %d/%" PRIzu,
2201 2202
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRes->info.rows,
               pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, completed + 1, i + 1, totalSources);
2203
        completed += 1;
2204
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2205
      } else {
D
dapan1121 已提交
2206
        qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64
dengyihao's avatar
dengyihao 已提交
2207
               ", totalBytes:%" PRIu64,
2208 2209
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows,
               pLoadInfo->totalRows, pLoadInfo->totalSize);
2210 2211
      }

2212 2213
      taosMemoryFreeClear(pDataInfo->pRsp);

2214 2215
      if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED) {
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
2216 2217
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
        if (code != TSDB_CODE_SUCCESS) {
2218
          taosMemoryFreeClear(pDataInfo->pRsp);
2219 2220 2221 2222 2223 2224 2225
          goto _error;
        }
      }

      return pExchangeInfo->pResult;
    }

2226
    if (completed == totalSources) {
2227 2228
      return setAllSourcesCompleted(pOperator, startTs);
    }
H
Haojun Liao 已提交
2229 2230

    sched_yield();
2231 2232 2233 2234 2235 2236 2237
  }

_error:
  pTaskInfo->code = code;
  return NULL;
}

L
Liu Jicong 已提交
2238 2239 2240
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2241

L
Liu Jicong 已提交
2242
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2243 2244 2245
  int64_t startTs = taosGetTimestampUs();

  // Asynchronously send all fetch requests to all sources.
L
Liu Jicong 已提交
2246
  for (int32_t i = 0; i < totalSources; ++i) {
2247 2248
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2249 2250
      pTaskInfo->code = code;
      return code;
2251 2252 2253 2254
    }
  }

  int64_t endTs = taosGetTimestampUs();
2255
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
X
Xiaoyu Wang 已提交
2256
         totalSources, (endTs - startTs) / 1000.0);
2257

2258
  pOperator->status = OP_RES_TO_RETURN;
H
Haojun Liao 已提交
2259
  pOperator->cost.openCost = taosGetTimestampUs() - startTs;
2260

2261
  tsem_wait(&pExchangeInfo->ready);
H
Haojun Liao 已提交
2262
  return TSDB_CODE_SUCCESS;
2263 2264
}

L
Liu Jicong 已提交
2265 2266 2267
static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2268

L
Liu Jicong 已提交
2269
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2270
  int64_t startTs = taosGetTimestampUs();
2271

L
Liu Jicong 已提交
2272
  while (1) {
2273 2274
    if (pExchangeInfo->current >= totalSources) {
      return setAllSourcesCompleted(pOperator, startTs);
2275
    }
2276

2277 2278 2279
    doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
    tsem_wait(&pExchangeInfo->ready);

dengyihao's avatar
dengyihao 已提交
2280
    SSourceDataInfo*       pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
X
Xiaoyu Wang 已提交
2281
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
2282

H
Haojun Liao 已提交
2283
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
2284 2285
      qError("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d error happens, code:%s", GET_TASKID(pTaskInfo),
             pSource->addr.nodeId, pSource->taskId, pSource->execId, tstrerror(pDataInfo->code));
H
Haojun Liao 已提交
2286 2287 2288 2289
      pOperator->pTaskInfo->code = pDataInfo->code;
      return NULL;
    }

L
Liu Jicong 已提交
2290
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
H
Haojun Liao 已提交
2291
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2292
    if (pRsp->numOfRows == 0) {
2293 2294
      qDebug("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d %d of total completed, rowsOfSource:%" PRIu64
             ", totalRows:%" PRIu64 " try next",
D
dapan1121 已提交
2295
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pExchangeInfo->current + 1,
H
Haojun Liao 已提交
2296
             pDataInfo->totalRows, pLoadInfo->totalRows);
H
Haojun Liao 已提交
2297

2298
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2299
      pExchangeInfo->current += 1;
D
dapan1121 已提交
2300
      taosMemoryFreeClear(pDataInfo->pRsp);
2301 2302
      continue;
    }
H
Haojun Liao 已提交
2303

L
Liu Jicong 已提交
2304
    SSDataBlock*       pRes = pExchangeInfo->pResult;
H
Haojun Liao 已提交
2305
    SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
L
Liu Jicong 已提交
2306
    int32_t            code =
2307
        extractDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
L
Liu Jicong 已提交
2308
                                     pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
2309 2310

    if (pRsp->completed == 1) {
D
dapan1121 已提交
2311
      qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64
L
Liu Jicong 已提交
2312
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
2313 2314 2315
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows,
             pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1,
             totalSources);
2316

2317
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2318 2319
      pExchangeInfo->current += 1;
    } else {
D
dapan1121 已提交
2320
      qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64
L
Liu Jicong 已提交
2321
             ", totalBytes:%" PRIu64,
2322 2323
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows,
             pLoadInfo->totalRows, pLoadInfo->totalSize);
2324 2325
    }

2326
    pOperator->resultInfo.totalRows += pRes->info.rows;
2327
    taosMemoryFreeClear(pDataInfo->pRsp);
2328 2329
    return pExchangeInfo->pResult;
  }
2330 2331
}

L
Liu Jicong 已提交
2332
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
2333
  if (OPTR_IS_OPENED(pOperator)) {
H
Haojun Liao 已提交
2334 2335 2336
    return TSDB_CODE_SUCCESS;
  }

2337 2338
  int64_t st = taosGetTimestampUs();

L
Liu Jicong 已提交
2339
  SExchangeInfo* pExchangeInfo = pOperator->info;
2340
  if (!pExchangeInfo->seqLoadData) {
H
Haojun Liao 已提交
2341 2342 2343 2344 2345 2346
    int32_t code = prepareConcurrentlyLoad(pOperator);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
  }

2347
  OPTR_SET_OPENED(pOperator);
2348
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
2349 2350 2351
  return TSDB_CODE_SUCCESS;
}

2352
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2353 2354
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2355

2356
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
2357
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2358 2359
    return NULL;
  }
2360

L
Liu Jicong 已提交
2361
  size_t               totalSources = taosArrayGetSize(pExchangeInfo->pSources);
H
Haojun Liao 已提交
2362
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
H
Haojun Liao 已提交
2363

2364
  if (pOperator->status == OP_EXEC_DONE) {
L
Liu Jicong 已提交
2365 2366 2367
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
           pLoadInfo->totalElapsed / 1000.0);
2368 2369 2370 2371 2372 2373
    return NULL;
  }

  if (pExchangeInfo->seqLoadData) {
    return seqLoadRemoteData(pOperator);
  } else {
2374
    return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
2375
  }
H
Haojun Liao 已提交
2376
}
2377

2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415
static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  while(1) {
    SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
    if (pBlock == NULL) {
      return NULL;
    }

    ASSERT(pBlock == pExchangeInfo->pResult);

    SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
    if (hasLimitOffsetInfo(pLimitInfo)) {
      int32_t status = handleLimitOffset(pOperator, pLimitInfo, pExchangeInfo->pResult, false);
      if (status == PROJECT_RETRIEVE_CONTINUE) {
        continue;
      } else if (status == PROJECT_RETRIEVE_DONE) {
        size_t rows = pExchangeInfo->pResult->info.rows;
        pExchangeInfo->limitInfo.numOfOutputRows += rows;

        if (rows == 0) {
          doSetOperatorCompleted(pOperator);
          return NULL;
        } else {
          return pExchangeInfo->pResult;
        }
      }
    } else {
      return pExchangeInfo->pResult;
    }
  }
}

2416
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
2417
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
H
Haojun Liao 已提交
2418 2419
  if (pInfo->pSourceDataInfo == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
2420 2421
  }

L
Liu Jicong 已提交
2422
  for (int32_t i = 0; i < numOfSources; ++i) {
2423
    SSourceDataInfo dataInfo = {0};
H
Haojun Liao 已提交
2424
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
2425
    dataInfo.taskId = id;
L
Liu Jicong 已提交
2426
    dataInfo.index = i;
X
Xiaoyu Wang 已提交
2427
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
2428
    if (pDs == NULL) {
H
Haojun Liao 已提交
2429 2430 2431 2432 2433 2434 2435 2436
      taosArrayDestroy(pInfo->pSourceDataInfo);
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
}

2437
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
2438
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
H
Haojun Liao 已提交
2439

2440
  if (numOfSources == 0) {
X
Xiaoyu Wang 已提交
2441
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
2442 2443 2444
    return TSDB_CODE_INVALID_PARA;
  }

H
Haojun Liao 已提交
2445
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
wmmhello's avatar
wmmhello 已提交
2446
  if (pInfo->pSources == NULL) {
2447
    return TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
2448 2449
  }

L
Liu Jicong 已提交
2450
  for (int32_t i = 0; i < numOfSources; ++i) {
D
dapan1121 已提交
2451
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
H
Haojun Liao 已提交
2452 2453
    taosArrayPush(pInfo->pSources, pNode);
  }
2454

2455
  initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
2456 2457
  pInfo->self = taosAddRef(exchangeObjRefPool, pInfo);

2458
  return initDataSource(numOfSources, pInfo, id);
2459 2460 2461 2462 2463 2464
}

SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo) {
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
2465
    goto _error;
2466
  }
H
Haojun Liao 已提交
2467

2468
  int32_t code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
2469 2470 2471
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2472 2473

  tsem_init(&pInfo->ready, 0, 0);
2474

2475
  pInfo->seqLoadData = false;
2476
  pInfo->pTransporter = pTransporter;
2477 2478
  pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc);
  pOperator->name = "ExchangeOperator";
X
Xiaoyu Wang 已提交
2479
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
X
Xiaoyu Wang 已提交
2480
  pOperator->blocking = false;
2481 2482
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
2483
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pResult->pDataBlock);
X
Xiaoyu Wang 已提交
2484
  pOperator->pTaskInfo = pTaskInfo;
2485

L
Liu Jicong 已提交
2486 2487
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
                                         destroyExchangeOperatorInfo, NULL, NULL, NULL);
2488
  return pOperator;
H
Haojun Liao 已提交
2489

L
Liu Jicong 已提交
2490
_error:
H
Haojun Liao 已提交
2491
  if (pInfo != NULL) {
2492
    doDestroyExchangeOperatorInfo(pInfo);
H
Haojun Liao 已提交
2493 2494
  }

wafwerar's avatar
wafwerar 已提交
2495
  taosMemoryFreeClear(pOperator);
2496
  pTaskInfo->code = code;
H
Haojun Liao 已提交
2497
  return NULL;
2498 2499
}

dengyihao's avatar
dengyihao 已提交
2500 2501
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                                const char* pKey);
2502

2503
static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
2504
  SSortedMergeOperatorInfo* pInfo = (SSortedMergeOperatorInfo*)param;
H
Haojun Liao 已提交
2505
  taosArrayDestroy(pInfo->pSortInfo);
2506 2507 2508
  taosArrayDestroy(pInfo->groupInfo);

  if (pInfo->pSortHandle != NULL) {
H
Haojun Liao 已提交
2509
    tsortDestroySortHandle(pInfo->pSortHandle);
2510 2511
  }

H
Haojun Liao 已提交
2512
  blockDataDestroy(pInfo->binfo.pRes);
H
Haojun Liao 已提交
2513
  cleanupAggSup(&pInfo->aggSup);
L
Liu Jicong 已提交
2514

D
dapan1121 已提交
2515
  taosMemoryFreeClear(param);
2516
}
H
Haojun Liao 已提交
2517

L
Liu Jicong 已提交
2518
static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) {
2519 2520 2521 2522
  size_t size = taosArrayGetSize(groupInfo);
  if (size == 0) {
    return true;
  }
2523

2524 2525
  for (int32_t i = 0; i < size; ++i) {
    int32_t* index = taosArrayGet(groupInfo, i);
2526

2527
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index);
L
Liu Jicong 已提交
2528
    bool             isNull = colDataIsNull(pColInfo, rowIndex, pBlock->info.rows, NULL);
2529

2530 2531 2532
    if ((isNull && buf[i] != NULL) || (!isNull && buf[i] == NULL)) {
      return false;
    }
2533

2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546
    char* pCell = colDataGetData(pColInfo, rowIndex);
    if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
      if (varDataLen(pCell) != varDataLen(buf[i])) {
        return false;
      } else {
        if (memcmp(varDataVal(pCell), varDataVal(buf[i]), varDataLen(pCell)) != 0) {
          return false;
        }
      }
    } else {
      if (memcmp(pCell, buf[i], pColInfo->info.bytes) != 0) {
        return false;
      }
2547 2548 2549
    }
  }

2550
  return 0;
2551 2552
}

L
Liu Jicong 已提交
2553 2554 2555
static void doMergeResultImpl(SSortedMergeOperatorInfo* pInfo, SqlFunctionCtx* pCtx, int32_t numOfExpr,
                              int32_t rowIndex) {
  for (int32_t j = 0; j < numOfExpr; ++j) {  // TODO set row index
X
Xiaoyu Wang 已提交
2556
                                             //    pCtx[j].startRow = rowIndex;
2557 2558
  }

2559 2560
  for (int32_t j = 0; j < numOfExpr; ++j) {
    int32_t functionId = pCtx[j].functionId;
L
Liu Jicong 已提交
2561 2562 2563 2564 2565 2566 2567 2568 2569
    //    pCtx[j].fpSet->addInput(&pCtx[j]);

    //    if (functionId < 0) {
    //      SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
    //      doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
    //    } else {
    //      assert(!TSDB_FUNC_IS_SCALAR(functionId));
    //      aAggs[functionId].mergeFunc(&pCtx[j]);
    //    }
2570
  }
2571
}
2572

L
Liu Jicong 已提交
2573 2574
static void doFinalizeResultImpl(SqlFunctionCtx* pCtx, int32_t numOfExpr) {
  for (int32_t j = 0; j < numOfExpr; ++j) {
2575 2576 2577 2578
    int32_t functionId = pCtx[j].functionId;
    //    if (functionId == FUNC_TAG_DUMMY || functionId == FUNC_TS_DUMMY) {
    //      continue;
    //    }
2579

2580 2581 2582 2583
    //    if (functionId < 0) {
    //      SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
    //      doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
    //    } else {
dengyihao's avatar
dengyihao 已提交
2584
    //    pCtx[j].fpSet.finalize(&pCtx[j]);
2585 2586
  }
}
2587

2588
static bool saveCurrentTuple(char** rowColData, SArray* pColumnList, SSDataBlock* pBlock, int32_t rowIndex) {
L
Liu Jicong 已提交
2589
  int32_t size = (int32_t)taosArrayGetSize(pColumnList);
2590

L
Liu Jicong 已提交
2591 2592
  for (int32_t i = 0; i < size; ++i) {
    int32_t*         index = taosArrayGet(pColumnList, i);
2593
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index);
H
Haojun Liao 已提交
2594

2595 2596 2597
    char* data = colDataGetData(pColInfo, rowIndex);
    memcpy(rowColData[i], data, colDataGetLength(pColInfo, rowIndex));
  }
2598

2599 2600
  return true;
}
2601

2602 2603
static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock* pBlock) {
  SSortedMergeOperatorInfo* pInfo = pOperator->info;
2604

2605
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
2606

L
Liu Jicong 已提交
2607
  for (int32_t i = 0; i < pBlock->info.rows; ++i) {
2608 2609 2610 2611 2612 2613 2614 2615 2616
    if (!pInfo->hasGroupVal) {
      ASSERT(i == 0);
      doMergeResultImpl(pInfo, pCtx, numOfExpr, i);
      pInfo->hasGroupVal = saveCurrentTuple(pInfo->groupVal, pInfo->groupInfo, pBlock, i);
    } else {
      if (needToMerge(pBlock, pInfo->groupInfo, pInfo->groupVal, i)) {
        doMergeResultImpl(pInfo, pCtx, numOfExpr, i);
      } else {
        doFinalizeResultImpl(pCtx, numOfExpr);
2617 2618
        int32_t numOfRows = getNumOfResult(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL);
        //        setTagValueForMultipleRows(pCtx, pOperator->exprSupp.numOfExprs, numOfRows);
2619

2620
        // TODO check for available buffer;
H
Haojun Liao 已提交
2621

2622 2623 2624 2625 2626
        // next group info data
        pInfo->binfo.pRes->info.rows += numOfRows;
        for (int32_t j = 0; j < numOfExpr; ++j) {
          if (pCtx[j].functionId < 0) {
            continue;
2627
          }
2628

H
Haojun Liao 已提交
2629
          pCtx[j].fpSet.process(&pCtx[j]);
2630
        }
2631 2632 2633

        doMergeResultImpl(pInfo, pCtx, numOfExpr, i);
        pInfo->hasGroupVal = saveCurrentTuple(pInfo->groupVal, pInfo->groupInfo, pBlock, i);
H
Haojun Liao 已提交
2634
      }
2635 2636 2637 2638
    }
  }
}

2639 2640
static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
  SSortedMergeOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
2641
  SSortHandle*              pHandle = pInfo->pSortHandle;
2642

2643
  SSDataBlock* pDataBlock = createOneDataBlock(pInfo->binfo.pRes, false);
2644
  blockDataEnsureCapacity(pDataBlock, pOperator->resultInfo.capacity);
2645

L
Liu Jicong 已提交
2646
  while (1) {
2647
    blockDataCleanup(pDataBlock);
2648
    while (1) {
H
Haojun Liao 已提交
2649
      STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2650 2651
      if (pTupleHandle == NULL) {
        break;
2652
      }
2653

2654 2655
      // build datablock for merge for one group
      appendOneRowToDataBlock(pDataBlock, pTupleHandle);
2656
      if (pDataBlock->info.rows >= pOperator->resultInfo.capacity) {
2657 2658
        break;
      }
2659
    }
2660

2661 2662 2663
    if (pDataBlock->info.rows == 0) {
      break;
    }
2664

2665
    setInputDataBlock(pOperator, pOperator->exprSupp.pCtx, pDataBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
L
Liu Jicong 已提交
2666 2667
    //  updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor,
    //  pOperator->pRuntimeEnv, true);
2668
    doMergeImpl(pOperator, pOperator->exprSupp.numOfExprs, pDataBlock);
2669 2670
    // flush to tuple store, and after all data have been handled, return to upstream node or sink node
  }
2671

2672 2673 2674
  doFinalizeResultImpl(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs);
  int32_t numOfRows = getNumOfResult(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL);
  //        setTagValueForMultipleRows(pCtx, pOperator->exprSupp.numOfExprs, numOfRows);
2675

2676
  // TODO check for available buffer;
2677

2678 2679
  // next group info data
  pInfo->binfo.pRes->info.rows += numOfRows;
L
Liu Jicong 已提交
2680
  return (pInfo->binfo.pRes->info.rows > 0) ? pInfo->binfo.pRes : NULL;
2681
}
2682

L
Liu Jicong 已提交
2683 2684
SSDataBlock* getSortedMergeBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
                                     SArray* pColMatchInfo, SSortedMergeOperatorInfo* pInfo) {
2685 2686 2687 2688 2689 2690 2691 2692 2693 2694
  blockDataCleanup(pDataBlock);

  SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
  if (p == NULL) {
    return NULL;
  }

  blockDataEnsureCapacity(p, capacity);

  while (1) {
2695
    STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2696 2697 2698 2699
    if (pTupleHandle == NULL) {
      break;
    }

2700
    appendOneRowToDataBlock(p, pTupleHandle);
2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713
    if (p->info.rows >= capacity) {
      break;
    }
  }

  if (p->info.rows > 0) {
    int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
    for (int32_t i = 0; i < numOfCols; ++i) {
      SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i);
      ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID);

      SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
      SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId);
2714
      colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
2715 2716 2717 2718 2719 2720 2721 2722 2723 2724
    }

    pDataBlock->info.rows = p->info.rows;
    pDataBlock->info.capacity = p->info.rows;
  }

  blockDataDestroy(p);
  return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
}

2725
static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) {
2726 2727
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
2728 2729
  }

L
Liu Jicong 已提交
2730
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
2731
  SSortedMergeOperatorInfo* pInfo = pOperator->info;
H
Haojun Liao 已提交
2732
  if (pOperator->status == OP_RES_TO_RETURN) {
2733
    return getSortedMergeBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, NULL, pInfo);
2734 2735
  }

2736
  int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
L
Liu Jicong 已提交
2737 2738
  pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
                                             pInfo->binfo.pRes, "GET_TASKID(pTaskInfo)");
H
Haojun Liao 已提交
2739

2740
  tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
2741

L
Liu Jicong 已提交
2742
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
wmmhello's avatar
wmmhello 已提交
2743
    SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
H
Haojun Liao 已提交
2744
    ps->param = pOperator->pDownstream[i];
H
Haojun Liao 已提交
2745
    tsortAddSource(pInfo->pSortHandle, ps);
2746 2747
  }

H
Haojun Liao 已提交
2748
  int32_t code = tsortOpen(pInfo->pSortHandle);
2749
  if (code != TSDB_CODE_SUCCESS) {
2750
    longjmp(pTaskInfo->env, terrno);
2751 2752
  }

H
Haojun Liao 已提交
2753
  pOperator->status = OP_RES_TO_RETURN;
2754
  return doMerge(pOperator);
2755
}
2756

L
Liu Jicong 已提交
2757 2758
static int32_t initGroupCol(SExprInfo* pExprInfo, int32_t numOfCols, SArray* pGroupInfo,
                            SSortedMergeOperatorInfo* pInfo) {
2759 2760
  if (pGroupInfo == NULL || taosArrayGetSize(pGroupInfo) == 0) {
    return 0;
H
Haojun Liao 已提交
2761 2762
  }

2763 2764 2765 2766 2767 2768 2769 2770
  int32_t len = 0;
  SArray* plist = taosArrayInit(3, sizeof(SColumn));
  pInfo->groupInfo = taosArrayInit(3, sizeof(int32_t));

  if (plist == NULL || pInfo->groupInfo == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

L
Liu Jicong 已提交
2771 2772
  size_t numOfGroupCol = taosArrayGetSize(pInfo->groupInfo);
  for (int32_t i = 0; i < numOfGroupCol; ++i) {
2773
    SColumn* pCol = taosArrayGet(pGroupInfo, i);
L
Liu Jicong 已提交
2774
    for (int32_t j = 0; j < numOfCols; ++j) {
H
Haojun Liao 已提交
2775
      SExprInfo* pe = &pExprInfo[j];
2776
      if (pe->base.resSchema.slotId == pCol->colId) {
2777 2778
        taosArrayPush(plist, pCol);
        taosArrayPush(pInfo->groupInfo, &j);
H
Haojun Liao 已提交
2779
        len += pCol->bytes;
2780 2781
        break;
      }
H
Haojun Liao 已提交
2782 2783 2784
    }
  }

2785
  ASSERT(taosArrayGetSize(pGroupInfo) == taosArrayGetSize(plist));
H
Haojun Liao 已提交
2786

wafwerar's avatar
wafwerar 已提交
2787
  pInfo->groupVal = taosMemoryCalloc(1, (POINTER_BYTES * numOfGroupCol + len));
2788 2789 2790 2791
  if (pInfo->groupVal == NULL) {
    taosArrayDestroy(plist);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Haojun Liao 已提交
2792

2793
  int32_t offset = 0;
L
Liu Jicong 已提交
2794 2795
  char*   start = (char*)(pInfo->groupVal + (POINTER_BYTES * numOfGroupCol));
  for (int32_t i = 0; i < numOfGroupCol; ++i) {
2796 2797
    pInfo->groupVal[i] = start + offset;
    SColumn* pCol = taosArrayGet(plist, i);
H
Haojun Liao 已提交
2798
    offset += pCol->bytes;
2799
  }
H
Haojun Liao 已提交
2800

2801
  taosArrayDestroy(plist);
H
Haojun Liao 已提交
2802

2803 2804
  return TSDB_CODE_SUCCESS;
}
H
Haojun Liao 已提交
2805

L
Liu Jicong 已提交
2806 2807 2808
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo,
                                             int32_t num, SArray* pSortInfo, SArray* pGroupInfo,
                                             SExecTaskInfo* pTaskInfo) {
wafwerar's avatar
wafwerar 已提交
2809
  SSortedMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortedMergeOperatorInfo));
L
Liu Jicong 已提交
2810
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2811
  if (pInfo == NULL || pOperator == NULL) {
2812
    goto _error;
2813
  }
H
Haojun Liao 已提交
2814

2815 2816 2817 2818 2819
  int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, num);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

2820
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
H
Haojun Liao 已提交
2821

2822
  if (pOperator->exprSupp.pCtx == NULL || pInfo->binfo.pRes == NULL) {
2823 2824
    goto _error;
  }
H
Haojun Liao 已提交
2825

2826
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
2827
  code = doInitAggInfoSup(&pInfo->aggSup, pOperator->exprSupp.pCtx, num, keyBufSize, pTaskInfo->id.str);
2828 2829 2830
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
2831

2832
  setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, num);
H
Haojun Liao 已提交
2833
  code = initGroupCol(pExprInfo, num, pGroupInfo, pInfo);
2834 2835 2836
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
2837

L
Liu Jicong 已提交
2838 2839 2840 2841 2842
  //  pInfo->resultRowFactor = (int32_t)(getRowNumForMultioutput(pRuntimeEnv->pQueryAttr,
  //      pRuntimeEnv->pQueryAttr->topBotQuery, false));
  pInfo->sortBufSize = 1024 * 16;  // 1MB
  pInfo->bufPageSize = 1024;
  pInfo->pSortInfo = pSortInfo;
H
Haojun Liao 已提交
2843

2844
  pOperator->resultInfo.capacity = blockDataGetCapacityInRow(pInfo->binfo.pRes, pInfo->bufPageSize);
H
Haojun Liao 已提交
2845

L
Liu Jicong 已提交
2846
  pOperator->name = "SortedMerge";
X
Xiaoyu Wang 已提交
2847
  // pOperator->operatorType = OP_SortedMerge;
2848 2849 2850
  pOperator->blocking = true;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
L
Liu Jicong 已提交
2851
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
2852

2853 2854
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSortedMerge, NULL, NULL, destroySortedMergeOperatorInfo,
                                         NULL, NULL, NULL);
2855 2856 2857
  code = appendDownstream(pOperator, downstream, numOfDownstream);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
2858
  }
H
Haojun Liao 已提交
2859

2860
  return pOperator;
H
Haojun Liao 已提交
2861

L
Liu Jicong 已提交
2862
_error:
2863
  if (pInfo != NULL) {
H
Haojun Liao 已提交
2864
    destroySortedMergeOperatorInfo(pInfo, num);
H
Haojun Liao 已提交
2865 2866
  }

wafwerar's avatar
wafwerar 已提交
2867 2868
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
2869 2870
  terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
  return NULL;
H
Haojun Liao 已提交
2871 2872
}

X
Xiaoyu Wang 已提交
2873
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) {
2874
  // todo add more information about exchange operation
2875
  int32_t type = pOperator->operatorType;
X
Xiaoyu Wang 已提交
2876
  if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN ||
2877
      type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN ||
2878
      type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN) {
2879 2880 2881
    *order = TSDB_ORDER_ASC;
    *scanFlag = MAIN_SCAN;
    return TSDB_CODE_SUCCESS;
2882
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
2883 2884 2885 2886 2887
    STableScanInfo* pTableScanInfo = pOperator->info;
    *order = pTableScanInfo->cond.order;
    *scanFlag = pTableScanInfo->scanFlag;
    return TSDB_CODE_SUCCESS;
  } else {
H
Haojun Liao 已提交
2888
    if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
2889
      return TSDB_CODE_INVALID_PARA;
H
Haojun Liao 已提交
2890
    } else {
2891
      return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag);
2892 2893 2894
    }
  }
}
L
Liu Jicong 已提交
2895
#if 0
L
Liu Jicong 已提交
2896
int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
L
Liu Jicong 已提交
2897
  uint8_t type = pOperator->operatorType;
2898 2899 2900

  pOperator->status = OP_OPENED;

L
Liu Jicong 已提交
2901
  if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
2902
    SStreamScanInfo* pScanInfo = pOperator->info;
L
Liu Jicong 已提交
2903
    pScanInfo->blockType = STREAM_INPUT__TABLE_SCAN;
2904

2905
    pScanInfo->pTableScanOp->status = OP_OPENED;
2906

2907
    STableScanInfo* pInfo = pScanInfo->pTableScanOp->info;
2908 2909
    ASSERT(pInfo->scanMode == TABLE_SCAN__TABLE_ORDER);

L
Liu Jicong 已提交
2910 2911 2912 2913
    if (uid == 0) {
      pInfo->noTable = 1;
      return TSDB_CODE_SUCCESS;
    }
2914 2915 2916 2917 2918 2919

    /*if (pSnapShotScanInfo->dataReader == NULL) {*/
    /*pSnapShotScanInfo->dataReader = tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, 0, 0);*/
    /*pSnapShotScanInfo->scanMode = TABLE_SCAN__TABLE_ORDER;*/
    /*}*/

L
Liu Jicong 已提交
2920 2921
    pInfo->noTable = 0;

2922
    if (pInfo->lastStatus.uid != uid || pInfo->lastStatus.ts != ts) {
L
Liu Jicong 已提交
2923 2924 2925 2926 2927 2928 2929 2930 2931 2932 2933
      SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

      int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
      bool    found = false;
      for (int32_t i = 0; i < tableSz; i++) {
        STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
        if (pTableInfo->uid == uid) {
          found = true;
          pInfo->currentTable = i;
        }
      }
2934
      // TODO after processing drop, found can be false
L
Liu Jicong 已提交
2935
      ASSERT(found);
2936 2937

      tsdbSetTableId(pInfo->dataReader, uid);
H
Haojun Liao 已提交
2938 2939 2940 2941
      int64_t oldSkey = pInfo->cond.twindows.skey;
      pInfo->cond.twindows.skey = ts + 1;
      tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
      pInfo->cond.twindows.skey = oldSkey;
2942 2943
      pInfo->scanTimes = 0;

S
Shengliang Guan 已提交
2944
      qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, ts,
L
Liu Jicong 已提交
2945
             pInfo->currentTable, tableSz);
L
Liu Jicong 已提交
2946
    }
L
Liu Jicong 已提交
2947

L
Liu Jicong 已提交
2948
    return TSDB_CODE_SUCCESS;
2949

L
Liu Jicong 已提交
2950
  } else {
2951 2952 2953 2954 2955
    if (pOperator->numOfDownstream == 1) {
      return doPrepareScan(pOperator->pDownstream[0], uid, ts);
    } else if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator to set the input data block");
      return TSDB_CODE_QRY_APP_ERROR;
L
Liu Jicong 已提交
2956
    } else {
2957 2958
      qError("join not supported for stream block scan");
      return TSDB_CODE_QRY_APP_ERROR;
L
Liu Jicong 已提交
2959 2960 2961 2962
    }
  }
}

2963 2964 2965
int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) {
  int32_t type = pOperator->operatorType;
  if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
2966 2967
    SStreamScanInfo* pScanInfo = pOperator->info;
    STableScanInfo*  pSnapShotScanInfo = pScanInfo->pTableScanOp->info;
L
Liu Jicong 已提交
2968 2969
    *uid = pSnapShotScanInfo->lastStatus.uid;
    *ts = pSnapShotScanInfo->lastStatus.ts;
2970 2971 2972 2973 2974 2975 2976 2977 2978 2979
  } else {
    if (pOperator->pDownstream[0] == NULL) {
      return TSDB_CODE_INVALID_PARA;
    } else {
      doGetScanStatus(pOperator->pDownstream[0], uid, ts);
    }
  }

  return TSDB_CODE_SUCCESS;
}
L
Liu Jicong 已提交
2980
#endif
2981

2982
// this is a blocking operator
L
Liu Jicong 已提交
2983
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
2984 2985
  if (OPTR_IS_OPENED(pOperator)) {
    return TSDB_CODE_SUCCESS;
2986 2987
  }

H
Haojun Liao 已提交
2988
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
2989
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
2990

2991 2992
  SExprSupp*     pSup = &pOperator->exprSupp;
  SOperatorInfo* downstream = pOperator->pDownstream[0];
2993

2994 2995
  int64_t st = taosGetTimestampUs();

2996 2997 2998
  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;

H
Haojun Liao 已提交
2999
  while (1) {
3000
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
3001 3002 3003 3004
    if (pBlock == NULL) {
      break;
    }

3005 3006 3007 3008
    int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, code);
    }
3009

3010
    // there is an scalar expression that needs to be calculated before apply the group aggregation.
3011 3012 3013
    if (pAggInfo->scalarExprSup.pExprInfo != NULL) {
      SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
      code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
3014
      if (code != TSDB_CODE_SUCCESS) {
3015
        longjmp(pTaskInfo->env, code);
3016
      }
3017 3018
    }

3019
    // the pDataBlock are always the same one, no need to call this again
3020 3021
    setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId, pAggInfo);
    setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, true);
3022
    code = doAggregateImpl(pOperator, pSup->pCtx);
3023 3024 3025
    if (code != 0) {
      longjmp(pTaskInfo->env, code);
    }
3026 3027
  }

H
Haojun Liao 已提交
3028
  closeAllResultRows(&pAggInfo->binfo.resultRowInfo);
3029
  initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
H
Haojun Liao 已提交
3030
  OPTR_SET_OPENED(pOperator);
3031

3032
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
3033 3034 3035
  return TSDB_CODE_SUCCESS;
}

3036
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
3037
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
3038 3039 3040 3041 3042 3043
  SOptrBasicInfo*   pInfo = &pAggInfo->binfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

L
Liu Jicong 已提交
3044
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3045
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
H
Haojun Liao 已提交
3046
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
3047
    doSetOperatorCompleted(pOperator);
H
Haojun Liao 已提交
3048 3049 3050
    return NULL;
  }

H
Haojun Liao 已提交
3051
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
S
slzhou 已提交
3052 3053 3054 3055 3056 3057 3058 3059
  while (1) {
    doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
    doFilter(pAggInfo->pCondition, pInfo->pRes);

    if (!hasDataInGroupInfo(&pAggInfo->groupResInfo)) {
      doSetOperatorCompleted(pOperator);
      break;
    }
3060

S
slzhou 已提交
3061 3062 3063 3064
    if (pInfo->pRes->info.rows > 0) {
      break;
    }
  }
3065
  size_t rows = blockDataGetNumOfRows(pInfo->pRes);
3066 3067
  pOperator->resultInfo.totalRows += rows;

3068
  return (rows == 0) ? NULL : pInfo->pRes;
3069 3070
}

wmmhello's avatar
wmmhello 已提交
3071
int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length) {
3072
  if (result == NULL || length == NULL) {
wmmhello's avatar
wmmhello 已提交
3073 3074 3075
    return TSDB_CODE_TSC_INVALID_INPUT;
  }
  SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
3076 3077 3078 3079 3080
  SAggSupporter*  pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
  int32_t         size = taosHashGetSize(pSup->pResultRowHashTable);
  size_t          keyLen = sizeof(uint64_t) * 2;  // estimate the key length
  int32_t         totalSize =
      sizeof(int32_t) + sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize);
wmmhello's avatar
wmmhello 已提交
3081

C
Cary Xu 已提交
3082 3083 3084 3085 3086 3087
  // no result
  if (getTotalBufSize(pSup->pResultBuf) == 0) {
    *result = NULL;
    *length = 0;
    return TSDB_CODE_SUCCESS;
  }
3088

wmmhello's avatar
wmmhello 已提交
3089
  *result = (char*)taosMemoryCalloc(1, totalSize);
L
Liu Jicong 已提交
3090
  if (*result == NULL) {
wmmhello's avatar
wmmhello 已提交
3091
    return TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
3092
  }
wmmhello's avatar
wmmhello 已提交
3093

wmmhello's avatar
wmmhello 已提交
3094
  int32_t offset = sizeof(int32_t);
wmmhello's avatar
wmmhello 已提交
3095 3096
  *(int32_t*)(*result + offset) = size;
  offset += sizeof(int32_t);
3097 3098

  // prepare memory
3099
  SResultRowPosition* pos = &pInfo->resultRowInfo.cur;
dengyihao's avatar
dengyihao 已提交
3100 3101
  void*               pPage = getBufPage(pSup->pResultBuf, pos->pageId);
  SResultRow*         pRow = (SResultRow*)((char*)pPage + pos->offset);
3102 3103 3104
  setBufPageDirty(pPage, true);
  releaseBufPage(pSup->pResultBuf, pPage);

dengyihao's avatar
dengyihao 已提交
3105
  void* pIter = taosHashIterate(pSup->pResultRowHashTable, NULL);
wmmhello's avatar
wmmhello 已提交
3106
  while (pIter) {
dengyihao's avatar
dengyihao 已提交
3107
    void*               key = taosHashGetKey(pIter, &keyLen);
3108
    SResultRowPosition* p1 = (SResultRowPosition*)pIter;
3109

dengyihao's avatar
dengyihao 已提交
3110
    pPage = (SFilePage*)getBufPage(pSup->pResultBuf, p1->pageId);
3111
    pRow = (SResultRow*)((char*)pPage + p1->offset);
3112 3113
    setBufPageDirty(pPage, true);
    releaseBufPage(pSup->pResultBuf, pPage);
wmmhello's avatar
wmmhello 已提交
3114 3115 3116

    // recalculate the result size
    int32_t realTotalSize = offset + sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize;
L
Liu Jicong 已提交
3117
    if (realTotalSize > totalSize) {
wmmhello's avatar
wmmhello 已提交
3118
      char* tmp = (char*)taosMemoryRealloc(*result, realTotalSize);
L
Liu Jicong 已提交
3119
      if (tmp == NULL) {
wafwerar's avatar
wafwerar 已提交
3120
        taosMemoryFree(*result);
wmmhello's avatar
wmmhello 已提交
3121
        *result = NULL;
wmmhello's avatar
wmmhello 已提交
3122
        return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
3123
      } else {
wmmhello's avatar
wmmhello 已提交
3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135
        *result = tmp;
      }
    }
    // save key
    *(int32_t*)(*result + offset) = keyLen;
    offset += sizeof(int32_t);
    memcpy(*result + offset, key, keyLen);
    offset += keyLen;

    // save value
    *(int32_t*)(*result + offset) = pSup->resultRowSize;
    offset += sizeof(int32_t);
3136
    memcpy(*result + offset, pRow, pSup->resultRowSize);
wmmhello's avatar
wmmhello 已提交
3137 3138 3139 3140 3141
    offset += pSup->resultRowSize;

    pIter = taosHashIterate(pSup->pResultRowHashTable, pIter);
  }

wmmhello's avatar
wmmhello 已提交
3142 3143 3144 3145
  *(int32_t*)(*result) = offset;
  *length = offset;

  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
3146 3147
}

3148
int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) {
3149
  if (result == NULL) {
wmmhello's avatar
wmmhello 已提交
3150
    return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
3151
  }
wmmhello's avatar
wmmhello 已提交
3152
  SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
3153
  SAggSupporter*  pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
wmmhello's avatar
wmmhello 已提交
3154 3155

  //  int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
3156
  int32_t length = *(int32_t*)(result);
wmmhello's avatar
wmmhello 已提交
3157
  int32_t offset = sizeof(int32_t);
3158 3159 3160 3161

  int32_t count = *(int32_t*)(result + offset);
  offset += sizeof(int32_t);

L
Liu Jicong 已提交
3162
  while (count-- > 0 && length > offset) {
wmmhello's avatar
wmmhello 已提交
3163 3164 3165
    int32_t keyLen = *(int32_t*)(result + offset);
    offset += sizeof(int32_t);

L
Liu Jicong 已提交
3166
    uint64_t    tableGroupId = *(uint64_t*)(result + offset);
3167
    SResultRow* resultRow = getNewResultRow(pSup->pResultBuf, tableGroupId, pSup->resultRowSize);
L
Liu Jicong 已提交
3168
    if (!resultRow) {
wmmhello's avatar
wmmhello 已提交
3169
      return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
3170
    }
3171

wmmhello's avatar
wmmhello 已提交
3172
    // add a new result set for a new group
3173 3174
    SResultRowPosition pos = {.pageId = resultRow->pageId, .offset = resultRow->offset};
    taosHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &pos, sizeof(SResultRowPosition));
wmmhello's avatar
wmmhello 已提交
3175 3176 3177

    offset += keyLen;
    int32_t valueLen = *(int32_t*)(result + offset);
L
Liu Jicong 已提交
3178
    if (valueLen != pSup->resultRowSize) {
wmmhello's avatar
wmmhello 已提交
3179
      return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
3180 3181 3182 3183 3184 3185 3186 3187 3188 3189
    }
    offset += sizeof(int32_t);
    int32_t pageId = resultRow->pageId;
    int32_t pOffset = resultRow->offset;
    memcpy(resultRow, result + offset, valueLen);
    resultRow->pageId = pageId;
    resultRow->offset = pOffset;
    offset += valueLen;

    initResultRow(resultRow);
dengyihao's avatar
dengyihao 已提交
3190
    pInfo->resultRowInfo.cur = (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset};
wmmhello's avatar
wmmhello 已提交
3191 3192
  }

L
Liu Jicong 已提交
3193
  if (offset != length) {
wmmhello's avatar
wmmhello 已提交
3194
    return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
3195
  }
wmmhello's avatar
wmmhello 已提交
3196
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
3197 3198
}

3199 3200 3201 3202 3203
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
  if (pLimitInfo->remainGroupOffset > 0) {
    if (pLimitInfo->currentGroupId == 0) {  // it is the first group
      pLimitInfo->currentGroupId = pBlock->info.groupId;
      blockDataCleanup(pBlock);
3204
      return PROJECT_RETRIEVE_CONTINUE;
3205 3206 3207
    } else if (pLimitInfo->currentGroupId != pBlock->info.groupId) {
      // now it is the data from a new group
      pLimitInfo->remainGroupOffset -= 1;
3208 3209

      // ignore data block in current group
3210 3211
      if (pLimitInfo->remainGroupOffset > 0) {
        blockDataCleanup(pBlock);
3212 3213 3214 3215 3216
        return PROJECT_RETRIEVE_CONTINUE;
      }
    }

    // set current group id of the project operator
3217
    pLimitInfo->currentGroupId = pBlock->info.groupId;
3218 3219
  }

3220 3221 3222
  if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.groupId) {
    pLimitInfo->numOfOutputGroups += 1;
    if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
3223
      pOperator->status = OP_EXEC_DONE;
3224
      blockDataCleanup(pBlock);
3225 3226 3227 3228 3229

      return PROJECT_RETRIEVE_DONE;
    }

    // reset the value for a new group data
3230 3231
    pLimitInfo->numOfOutputRows = 0;
    pLimitInfo->remainOffset = pLimitInfo->limit.offset;
3232 3233 3234 3235 3236
  }

  // here we reach the start position, according to the limit/offset requirements.

  // set current group id
3237
  pLimitInfo->currentGroupId = pBlock->info.groupId;
3238

3239 3240 3241
  if (pLimitInfo->remainOffset >= pBlock->info.rows) {
    pLimitInfo->remainOffset -= pBlock->info.rows;
    blockDataCleanup(pBlock);
3242
    return PROJECT_RETRIEVE_CONTINUE;
3243 3244 3245
  } else if (pLimitInfo->remainOffset < pBlock->info.rows && pLimitInfo->remainOffset > 0) {
    blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset);
    pLimitInfo->remainOffset = 0;
3246 3247
  }

3248
  // check for the limitation in each group
3249 3250 3251 3252
  if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) {
    int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows);
    blockDataKeepFirstNRows(pBlock, keepRows);
    if (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups) {
3253 3254 3255
      pOperator->status = OP_EXEC_DONE;
    }

3256
    return PROJECT_RETRIEVE_DONE;
3257
  }
3258

3259
  // todo optimize performance
3260 3261
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
  // they may not belong to the same group the limit/offset value is not valid in this case.
3262 3263
  if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || pLimitInfo->slimit.offset != -1 ||
      pLimitInfo->slimit.limit != -1) {
3264
    return PROJECT_RETRIEVE_DONE;
L
Liu Jicong 已提交
3265
  } else {  // not full enough, continue to accumulate the output data in the buffer.
3266 3267 3268 3269
    return PROJECT_RETRIEVE_CONTINUE;
  }
}

3270
static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
3271
  SProjectOperatorInfo* pProjectInfo = pOperator->info;
L
Liu Jicong 已提交
3272
  SOptrBasicInfo*       pInfo = &pProjectInfo->binfo;
3273

L
Liu Jicong 已提交
3274
  SExprSupp*   pSup = &pOperator->exprSupp;
3275
  SSDataBlock* pRes = pInfo->pRes;
3276
  blockDataCleanup(pRes);
3277

3278
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3279
  if (pOperator->status == OP_EXEC_DONE) {
L
Liu Jicong 已提交
3280 3281 3282 3283
    if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
      pOperator->status = OP_OPENED;
      return NULL;
    }
3284 3285
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
3286

H
Haojun Liao 已提交
3287
#if 0
3288 3289 3290 3291 3292
  if (pProjectInfo->existDataBlock) {  // TODO refactor
    SSDataBlock* pBlock = pProjectInfo->existDataBlock;
    pProjectInfo->existDataBlock = NULL;

    // the pDataBlock are always the same one, no need to call this again
H
Haojun Liao 已提交
3293
    setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC);
3294

H
Haojun Liao 已提交
3295
    blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
3296
    projectApplyFunctions(pOperator->exprSupp.pExprInfo, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->exprSupp.numOfExprs);
L
Liu Jicong 已提交
3297
    if (pRes->info.rows >= pProjectInfo->binfo.capacity * 0.8) {
3298 3299
      copyTsColoum(pRes, pInfo->pCtx, pOperator->exprSupp.numOfExprs);
      resetResultRowEntryResult(pInfo->pCtx, pOperator->exprSupp.numOfExprs);
3300 3301 3302
      return pRes;
    }
  }
H
Haojun Liao 已提交
3303
#endif
3304

3305
  int64_t st = 0;
3306 3307 3308
  int32_t order = 0;
  int32_t scanFlag = 0;

3309 3310 3311 3312
  if (pOperator->cost.openCost == 0) {
    st = taosGetTimestampUs();
  }

H
Haojun Liao 已提交
3313 3314
  SOperatorInfo* downstream = pOperator->pDownstream[0];

L
Liu Jicong 已提交
3315
  while (1) {
H
Haojun Liao 已提交
3316
    // The downstream exec may change the value of the newgroup, so use a local variable instead.
L
Liu Jicong 已提交
3317
    qDebug("projection call next");
3318
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
3319
    if (pBlock == NULL) {
L
Liu Jicong 已提交
3320 3321 3322
      qDebug("projection get null");

      /*if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH) {*/
3323
      doSetOperatorCompleted(pOperator);
L
Liu Jicong 已提交
3324 3325
      /*} else if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {*/
      /*pOperator->status = OP_RES_TO_RETURN;*/
L
Liu Jicong 已提交
3326
      /*}*/
3327 3328
      break;
    }
3329 3330 3331 3332
    if (pBlock->info.type == STREAM_RETRIEVE) {
      // for stream interval
      return pBlock;
    }
3333 3334

    // the pDataBlock are always the same one, no need to call this again
3335
    int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag);
3336 3337 3338
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, code);
    }
3339

3340
    setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
3341 3342
    blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);

3343
    code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
X
Xiaoyu Wang 已提交
3344
                                 pProjectInfo->pPseudoColInfo);
3345 3346
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, code);
3347 3348
    }

3349
    int32_t status = handleLimitOffset(pOperator, &pProjectInfo->limitInfo, pInfo->pRes, true);
3350 3351 3352 3353

    // filter shall be applied after apply functions and limit/offset on the result
    doFilter(pProjectInfo->pFilterNode, pInfo->pRes);

3354
    if (status == PROJECT_RETRIEVE_CONTINUE) {
H
Haojun Liao 已提交
3355
      continue;
L
Liu Jicong 已提交
3356
    } else if (status == PROJECT_RETRIEVE_DONE) {
3357 3358 3359
      break;
    }
  }
dengyihao's avatar
dengyihao 已提交
3360

3361
  size_t rows = pInfo->pRes->info.rows;
3362 3363
  pProjectInfo->limitInfo.numOfOutputRows += rows;

3364 3365 3366
  pOperator->resultInfo.totalRows += rows;

  if (pOperator->cost.openCost == 0) {
3367
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
3368 3369
  }

3370
  return (rows > 0) ? pInfo->pRes : NULL;
3371 3372
}

H
Haojun Liao 已提交
3373
static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo,
L
Liu Jicong 已提交
3374
                                               SExecTaskInfo* pTaskInfo) {
3375
  pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
H
Haojun Liao 已提交
3376

L
Liu Jicong 已提交
3377 3378
  int64_t ekey =
      Q_STATUS_EQUAL(pTaskInfo->status, TASK_COMPLETED) ? pInfo->win.ekey : pInfo->existNewGroupBlock->info.window.ekey;
3379 3380
  taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));

3381
  taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
3382 3383
  taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);

H
Haojun Liao 已提交
3384 3385 3386
  int32_t numOfResultRows = pResultInfo->capacity - pInfo->pRes->info.rows;
  taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pRes, numOfResultRows);

3387
  pInfo->curGroupId = pInfo->existNewGroupBlock->info.groupId;
3388 3389 3390
  pInfo->existNewGroupBlock = NULL;
}

H
Haojun Liao 已提交
3391
static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo,
L
Liu Jicong 已提交
3392
                                            SExecTaskInfo* pTaskInfo) {
3393
  if (taosFillHasMoreResults(pInfo->pFillInfo)) {
H
Haojun Liao 已提交
3394 3395 3396
    int32_t numOfResultRows = pResultInfo->capacity - pInfo->pRes->info.rows;
    taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pRes, numOfResultRows);
    if (pInfo->pRes->info.rows > pResultInfo->threshold) {
3397 3398 3399 3400 3401 3402
      return;
    }
  }

  // handle the cached new group data block
  if (pInfo->existNewGroupBlock) {
H
Haojun Liao 已提交
3403
    doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, pTaskInfo);
3404 3405 3406
  }
}

S
slzhou 已提交
3407
static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
3408 3409
  SFillOperatorInfo* pInfo = pOperator->info;
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
3410

H
Haojun Liao 已提交
3411
  SResultInfo* pResultInfo = &pOperator->resultInfo;
3412 3413 3414
  SSDataBlock* pResBlock = pInfo->pRes;

  blockDataCleanup(pResBlock);
3415

H
Haojun Liao 已提交
3416 3417
  doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo);
  if (pResBlock->info.rows > pResultInfo->threshold || pResBlock->info.rows > 0) {
3418
    return pResBlock;
H
Haojun Liao 已提交
3419
  }
3420

H
Haojun Liao 已提交
3421
  SOperatorInfo* pDownstream = pOperator->pDownstream[0];
L
Liu Jicong 已提交
3422
  while (1) {
3423
    SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream);
3424 3425 3426 3427 3428
    if (pBlock == NULL) {
      if (pInfo->totalInputRows == 0) {
        pOperator->status = OP_EXEC_DONE;
        return NULL;
      }
3429

3430
      taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
3431
    } else {
3432 3433 3434 3435
      blockDataUpdateTsWindow(pBlock, pInfo->primaryTsCol);

      if (pInfo->curGroupId == 0 || pInfo->curGroupId == pBlock->info.groupId) {
        pInfo->curGroupId = pBlock->info.groupId;   // the first data block
3436 3437

        pInfo->totalInputRows += pBlock->info.rows;
3438

3439 3440
        taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey);
        taosFillSetInputDataBlock(pInfo->pFillInfo, pBlock);
3441 3442 3443 3444 3445 3446
      } else if (pInfo->curGroupId != pBlock->info.groupId) { // the new group data block
        pInfo->existNewGroupBlock = pBlock;

        // Fill the previous group data block, before handle the data block of new group.
        // Close the fill operation for previous group data block
        taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
3447 3448 3449
      }
    }

3450
    blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
3451

3452 3453
    int32_t numOfResultRows = pOperator->resultInfo.capacity - pResBlock->info.rows;
    taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows);
3454 3455

    // current group has no more result to return
3456
    if (pResBlock->info.rows > 0) {
3457 3458
      // 1. The result in current group not reach the threshold of output result, continue
      // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
3459
      if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || pInfo->existNewGroupBlock != NULL) {
3460
        return pResBlock;
3461 3462
      }

H
Haojun Liao 已提交
3463
      doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo);
3464
      if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) {
3465
        return pResBlock;
3466 3467 3468
      }
    } else if (pInfo->existNewGroupBlock) {  // try next group
      assert(pBlock != NULL);
H
Haojun Liao 已提交
3469
      doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, pTaskInfo);
3470 3471
      if (pResBlock->info.rows > pResultInfo->threshold) {
        return pResBlock;
3472 3473 3474 3475 3476 3477 3478
      }
    } else {
      return NULL;
    }
  }
}

S
slzhou 已提交
3479 3480 3481 3482 3483 3484 3485 3486
static SSDataBlock* doFill(SOperatorInfo* pOperator) {
  SFillOperatorInfo* pInfo = pOperator->info;
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

S
slzhou 已提交
3487
  SSDataBlock* fillResult = NULL;
S
slzhou 已提交
3488
  while (true) {
S
slzhou 已提交
3489
    fillResult = doFillImpl(pOperator);
S
slzhou 已提交
3490 3491 3492 3493 3494 3495 3496 3497 3498 3499 3500 3501 3502 3503
    if (fillResult != NULL) {
      doFilter(pInfo->pCondition, fillResult);
    }

    if (fillResult == NULL) {
      doSetOperatorCompleted(pOperator);
      break;
    }

    if (fillResult->info.rows > 0) {
      break;
    }
  }

S
slzhou 已提交
3504 3505 3506 3507
  if (fillResult != NULL) {
    size_t rows = fillResult->info.rows;
    pOperator->resultInfo.totalRows += rows;
  }
S
slzhou 已提交
3508

S
slzhou 已提交
3509
  return fillResult;
S
slzhou 已提交
3510 3511
}

H
Haojun Liao 已提交
3512 3513 3514
static void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
  for (int32_t i = 0; i < numOfExprs; ++i) {
    SExprInfo* pExprInfo = &pExpr[i];
H
Haojun Liao 已提交
3515 3516 3517 3518
    for(int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
        taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
      }
H
Haojun Liao 已提交
3519
    }
H
Haojun Liao 已提交
3520

H
Haojun Liao 已提交
3521
    taosMemoryFree(pExprInfo->base.pParam);
H
Haojun Liao 已提交
3522 3523 3524 3525
    taosMemoryFree(pExprInfo->pExpr);
  }
}

3526 3527 3528 3529 3530
static void destroyOperatorInfo(SOperatorInfo* pOperator) {
  if (pOperator == NULL) {
    return;
  }

3531
  if (pOperator->fpSet.closeFn != NULL) {
3532
    pOperator->fpSet.closeFn(pOperator->info, pOperator->exprSupp.numOfExprs);
3533 3534
  }

H
Haojun Liao 已提交
3535
  if (pOperator->pDownstream != NULL) {
L
Liu Jicong 已提交
3536
    for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
H
Haojun Liao 已提交
3537
      destroyOperatorInfo(pOperator->pDownstream[i]);
3538 3539
    }

wafwerar's avatar
wafwerar 已提交
3540
    taosMemoryFreeClear(pOperator->pDownstream);
H
Haojun Liao 已提交
3541
    pOperator->numOfDownstream = 0;
3542 3543
  }

3544
  cleanupExprSupp(&pOperator->exprSupp);
wafwerar's avatar
wafwerar 已提交
3545
  taosMemoryFreeClear(pOperator);
3546 3547
}

3548 3549 3550 3551 3552 3553 3554 3555 3556 3557 3558 3559 3560 3561 3562
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) {
  *defaultPgsz = 4096;
  while (*defaultPgsz < rowSize * 4) {
    *defaultPgsz <<= 1u;
  }

  // at least four pages need to be in buffer
  *defaultBufsz = 4096 * 256;
  if ((*defaultBufsz) <= (*defaultPgsz)) {
    (*defaultBufsz) = (*defaultPgsz) * 4;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
3563 3564
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                         const char* pKey) {
3565 3566
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);

dengyihao's avatar
dengyihao 已提交
3567 3568
  pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
  pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
3569 3570
  pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);

H
Haojun Liao 已提交
3571
  if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
3572 3573 3574
    return TSDB_CODE_OUT_OF_MEMORY;
  }

dengyihao's avatar
dengyihao 已提交
3575
  uint32_t defaultPgsz = 0;
3576 3577
  uint32_t defaultBufsz = 0;
  getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
H
Haojun Liao 已提交
3578

3579
  int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, TD_TMP_DIR_PATH);
H
Haojun Liao 已提交
3580 3581 3582 3583
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

3584 3585 3586
  return TSDB_CODE_SUCCESS;
}

3587
void cleanupAggSup(SAggSupporter* pAggSup) {
wafwerar's avatar
wafwerar 已提交
3588
  taosMemoryFreeClear(pAggSup->keyBuf);
3589
  taosHashCleanup(pAggSup->pResultRowHashTable);
H
Haojun Liao 已提交
3590
  destroyDiskbasedBuf(pAggSup->pResultBuf);
3591 3592
}

L
Liu Jicong 已提交
3593 3594
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
                    const char* pkey) {
3595 3596 3597 3598 3599
  int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

3600
  doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
L
Liu Jicong 已提交
3601
  for (int32_t i = 0; i < numOfCols; ++i) {
3602
    pSup->pCtx[i].pBuf = pAggSup->pResultBuf;
3603 3604
  }

3605
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
3606 3607
}

3608
void initResultSizeInfo(SResultInfo * pResultInfo, int32_t numOfRows) {
wmmhello's avatar
wmmhello 已提交
3609
  ASSERT(numOfRows != 0);
3610 3611
  pResultInfo->capacity = numOfRows;
  pResultInfo->threshold = numOfRows * 0.75;
3612

3613 3614
  if (pResultInfo->threshold == 0) {
    pResultInfo->threshold = numOfRows;
3615 3616 3617
  }
}

3618 3619 3620 3621 3622
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
  pInfo->pRes = pBlock;
  initResultRowInfo(&pInfo->resultRowInfo);
}

3623 3624 3625 3626 3627 3628 3629 3630 3631 3632 3633 3634 3635 3636 3637 3638 3639 3640 3641
static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
  if (pCtx == NULL) {
    return NULL;
  }

  for (int32_t i = 0; i < numOfOutput; ++i) {
    for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) {
      taosVariantDestroy(&pCtx[i].param[j].param);
    }

    taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
    taosMemoryFree(pCtx[i].input.pData);
    taosMemoryFree(pCtx[i].input.pColumnDataAgg);
  }

  taosMemoryFreeClear(pCtx);
  return NULL;
}

3642
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) {
3643 3644 3645 3646
  pSup->pExprInfo = pExprInfo;
  pSup->numOfExprs = numOfExpr;
  if (pSup->pExprInfo != NULL) {
    pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset);
3647 3648 3649
    if (pSup->pCtx == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
3650
  }
3651 3652

  return TSDB_CODE_SUCCESS;
3653 3654
}

3655 3656 3657 3658 3659 3660 3661 3662 3663 3664
void cleanupExprSupp(SExprSupp* pSupp) {
  destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs);
  if (pSupp->pExprInfo != NULL) {
    destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
  }

  taosMemoryFreeClear(pSupp->pExprInfo);
  taosMemoryFree(pSupp->rowEntryInfoOffset);
}

L
Liu Jicong 已提交
3665
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
S
slzhou 已提交
3666
                                           SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo,
wmmhello's avatar
wmmhello 已提交
3667
                                           int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) {
wafwerar's avatar
wafwerar 已提交
3668
  SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
L
Liu Jicong 已提交
3669
  SOperatorInfo*    pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
3670 3671 3672
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
H
Haojun Liao 已提交
3673

3674
  int32_t numOfRows = 1024;
dengyihao's avatar
dengyihao 已提交
3675
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
3676

3677
  initResultSizeInfo(&pOperator->resultInfo, numOfRows);
3678
  int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
L
Liu Jicong 已提交
3679
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3680 3681
    goto _error;
  }
H
Haojun Liao 已提交
3682

3683
  initBasicInfo(&pInfo->binfo, pResultBlock);
3684 3685 3686 3687
  code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3688

L
Liu Jicong 已提交
3689
  pInfo->groupId = INT32_MIN;
S
slzhou 已提交
3690
  pInfo->pCondition = pCondition;
dengyihao's avatar
dengyihao 已提交
3691
  pOperator->name = "TableAggregate";
3692
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
3693
  pOperator->blocking = true;
dengyihao's avatar
dengyihao 已提交
3694 3695 3696
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
3697

3698 3699
  pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo,
                                         aggEncodeResultRow, aggDecodeResultRow, NULL);
H
Haojun Liao 已提交
3700 3701 3702 3703 3704

  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3705 3706

  return pOperator;
L
Liu Jicong 已提交
3707
_error:
H
Haojun Liao 已提交
3708
  destroyAggOperatorInfo(pInfo, numOfCols);
wafwerar's avatar
wafwerar 已提交
3709 3710
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
H
Haojun Liao 已提交
3711 3712
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
3713 3714
}

3715
void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
3716 3717
  assert(pInfo != NULL);
  cleanupResultRowInfo(&pInfo->resultRowInfo);
H
Haojun Liao 已提交
3718
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
3719 3720
}

H
Haojun Liao 已提交
3721
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
3722
  SOptrBasicInfo* pInfo = (SOptrBasicInfo*)param;
3723
  cleanupBasicInfo(pInfo);
L
Liu Jicong 已提交
3724

D
dapan1121 已提交
3725
  taosMemoryFreeClear(param);
3726
}
H
Haojun Liao 已提交
3727

H
Haojun Liao 已提交
3728 3729 3730 3731 3732 3733 3734 3735

static void freeItem(void* pItem) {
  void** p = pItem;
  if (*p != NULL) {
    taosMemoryFreeClear(*p);
  }
}

H
Haojun Liao 已提交
3736
void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
3737
  SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
L
Liu Jicong 已提交
3738 3739
  cleanupBasicInfo(&pInfo->binfo);

H
Haojun Liao 已提交
3740
  cleanupAggSup(&pInfo->aggSup);
H
Haojun Liao 已提交
3741
  cleanupGroupResInfo(&pInfo->groupResInfo);
D
dapan1121 已提交
3742
  taosMemoryFreeClear(param);
3743
}
3744

H
Haojun Liao 已提交
3745
void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
3746
  SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param;
3747
  pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
H
Haojun Liao 已提交
3748
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
wafwerar's avatar
wafwerar 已提交
3749
  taosMemoryFreeClear(pInfo->p);
L
Liu Jicong 已提交
3750

D
dapan1121 已提交
3751
  taosMemoryFreeClear(param);
3752 3753
}

H
Haojun Liao 已提交
3754
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
D
fix bug  
dapan 已提交
3755 3756 3757
  if (NULL == param) {
    return;
  }
L
Liu Jicong 已提交
3758
  SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*)param;
3759
  cleanupBasicInfo(&pInfo->binfo);
H
Haojun Liao 已提交
3760
  cleanupAggSup(&pInfo->aggSup);
H
Haojun Liao 已提交
3761
  taosArrayDestroy(pInfo->pPseudoColInfo);
L
Liu Jicong 已提交
3762

D
dapan1121 已提交
3763
  taosMemoryFreeClear(param);
3764 3765
}

H
Haojun Liao 已提交
3766
static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) {
3767
  SIndefOperatorInfo* pInfo = (SIndefOperatorInfo*)param;
3768
  cleanupBasicInfo(&pInfo->binfo);
H
Haojun Liao 已提交
3769 3770 3771

  taosArrayDestroy(pInfo->pPseudoColInfo);
  cleanupAggSup(&pInfo->aggSup);
3772
  cleanupExprSupp(&pInfo->scalarSup);
L
Liu Jicong 已提交
3773

D
dapan1121 已提交
3774
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
3775 3776
}

H
Haojun Liao 已提交
3777
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
3778
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
3779 3780 3781 3782
  taosRemoveRef(exchangeObjRefPool, pExInfo->self);
}

void doDestroyExchangeOperatorInfo(void* param) {
X
Xiaoyu Wang 已提交
3783
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
3784

H
Haojun Liao 已提交
3785 3786 3787
  taosArrayDestroy(pExInfo->pSources);
  taosArrayDestroy(pExInfo->pSourceDataInfo);
  if (pExInfo->pResult != NULL) {
H
Haojun Liao 已提交
3788
    pExInfo->pResult = blockDataDestroy(pExInfo->pResult);
H
Haojun Liao 已提交
3789 3790 3791
  }

  tsem_destroy(&pExInfo->ready);
L
Liu Jicong 已提交
3792

D
dapan1121 已提交
3793
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
3794 3795
}

H
Haojun Liao 已提交
3796 3797
static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols) {
  SArray* pList = taosArrayInit(4, sizeof(int32_t));
dengyihao's avatar
dengyihao 已提交
3798
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
3799 3800 3801 3802 3803 3804 3805 3806
    if (fmIsPseudoColumnFunc(pCtx[i].functionId)) {
      taosArrayPush(pList, &i);
    }
  }

  return pList;
}

L
Liu Jicong 已提交
3807
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode,
dengyihao's avatar
dengyihao 已提交
3808
                                         SExecTaskInfo* pTaskInfo) {
wafwerar's avatar
wafwerar 已提交
3809
  SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
L
Liu Jicong 已提交
3810
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
3811 3812 3813
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
3814

L
Liu Jicong 已提交
3815
  int32_t    numOfCols = 0;
3816 3817 3818
  SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &numOfCols);

  SSDataBlock* pResBlock = createResDataBlock(pProjPhyNode->node.pOutputDataBlockDesc);
3819
  initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo);
3820

H
Haojun Liao 已提交
3821
  pInfo->binfo.pRes = pResBlock;
3822
  pInfo->pFilterNode = pProjPhyNode->node.pConditions;
H
Haojun Liao 已提交
3823 3824

  int32_t numOfRows = 4096;
dengyihao's avatar
dengyihao 已提交
3825
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
3826

3827 3828 3829 3830 3831
  // Make sure the size of SSDataBlock will never exceed the size of 2MB.
  int32_t TWOMB = 2 * 1024 * 1024;
  if (numOfRows * pResBlock->info.rowSize > TWOMB) {
    numOfRows = TWOMB / pResBlock->info.rowSize;
  }
3832
  initResultSizeInfo(&pOperator->resultInfo, numOfRows);
3833

3834 3835
  initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
  initBasicInfo(&pInfo->binfo, pResBlock);
3836
  setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols);
3837

3838
  pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols);
X
Xiaoyu Wang 已提交
3839
  pOperator->name = "ProjectOperator";
H
Haojun Liao 已提交
3840
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
X
Xiaoyu Wang 已提交
3841 3842 3843 3844
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
3845

L
Liu Jicong 已提交
3846 3847
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL,
                                         destroyProjectOperatorInfo, NULL, NULL, NULL);
L
Liu Jicong 已提交
3848

3849
  int32_t code = appendDownstream(pOperator, &downstream, 1);
H
Haojun Liao 已提交
3850
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3851 3852
    goto _error;
  }
3853 3854

  return pOperator;
H
Haojun Liao 已提交
3855

L
Liu Jicong 已提交
3856
_error:
H
Haojun Liao 已提交
3857 3858
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
3859 3860
}

3861 3862
static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOperatorInfo* downstream,
                              SExecTaskInfo* pTaskInfo) {
3863 3864 3865 3866 3867 3868 3869 3870 3871 3872 3873 3874 3875 3876 3877 3878 3879 3880 3881 3882 3883 3884 3885 3886 3887 3888 3889 3890 3891 3892 3893 3894 3895
  int32_t order = 0;
  int32_t scanFlag = 0;

  SIndefOperatorInfo* pIndefInfo = pOperator->info;
  SOptrBasicInfo*     pInfo = &pIndefInfo->binfo;
  SExprSupp*          pSup = &pOperator->exprSupp;

  // the pDataBlock are always the same one, no need to call this again
  int32_t code = getTableScanInfo(downstream, &order, &scanFlag);
  if (code != TSDB_CODE_SUCCESS) {
    longjmp(pTaskInfo->env, code);
  }

  // there is an scalar expression that needs to be calculated before apply the group aggregation.
  SExprSupp* pScalarSup = &pIndefInfo->scalarSup;
  if (pScalarSup->pExprInfo != NULL) {
    code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
                                 pIndefInfo->pPseudoColInfo);
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, code);
    }
  }

  setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
  blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);

  code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
                               pIndefInfo->pPseudoColInfo);
  if (code != TSDB_CODE_SUCCESS) {
    longjmp(pTaskInfo->env, code);
  }
}

H
Haojun Liao 已提交
3896 3897
static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
  SIndefOperatorInfo* pIndefInfo = pOperator->info;
3898
  SOptrBasicInfo*     pInfo = &pIndefInfo->binfo;
L
Liu Jicong 已提交
3899
  SExprSupp*          pSup = &pOperator->exprSupp;
H
Haojun Liao 已提交
3900 3901 3902 3903 3904 3905 3906 3907 3908 3909 3910 3911 3912 3913 3914 3915 3916

  SSDataBlock* pRes = pInfo->pRes;
  blockDataCleanup(pRes);

  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  int64_t st = 0;

  if (pOperator->cost.openCost == 0) {
    st = taosGetTimestampUs();
  }

  SOperatorInfo* downstream = pOperator->pDownstream[0];

3917
  while (1) {
3918
    // here we need to handle the existsed group results
3919
    if (pIndefInfo->pNextGroupRes != NULL) {  // todo extract method
3920 3921
      for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
        SqlFunctionCtx* pCtx = &pSup->pCtx[k];
H
Haojun Liao 已提交
3922

3923 3924 3925 3926 3927 3928 3929
        SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
        pResInfo->initialized = false;
        pCtx->pOutput = NULL;
      }

      doHandleDataBlock(pOperator, pIndefInfo->pNextGroupRes, downstream, pTaskInfo);
      pIndefInfo->pNextGroupRes = NULL;
H
Haojun Liao 已提交
3930 3931
    }

3932 3933 3934 3935 3936 3937 3938 3939 3940 3941 3942 3943 3944 3945 3946 3947 3948 3949 3950 3951 3952 3953 3954
    if (pInfo->pRes->info.rows < pOperator->resultInfo.threshold) {
      while (1) {
        // The downstream exec may change the value of the newgroup, so use a local variable instead.
        SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
        if (pBlock == NULL) {
          doSetOperatorCompleted(pOperator);
          break;
        }

        if (pIndefInfo->groupId == 0 && pBlock->info.groupId != 0) {
          pIndefInfo->groupId = pBlock->info.groupId;  // this is the initial group result
        } else {
          if (pIndefInfo->groupId != pBlock->info.groupId) {  // reset output buffer and computing status
            pIndefInfo->groupId = pBlock->info.groupId;
            pIndefInfo->pNextGroupRes = pBlock;
            break;
          }
        }

        doHandleDataBlock(pOperator, pBlock, downstream, pTaskInfo);
        if (pInfo->pRes->info.rows >= pOperator->resultInfo.threshold) {
          break;
        }
H
Haojun Liao 已提交
3955 3956 3957
      }
    }

3958 3959 3960 3961
    doFilter(pIndefInfo->pCondition, pInfo->pRes);
    size_t rows = pInfo->pRes->info.rows;
    if (rows >= 0) {
      break;
H
Haojun Liao 已提交
3962 3963 3964 3965 3966 3967 3968 3969 3970 3971 3972 3973 3974
    }
  }

  size_t rows = pInfo->pRes->info.rows;
  pOperator->resultInfo.totalRows += rows;

  if (pOperator->cost.openCost == 0) {
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
  }

  return (rows > 0) ? pInfo->pRes : NULL;
}

3975 3976
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode,
                                                 SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
3977
  SIndefOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIndefOperatorInfo));
3978
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
3979 3980 3981 3982
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

3983 3984
  SExprSupp* pSup = &pOperator->exprSupp;

H
Haojun Liao 已提交
3985 3986 3987
  SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode;

  int32_t    numOfExpr = 0;
X
Xiaoyu Wang 已提交
3988
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->pFuncs, NULL, &numOfExpr);
H
Haojun Liao 已提交
3989 3990

  if (pPhyNode->pExprs != NULL) {
3991
    int32_t    num = 0;
3992
    SExprInfo* pSExpr = createExprInfo(pPhyNode->pExprs, NULL, &num);
3993
    int32_t    code = initExprSupp(&pInfo->scalarSup, pSExpr, num);
3994 3995 3996
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
H
Haojun Liao 已提交
3997 3998
  }

3999
  SSDataBlock* pResBlock = createResDataBlock(pPhyNode->node.pOutputDataBlockDesc);
H
Haojun Liao 已提交
4000 4001 4002 4003 4004 4005 4006 4007 4008

  int32_t numOfRows = 4096;
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;

  // Make sure the size of SSDataBlock will never exceed the size of 2MB.
  int32_t TWOMB = 2 * 1024 * 1024;
  if (numOfRows * pResBlock->info.rowSize > TWOMB) {
    numOfRows = TWOMB / pResBlock->info.rowSize;
  }
4009

4010
  initResultSizeInfo(&pOperator->resultInfo, numOfRows);
H
Haojun Liao 已提交
4011

4012
  initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str);
4013 4014
  initBasicInfo(&pInfo->binfo, pResBlock);

4015
  setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
H
Haojun Liao 已提交
4016

4017 4018 4019
  pInfo->binfo.pRes = pResBlock;
  pInfo->pCondition = pPhyNode->node.pConditions;
  pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
H
Haojun Liao 已提交
4020

4021
  pOperator->name = "IndefinitOperator";
4022
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC;
4023 4024 4025
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
4026
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
4027 4028 4029 4030 4031 4032 4033 4034 4035 4036 4037

  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL,
                                         destroyIndefinitOperatorInfo, NULL, NULL, NULL);

  int32_t code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

4038
_error:
H
Haojun Liao 已提交
4039 4040 4041 4042 4043 4044
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}

4045
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SNodeListNode* pValNode,
L
Liu Jicong 已提交
4046
                            STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) {
4047
  SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pValNode);
H
Haojun Liao 已提交
4048

4049
  STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey);
4050
  w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC);
H
Haojun Liao 已提交
4051 4052

  int32_t order = TSDB_ORDER_ASC;
4053 4054
  pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval,
      fillType, pColInfo, pInfo->primaryTsCol, id);
H
Haojun Liao 已提交
4055

4056
  pInfo->win = win;
L
Liu Jicong 已提交
4057
  pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
4058

H
Haojun Liao 已提交
4059
  if (pInfo->pFillInfo == NULL || pInfo->p == NULL) {
H
Haojun Liao 已提交
4060 4061
    taosMemoryFree(pInfo->pFillInfo);
    taosMemoryFree(pInfo->p);
H
Haojun Liao 已提交
4062 4063 4064 4065 4066 4067
    return TSDB_CODE_OUT_OF_MEMORY;
  } else {
    return TSDB_CODE_SUCCESS;
  }
}

H
Haojun Liao 已提交
4068
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo) {
4069 4070 4071 4072 4073 4074
  SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

L
Liu Jicong 已提交
4075 4076 4077
  int32_t      num = 0;
  SSDataBlock* pResBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc);
  SExprInfo*   pExprInfo = createExprInfo(pPhyFillNode->pTargets, NULL, &num);
4078 4079 4080 4081
  SInterval*   pInterval =
      QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
            ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
            : &((SIntervalAggOperatorInfo*)downstream->info)->interval;
4082

4083
  int32_t type = convertFillType(pPhyFillNode->mode);
4084

H
Haojun Liao 已提交
4085
  SResultInfo* pResultInfo = &pOperator->resultInfo;
4086
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
4087
  pInfo->primaryTsCol = ((SColumnNode*)pPhyFillNode->pWStartTs)->slotId;
4088

4089
  int32_t numOfOutputCols = 0;
4090 4091
  SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pTargets, pPhyFillNode->node.pOutputDataBlockDesc,
                                                 &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
4092

4093 4094
  int32_t code = initFillInfo(pInfo, pExprInfo, num, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange,
                              pResultInfo->capacity, pTaskInfo->id.str, pInterval, type);
4095 4096 4097
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
4098

4099 4100 4101 4102 4103 4104 4105 4106
  pInfo->pRes = pResBlock;
  pInfo->pCondition = pPhyFillNode->node.pConditions;
  pInfo->pColMatchColInfo = pColMatchColInfo;
  pOperator->name = "FillOperator";
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
  pOperator->exprSupp.pExprInfo = pExprInfo;
4107
  pOperator->exprSupp.numOfExprs = num;
4108 4109
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
4110

L
Liu Jicong 已提交
4111 4112
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL);
4113

4114
  code = appendDownstream(pOperator, &downstream, 1);
4115
  return pOperator;
H
Haojun Liao 已提交
4116

L
Liu Jicong 已提交
4117
_error:
wafwerar's avatar
wafwerar 已提交
4118 4119
  taosMemoryFreeClear(pOperator);
  taosMemoryFreeClear(pInfo);
H
Haojun Liao 已提交
4120
  return NULL;
4121 4122
}

D
dapan1121 已提交
4123
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model, char* dbFName) {
wafwerar's avatar
wafwerar 已提交
4124
  SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
4125
  setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
4126

4127
  pTaskInfo->schemaInfo.dbname = strdup(dbFName);
4128
  pTaskInfo->cost.created = taosGetTimestampMs();
H
Haojun Liao 已提交
4129
  pTaskInfo->id.queryId = queryId;
dengyihao's avatar
dengyihao 已提交
4130
  pTaskInfo->execModel = model;
H
Haojun Liao 已提交
4131

wafwerar's avatar
wafwerar 已提交
4132
  char* p = taosMemoryCalloc(1, 128);
L
Liu Jicong 已提交
4133
  snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId);
H
Haojun Liao 已提交
4134
  pTaskInfo->id.str = p;
H
Haojun Liao 已提交
4135

4136 4137
  return pTaskInfo;
}
H
Haojun Liao 已提交
4138

H
Hongze Cheng 已提交
4139
static STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
H
Haojun Liao 已提交
4140
                                       STableListInfo* pTableListInfo, const char* idstr);
H
Haojun Liao 已提交
4141

H
Haojun Liao 已提交
4142
static SArray* extractColumnInfo(SNodeList* pNodeList);
4143

4144
int32_t extractTableSchemaInfo(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) {
4145 4146
  SMetaReader mr = {0};
  metaReaderInit(&mr, pHandle->meta, 0);
D
dapan1121 已提交
4147
  int32_t code = metaGetTableEntryByUid(&mr, uid);
4148
  if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
4149
    metaReaderClear(&mr);
4150
    return terrno;
D
dapan1121 已提交
4151
  }
4152

4153
  pTaskInfo->schemaInfo.tablename = strdup(mr.me.name);
4154 4155

  if (mr.me.type == TSDB_SUPER_TABLE) {
4156 4157
    pTaskInfo->schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
    pTaskInfo->schemaInfo.tversion = mr.me.stbEntry.schemaTag.version;
4158
  } else if (mr.me.type == TSDB_CHILD_TABLE) {
4159 4160
    tDecoderClear(&mr.coder);

4161 4162
    tb_uid_t suid = mr.me.ctbEntry.suid;
    metaGetTableEntryByUid(&mr, suid);
4163 4164
    pTaskInfo->schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
    pTaskInfo->schemaInfo.tversion = mr.me.stbEntry.schemaTag.version;
4165
  } else {
4166
    pTaskInfo->schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
4167
  }
4168 4169

  metaReaderClear(&mr);
D
dapan1121 已提交
4170
  return TSDB_CODE_SUCCESS;
4171 4172
}

4173 4174 4175
static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
  taosMemoryFreeClear(pSchemaInfo->dbname);
  if (pSchemaInfo->sw == NULL) {
4176 4177 4178
    return;
  }

4179 4180 4181
  taosMemoryFree(pSchemaInfo->tablename);
  taosMemoryFree(pSchemaInfo->sw->pSchema);
  taosMemoryFree(pSchemaInfo->sw);
4182 4183
}

4184
static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum) {
wmmhello's avatar
wmmhello 已提交
4185
  taosArrayClear(pTableListInfo->pGroupList);
4186 4187
  SArray* sortSupport = taosArrayInit(groupNum, sizeof(uint64_t));
  if (sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
4188 4189
  for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
4190
    uint64_t*      groupId = taosHashGet(pTableListInfo->map, &info->uid, sizeof(uint64_t));
wmmhello's avatar
wmmhello 已提交
4191 4192

    int32_t index = taosArraySearchIdx(sortSupport, groupId, compareUint64Val, TD_EQ);
4193 4194 4195 4196
    if (index == -1) {
      void*   p = taosArraySearch(sortSupport, groupId, compareUint64Val, TD_GT);
      SArray* tGroup = taosArrayInit(8, sizeof(STableKeyInfo));
      if (tGroup == NULL) {
wmmhello's avatar
wmmhello 已提交
4197 4198 4199
        taosArrayDestroy(sortSupport);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
4200
      if (taosArrayPush(tGroup, info) == NULL) {
wmmhello's avatar
wmmhello 已提交
4201 4202 4203 4204
        qError("taos push info array error");
        taosArrayDestroy(sortSupport);
        return TSDB_CODE_QRY_APP_ERROR;
      }
4205
      if (p == NULL) {
wmmhello's avatar
wmmhello 已提交
4206
        if (taosArrayPush(sortSupport, groupId) == NULL) {
wmmhello's avatar
wmmhello 已提交
4207 4208 4209 4210
          qError("taos push support array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
wmmhello's avatar
wmmhello 已提交
4211
        if (taosArrayPush(pTableListInfo->pGroupList, &tGroup) == NULL) {
wmmhello's avatar
wmmhello 已提交
4212 4213 4214 4215
          qError("taos push group array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
4216
      } else {
wmmhello's avatar
wmmhello 已提交
4217
        int32_t pos = TARRAY_ELEM_IDX(sortSupport, p);
4218
        if (taosArrayInsert(sortSupport, pos, groupId) == NULL) {
wmmhello's avatar
wmmhello 已提交
4219 4220 4221 4222
          qError("taos insert support array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
4223
        if (taosArrayInsert(pTableListInfo->pGroupList, pos, &tGroup) == NULL) {
wmmhello's avatar
wmmhello 已提交
4224 4225 4226 4227 4228
          qError("taos insert group array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
      }
4229
    } else {
wmmhello's avatar
wmmhello 已提交
4230
      SArray* tGroup = (SArray*)taosArrayGetP(pTableListInfo->pGroupList, index);
4231
      if (taosArrayPush(tGroup, info) == NULL) {
wmmhello's avatar
wmmhello 已提交
4232 4233 4234 4235 4236 4237 4238 4239 4240 4241
        qError("taos push uid array error");
        taosArrayDestroy(sortSupport);
        return TSDB_CODE_QRY_APP_ERROR;
      }
    }
  }
  taosArrayDestroy(sortSupport);
  return TDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
4242 4243
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group) {
  if (group == NULL) {
wmmhello's avatar
wmmhello 已提交
4244 4245 4246 4247 4248 4249 4250 4251
    return TDB_CODE_SUCCESS;
  }

  pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (pTableListInfo->map == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  int32_t keyLen = 0;
X
Xiaoyu Wang 已提交
4252
  void*   keyBuf = NULL;
wmmhello's avatar
wmmhello 已提交
4253

4254
  SNode* node;
wmmhello's avatar
wmmhello 已提交
4255
  FOREACH(node, group) {
4256
    SExprNode* pExpr = (SExprNode*)node;
wmmhello's avatar
wmmhello 已提交
4257
    keyLen += pExpr->resType.bytes;
wmmhello's avatar
wmmhello 已提交
4258 4259
  }

wmmhello's avatar
wmmhello 已提交
4260
  int32_t nullFlagSize = sizeof(int8_t) * LIST_LENGTH(group);
wmmhello's avatar
wmmhello 已提交
4261 4262 4263 4264 4265 4266 4267
  keyLen += nullFlagSize;

  keyBuf = taosMemoryCalloc(1, keyLen);
  if (keyBuf == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

4268
  int32_t groupNum = 0;
X
Xiaoyu Wang 已提交
4269 4270 4271
  for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
    SMetaReader    mr = {0};
wmmhello's avatar
wmmhello 已提交
4272 4273 4274
    metaReaderInit(&mr, pHandle->meta, 0);
    metaGetTableEntryByUid(&mr, info->uid);

4275
    SNodeList* groupNew = nodesCloneList(group);
wmmhello's avatar
wmmhello 已提交
4276

wmmhello's avatar
wmmhello 已提交
4277
    nodesRewriteExprsPostOrder(groupNew, doTranslateTagExpr, &mr);
wmmhello's avatar
wmmhello 已提交
4278
    char* isNull = (char*)keyBuf;
wmmhello's avatar
wmmhello 已提交
4279 4280
    char* pStart = (char*)keyBuf + nullFlagSize;

4281
    SNode*  pNode;
wmmhello's avatar
wmmhello 已提交
4282
    int32_t index = 0;
4283
    FOREACH(pNode, groupNew) {
wmmhello's avatar
wmmhello 已提交
4284 4285 4286 4287
      SNode*  pNew = NULL;
      int32_t code = scalarCalculateConstants(pNode, &pNew);
      if (TSDB_CODE_SUCCESS == code) {
        REPLACE_NODE(pNew);
X
Xiaoyu Wang 已提交
4288
      } else {
4289
        taosMemoryFree(keyBuf);
wmmhello's avatar
wmmhello 已提交
4290
        nodesClearList(groupNew);
4291
        metaReaderClear(&mr);
wmmhello's avatar
wmmhello 已提交
4292
        return code;
wmmhello's avatar
wmmhello 已提交
4293
      }
4294

wmmhello's avatar
wmmhello 已提交
4295
      ASSERT(nodeType(pNew) == QUERY_NODE_VALUE);
4296
      SValueNode* pValue = (SValueNode*)pNew;
4297

wmmhello's avatar
wmmhello 已提交
4298
      if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL || pValue->isNull) {
wmmhello's avatar
wmmhello 已提交
4299 4300 4301 4302
        isNull[index++] = 1;
        continue;
      } else {
        isNull[index++] = 0;
4303
        char* data = nodesGetValueFromNode(pValue);
L
Liu Jicong 已提交
4304 4305
        if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) {
          if (tTagIsJson(data)) {
wmmhello's avatar
wmmhello 已提交
4306 4307 4308
            terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
            taosMemoryFree(keyBuf);
            nodesClearList(groupNew);
4309
            metaReaderClear(&mr);
wmmhello's avatar
wmmhello 已提交
4310 4311
            return terrno;
          }
wmmhello's avatar
wmmhello 已提交
4312
          int32_t len = getJsonValueLen(data);
wmmhello's avatar
wmmhello 已提交
4313 4314 4315
          memcpy(pStart, data, len);
          pStart += len;
        } else if (IS_VAR_DATA_TYPE(pValue->node.resType.type)) {
wmmhello's avatar
wmmhello 已提交
4316 4317
          memcpy(pStart, data, varDataTLen(data));
          pStart += varDataTLen(data);
wmmhello's avatar
wmmhello 已提交
4318
        } else {
wmmhello's avatar
wmmhello 已提交
4319 4320
          memcpy(pStart, data, pValue->node.resType.bytes);
          pStart += pValue->node.resType.bytes;
wmmhello's avatar
wmmhello 已提交
4321 4322 4323
        }
      }
    }
4324

4325
    int32_t  len = (int32_t)(pStart - (char*)keyBuf);
4326 4327
    uint64_t groupId = calcGroupId(keyBuf, len);
    taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &groupId, sizeof(uint64_t));
S
slzhou 已提交
4328
    info->groupId = groupId;
4329
    groupNum++;
wmmhello's avatar
wmmhello 已提交
4330

wmmhello's avatar
wmmhello 已提交
4331
    nodesClearList(groupNew);
wmmhello's avatar
wmmhello 已提交
4332 4333 4334
    metaReaderClear(&mr);
  }
  taosMemoryFree(keyBuf);
4335

4336
  if (pTableListInfo->needSortTableByGroupId) {
wmmhello's avatar
wmmhello 已提交
4337
    return sortTableGroup(pTableListInfo, groupNum);
4338 4339
  }

wmmhello's avatar
wmmhello 已提交
4340 4341 4342
  return TDB_CODE_SUCCESS;
}

4343 4344 4345 4346 4347 4348 4349 4350 4351 4352 4353 4354 4355 4356 4357 4358 4359 4360 4361 4362 4363 4364 4365 4366
static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) {
  memset(pCond, 0, sizeof(SQueryTableDataCond));

  pCond->order = TSDB_ORDER_ASC;
  pCond->numOfCols = 1;
  pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
  if (pCond->colList == NULL) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return terrno;
  }

  pCond->colList->colId = 1;
  pCond->colList->type = TSDB_DATA_TYPE_TIMESTAMP;
  pCond->colList->bytes = sizeof(TSKEY);

  pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
  pCond->suid = uid;
  pCond->type = BLOCK_LOAD_OFFSET_ORDER;
  pCond->startVersion = -1;
  pCond->endVersion  =  -1;

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
4367
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
4368
                                  STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, const char* pUser) {
4369 4370
  int32_t type = nodeType(pPhyNode);

X
Xiaoyu Wang 已提交
4371
  if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
H
Haojun Liao 已提交
4372
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
dengyihao's avatar
dengyihao 已提交
4373
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
4374

4375
      int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags,
4376
                                             pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
4377
      if (code) {
wmmhello's avatar
wmmhello 已提交
4378
        pTaskInfo->code = code;
D
dapan1121 已提交
4379 4380
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
4381

4382
      code = extractTableSchemaInfo(pHandle, pTableScanNode->scan.uid, pTaskInfo);
S
slzhou 已提交
4383
      if (code) {
4384
        pTaskInfo->code = terrno;
wmmhello's avatar
wmmhello 已提交
4385 4386 4387
        return NULL;
      }

H
Haojun Liao 已提交
4388
      SOperatorInfo*  pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
4389 4390
      STableScanInfo* pScanInfo = pOperator->info;
      pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
S
slzhou 已提交
4391
      return pOperator;
L
Liu Jicong 已提交
4392

S
slzhou 已提交
4393 4394
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
      STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
4395
      int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags,
4396
                                             pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
L
Liu Jicong 已提交
4397
      if (code) {
wmmhello's avatar
wmmhello 已提交
4398
        pTaskInfo->code = code;
wmmhello's avatar
wmmhello 已提交
4399 4400
        return NULL;
      }
4401

4402
      code = extractTableSchemaInfo(pHandle, pTableScanNode->scan.uid, pTaskInfo);
wmmhello's avatar
wmmhello 已提交
4403 4404 4405 4406
      if (code) {
        pTaskInfo->code = terrno;
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
4407

4408
      SOperatorInfo* pOperator =
4409
          createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo);
wmmhello's avatar
wmmhello 已提交
4410

4411 4412 4413
      STableScanInfo* pScanInfo = pOperator->info;
      pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
      return pOperator;
L
Liu Jicong 已提交
4414

H
Haojun Liao 已提交
4415
    } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
4416
      return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
4417
    } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
5
54liuyao 已提交
4418
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
4419
      STimeWindowAggSupp   twSup = {
L
Liu Jicong 已提交
4420 4421 4422 4423
            .waterMark = pTableScanNode->watermark,
            .calTrigger = pTableScanNode->triggerType,
            .maxTs = INT64_MIN,
      };
5
54liuyao 已提交
4424
      if (pHandle->vnode) {
4425
        int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags,
4426
                                               pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
L
Liu Jicong 已提交
4427
        if (code) {
wmmhello's avatar
wmmhello 已提交
4428 4429 4430
          pTaskInfo->code = code;
          return NULL;
        }
5
54liuyao 已提交
4431
      }
4432

4433
      SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo, &twSup);
H
Haojun Liao 已提交
4434
      return pOperator;
L
Liu Jicong 已提交
4435

H
Haojun Liao 已提交
4436
    } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
L
Liu Jicong 已提交
4437
      SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
4438
      return createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo);
4439
    } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
X
Xiaoyu Wang 已提交
4440
      STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
4441

4442
      int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanPhyNode, pTagCond, pTagIndexCond, pTableListInfo);
4443
      if (code != TSDB_CODE_SUCCESS) {
4444
        pTaskInfo->code = terrno;
4445 4446 4447
        return NULL;
      }

4448
      return createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo);
4449
    } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
4450
      SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
4451 4452 4453
      pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));

      if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
4454
        int32_t code = vnodeGetAllTableList(pHandle->vnode, pBlockNode->uid, pTableListInfo->pTableList);
4455 4456 4457 4458 4459
        if (code != TSDB_CODE_SUCCESS) {
          pTaskInfo->code = terrno;
          return NULL;
        }
      } else {  // Create one table group.
S
slzhou 已提交
4460
        STableKeyInfo info = {.lastKey = 0, .uid = pBlockNode->uid, .groupId = 0};
4461 4462 4463 4464
        taosArrayPush(pTableListInfo->pTableList, &info);
      }

      SQueryTableDataCond cond = {0};
4465 4466 4467
      int32_t code = initTableblockDistQueryCond(pBlockNode->suid, &cond);
      if (code != TSDB_CODE_SUCCESS) {
        return NULL;
4468
      }
H
Haojun Liao 已提交
4469 4470 4471

      STsdbReader* pReader = NULL;
      tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, "");
4472 4473
      cleanupQueryTableDataCond(&cond);

4474
      return createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo);
H
Haojun Liao 已提交
4475 4476 4477
    } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
      SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;

4478
      int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
4479 4480 4481 4482
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
      }
4483

4484 4485 4486 4487
      code = extractTableSchemaInfo(pHandle, pScanNode->scan.uid, pTaskInfo);
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
H
Haojun Liao 已提交
4488 4489
      }

4490
      return createLastrowScanOperator(pScanNode, pHandle, pTaskInfo);
H
Haojun Liao 已提交
4491 4492
    } else {
      ASSERT(0);
H
Haojun Liao 已提交
4493 4494 4495
    }
  }

4496 4497
  int32_t num = 0;
  size_t  size = LIST_LENGTH(pPhyNode->pChildren);
H
Haojun Liao 已提交
4498

4499
  SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
dengyihao's avatar
dengyihao 已提交
4500
  for (int32_t i = 0; i < size; ++i) {
4501
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
4502
    ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pUser);
4503 4504 4505
    if (ops[i] == NULL) {
      return NULL;
    }
4506
  }
H
Haojun Liao 已提交
4507

4508
  SOperatorInfo* pOptr = NULL;
H
Haojun Liao 已提交
4509
  if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
4510
    pOptr = createProjectOperatorInfo(ops[0], (SProjectPhysiNode*)pPhyNode, pTaskInfo);
4511
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) {
H
Haojun Liao 已提交
4512 4513
    SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
    SExprInfo*     pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
4514
    SSDataBlock*   pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
H
Haojun Liao 已提交
4515

dengyihao's avatar
dengyihao 已提交
4516
    int32_t    numOfScalarExpr = 0;
4517 4518 4519 4520 4521
    SExprInfo* pScalarExprInfo = NULL;
    if (pAggNode->pExprs != NULL) {
      pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
    }

H
Haojun Liao 已提交
4522 4523
    if (pAggNode->pGroupKeys != NULL) {
      SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys);
dengyihao's avatar
dengyihao 已提交
4524
      pOptr = createGroupOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions,
wmmhello's avatar
wmmhello 已提交
4525
                                      pScalarExprInfo, numOfScalarExpr, pTaskInfo);
H
Haojun Liao 已提交
4526
    } else {
L
Liu Jicong 已提交
4527 4528
      pOptr = createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pAggNode->node.pConditions,
                                          pScalarExprInfo, numOfScalarExpr, pTaskInfo);
H
Haojun Liao 已提交
4529
    }
X
Xiaoyu Wang 已提交
4530
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type || QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
H
Haojun Liao 已提交
4531
    SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
4532

H
Haojun Liao 已提交
4533
    SExprInfo*   pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
4534
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
H
Haojun Liao 已提交
4535

dengyihao's avatar
dengyihao 已提交
4536 4537 4538 4539 4540 4541
    SInterval interval = {.interval = pIntervalPhyNode->interval,
                          .sliding = pIntervalPhyNode->sliding,
                          .intervalUnit = pIntervalPhyNode->intervalUnit,
                          .slidingUnit = pIntervalPhyNode->slidingUnit,
                          .offset = pIntervalPhyNode->offset,
                          .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
H
Haojun Liao 已提交
4542

X
Xiaoyu Wang 已提交
4543 4544 4545 4546 4547
    STimeWindowAggSupp as = {
        .waterMark = pIntervalPhyNode->window.watermark,
        .calTrigger = pIntervalPhyNode->window.triggerType,
        .maxTs = INT64_MIN,
    };
4548
    ASSERT(as.calTrigger != STREAM_TRIGGER_MAX_DELAY);
4549

4550
    int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
L
Liu Jicong 已提交
4551
    bool    isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
4552 4553
    pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pIntervalPhyNode,
                                       pTaskInfo, isStream);
4554

4555 4556
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
    SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode;
S
shenglian zhou 已提交
4557 4558 4559 4560 4561 4562 4563 4564 4565 4566

    SExprInfo*   pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);

    SInterval interval = {.interval = pIntervalPhyNode->interval,
                          .sliding = pIntervalPhyNode->sliding,
                          .intervalUnit = pIntervalPhyNode->intervalUnit,
                          .slidingUnit = pIntervalPhyNode->slidingUnit,
                          .offset = pIntervalPhyNode->offset,
                          .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
4567

S
shenglian zhou 已提交
4568
    int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
L
Liu Jicong 已提交
4569 4570
    pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId,
                                                   pPhyNode->pConditions, pTaskInfo);
S
shenglian zhou 已提交
4571
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
X
Xiaoyu Wang 已提交
4572
    SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
S
shenglian zhou 已提交
4573 4574 4575 4576 4577 4578 4579 4580 4581 4582

    SExprInfo*   pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);

    SInterval interval = {.interval = pIntervalPhyNode->interval,
                          .sliding = pIntervalPhyNode->sliding,
                          .intervalUnit = pIntervalPhyNode->intervalUnit,
                          .slidingUnit = pIntervalPhyNode->slidingUnit,
                          .offset = pIntervalPhyNode->offset,
                          .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
4583

S
shenglian zhou 已提交
4584 4585
    int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
    pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pTaskInfo);
5
54liuyao 已提交
4586
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
4587
    int32_t children = 0;
5
54liuyao 已提交
4588 4589
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) {
5
54liuyao 已提交
4590
    int32_t children = pHandle->numOfVgroups;
5
54liuyao 已提交
4591
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
4592
  } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
4593
    pOptr = createSortOperatorInfo(ops[0], (SSortPhysiNode*)pPhyNode, pTaskInfo);
S
shenglian zhou 已提交
4594 4595
  } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT == type) {
    pOptr = createGroupSortOperatorInfo(ops[0], (SGroupSortPhysiNode*)pPhyNode, pTaskInfo);
X
Xiaoyu Wang 已提交
4596
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) {
4597
    SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode;
4598
    pOptr = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo);
4599
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
H
Haojun Liao 已提交
4600 4601
    SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;

X
Xiaoyu Wang 已提交
4602 4603
    STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark,
                             .calTrigger = pSessionNode->window.triggerType};
4604

H
Haojun Liao 已提交
4605
    SExprInfo*   pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num);
4606
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
4607 4608
    int32_t      tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;

L
Liu Jicong 已提交
4609 4610
    pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as,
                                         pPhyNode->pConditions, pTaskInfo);
4611
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
4612 4613 4614 4615 4616 4617 4618
    pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
    int32_t children = 0;
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION == type) {
    int32_t children = 1;
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
4619
  } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
4620
    pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
4621
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
dengyihao's avatar
dengyihao 已提交
4622
    SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
4623

4624 4625
    STimeWindowAggSupp as = {.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};

dengyihao's avatar
dengyihao 已提交
4626
    SExprInfo*   pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
4627
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
4628 4629
    int32_t      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;

4630
    SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
X
Xiaoyu Wang 已提交
4631
    SColumn      col = extractColumnFromColumnNode(pColNode);
L
Liu Jicong 已提交
4632 4633
    pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pPhyNode->pConditions,
                                          pTaskInfo);
4634
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
5
54liuyao 已提交
4635
    pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
4636
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
4637
    pOptr = createMergeJoinOperatorInfo(ops, size, (SJoinPhysiNode*)pPhyNode, pTaskInfo);
4638
  } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
H
Haojun Liao 已提交
4639
    pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
4640 4641
  } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
    pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo);
4642 4643
  } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
    pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
4644 4645
  } else {
    ASSERT(0);
H
Haojun Liao 已提交
4646
  }
4647 4648 4649

  taosMemoryFree(ops);
  return pOptr;
4650
}
H
Haojun Liao 已提交
4651

H
Haojun Liao 已提交
4652
SArray* extractColumnInfo(SNodeList* pNodeList) {
L
Liu Jicong 已提交
4653
  size_t  numOfCols = LIST_LENGTH(pNodeList);
H
Haojun Liao 已提交
4654 4655 4656 4657 4658 4659
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
  if (pList == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

L
Liu Jicong 已提交
4660 4661
  for (int32_t i = 0; i < numOfCols; ++i) {
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
H
Haojun Liao 已提交
4662

4663 4664 4665
    if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

4666
      SColumn c = extractColumnFromColumnNode(pColNode);
4667 4668
      taosArrayPush(pList, &c);
    } else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) {
L
Liu Jicong 已提交
4669 4670
      SValueNode* pValNode = (SValueNode*)pNode->pExpr;
      SColumn     c = {0};
4671
      c.slotId = pNode->slotId;
L
Liu Jicong 已提交
4672 4673 4674 4675
      c.colId = pNode->slotId;
      c.type = pValNode->node.type;
      c.bytes = pValNode->node.resType.bytes;
      c.scale = pValNode->node.resType.scale;
4676 4677 4678 4679
      c.precision = pValNode->node.resType.precision;

      taosArrayPush(pList, &c);
    }
H
Haojun Liao 已提交
4680 4681 4682 4683 4684
  }

  return pList;
}

4685
#if 0
L
Liu Jicong 已提交
4686 4687
STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
                                STableListInfo* pTableListInfo, const char* idstr) {
4688
  int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo);
wmmhello's avatar
wmmhello 已提交
4689 4690 4691 4692 4693 4694
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
    code = 0;
H
Haojun Liao 已提交
4695
    qDebug("no table qualified for query, %s", idstr);
wmmhello's avatar
wmmhello 已提交
4696 4697 4698
    goto _error;
  }

4699
  SQueryTableDataCond cond = {0};
wmmhello's avatar
wmmhello 已提交
4700
  code = initQueryTableDataCond(&cond, pTableScanNode);
4701
  if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
4702
    goto _error;
X
Xiaoyu Wang 已提交
4703
  }
4704

H
Hongze Cheng 已提交
4705
  STsdbReader* pReader;
H
Haojun Liao 已提交
4706
  code = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, idstr);
H
Haojun Liao 已提交
4707 4708 4709 4710
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

4711
  cleanupQueryTableDataCond(&cond);
H
Haojun Liao 已提交
4712 4713

  return pReader;
wmmhello's avatar
wmmhello 已提交
4714 4715 4716 4717

_error:
  terrno = code;
  return NULL;
H
Haojun Liao 已提交
4718
}
4719
#endif
H
Haojun Liao 已提交
4720

L
Liu Jicong 已提交
4721 4722 4723 4724 4725 4726 4727 4728 4729 4730 4731 4732 4733
static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator");
      return TSDB_CODE_QRY_APP_ERROR;
    }

    if (pOperator->numOfDownstream > 1) {
      qError("join not supported for stream block scan");
      return TSDB_CODE_QRY_APP_ERROR;
    }
    return extractTbscanInStreamOpTree(pOperator->pDownstream[0], ppInfo);
  } else {
4734 4735 4736
    SStreamScanInfo* pInfo = pOperator->info;
    ASSERT(pInfo->pTableScanOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
    *ppInfo = pInfo->pTableScanOp->info;
L
Liu Jicong 已提交
4737 4738 4739 4740
    return 0;
  }
}

4741 4742 4743 4744 4745 4746 4747 4748 4749 4750 4751 4752 4753 4754 4755 4756 4757 4758 4759 4760 4761 4762
int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) {
  if (pNode->pChildren == NULL || LIST_LENGTH(pNode->pChildren) == 0) {
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == pNode->type) {
      *ppNode = (STableScanPhysiNode*)pNode;
      return 0;
    } else {
      ASSERT(0);
      terrno = TSDB_CODE_QRY_APP_ERROR;
      return -1;
    }
  } else {
    if (LIST_LENGTH(pNode->pChildren) != 1) {
      ASSERT(0);
      terrno = TSDB_CODE_QRY_APP_ERROR;
      return -1;
    }
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pNode->pChildren, 0);
    return extractTableScanNode(pChildNode, ppNode);
  }
  return -1;
}

4763
#if 0
L
Liu Jicong 已提交
4764 4765 4766 4767 4768
int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHandle, int64_t uid, int64_t ts) {
  STableScanInfo* pTableScanInfo = NULL;
  if (extractTbscanInStreamOpTree(pOperator, &pTableScanInfo) < 0) {
    return -1;
  }
4769

L
Liu Jicong 已提交
4770 4771 4772 4773
  STableScanPhysiNode* pNode = NULL;
  if (extractTableScanNode(plan->pNode, &pNode) < 0) {
    ASSERT(0);
  }
4774

H
Haojun Liao 已提交
4775
  tsdbReaderClose(pTableScanInfo->dataReader);
4776

L
Liu Jicong 已提交
4777
  STableListInfo info = {0};
H
Haojun Liao 已提交
4778
  pTableScanInfo->dataReader = doCreateDataReader(pNode, pHandle, &info, NULL);
L
Liu Jicong 已提交
4779 4780 4781 4782
  if (pTableScanInfo->dataReader == NULL) {
    ASSERT(0);
    qError("failed to create data reader");
    return TSDB_CODE_QRY_APP_ERROR;
4783
  }
L
Liu Jicong 已提交
4784
  // TODO: set uid and ts to data reader
4785 4786
  return 0;
}
4787
#endif
4788

C
Cary Xu 已提交
4789
int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length, int32_t* nOptrWithVal) {
wmmhello's avatar
wmmhello 已提交
4790
  int32_t code = TDB_CODE_SUCCESS;
4791
  char*   pCurrent = NULL;
wmmhello's avatar
wmmhello 已提交
4792
  int32_t currLength = 0;
4793
  if (ops->fpSet.encodeResultRow) {
C
Cary Xu 已提交
4794
    if (result == NULL || length == NULL || nOptrWithVal == NULL) {
wmmhello's avatar
wmmhello 已提交
4795 4796 4797
      return TSDB_CODE_TSC_INVALID_INPUT;
    }
    code = ops->fpSet.encodeResultRow(ops, &pCurrent, &currLength);
wmmhello's avatar
wmmhello 已提交
4798

4799 4800
    if (code != TDB_CODE_SUCCESS) {
      if (*result != NULL) {
wmmhello's avatar
wmmhello 已提交
4801 4802 4803 4804
        taosMemoryFree(*result);
        *result = NULL;
      }
      return code;
C
Cary Xu 已提交
4805 4806 4807
    } else if (currLength == 0) {
      ASSERT(!pCurrent);
      goto _downstream;
wmmhello's avatar
wmmhello 已提交
4808
    }
wmmhello's avatar
wmmhello 已提交
4809

C
Cary Xu 已提交
4810 4811
    ++(*nOptrWithVal);

C
Cary Xu 已提交
4812
    ASSERT(currLength >= 0);
wmmhello's avatar
wmmhello 已提交
4813

4814
    if (*result == NULL) {
wmmhello's avatar
wmmhello 已提交
4815
      *result = (char*)taosMemoryCalloc(1, currLength + sizeof(int32_t));
wmmhello's avatar
wmmhello 已提交
4816 4817 4818 4819 4820 4821
      if (*result == NULL) {
        taosMemoryFree(pCurrent);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      memcpy(*result + sizeof(int32_t), pCurrent, currLength);
      *(int32_t*)(*result) = currLength + sizeof(int32_t);
4822
    } else {
wmmhello's avatar
wmmhello 已提交
4823
      int32_t sizePre = *(int32_t*)(*result);
4824
      char*   tmp = (char*)taosMemoryRealloc(*result, sizePre + currLength);
wmmhello's avatar
wmmhello 已提交
4825 4826 4827 4828 4829 4830 4831 4832 4833 4834 4835 4836
      if (tmp == NULL) {
        taosMemoryFree(pCurrent);
        taosMemoryFree(*result);
        *result = NULL;
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      *result = tmp;
      memcpy(*result + sizePre, pCurrent, currLength);
      *(int32_t*)(*result) += currLength;
    }
    taosMemoryFree(pCurrent);
    *length = *(int32_t*)(*result);
wmmhello's avatar
wmmhello 已提交
4837 4838
  }

C
Cary Xu 已提交
4839
_downstream:
wmmhello's avatar
wmmhello 已提交
4840
  for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
C
Cary Xu 已提交
4841
    code = encodeOperator(ops->pDownstream[i], result, length, nOptrWithVal);
4842
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
4843
      return code;
wmmhello's avatar
wmmhello 已提交
4844 4845
    }
  }
wmmhello's avatar
wmmhello 已提交
4846
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
4847 4848
}

H
Haojun Liao 已提交
4849
int32_t decodeOperator(SOperatorInfo* ops, const char* result, int32_t length) {
wmmhello's avatar
wmmhello 已提交
4850
  int32_t code = TDB_CODE_SUCCESS;
4851 4852
  if (ops->fpSet.decodeResultRow) {
    if (result == NULL) {
wmmhello's avatar
wmmhello 已提交
4853 4854
      return TSDB_CODE_TSC_INVALID_INPUT;
    }
H
Haojun Liao 已提交
4855

4856
    ASSERT(length == *(int32_t*)result);
H
Haojun Liao 已提交
4857 4858

    const char* data = result + sizeof(int32_t);
L
Liu Jicong 已提交
4859
    code = ops->fpSet.decodeResultRow(ops, (char*)data);
4860
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
4861 4862
      return code;
    }
wmmhello's avatar
wmmhello 已提交
4863

wmmhello's avatar
wmmhello 已提交
4864
    int32_t totalLength = *(int32_t*)result;
4865 4866
    int32_t dataLength = *(int32_t*)data;

4867
    if (totalLength == dataLength + sizeof(int32_t)) {  // the last data
wmmhello's avatar
wmmhello 已提交
4868 4869
      result = NULL;
      length = 0;
4870
    } else {
wmmhello's avatar
wmmhello 已提交
4871 4872 4873 4874
      result += dataLength;
      *(int32_t*)(result) = totalLength - dataLength;
      length = totalLength - dataLength;
    }
wmmhello's avatar
wmmhello 已提交
4875 4876
  }

wmmhello's avatar
wmmhello 已提交
4877 4878
  for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
    code = decodeOperator(ops->pDownstream[i], result, length);
4879
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
4880
      return code;
wmmhello's avatar
wmmhello 已提交
4881 4882
    }
  }
wmmhello's avatar
wmmhello 已提交
4883
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
4884 4885
}

D
dapan1121 已提交
4886
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle) {
D
dapan1121 已提交
4887
  SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo;
4888

D
dapan1121 已提交
4889
  switch (pNode->type) {
D
dapan1121 已提交
4890 4891 4892 4893 4894 4895
    case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
      SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
      if (NULL == pInserterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      pInserterParam->readHandle = readHandle;
L
Liu Jicong 已提交
4896

D
dapan1121 已提交
4897 4898 4899
      *pParam = pInserterParam;
      break;
    }
D
dapan1121 已提交
4900
    case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
4901
      SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
D
dapan1121 已提交
4902 4903 4904 4905
      if (NULL == pDeleterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      int32_t tbNum = taosArrayGetSize(pTask->tableqinfoList.pTableList);
D
dapan1121 已提交
4906
      pDeleterParam->suid = pTask->tableqinfoList.suid;
D
dapan1121 已提交
4907 4908 4909 4910 4911 4912
      pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t));
      if (NULL == pDeleterParam->pUidList) {
        taosMemoryFree(pDeleterParam);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      for (int32_t i = 0; i < tbNum; ++i) {
4913
        STableKeyInfo* pTable = taosArrayGet(pTask->tableqinfoList.pTableList, i);
D
dapan1121 已提交
4914 4915 4916 4917 4918 4919 4920 4921 4922 4923 4924 4925 4926
        taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
      }

      *pParam = pDeleterParam;
      break;
    }
    default:
      break;
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
4927
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
4928
                               const char* sql, EOPTR_EXEC_MODEL model) {
H
Haojun Liao 已提交
4929 4930
  uint64_t queryId = pPlan->id.queryId;

H
Haojun Liao 已提交
4931
  int32_t code = TSDB_CODE_SUCCESS;
D
dapan1121 已提交
4932
  *pTaskInfo = createExecTaskInfo(queryId, taskId, model, pPlan->dbFName);
H
Haojun Liao 已提交
4933 4934 4935 4936
  if (*pTaskInfo == NULL) {
    code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    goto _complete;
  }
H
Haojun Liao 已提交
4937

4938
  (*pTaskInfo)->sql = sql;
4939
  (*pTaskInfo)->pSubplan = pPlan;
4940
  (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, &(*pTaskInfo)->tableqinfoList, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user);
L
Liu Jicong 已提交
4941

D
dapan1121 已提交
4942
  if (NULL == (*pTaskInfo)->pRoot) {
4943
    code = (*pTaskInfo)->code;
D
dapan1121 已提交
4944
    goto _complete;
4945 4946
  }

H
Haojun Liao 已提交
4947 4948
  return code;

H
Haojun Liao 已提交
4949
_complete:
wafwerar's avatar
wafwerar 已提交
4950
  taosMemoryFreeClear(*pTaskInfo);
H
Haojun Liao 已提交
4951 4952
  terrno = code;
  return code;
H
Haojun Liao 已提交
4953 4954
}

wmmhello's avatar
wmmhello 已提交
4955 4956 4957
static void doDestroyTableList(STableListInfo* pTableqinfoList) {
  taosArrayDestroy(pTableqinfoList->pTableList);
  taosHashCleanup(pTableqinfoList->map);
4958 4959
  if (pTableqinfoList->needSortTableByGroupId) {
    for (int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++) {
wmmhello's avatar
wmmhello 已提交
4960
      SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i);
4961 4962 4963
      if (tmp == pTableqinfoList->pTableList) {
        continue;
      }
wmmhello's avatar
wmmhello 已提交
4964 4965 4966 4967
      taosArrayDestroy(tmp);
    }
  }
  taosArrayDestroy(pTableqinfoList->pGroupList);
4968

wmmhello's avatar
wmmhello 已提交
4969 4970
  pTableqinfoList->pTableList = NULL;
  pTableqinfoList->map = NULL;
4971 4972
}

L
Liu Jicong 已提交
4973
void doDestroyTask(SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
4974 4975
  qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));

wmmhello's avatar
wmmhello 已提交
4976
  doDestroyTableList(&pTaskInfo->tableqinfoList);
H
Haojun Liao 已提交
4977
  destroyOperatorInfo(pTaskInfo->pRoot);
4978 4979 4980
  cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);

  nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
4981

wafwerar's avatar
wafwerar 已提交
4982 4983 4984
  taosMemoryFreeClear(pTaskInfo->sql);
  taosMemoryFreeClear(pTaskInfo->id.str);
  taosMemoryFreeClear(pTaskInfo);
4985 4986 4987 4988 4989 4990 4991 4992 4993 4994 4995 4996
}

static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes) {
  if (val == NULL) {
    setNull(output, type, bytes);
    return;
  }

  if (IS_VAR_DATA_TYPE(type)) {
    // Binary data overflows for sort of unknown reasons. Let trim the overflow data
    if (varDataTLen(val) > bytes) {
      int32_t maxLen = bytes - VARSTR_HEADER_SIZE;
L
Liu Jicong 已提交
4997
      int32_t len = (varDataLen(val) > maxLen) ? maxLen : varDataLen(val);
4998 4999 5000 5001 5002 5003 5004 5005 5006 5007 5008 5009
      memcpy(varDataVal(output), varDataVal(val), len);
      varDataSetLen(output, len);
    } else {
      varDataCopy(output, val);
    }
  } else {
    memcpy(output, val, bytes);
  }
}

static int64_t getQuerySupportBufSize(size_t numOfTables) {
  size_t s1 = sizeof(STableQueryInfo);
L
Liu Jicong 已提交
5010 5011
  //  size_t s3 = sizeof(STableCheckInfo);  buffer consumption in tsdb
  return (int64_t)(s1 * 1.5 * numOfTables);
5012 5013 5014 5015 5016 5017 5018
}

int32_t checkForQueryBuf(size_t numOfTables) {
  int64_t t = getQuerySupportBufSize(numOfTables);
  if (tsQueryBufferSizeBytes < 0) {
    return TSDB_CODE_SUCCESS;
  } else if (tsQueryBufferSizeBytes > 0) {
L
Liu Jicong 已提交
5019
    while (1) {
5020 5021 5022 5023 5024 5025 5026 5027 5028 5029 5030 5031 5032 5033 5034 5035 5036 5037 5038 5039 5040 5041 5042 5043 5044 5045
      int64_t s = tsQueryBufferSizeBytes;
      int64_t remain = s - t;
      if (remain >= 0) {
        if (atomic_val_compare_exchange_64(&tsQueryBufferSizeBytes, s, remain) == s) {
          return TSDB_CODE_SUCCESS;
        }
      } else {
        return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
      }
    }
  }

  // disable query processing if the value of tsQueryBufferSize is zero.
  return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
}

void releaseQueryBuf(size_t numOfTables) {
  if (tsQueryBufferSizeBytes < 0) {
    return;
  }

  int64_t t = getQuerySupportBufSize(numOfTables);

  // restore value is not enough buffer available
  atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
}
D
dapan1121 已提交
5046

dengyihao's avatar
dengyihao 已提交
5047 5048
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity,
                                   int32_t* resNum) {
D
dapan1121 已提交
5049 5050
  if (*resNum >= *capacity) {
    *capacity += 10;
dengyihao's avatar
dengyihao 已提交
5051

D
dapan1121 已提交
5052 5053
    *pRes = taosMemoryRealloc(*pRes, (*capacity) * sizeof(SExplainExecInfo));
    if (NULL == *pRes) {
D
dapan1121 已提交
5054
      qError("malloc %d failed", (*capacity) * (int32_t)sizeof(SExplainExecInfo));
D
dapan1121 已提交
5055 5056 5057 5058
      return TSDB_CODE_QRY_OUT_OF_MEMORY;
    }
  }

5059 5060 5061 5062 5063
  SExplainExecInfo* pInfo = &(*pRes)[*resNum];

  pInfo->numOfRows = operatorInfo->resultInfo.totalRows;
  pInfo->startupCost = operatorInfo->cost.openCost;
  pInfo->totalCost = operatorInfo->cost.totalCost;
D
dapan1121 已提交
5064

5065
  if (operatorInfo->fpSet.getExplainFn) {
5066
    int32_t code = operatorInfo->fpSet.getExplainFn(operatorInfo, &pInfo->verboseInfo, &pInfo->verboseLen);
D
dapan1121 已提交
5067
    if (code) {
5068
      qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
D
dapan1121 已提交
5069 5070
      return code;
    }
5071 5072 5073
  } else {
    pInfo->verboseLen = 0;
    pInfo->verboseInfo = NULL;
D
dapan1121 已提交
5074
  }
dengyihao's avatar
dengyihao 已提交
5075

D
dapan1121 已提交
5076
  ++(*resNum);
dengyihao's avatar
dengyihao 已提交
5077

D
dapan1121 已提交
5078
  int32_t code = 0;
D
dapan1121 已提交
5079 5080
  for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
    code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pRes, capacity, resNum);
D
dapan1121 已提交
5081 5082 5083 5084 5085 5086 5087
    if (code) {
      taosMemoryFreeClear(*pRes);
      return TSDB_CODE_QRY_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
5088
}
5
54liuyao 已提交
5089

L
Liu Jicong 已提交
5090
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput,
5091
                               int32_t size) {
5092
  pSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
5
54liuyao 已提交
5093 5094
  pSup->keySize = sizeof(int64_t) + sizeof(TSKEY);
  pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize);
5095 5096
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  pSup->pResultRows = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
5
54liuyao 已提交
5097 5098 5099
  if (pSup->pKeyBuf == NULL || pSup->pResultRows == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
5100
  pSup->valueSize = size;
5
54liuyao 已提交
5101

5
54liuyao 已提交
5102 5103
  pSup->pScanWindow = taosArrayInit(4, sizeof(STimeWindow));

5
54liuyao 已提交
5104 5105 5106 5107 5108 5109 5110 5111 5112
  int32_t pageSize = 4096;
  while (pageSize < pSup->resultRowSize * 4) {
    pageSize <<= 1u;
  }
  // at least four pages need to be in buffer
  int32_t bufSize = 4096 * 256;
  if (bufSize <= pageSize) {
    bufSize = pageSize * 4;
  }
5113
  int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, TD_TMP_DIR_PATH);
L
Liu Jicong 已提交
5114
  for (int32_t i = 0; i < numOfOutput; ++i) {
5115 5116 5117
    pCtx[i].pBuf = pSup->pResultBuf;
  }
  return code;
5
54liuyao 已提交
5118
}