executorimpl.c 181.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

H
Haojun Liao 已提交
16 17
#include "filter.h"
#include "function.h"
18 19
#include "functionMgt.h"
#include "os.h"
H
Haojun Liao 已提交
20
#include "querynodes.h"
21
#include "tfill.h"
dengyihao's avatar
dengyihao 已提交
22
#include "tname.h"
X
Xiaoyu Wang 已提交
23
#include "tref.h"
24

H
Haojun Liao 已提交
25
#include "tdatablock.h"
26
#include "tglobal.h"
H
Haojun Liao 已提交
27
#include "tmsg.h"
H
Haojun Liao 已提交
28
#include "tsort.h"
29
#include "ttime.h"
H
Haojun Liao 已提交
30

31
#include "executorimpl.h"
dengyihao's avatar
dengyihao 已提交
32
#include "index.h"
33
#include "query.h"
34 35
#include "tcompare.h"
#include "tcompression.h"
H
Haojun Liao 已提交
36
#include "thash.h"
37
#include "ttypes.h"
dengyihao's avatar
dengyihao 已提交
38
#include "vnode.h"
39

H
Haojun Liao 已提交
40
#define IS_MAIN_SCAN(runtime)          ((runtime)->scanFlag == MAIN_SCAN)
41 42 43 44
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)

#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)

45 46 47 48 49
enum {
  PROJECT_RETRIEVE_CONTINUE = 0x1,
  PROJECT_RETRIEVE_DONE = 0x2,
};

50 51
#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
wafwerar's avatar
wafwerar 已提交
52
  uint32_t v = taosRand();
53 54 55 56

  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
57
    return taosMemoryMalloc(__size);
58 59 60 61
  }
}

static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
wafwerar's avatar
wafwerar 已提交
62
  uint32_t v = taosRand();
63 64 65
  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
66
    return taosMemoryCalloc(num, __size);
67 68 69 70
  }
}

static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
wafwerar's avatar
wafwerar 已提交
71
  uint32_t v = taosRand();
72 73 74
  if (v % 5 <= 1) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
75
    return taosMemoryRealloc(p, __size);
76 77 78 79 80 81 82 83
  }
}

#define calloc  u_calloc
#define malloc  u_malloc
#define realloc u_realloc
#endif

X
Xiaoyu Wang 已提交
84
#define CLEAR_QUERY_STATUS(q, st)   ((q)->status &= (~(st)))
85 86
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)

L
Liu Jicong 已提交
87 88 89
int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; }

static int32_t getExprFunctionId(SExprInfo* pExprInfo) {
90
  assert(pExprInfo != NULL && pExprInfo->pExpr != NULL && pExprInfo->pExpr->nodeType == TEXPR_UNARYEXPR_NODE);
91
  return 0;
92 93 94 95
}

static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes);

96
static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pSDataBlock);
97

X
Xiaoyu Wang 已提交
98
static void releaseQueryBuf(size_t numOfTables);
99 100 101 102 103

static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput);
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
X
Xiaoyu Wang 已提交
104

H
Haojun Liao 已提交
105
static void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput);
H
Haojun Liao 已提交
106 107
static void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput);

108 109
static void destroyOperatorInfo(SOperatorInfo* pOperator);

110
void doSetOperatorCompleted(SOperatorInfo* pOperator) {
111
  pOperator->status = OP_EXEC_DONE;
112

113
  pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000) / 1000.0;
H
Haojun Liao 已提交
114
  if (pOperator->pTaskInfo != NULL) {
115
    setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
116 117
  }
}
118

H
Haojun Liao 已提交
119
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) {
120
  OPTR_SET_OPENED(pOperator);
121
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
122
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
123 124
}

125
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
L
Liu Jicong 已提交
126
                                   __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode,
127
                                   __optr_decode_fn_t decode, __optr_explain_fn_t explain) {
128 129 130 131 132 133 134 135 136 137 138 139 140 141
  SOperatorFpSet fpSet = {
      ._openFn = openFn,
      .getNextFn = nextFn,
      .getStreamResFn = streamFn,
      .cleanupFn = cleanup,
      .closeFn = closeFn,
      .encodeResultRow = encode,
      .decodeResultRow = decode,
      .getExplainFn = explain,
  };

  return fpSet;
}

H
Haojun Liao 已提交
142
void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
H
Haojun Liao 已提交
143

X
Xiaoyu Wang 已提交
144 145 146
static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo,
                                  SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, const int32_t* rowCellOffset,
                                  SqlFunctionCtx* pCtx, int32_t numOfExprs);
H
Haojun Liao 已提交
147

148
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
L
Liu Jicong 已提交
149 150
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput,
                                     uint64_t groupId);
151

L
Liu Jicong 已提交
152 153
// setup the output buffer for each operator
static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) {
dengyihao's avatar
dengyihao 已提交
154 155
  if (TSDB_COL_IS_TAG(pColumn->flag) || TSDB_COL_IS_UD_COL(pColumn->flag) ||
      pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
156 157 158 159 160 161 162 163 164 165
    return false;
  }

  if (pStatis != NULL && pStatis->numOfNull == 0) {
    return false;
  }

  return true;
}

166
#if 0
L
Liu Jicong 已提交
167 168
static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, char* pData,
                                int16_t bytes, bool masterscan, uint64_t uid) {
169 170 171
  bool existed = false;
  SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid);

L
Liu Jicong 已提交
172 173
  SResultRow** p1 =
      (SResultRow**)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
174 175 176 177 178 179 180 181 182 183 184

  // in case of repeat scan/reverse scan, no new time window added.
  if (QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQueryAttr)) {
    if (!masterscan) {  // the *p1 may be NULL in case of sliding+offset exists.
      return p1 != NULL;
    }

    if (p1 != NULL) {
      if (pResultRowInfo->size == 0) {
        existed = false;
      } else if (pResultRowInfo->size == 1) {
dengyihao's avatar
dengyihao 已提交
185
        //        existed = (pResultRowInfo->pResult[0] == (*p1));
186 187
      } else {  // check if current pResultRowInfo contains the existed pResultRow
        SET_RES_EXT_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid, pResultRowInfo);
L
Liu Jicong 已提交
188 189
        int64_t* index =
            taosHashGet(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes));
190 191 192 193 194 195 196 197 198 199 200 201 202
        if (index != NULL) {
          existed = true;
        } else {
          existed = false;
        }
      }
    }

    return existed;
  }

  return p1 != NULL;
}
203
#endif
204

205
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize) {
L
Liu Jicong 已提交
206
  SFilePage* pData = NULL;
207 208 209 210 211 212 213 214 215 216 217 218 219

  // in the first scan, new space needed for results
  int32_t pageId = -1;
  SIDList list = getDataBufPagesIdList(pResultBuf, tableGroupId);

  if (taosArrayGetSize(list) == 0) {
    pData = getNewBufPage(pResultBuf, tableGroupId, &pageId);
    pData->num = sizeof(SFilePage);
  } else {
    SPageInfo* pi = getLastPageInfo(list);
    pData = getBufPage(pResultBuf, getPageId(pi));
    pageId = getPageId(pi);

wmmhello's avatar
wmmhello 已提交
220
    if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
221 222 223 224 225 226 227 228 229 230 231 232 233 234
      // release current page first, and prepare the next one
      releaseBufPageInfo(pResultBuf, pi);

      pData = getNewBufPage(pResultBuf, tableGroupId, &pageId);
      if (pData != NULL) {
        pData->num = sizeof(SFilePage);
      }
    }
  }

  if (pData == NULL) {
    return NULL;
  }

235 236
  setBufPageDirty(pData, true);

237 238 239 240 241
  // set the number of rows in current disk page
  SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
  pResultRow->pageId = pageId;
  pResultRow->offset = (int32_t)pData->num;

wmmhello's avatar
wmmhello 已提交
242
  pData->num += interBufSize;
243 244 245 246

  return pResultRow;
}

247 248 249 250 251 252 253
/**
 * the struct of key in hash table
 * +----------+---------------+
 * | group id |   key data    |
 * | 8 bytes  | actual length |
 * +----------+---------------+
 */
254 255 256
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
                                   int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
                                   bool isIntervalQuery, SAggSupporter* pSup) {
257
  SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
H
Haojun Liao 已提交
258

dengyihao's avatar
dengyihao 已提交
259 260
  SResultRowPosition* p1 =
      (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
H
Haojun Liao 已提交
261

262 263
  SResultRow* pResult = NULL;

H
Haojun Liao 已提交
264 265
  // in case of repeat scan/reverse scan, no new time window added.
  if (isIntervalQuery) {
266 267
    if (masterscan && p1 != NULL) {  // the *p1 may be NULL in case of sliding+offset exists.
      pResult = getResultRowByPos(pResultBuf, p1);
268
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
269 270
    }
  } else {
dengyihao's avatar
dengyihao 已提交
271 272
    // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
    // pResultRowInfo object.
H
Haojun Liao 已提交
273
    if (p1 != NULL) {
274
      // todo
275
      pResult = getResultRowByPos(pResultBuf, p1);
276
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
277 278 279
    }
  }

L
Liu Jicong 已提交
280
  // 1. close current opened time window
281
  if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
282
#ifdef BUF_PAGE_DEBUG
wmmhello's avatar
wmmhello 已提交
283
    qDebug("page_1");
284
#endif
285
    SResultRowPosition pos = pResultRowInfo->cur;
X
Xiaoyu Wang 已提交
286
    SFilePage*         pPage = getBufPage(pResultBuf, pos.pageId);
287 288 289 290 291
    releaseBufPage(pResultBuf, pPage);
  }

  // allocate a new buffer page
  if (pResult == NULL) {
292
#ifdef BUF_PAGE_DEBUG
wmmhello's avatar
wmmhello 已提交
293
    qDebug("page_2");
294
#endif
H
Haojun Liao 已提交
295
    ASSERT(pSup->resultRowSize > 0);
296 297
    pResult = getNewResultRow(pResultBuf, groupId, pSup->resultRowSize);

298
    initResultRow(pResult);
H
Haojun Liao 已提交
299

300 301
    // add a new result set for a new group
    SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
X
Xiaoyu Wang 已提交
302 303
    taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
                sizeof(SResultRowPosition));
H
Haojun Liao 已提交
304 305
  }

306 307 308
  // 2. set the new time window to be the new active time window
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};

H
Haojun Liao 已提交
309
  // too many time window in query
310
  if (taosHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
H
Haojun Liao 已提交
311 312 313
    longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
  }

H
Haojun Liao 已提交
314
  return pResult;
H
Haojun Liao 已提交
315 316
}

317
// a new buffer page for each table. Needs to opt this design
L
Liu Jicong 已提交
318
static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, int32_t tid, uint32_t size) {
319 320 321 322
  if (pWindowRes->pageId != -1) {
    return 0;
  }

L
Liu Jicong 已提交
323
  SFilePage* pData = NULL;
324 325 326 327 328 329

  // in the first scan, new space needed for results
  int32_t pageId = -1;
  SIDList list = getDataBufPagesIdList(pResultBuf, tid);

  if (taosArrayGetSize(list) == 0) {
H
Haojun Liao 已提交
330
    pData = getNewBufPage(pResultBuf, tid, &pageId);
331
    pData->num = sizeof(SFilePage);
332 333
  } else {
    SPageInfo* pi = getLastPageInfo(list);
334
    pData = getBufPage(pResultBuf, getPageId(pi));
335
    pageId = getPageId(pi);
336

337
    if (pData->num + size > getBufPageSize(pResultBuf)) {
338
      // release current page first, and prepare the next one
339
      releaseBufPageInfo(pResultBuf, pi);
340

H
Haojun Liao 已提交
341
      pData = getNewBufPage(pResultBuf, tid, &pageId);
342
      if (pData != NULL) {
343
        pData->num = sizeof(SFilePage);
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
      }
    }
  }

  if (pData == NULL) {
    return -1;
  }

  // set the number of rows in current disk page
  if (pWindowRes->pageId == -1) {  // not allocated yet, allocate new buffer
    pWindowRes->pageId = pageId;
    pWindowRes->offset = (int32_t)pData->num;

    pData->num += size;
    assert(pWindowRes->pageId >= 0);
  }

  return 0;
}

364
//  query_range_start, query_range_end, window_duration, window_start, window_end
365
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
366 367 368
  pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
  pColData->info.bytes = sizeof(int64_t);

369
  colInfoDataEnsureCapacity(pColData, 5);
370 371 372 373 374 375 376 377 378
  colDataAppendInt64(pColData, 0, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 1, &pQueryWindow->ekey);

  int64_t interval = 0;
  colDataAppendInt64(pColData, 2, &interval);  // this value may be variable in case of 'n' and 'y'.
  colDataAppendInt64(pColData, 3, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
}

H
Haojun Liao 已提交
379 380 381 382
void cleanupExecTimeWindowInfo(SColumnInfoData* pColData) {
  colDataDestroy(pColData);
}

X
Xiaoyu Wang 已提交
383 384 385
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin,
                      SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol,
                      int32_t numOfTotal, int32_t numOfOutput, int32_t order) {
386
  for (int32_t k = 0; k < numOfOutput; ++k) {
H
Haojun Liao 已提交
387
    // keep it temporarily
388
    // todo no need this??
dengyihao's avatar
dengyihao 已提交
389 390
    bool    hasAgg = pCtx[k].input.colDataAggIsSet;
    int32_t numOfRows = pCtx[k].input.numOfRows;
H
Haojun Liao 已提交
391
    int32_t startOffset = pCtx[k].input.startRowIndex;
392

393
    pCtx[k].input.startRowIndex = offset;
394
    pCtx[k].input.numOfRows = forwardStep;
395 396 397

    // not a whole block involved in query processing, statistics data can not be used
    // NOTE: the original value of isSet have been changed here
398 399
    if (pCtx[k].input.colDataAggIsSet && forwardStep < numOfTotal) {
      pCtx[k].input.colDataAggIsSet = false;
400 401
    }

402 403
    if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) {
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
404 405

      char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
406

407
      SColumnInfoData idata = {0};
dengyihao's avatar
dengyihao 已提交
408
      idata.info.type = TSDB_DATA_TYPE_BIGINT;
409
      idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
dengyihao's avatar
dengyihao 已提交
410
      idata.pData = p;
411 412 413 414

      SScalarParam out = {.columnData = &idata};
      SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
      pCtx[k].sfp.process(&tw, 1, &out);
415
      pEntryInfo->numOfRes = 1;
416 417 418 419 420 421 422 423 424 425
    } else {
      int32_t code = TSDB_CODE_SUCCESS;
      if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
        code = pCtx[k].fpSet.process(&pCtx[k]);

        if (code != TSDB_CODE_SUCCESS) {
          qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
          taskInfo->code = code;
          longjmp(taskInfo->env, code);
        }
426
      }
427

428 429 430 431 432
      // restore it
      pCtx[k].input.colDataAggIsSet = hasAgg;
      pCtx[k].input.startRowIndex = startOffset;
      pCtx[k].input.numOfRows = numOfRows;
    }
433 434 435
  }
}

dengyihao's avatar
dengyihao 已提交
436
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
437
                                   int32_t scanFlag, bool createDummyCol);
438

dengyihao's avatar
dengyihao 已提交
439 440
static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock,
                                    int32_t order) {
441
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
442
    pCtx[i].order = order;
443
    pCtx[i].input.numOfRows = pBlock->info.rows;
444
    setBlockStatisInfo(&pCtx[i], &pOperator->exprSupp.pExprInfo[i], pBlock);
445 446 447
  }
}

X
Xiaoyu Wang 已提交
448 449
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
                       int32_t scanFlag, bool createDummyCol) {
450
  if (pBlock->pBlockAgg != NULL) {
H
Haojun Liao 已提交
451
    doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
452
  } else {
453
    doSetInputDataBlock(pOperator, pCtx, pBlock, order, scanFlag, createDummyCol);
H
Haojun Liao 已提交
454
  }
455 456
}

L
Liu Jicong 已提交
457 458
static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t paramIndex,
                                             int32_t numOfRows) {
459 460 461 462 463 464 465 466
  SColumnInfoData* pColInfo = NULL;
  if (pInput->pData[paramIndex] == NULL) {
    pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pColInfo == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    // Set the correct column info (data type and bytes)
467 468
    pColInfo->info.type = pFuncParam->param.nType;
    pColInfo->info.bytes = pFuncParam->param.nLen;
469 470

    pInput->pData[paramIndex] = pColInfo;
471 472
  } else {
    pColInfo = pInput->pData[paramIndex];
473 474
  }

475
  colInfoDataEnsureCapacity(pColInfo, numOfRows);
476

477
  int8_t type = pFuncParam->param.nType;
478 479
  if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
    int64_t v = pFuncParam->param.i;
dengyihao's avatar
dengyihao 已提交
480
    for (int32_t i = 0; i < numOfRows; ++i) {
481 482 483 484
      colDataAppendInt64(pColInfo, i, &v);
    }
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
dengyihao's avatar
dengyihao 已提交
485
    for (int32_t i = 0; i < numOfRows; ++i) {
486 487
      colDataAppendDouble(pColInfo, i, &v);
    }
488
  } else if (type == TSDB_DATA_TYPE_VARCHAR) {
L
Liu Jicong 已提交
489
    char* tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE);
490
    STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen);
L
Liu Jicong 已提交
491
    for (int32_t i = 0; i < numOfRows; ++i) {
492 493
      colDataAppend(pColInfo, i, tmp, false);
    }
494 495 496 497 498
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
499
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
X
Xiaoyu Wang 已提交
500
                                   int32_t scanFlag, bool createDummyCol) {
501 502
  int32_t code = TSDB_CODE_SUCCESS;

503
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
L
Liu Jicong 已提交
504
    pCtx[i].order = order;
505 506
    pCtx[i].input.numOfRows = pBlock->info.rows;

L
Liu Jicong 已提交
507
    pCtx[i].pSrcBlock = pBlock;
X
Xiaoyu Wang 已提交
508
    pCtx[i].scanFlag = scanFlag;
H
Haojun Liao 已提交
509

510
    SInputColumnInfoData* pInput = &pCtx[i].input;
511
    pInput->uid = pBlock->info.uid;
C
Cary Xu 已提交
512
    pInput->colDataAggIsSet = false;
513

514
    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
515
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
dengyihao's avatar
dengyihao 已提交
516
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
G
Ganlin Zhao 已提交
517 518
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
dengyihao's avatar
dengyihao 已提交
519
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
520 521 522
        pInput->totalRows = pBlock->info.rows;
        pInput->numOfRows = pBlock->info.rows;
        pInput->startRowIndex = 0;
523

524
        // NOTE: the last parameter is the primary timestamp column
H
Haojun Liao 已提交
525 526 527 528
        // todo: refactor this
        if (fmIsTimelineFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
          pInput->pPTS = pInput->pData[j];   // in case of merge function, this is not always the ts column data.
//          ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP);
529
        }
530 531
        ASSERT(pInput->pData[j] != NULL);
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
532 533 534
        // todo avoid case: top(k, 12), 12 is the value parameter.
        // sum(11), 11 is also the value parameter.
        if (createDummyCol && pOneExpr->base.numOfParams == 1) {
535 536 537 538
          pInput->totalRows = pBlock->info.rows;
          pInput->numOfRows = pBlock->info.rows;
          pInput->startRowIndex = 0;

539
          code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows);
540 541 542
          if (code != TSDB_CODE_SUCCESS) {
            return code;
          }
543
        }
G
Ganlin Zhao 已提交
544 545
      }
    }
H
Haojun Liao 已提交
546
  }
547 548

  return code;
H
Haojun Liao 已提交
549 550
}

551
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
552
  for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
H
Haojun Liao 已提交
553
    if (functionNeedToExecute(&pCtx[k])) {
554
      // todo add a dummy funtion to avoid process check
555 556 557
      if (pCtx[k].fpSet.process == NULL) {
        continue;
      }
H
Haojun Liao 已提交
558

559 560 561 562
      int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
      if (code != TSDB_CODE_SUCCESS) {
        qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
        return code;
563
      }
564 565
    }
  }
566 567

  return TSDB_CODE_SUCCESS;
568 569
}

H
Haojun Liao 已提交
570
static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) {
dengyihao's avatar
dengyihao 已提交
571
  size_t num = (pPseudoList != NULL) ? taosArrayGetSize(pPseudoList) : 0;
H
Haojun Liao 已提交
572 573 574 575 576
  for (int32_t i = 0; i < num; ++i) {
    pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i);
  }
}

577
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
X
Xiaoyu Wang 已提交
578
                              int32_t numOfOutput, SArray* pPseudoList) {
H
Haojun Liao 已提交
579
  setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
H
Haojun Liao 已提交
580
  pResult->info.groupId = pSrcBlock->info.groupId;
H
Haojun Liao 已提交
581

582 583
  // if the source equals to the destination, it is to create a new column as the result of scalar
  // function or some operators.
584 585
  bool createNewColModel = (pResult == pSrcBlock);

586 587
  int32_t numOfRows = 0;

588
  for (int32_t k = 0; k < numOfOutput; ++k) {
589 590
    int32_t               outputSlotId = pExpr[k].base.resSchema.slotId;
    SqlFunctionCtx*       pfCtx = &pCtx[k];
591
    SInputColumnInfoData* pInputData = &pfCtx->input;
592

L
Liu Jicong 已提交
593
    if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) {  // it is a project query
594
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
595
      if (pResult->info.rows > 0 && !createNewColModel) {
596 597
        colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pInputData->pData[0],
                        pInputData->numOfRows);
598
      } else {
599
        colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info);
600
      }
601

602
      numOfRows = pInputData->numOfRows;
603
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
604
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
605

dengyihao's avatar
dengyihao 已提交
606
      int32_t offset = createNewColModel ? 0 : pResult->info.rows;
607 608 609 610 611 612 613 614

      int32_t type = pExpr[k].base.pParam[0].param.nType;
      if (TSDB_DATA_TYPE_NULL == type) {
        colDataAppendNNULL(pColInfoData, offset, pSrcBlock->info.rows);
      } else {
        for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
          colDataAppend(pColInfoData, i + offset, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false);
        }
615
      }
616 617

      numOfRows = pSrcBlock->info.rows;
H
Haojun Liao 已提交
618
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) {
619 620 621
      SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
      taosArrayPush(pBlockList, &pSrcBlock);

622
      SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
623
      SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
624

625
      SScalarParam dest = {.columnData = &idata};
X
Xiaoyu Wang 已提交
626
      int32_t      code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
627 628 629 630
      if (code != TSDB_CODE_SUCCESS) {
        taosArrayDestroy(pBlockList);
        return code;
      }
631

dengyihao's avatar
dengyihao 已提交
632
      int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
633
      ASSERT(pResult->info.capacity > 0);
634
      colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
D
dapan1121 已提交
635 636
      colDataDestroy(&idata);
      
637
      numOfRows = dest.numOfRows;
638 639
      taosArrayDestroy(pBlockList);
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
640 641
      // _rowts/_c0, not tbname column
      if (fmIsPseudoColumnFunc(pfCtx->functionId) && (!fmIsScanPseudoColumnFunc(pfCtx->functionId))) {
H
Haojun Liao 已提交
642
        // do nothing
643
      } else if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) {
644 645
        SResultRowEntryInfo* pResInfo = GET_RES_INFO(pfCtx);
        pfCtx->fpSet.init(pfCtx, pResInfo);
646 647 648 649 650 651 652 653 654 655 656

        pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
        pfCtx->offset = createNewColModel ? 0 : pResult->info.rows;  // set the start offset

        // set the timestamp(_rowts) output buffer
        if (taosArrayGetSize(pPseudoList) > 0) {
          int32_t* outputColIndex = taosArrayGet(pPseudoList, 0);
          pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
        }

        numOfRows = pfCtx->fpSet.process(pfCtx);
H
Haojun Liao 已提交
657
      } else if (fmIsAggFunc(pfCtx->functionId)) {
658 659
        // _group_key function for "partition by tbname" + csum(col_name) query
        SColumnInfoData* pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
660
        int32_t          slotId = pfCtx->param[0].pCol->slotId;
661 662 663

        // todo handle the json tag
        SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
664
        for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
665 666 667 668 669 670 671 672 673
          bool isNull = colDataIsNull_s(pInput, f);
          if (isNull) {
            colDataAppendNULL(pOutput, pResult->info.rows + f);
          } else {
            char* data = colDataGetData(pInput, f);
            colDataAppend(pOutput, pResult->info.rows + f, data, isNull);
          }
        }

H
Haojun Liao 已提交
674 675 676
      } else {
        SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
        taosArrayPush(pBlockList, &pSrcBlock);
G
Ganlin Zhao 已提交
677

678
        SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
679
        SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
H
Haojun Liao 已提交
680

681
        SScalarParam dest = {.columnData = &idata};
X
Xiaoyu Wang 已提交
682
        int32_t      code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
683 684 685 686
        if (code != TSDB_CODE_SUCCESS) {
          taosArrayDestroy(pBlockList);
          return code;
        }
687

dengyihao's avatar
dengyihao 已提交
688
        int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
689
        ASSERT(pResult->info.capacity > 0);
690
        colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
D
dapan1121 已提交
691
        colDataDestroy(&idata);
692 693

        numOfRows = dest.numOfRows;
H
Haojun Liao 已提交
694 695
        taosArrayDestroy(pBlockList);
      }
696
    } else {
697
      ASSERT(0);
698 699
    }
  }
700

701 702 703
  if (!createNewColModel) {
    pResult->info.rows += numOfRows;
  }
704 705

  return TSDB_CODE_SUCCESS;
706 707
}

5
54liuyao 已提交
708
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
709
  struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
710

711 712 713 714 715
  // in case of timestamp column, always generated results.
  int32_t functionId = pCtx->functionId;
  if (functionId == -1) {
    return false;
  }
716

717 718
  if (pCtx->scanFlag == REPEAT_SCAN) {
    return fmIsRepeatScanFunc(pCtx->functionId);
719 720
  }

721 722
  if (isRowEntryCompleted(pResInfo)) {
    return false;
723 724
  }

725 726 727
  return true;
}

728 729 730 731 732 733 734
static int32_t doCreateConstantValColumnAggInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type,
                                                int32_t paramIndex, int32_t numOfRows) {
  if (pInput->pData[paramIndex] == NULL) {
    pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pInput->pData[paramIndex] == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
735

736 737 738
    // Set the correct column info (data type and bytes)
    pInput->pData[paramIndex]->info.type = type;
    pInput->pData[paramIndex]->info.bytes = tDataTypes[type].bytes;
739
  }
H
Haojun Liao 已提交
740

741 742 743 744 745 746
  SColumnDataAgg* da = NULL;
  if (pInput->pColumnDataAgg[paramIndex] == NULL) {
    da = taosMemoryCalloc(1, sizeof(SColumnDataAgg));
    pInput->pColumnDataAgg[paramIndex] = da;
    if (da == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
747 748
    }
  } else {
749
    da = pInput->pColumnDataAgg[paramIndex];
750 751
  }

752
  ASSERT(!IS_VAR_DATA_TYPE(type));
753

754 755
  if (type == TSDB_DATA_TYPE_BIGINT) {
    int64_t v = pFuncParam->param.i;
756
    *da = (SColumnDataAgg){.numOfNull = 0, .min = v, .max = v, .sum = v * numOfRows};
757 758
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
759
    *da = (SColumnDataAgg){.numOfNull = 0};
760

761 762 763 764 765 766
    *(double*)&da->min = v;
    *(double*)&da->max = v;
    *(double*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_BOOL) {  // todo validate this data type
    bool v = pFuncParam->param.i;

767
    *da = (SColumnDataAgg){.numOfNull = 0};
768 769 770 771 772
    *(bool*)&da->min = 0;
    *(bool*)&da->max = v;
    *(bool*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_TIMESTAMP) {
    // do nothing
773
  } else {
774
    ASSERT(0);
775 776
  }

777 778
  return TSDB_CODE_SUCCESS;
}
779 780 781 782 783 784 785 786 787 788 789

void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock) {
  int32_t numOfRows = pBlock->info.rows;

  SInputColumnInfoData* pInput = &pCtx->input;
  pInput->numOfRows = numOfRows;
  pInput->totalRows = numOfRows;

  if (pBlock->pBlockAgg != NULL) {
    pInput->colDataAggIsSet = true;

790 791
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
792

793 794
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
795 796 797 798
        pInput->pColumnDataAgg[j] = pBlock->pBlockAgg[slotId];
        if (pInput->pColumnDataAgg[j] == NULL) {
          pInput->colDataAggIsSet = false;
        }
799 800 801 802

        // Here we set the column info data since the data type for each column data is required, but
        // the data in the corresponding SColumnInfoData will not be used.
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
803 804
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
        doCreateConstantValColumnAggInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
805 806
      }
    }
807
  } else {
808
    pInput->colDataAggIsSet = false;
809 810 811
  }

  // set the statistics data for primary time stamp column
812 813 814 815 816
  //  if (pCtx->functionId == FUNCTION_SPREAD && pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
  //    pCtx->isAggSet = true;
  //    pCtx->agg.min = pBlock->info.window.skey;
  //    pCtx->agg.max = pBlock->info.window.ekey;
  //  }
817 818
}

L
Liu Jicong 已提交
819
bool isTaskKilled(SExecTaskInfo* pTaskInfo) {
820 821
  // query has been executed more than tsShellActivityTimer, and the retrieve has not arrived
  // abort current query execution.
L
Liu Jicong 已提交
822 823
  if (pTaskInfo->owner != 0 &&
      ((taosGetTimestampSec() - pTaskInfo->cost.start / 1000) > 10 * getMaximumIdleDurationSec())
824 825
      /*(!needBuildResAfterQueryComplete(pTaskInfo))*/) {
    assert(pTaskInfo->cost.start != 0);
L
Liu Jicong 已提交
826 827 828
    //    qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64
    //           ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec());
    //    return true;
829 830 831 832 833
  }

  return false;
}

L
Liu Jicong 已提交
834
void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED; }
835 836

/////////////////////////////////////////////////////////////////////////////////////////////
837 838 839
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) {
  STimeWindow win  = {0};
  win.skey = taosTimeTruncate(key, pInterval, precision);
840 841

  /*
H
Haojun Liao 已提交
842
   * if the realSkey > INT64_MAX - pInterval->interval, the query duration between
843 844
   * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
   */
845 846 847
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
  if (win.ekey < win.skey) {
    win.ekey = INT64_MAX;
848
  }
849 850

  return win;
851 852
}

853
#if 0
L
Liu Jicong 已提交
854
static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
855

856 857 858
  bool hasFirstLastFunc = false;
  bool hasOtherFunc = false;

859
  if (status == BLK_DATA_DATA_LOAD || status == BLK_DATA_FILTEROUT) {
860 861 862 863 864
    return status;
  }

  for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
    int32_t functionId = getExprFunctionId(&pQuery->pExpr1[i]);
865

866 867 868 869 870 871 872 873 874 875
    if (functionId == FUNCTION_TS || functionId == FUNCTION_TS_DUMMY || functionId == FUNCTION_TAG ||
        functionId == FUNCTION_TAG_DUMMY) {
      continue;
    }

    if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_LAST_DST) {
      hasFirstLastFunc = true;
    } else {
      hasOtherFunc = true;
    }
876

877 878
  }

879
  if (hasFirstLastFunc && status == BLK_DATA_NOT_LOAD) {
L
Liu Jicong 已提交
880
    if (!hasOtherFunc) {
881
      return BLK_DATA_FILTEROUT;
882
    } else {
883
      return BLK_DATA_DATA_LOAD;
884 885 886 887 888 889
    }
  }

  return status;
}

890 891
#endif

L
Liu Jicong 已提交
892 893
// static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableReq* pQueryMsg, bool stableQuery) {
//   STaskAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
H
Haojun Liao 已提交
894
//
L
Liu Jicong 已提交
895 896 897 898
//   // in case of point-interpolation query, use asc order scan
//   char msg[] = "QInfo:0x%"PRIx64" scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%"
//   PRId64
//                "-%" PRId64 ", new qrange:%" PRId64 "-%" PRId64;
H
Haojun Liao 已提交
899
//
L
Liu Jicong 已提交
900 901 902 903 904
//   // todo handle the case the the order irrelevant query type mixed up with order critical query type
//   // descending order query for last_row query
//   if (isFirstLastRowQuery(pQueryAttr)) {
//     //qDebug("QInfo:0x%"PRIx64" scan order changed for last_row query, old:%d, new:%d", pQInfo->qId,
//     pQueryAttr->order.order, TSDB_ORDER_ASC);
H
Haojun Liao 已提交
905
//
L
Liu Jicong 已提交
906 907
//     pQueryAttr->order.order = TSDB_ORDER_ASC;
//     if (pQueryAttr->window.skey > pQueryAttr->window.ekey) {
wafwerar's avatar
wafwerar 已提交
908
//       TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
L
Liu Jicong 已提交
909
//     }
H
Haojun Liao 已提交
910
//
L
Liu Jicong 已提交
911 912 913
//     pQueryAttr->needReverseScan = false;
//     return;
//   }
H
Haojun Liao 已提交
914
//
L
Liu Jicong 已提交
915 916 917
//   if (pQueryAttr->groupbyColumn && pQueryAttr->order.order == TSDB_ORDER_DESC) {
//     pQueryAttr->order.order = TSDB_ORDER_ASC;
//     if (pQueryAttr->window.skey > pQueryAttr->window.ekey) {
wafwerar's avatar
wafwerar 已提交
918
//       TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
L
Liu Jicong 已提交
919
//     }
H
Haojun Liao 已提交
920
//
L
Liu Jicong 已提交
921 922 923 924
//     pQueryAttr->needReverseScan = false;
//     doUpdateLastKey(pQueryAttr);
//     return;
//   }
H
Haojun Liao 已提交
925
//
L
Liu Jicong 已提交
926 927 928 929 930 931
//   if (pQueryAttr->pointInterpQuery && pQueryAttr->interval.interval == 0) {
//     if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
//       //qDebug(msg, pQInfo->qId, "interp", pQueryAttr->order.order, TSDB_ORDER_ASC, pQueryAttr->window.skey,
//       pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey); TSWAP(pQueryAttr->window.skey,
//       pQueryAttr->window.ekey, TSKEY);
//     }
H
Haojun Liao 已提交
932
//
L
Liu Jicong 已提交
933 934 935
//     pQueryAttr->order.order = TSDB_ORDER_ASC;
//     return;
//   }
H
Haojun Liao 已提交
936
//
L
Liu Jicong 已提交
937 938 939 940
//   if (pQueryAttr->interval.interval == 0) {
//     if (onlyFirstQuery(pQueryAttr)) {
//       if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
//         //qDebug(msg, pQInfo->qId, "only-first", pQueryAttr->order.order, TSDB_ORDER_ASC, pQueryAttr->window.skey,
H
Haojun Liao 已提交
941 942
////               pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
//
wafwerar's avatar
wafwerar 已提交
943
//        TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
H
Haojun Liao 已提交
944 945 946 947 948 949 950 951 952 953
//        doUpdateLastKey(pQueryAttr);
//      }
//
//      pQueryAttr->order.order = TSDB_ORDER_ASC;
//      pQueryAttr->needReverseScan = false;
//    } else if (onlyLastQuery(pQueryAttr) && notContainSessionOrStateWindow(pQueryAttr)) {
//      if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//        //qDebug(msg, pQInfo->qId, "only-last", pQueryAttr->order.order, TSDB_ORDER_DESC, pQueryAttr->window.skey,
////               pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
//
wafwerar's avatar
wafwerar 已提交
954
//        TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
H
Haojun Liao 已提交
955 956 957 958 959 960 961 962 963 964 965 966
//        doUpdateLastKey(pQueryAttr);
//      }
//
//      pQueryAttr->order.order = TSDB_ORDER_DESC;
//      pQueryAttr->needReverseScan = false;
//    }
//
//  } else {  // interval query
//    if (stableQuery) {
//      if (onlyFirstQuery(pQueryAttr)) {
//        if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
//          //qDebug(msg, pQInfo->qId, "only-first stable", pQueryAttr->order.order, TSDB_ORDER_ASC,
L
Liu Jicong 已提交
967 968
////                 pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey,
/// pQueryAttr->window.skey);
H
Haojun Liao 已提交
969
//
wafwerar's avatar
wafwerar 已提交
970
//          TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
H
Haojun Liao 已提交
971 972 973 974 975 976 977 978
//          doUpdateLastKey(pQueryAttr);
//        }
//
//        pQueryAttr->order.order = TSDB_ORDER_ASC;
//        pQueryAttr->needReverseScan = false;
//      } else if (onlyLastQuery(pQueryAttr)) {
//        if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//          //qDebug(msg, pQInfo->qId, "only-last stable", pQueryAttr->order.order, TSDB_ORDER_DESC,
L
Liu Jicong 已提交
979 980
////                 pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey,
/// pQueryAttr->window.skey);
H
Haojun Liao 已提交
981
//
wafwerar's avatar
wafwerar 已提交
982
//          TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
H
Haojun Liao 已提交
983 984 985 986 987 988 989 990 991
//          doUpdateLastKey(pQueryAttr);
//        }
//
//        pQueryAttr->order.order = TSDB_ORDER_DESC;
//        pQueryAttr->needReverseScan = false;
//      }
//    }
//  }
//}
992

L
Liu Jicong 已提交
993 994 995
// static FORCE_INLINE bool doFilterByBlockStatistics(STaskRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis,
// SqlFunctionCtx *pCtx, int32_t numOfRows) {
//   STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
996
//
L
Liu Jicong 已提交
997 998 999
//   if (pDataStatis == NULL || pQueryAttr->pFilters == NULL) {
//     return true;
//   }
1000
//
L
Liu Jicong 已提交
1001 1002
//   return filterRangeExecute(pQueryAttr->pFilters, pDataStatis, pQueryAttr->numOfCols, numOfRows);
// }
1003
#if 0
H
Haojun Liao 已提交
1004
static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockInfo) {
1005 1006
  STimeWindow w = {0};

dengyihao's avatar
dengyihao 已提交
1007 1008
  TSKEY sk = TMIN(pQueryAttr->window.skey, pQueryAttr->window.ekey);
  TSKEY ek = TMAX(pQueryAttr->window.skey, pQueryAttr->window.ekey);
1009

1010
  if (true) {
L
Liu Jicong 已提交
1011
    //    getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.skey, sk, ek, &w);
1012 1013 1014 1015 1016 1017
    assert(w.ekey >= pBlockInfo->window.skey);

    if (w.ekey < pBlockInfo->window.ekey) {
      return true;
    }

L
Liu Jicong 已提交
1018 1019
    while (1) {
      //      getNextTimeWindow(pQueryAttr, &w);
1020 1021 1022 1023 1024 1025 1026 1027 1028 1029
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

      assert(w.ekey > pBlockInfo->window.ekey);
      if (w.skey <= pBlockInfo->window.ekey && w.skey > pBlockInfo->window.skey) {
        return true;
      }
    }
  } else {
L
Liu Jicong 已提交
1030
    //    getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.ekey, sk, ek, &w);
1031 1032 1033 1034 1035 1036
    assert(w.skey <= pBlockInfo->window.ekey);

    if (w.skey > pBlockInfo->window.skey) {
      return true;
    }

L
Liu Jicong 已提交
1037 1038
    while (1) {
      //      getNextTimeWindow(pQueryAttr, &w);
1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

      assert(w.skey < pBlockInfo->window.skey);
      if (w.ekey < pBlockInfo->window.ekey && w.ekey >= pBlockInfo->window.skey) {
        return true;
      }
    }
  }

  return false;
}
1052
#endif
1053 1054

static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) {
H
Haojun Liao 已提交
1055
#if 0
H
Haojun Liao 已提交
1056
  SqlFunctionCtx* pCtx = pTableScanInfo->pCtx;
1057
  uint32_t        status = BLK_DATA_NOT_LOAD;
1058

L
Liu Jicong 已提交
1059
  int32_t numOfOutput = 0;  // pTableScanInfo->numOfOutput;
1060 1061
  for (int32_t i = 0; i < numOfOutput; ++i) {
    int32_t functionId = pCtx[i].functionId;
H
Haojun Liao 已提交
1062
    int32_t colId = pTableScanInfo->pExpr[i].base.pParam[0].pCol->colId;
1063 1064 1065

    // group by + first/last should not apply the first/last block filter
    if (functionId < 0) {
1066
      status |= BLK_DATA_DATA_LOAD;
1067 1068
      return status;
    } else {
L
Liu Jicong 已提交
1069
      //      status |= aAggs[functionId].dataReqFunc(&pTableScanInfo->pCtx[i], &pBlock->info.window, colId);
1070
      //      if ((status & BLK_DATA_DATA_LOAD) == BLK_DATA_DATA_LOAD) {
L
Liu Jicong 已提交
1071 1072
      //        return status;
      //      }
1073 1074 1075 1076
    }
  }

  return status;
H
Haojun Liao 已提交
1077 1078
#endif
  return 0;
1079 1080
}

L
Liu Jicong 已提交
1081 1082
int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
                              uint32_t* status) {
1083
  *status = BLK_DATA_NOT_LOAD;
1084

H
Haojun Liao 已提交
1085
  pBlock->pDataBlock = NULL;
L
Liu Jicong 已提交
1086
  pBlock->pBlockAgg = NULL;
H
Haojun Liao 已提交
1087

L
Liu Jicong 已提交
1088 1089
  //  int64_t groupId = pRuntimeEnv->current->groupIndex;
  //  bool    ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
1090

H
Haojun Liao 已提交
1091
  STaskCostInfo* pCost = &pTaskInfo->cost;
1092

1093 1094
//  pCost->totalBlocks += 1;
//  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
1095
#if 0
1096 1097 1098
  // Calculate all time windows that are overlapping or contain current data block.
  // If current data block is contained by all possible time window, do not load current data block.
  if (/*pQueryAttr->pFilters || */pQueryAttr->groupbyColumn || pQueryAttr->sw.gap > 0 ||
H
Haojun Liao 已提交
1099
      (QUERY_IS_INTERVAL_QUERY(pQueryAttr) && overlapWithTimeWindow(pTaskInfo, &pBlock->info))) {
1100
    (*status) = BLK_DATA_DATA_LOAD;
1101 1102 1103
  }

  // check if this data block is required to load
1104
  if ((*status) != BLK_DATA_DATA_LOAD) {
1105 1106 1107 1108 1109 1110 1111
    bool needFilter = true;

    // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
    // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
    if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
      SResultRow* pResult = NULL;

H
Haojun Liao 已提交
1112
      bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
1113 1114 1115 1116 1117 1118
      TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

      STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
      if (pQueryAttr->pointInterpQuery) {
        needFilter = chkWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
1119
                                    pTableScanInfo->rowEntryInfoOffset);
1120 1121 1122
      } else {
        if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId,
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
1123
                                    pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
1124 1125 1126 1127 1128
          longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }
    } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate
      doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx,
1129
                               pTableScanInfo->rowEntryInfoOffset, pTableScanInfo->numOfOutput,
1130 1131 1132 1133 1134 1135
                               pRuntimeEnv->current->groupIndex);
    }

    if (needFilter) {
      (*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock);
    } else {
1136
      (*status) = BLK_DATA_DATA_LOAD;
1137 1138 1139 1140
    }
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
H
Haojun Liao 已提交
1141
//  *status = updateBlockLoadStatus(pRuntimeEnv->pQueryAttr, *status);
1142

1143
  if ((*status) == BLK_DATA_NOT_LOAD || (*status) == BLK_DATA_FILTEROUT) {
1144 1145
    //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//           pBlockInfo->window.ekey, pBlockInfo->rows);
1146
    pCost->skipBlocks += 1;
1147
  } else if ((*status) == BLK_DATA_SMA_LOAD) {
1148 1149
    // this function never returns error?
    pCost->loadBlockStatis += 1;
1150
//    tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
1151 1152

    if (pBlock->pBlockAgg == NULL) {  // data block statistics does not exist, load data block
1153
//      pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
1154 1155 1156
      pCost->totalCheckedRows += pBlock->info.rows;
    }
  } else {
1157
    assert((*status) == BLK_DATA_DATA_LOAD);
1158 1159 1160

    // load the data block statistics to perform further filter
    pCost->loadBlockStatis += 1;
1161
//    tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
1162 1163 1164 1165 1166 1167

    if (pQueryAttr->topBotQuery && pBlock->pBlockAgg != NULL) {
      { // set previous window
        if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
          SResultRow* pResult = NULL;

H
Haojun Liao 已提交
1168
          bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
1169 1170 1171 1172 1173
          TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

          STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
          if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId,
                                      pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
1174
                                      pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185
            longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
          }
        }
      }
      bool load = false;
      for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
        int32_t functionId = pTableScanInfo->pCtx[i].functionId;
        if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM) {
//          load = topbot_datablock_filter(&pTableScanInfo->pCtx[i], (char*)&(pBlock->pBlockAgg[i].min),
//                                         (char*)&(pBlock->pBlockAgg[i].max));
          if (!load) { // current block has been discard due to filter applied
1186
            pCost->skipBlocks += 1;
1187 1188
            //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId,
//                   pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
1189
            (*status) = BLK_DATA_FILTEROUT;
1190 1191 1192 1193 1194 1195 1196 1197
            return TSDB_CODE_SUCCESS;
          }
        }
      }
    }

    // current block has been discard due to filter applied
//    if (!doFilterByBlockStatistics(pRuntimeEnv, pBlock->pBlockAgg, pTableScanInfo->pCtx, pBlockInfo->rows)) {
1198
//      pCost->skipBlocks += 1;
1199 1200
//      qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//             pBlockInfo->window.ekey, pBlockInfo->rows);
1201
//      (*status) = BLK_DATA_FILTEROUT;
1202 1203 1204 1205 1206
//      return TSDB_CODE_SUCCESS;
//    }

    pCost->totalCheckedRows += pBlockInfo->rows;
    pCost->loadBlocks += 1;
1207
//    pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
1208 1209 1210 1211 1212
//    if (pBlock->pDataBlock == NULL) {
//      return terrno;
//    }

//    if (pQueryAttr->pFilters != NULL) {
1213
//      filterSetColFieldData(pQueryAttr->pFilters, taosArrayGetSize(pBlock->pDataBlock), pBlock->pDataBlock);
1214
//    }
1215

1216 1217 1218 1219
//    if (pQueryAttr->pFilters != NULL || pRuntimeEnv->pTsBuf != NULL) {
//      filterColRowsInDataBlock(pRuntimeEnv, pBlock, ascQuery);
//    }
  }
H
Haojun Liao 已提交
1220
#endif
1221 1222 1223
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1224
static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo) {
1225 1226 1227 1228
  if (pTableQueryInfo == NULL) {
    return;
  }

wafwerar's avatar
wafwerar 已提交
1229
  //  TSWAP(pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey);
L
Liu Jicong 已提交
1230
  //  pTableQueryInfo->lastKey = pTableQueryInfo->win.skey;
1231

L
Liu Jicong 已提交
1232 1233
  //  SWITCH_ORDER(pTableQueryInfo->cur.order);
  //  pTableQueryInfo->cur.vgroupIndex = -1;
1234 1235

  // set the index to be the end slot of result rows array
dengyihao's avatar
dengyihao 已提交
1236 1237 1238 1239 1240 1241
  //  SResultRowInfo* pResultRowInfo = &pTableQueryInfo->resInfo;
  //  if (pResultRowInfo->size > 0) {
  //    pResultRowInfo->curPos = pResultRowInfo->size - 1;
  //  } else {
  //    pResultRowInfo->curPos = -1;
  //  }
1242 1243
}

H
Haojun Liao 已提交
1244
void initResultRow(SResultRow* pResultRow) {
X
Xiaoyu Wang 已提交
1245
  //  pResultRow->pEntryInfo = (struct SResultRowEntryInfo*)((char*)pResultRow + sizeof(SResultRow));
1246 1247 1248 1249 1250
}

/*
 * The start of each column SResultRowEntryInfo is denote by RowCellInfoOffset.
 * Note that in case of top/bottom query, the whole multiple rows of result is treated as only one row of results.
H
Haojun Liao 已提交
1251 1252 1253
 * +------------+-----------------result column 1------------+------------------result column 2-----------+
 * | SResultRow | SResultRowEntryInfo | intermediate buffer1 | SResultRowEntryInfo | intermediate buffer 2|
 * +------------+--------------------------------------------+--------------------------------------------+
1254 1255
 *           offset[0]                                  offset[1]                                   offset[2]
 */
1256
// TODO refactor: some function move away
L
Liu Jicong 已提交
1257 1258 1259
void setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage,
                             int32_t numOfExprs) {
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
1260 1261
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
H
Haojun Liao 已提交
1262

H
Haojun Liao 已提交
1263
  SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
1264
  initResultRowInfo(pResultRowInfo);
H
Haojun Liao 已提交
1265

L
Liu Jicong 已提交
1266 1267
  int64_t     tid = 0;
  int64_t     groupId = 0;
1268 1269
  SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, (char*)&tid, sizeof(tid), true, groupId,
                                            pTaskInfo, false, pSup);
H
Haojun Liao 已提交
1270

1271
  for (int32_t i = 0; i < numOfExprs; ++i) {
1272
    struct SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, rowEntryInfoOffset);
H
Haojun Liao 已提交
1273 1274
    cleanupResultRowEntry(pEntry);

L
Liu Jicong 已提交
1275
    pCtx[i].resultInfo = pEntry;
1276
    pCtx[i].scanFlag = stage;
H
Haojun Liao 已提交
1277 1278
  }

1279
  initCtxOutputBuffer(pCtx, numOfExprs);
H
Haojun Liao 已提交
1280 1281
}

H
Haojun Liao 已提交
1282
void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) {
1283 1284
  for (int32_t j = 0; j < size; ++j) {
    struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
dengyihao's avatar
dengyihao 已提交
1285 1286
    if (isRowEntryInitialized(pResInfo) || fmIsPseudoColumnFunc(pCtx[j].functionId) || pCtx[j].functionId == -1 ||
        fmIsScalarFunc(pCtx[j].functionId)) {
1287 1288 1289
      continue;
    }

H
Haojun Liao 已提交
1290
    pCtx[j].fpSet.init(&pCtx[j], pCtx[j].resultInfo);
1291 1292 1293
  }
}

L
Liu Jicong 已提交
1294
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
1295
  if (status == TASK_NOT_COMPLETED) {
H
Haojun Liao 已提交
1296
    pTaskInfo->status = status;
1297 1298
  } else {
    // QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first
1299
    CLEAR_QUERY_STATUS(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
1300
    pTaskInfo->status |= status;
1301 1302 1303
  }
}

L
Liu Jicong 已提交
1304
void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) {
1305 1306 1307 1308
  if (pTableQueryInfo == NULL) {
    return;
  }

L
Liu Jicong 已提交
1309
  //  taosVariantDestroy(&pTableQueryInfo->tag);
dengyihao's avatar
dengyihao 已提交
1310
  //  cleanupResultRowInfo(&pTableQueryInfo->resInfo);
1311 1312
}

1313
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
1314
  for (int32_t i = 0; i < numOfOutput; ++i) {
1315
    pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
1316 1317 1318 1319 1320

    struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
    if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
      continue;
    }
1321 1322 1323 1324 1325

    if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
      continue;
    }

1326 1327 1328 1329 1330 1331
    if (!pResInfo->initialized) {
      if (pCtx[i].functionId != -1) {
        pCtx[i].fpSet.init(&pCtx[i], pResInfo);
      } else {
        pResInfo->initialized = true;
      }
1332 1333 1334 1335
    }
  }
}

H
Haojun Liao 已提交
1336
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep);
1337

1338
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
1339 1340 1341
  if (pFilterNode == NULL) {
    return;
  }
S
shenglian zhou 已提交
1342 1343 1344
  if (pBlock->info.rows == 0) {
    return;
  }
1345
  SFilterInfo* filter = NULL;
H
Haojun Liao 已提交
1346

H
Haojun Liao 已提交
1347
  // todo move to the initialization function
H
Haojun Liao 已提交
1348
  int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
1349

1350
  size_t             numOfCols = taosArrayGetSize(pBlock->pDataBlock);
1351
  SFilterColumnParam param1 = {.numOfCols = numOfCols, .pDataBlock = pBlock->pDataBlock};
1352 1353 1354
  code = filterSetDataFromSlotId(filter, &param1);

  int8_t* rowRes = NULL;
1355

1356
  // todo the keep seems never to be True??
1357
  bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols);
D
dapan1121 已提交
1358
  filterFreeInfo(filter);
1359

H
Haojun Liao 已提交
1360
  extractQualifiedTupleByFilterResult(pBlock, rowRes, keep);
1361
  blockDataUpdateTsWindow(pBlock, 0);
H
Haojun Liao 已提交
1362 1363

  taosMemoryFree(rowRes);
1364 1365
}

H
Haojun Liao 已提交
1366
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep) {
1367 1368 1369 1370 1371
  if (keep) {
    return;
  }

  if (rowRes != NULL) {
L
Liu Jicong 已提交
1372
    int32_t      totalRows = pBlock->info.rows;
1373
    SSDataBlock* px = createOneDataBlock(pBlock, true);
1374

1375 1376
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
    for (int32_t i = 0; i < numOfCols; ++i) {
1377 1378
      SColumnInfoData* pSrc = taosArrayGet(px->pDataBlock, i);
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
1379
      // it is a reserved column for scalar function, and no data in this column yet.
1380
      if (pDst->pData == NULL || pSrc->pData == NULL) {
1381 1382 1383
        continue;
      }

1384 1385
      colInfoDataCleanup(pDst, pBlock->info.rows);

1386
      int32_t numOfRows = 0;
1387
      for (int32_t j = 0; j < totalRows; ++j) {
D
dapan1121 已提交
1388 1389 1390
        if (rowRes[j] == 0) {
          continue;
        }
1391

D
dapan1121 已提交
1392
        if (colDataIsNull_s(pSrc, j)) {
1393
          colDataAppendNULL(pDst, numOfRows);
D
dapan1121 已提交
1394
        } else {
1395
          colDataAppend(pDst, numOfRows, colDataGetData(pSrc, j), false);
D
dapan1121 已提交
1396
        }
1397
        numOfRows += 1;
H
Haojun Liao 已提交
1398
      }
1399

1400 1401 1402 1403 1404
      if (pBlock->info.rows == totalRows) {
        pBlock->info.rows = numOfRows;
      } else {
        ASSERT(pBlock->info.rows == numOfRows);
      }
1405
    }
1406

dengyihao's avatar
dengyihao 已提交
1407
    blockDataDestroy(px);  // fix memory leak
1408 1409 1410
  } else {
    // do nothing
    pBlock->info.rows = 0;
1411 1412 1413
  }
}

L
Liu Jicong 已提交
1414 1415
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput,
                              uint64_t groupId) {
1416
  // for simple group by query without interval, all the tables belong to one group result.
L
Liu Jicong 已提交
1417
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
1418
  SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
1419 1420
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
1421

1422
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId,
L
Liu Jicong 已提交
1423
                                                  sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
L
Liu Jicong 已提交
1424
  assert(pResultRow != NULL);
1425 1426 1427 1428 1429 1430

  /*
   * not assign result buffer yet, add new result buffer
   * all group belong to one result set, and each group result has different group id so set the id to be one
   */
  if (pResultRow->pageId == -1) {
dengyihao's avatar
dengyihao 已提交
1431 1432
    int32_t ret =
        addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, groupId, pAggInfo->binfo.pRes->info.rowSize);
1433 1434 1435 1436 1437
    if (ret != TSDB_CODE_SUCCESS) {
      return;
    }
  }

1438
  setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
1439 1440
}

1441
void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId, SAggOperatorInfo* pAggInfo) {
H
Haojun Liao 已提交
1442
  if (pAggInfo->groupId != INT32_MIN && pAggInfo->groupId == groupId) {
1443 1444
    return;
  }
1445
#ifdef BUF_PAGE_DEBUG
L
Liu Jicong 已提交
1446
  qDebug("page_setbuf, groupId:%" PRIu64, groupId);
1447
#endif
1448
  doSetTableGroupOutputBuf(pOperator, pAggInfo, numOfOutput, groupId);
1449 1450

  // record the current active group id
H
Haojun Liao 已提交
1451
  pAggInfo->groupId = groupId;
1452 1453
}

1454 1455
static void doUpdateNumOfRows(SResultRow* pRow, int32_t numOfExprs, const int32_t* rowCellOffset) {
  for (int32_t j = 0; j < numOfExprs; ++j) {
1456
    struct SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowCellOffset);
1457 1458 1459 1460 1461 1462 1463 1464 1465 1466
    if (!isRowEntryInitialized(pResInfo)) {
      continue;
    }

    if (pRow->numOfRows < pResInfo->numOfRes) {
      pRow->numOfRows = pResInfo->numOfRes;
    }
  }
}

1467
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
S
shenglian zhou 已提交
1468 1469 1470
                                             SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs,
                                             const int32_t* rowCellOffset, SSDataBlock* pBlock,
                                             SExecTaskInfo* pTaskInfo) {
1471 1472 1473 1474 1475 1476 1477 1478 1479
  SFilePage*  page = getBufPage(pBuf, resultRowPosition->pageId);
  SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);

  doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset);
  if (pRow->numOfRows == 0) {
    releaseBufPage(pBuf, page);
    return 0;
  }

1480 1481 1482 1483 1484 1485 1486
  while (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
    int32_t code = blockDataEnsureCapacity(pBlock, pBlock->info.capacity * 1.25);
    if (TAOS_FAILED(code)) {
      releaseBufPage(pBuf, page);
      qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
      longjmp(pTaskInfo->env, code);
    }
1487 1488 1489 1490 1491
  }

  for (int32_t j = 0; j < numOfExprs; ++j) {
    int32_t slotId = pExprInfo[j].base.resSchema.slotId;

1492
    pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowCellOffset);
1493 1494 1495 1496 1497 1498 1499 1500 1501
    if (pCtx[j].fpSet.finalize) {
      int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
      if (TAOS_FAILED(code)) {
        qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
        longjmp(pTaskInfo->env, code);
      }
    } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
      // do nothing, todo refactor
    } else {
1502 1503
      // expand the result into multiple rows. E.g., _wstart, top(k, 20)
      // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
1504 1505 1506 1507 1508 1509 1510 1511 1512
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
      char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
      for (int32_t k = 0; k < pRow->numOfRows; ++k) {
        colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
      }
    }
  }

  releaseBufPage(pBuf, page);
1513
  pBlock->info.rows += pRow->numOfRows;
1514 1515 1516 1517

  return 0;
}

X
Xiaoyu Wang 已提交
1518 1519 1520
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf,
                           SGroupResInfo* pGroupResInfo, const int32_t* rowCellOffset, SqlFunctionCtx* pCtx,
                           int32_t numOfExprs) {
1521
  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
1522
  int32_t start = pGroupResInfo->index;
1523

1524
  for (int32_t i = start; i < numOfRows; i += 1) {
L
Liu Jicong 已提交
1525 1526
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
    SFilePage*  page = getBufPage(pBuf, pPos->pos.pageId);
1527

1528
    SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
1529 1530

    doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset);
1531 1532
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
1533
      releaseBufPage(pBuf, page);
1534 1535 1536
      continue;
    }

1537 1538 1539 1540 1541
    if (pBlock->info.groupId == 0) {
      pBlock->info.groupId = pPos->groupId;
    } else {
      // current value belongs to different group, it can't be packed into one datablock
      if (pBlock->info.groupId != pPos->groupId) {
1542
        releaseBufPage(pBuf, page);
1543 1544 1545 1546
        break;
      }
    }

1547
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
1548
      releaseBufPage(pBuf, page);
1549 1550 1551 1552 1553
      break;
    }

    pGroupResInfo->index += 1;

1554
    for (int32_t j = 0; j < numOfExprs; ++j) {
1555 1556
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

1557
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowCellOffset);
1558
      if (pCtx[j].fpSet.finalize) {
1559
#ifdef BUF_PAGE_DEBUG
wmmhello's avatar
wmmhello 已提交
1560
        qDebug("\npage_finalize %d", numOfExprs);
1561
#endif
1562
        int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
1563
        if (TAOS_FAILED(code)) {
1564 1565
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
          longjmp(pTaskInfo->env, code);
1566
        }
1567 1568
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
1569
      } else {
1570 1571
        // expand the result into multiple rows. E.g., _wstart, top(k, 20)
        // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
X
Xiaoyu Wang 已提交
1572 1573
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
1574
        if (pCtx[j].increase) {
L
Liu Jicong 已提交
1575
          int64_t ts = *(int64_t*)in;
1576
          for (int32_t k = 0; k < pRow->numOfRows; ++k) {
L
Liu Jicong 已提交
1577
            colDataAppend(pColInfoData, pBlock->info.rows + k, (const char*)&ts, pCtx[j].resultInfo->isNullRes);
1578 1579 1580 1581 1582 1583
            ts++;
          }
        } else {
          for (int32_t k = 0; k < pRow->numOfRows; ++k) {
            colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
          }
X
Xiaoyu Wang 已提交
1584
        }
1585
      }
1586 1587
    }

1588
    releaseBufPage(pBuf, page);
1589
    pBlock->info.rows += pRow->numOfRows;
L
Liu Jicong 已提交
1590 1591 1592
    //    if (pBlock->info.rows >= pBlock->info.capacity) {  // output buffer is full
    //      break;
    //    }
1593 1594
  }

X
Xiaoyu Wang 已提交
1595 1596
  qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
         pBlock->info.groupId);
1597
  blockDataUpdateTsWindow(pBlock, 0);
1598 1599 1600
  return 0;
}

X
Xiaoyu Wang 已提交
1601 1602
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                            SDiskbasedBuf* pBuf) {
1603 1604
  SExprInfo*     pExprInfo = pOperator->exprSupp.pExprInfo;
  int32_t        numOfExprs = pOperator->exprSupp.numOfExprs;
1605 1606
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

1607
  int32_t*        rowCellOffset = pOperator->exprSupp.rowEntryInfoOffset;
X
Xiaoyu Wang 已提交
1608
  SSDataBlock*    pBlock = pbInfo->pRes;
1609
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
1610

1611
  blockDataCleanup(pBlock);
1612
  if (!hasDataInGroupInfo(pGroupResInfo)) {
1613 1614 1615
    return;
  }

1616 1617
  // clear the existed group id
  pBlock->info.groupId = 0;
1618
  doCopyToSDataBlock(pTaskInfo, pBlock, pExprInfo, pBuf, pGroupResInfo, rowCellOffset, pCtx, numOfExprs);
1619 1620
}

L
Liu Jicong 已提交
1621
static void updateNumOfRowsInResultRows(SqlFunctionCtx* pCtx, int32_t numOfOutput, SResultRowInfo* pResultRowInfo,
1622
                                        int32_t* rowEntryInfoOffset) {
1623
  // update the number of result for each, only update the number of rows for the corresponding window result.
L
Liu Jicong 已提交
1624 1625 1626
  //  if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
  //    return;
  //  }
H
Haojun Liao 已提交
1627
#if 0
1628
  for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
L
Liu Jicong 已提交
1629
    SResultRow* pResult = pResultRowInfo->pResult[i];
1630 1631 1632 1633 1634 1635 1636

    for (int32_t j = 0; j < numOfOutput; ++j) {
      int32_t functionId = pCtx[j].functionId;
      if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG || functionId == FUNCTION_TAGPRJ) {
        continue;
      }

1637
      SResultRowEntryInfo* pCell = getResultEntryInfo(pResult, j, rowEntryInfoOffset);
1638
      pResult->numOfRows = (uint16_t)(TMAX(pResult->numOfRows, pCell->numOfRes));
1639 1640
    }
  }
H
Haojun Liao 已提交
1641
#endif
1642 1643
}

L
Liu Jicong 已提交
1644
static int32_t compressQueryColData(SColumnInfoData* pColRes, int32_t numOfRows, char* data, int8_t compressed) {
1645 1646
  int32_t colSize = pColRes->info.bytes * numOfRows;
  return (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colSize, numOfRows, data,
L
Liu Jicong 已提交
1647
                                                      colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
1648 1649
}

1650 1651 1652
int32_t doFillTimeIntervalGapsInResults(struct SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t capacity) {
  int32_t numOfRows = (int32_t)taosFillResultDataBlock(pFillInfo, pBlock, capacity - pBlock->info.rows);
  return pBlock->info.rows;
1653 1654
}

L
Liu Jicong 已提交
1655 1656
void queryCostStatis(SExecTaskInfo* pTaskInfo) {
  STaskCostInfo* pSummary = &pTaskInfo->cost;
1657

L
Liu Jicong 已提交
1658 1659 1660
  //  uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pResultRowHashTable);
  //  hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map);
  //  pSummary->hashSize = hashSize;
1661 1662 1663 1664

  // add the merge time
  pSummary->elapsedTime += pSummary->firstStageMergeTime;

L
Liu Jicong 已提交
1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675
  //  SResultRowPool* p = pTaskInfo->pool;
  //  if (p != NULL) {
  //    pSummary->winInfoSize = getResultRowPoolMemSize(p);
  //    pSummary->numOfTimeWindows = getNumOfAllocatedResultRows(p);
  //  } else {
  //    pSummary->winInfoSize = 0;
  //    pSummary->numOfTimeWindows = 0;
  //  }
  //
  //  calculateOperatorProfResults(pQInfo);

1676 1677
  SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
  if (pSummary->pRecoder != NULL) {
X
Xiaoyu Wang 已提交
1678 1679
    qDebug("%s :cost summary: elapsed time:%" PRId64 " us, first merge:%" PRId64
           " us, total blocks:%d, "
1680 1681 1682 1683
           "load block statis:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
           GET_TASKID(pTaskInfo), pSummary->elapsedTime, pSummary->firstStageMergeTime, pRecorder->totalBlocks,
           pRecorder->loadBlockStatis, pRecorder->loadBlocks, pRecorder->totalRows, pRecorder->totalCheckedRows);
  }
L
Liu Jicong 已提交
1684 1685 1686
  // qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb,
  // hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
  //      pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0);
1687 1688
}

L
Liu Jicong 已提交
1689 1690 1691
// static void updateOffsetVal(STaskRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
1692
//
L
Liu Jicong 已提交
1693
//   int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
1694
//
L
Liu Jicong 已提交
1695 1696 1697 1698
//   if (pQueryAttr->limit.offset == pBlockInfo->rows) {  // current block will ignore completed
//     pTableQueryInfo->lastKey = QUERY_IS_ASC_QUERY(pQueryAttr) ? pBlockInfo->window.ekey + step :
//     pBlockInfo->window.skey + step; pQueryAttr->limit.offset = 0; return;
//   }
1699
//
L
Liu Jicong 已提交
1700 1701 1702 1703 1704
//   if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//     pQueryAttr->pos = (int32_t)pQueryAttr->limit.offset;
//   } else {
//     pQueryAttr->pos = pBlockInfo->rows - (int32_t)pQueryAttr->limit.offset - 1;
//   }
1705
//
L
Liu Jicong 已提交
1706
//   assert(pQueryAttr->pos >= 0 && pQueryAttr->pos <= pBlockInfo->rows - 1);
1707
//
L
Liu Jicong 已提交
1708 1709
//   SArray *         pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//   SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1710
//
L
Liu Jicong 已提交
1711 1712
//   // update the pQueryAttr->limit.offset value, and pQueryAttr->pos value
//   TSKEY *keys = (TSKEY *) pColInfoData->pData;
1713
//
L
Liu Jicong 已提交
1714 1715 1716
//   // update the offset value
//   pTableQueryInfo->lastKey = keys[pQueryAttr->pos];
//   pQueryAttr->limit.offset = 0;
1717
//
L
Liu Jicong 已提交
1718
//   int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
1719
//
L
Liu Jicong 已提交
1720 1721 1722 1723
//   //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numBlocksOfStep:%d, numOfRes:%d,
//   lastKey:%"PRId64, GET_TASKID(pRuntimeEnv),
//          pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey);
// }
1724

L
Liu Jicong 已提交
1725 1726
// void skipBlocks(STaskRuntimeEnv *pRuntimeEnv) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
1727
//
L
Liu Jicong 已提交
1728 1729 1730
//   if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0) {
//     return;
//   }
1731
//
L
Liu Jicong 已提交
1732 1733
//   pQueryAttr->pos = 0;
//   int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
1734
//
L
Liu Jicong 已提交
1735 1736
//   STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
//   TsdbQueryHandleT pTsdbReadHandle = pRuntimeEnv->pTsdbReadHandle;
1737
//
L
Liu Jicong 已提交
1738 1739 1740 1741 1742
//   SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
//   while (tsdbNextDataBlock(pTsdbReadHandle)) {
//     if (isTaskKilled(pRuntimeEnv->qinfo)) {
//       longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
//     }
1743
//
L
Liu Jicong 已提交
1744
//     tsdbRetrieveDataBlockInfo(pTsdbReadHandle, &blockInfo);
1745
//
L
Liu Jicong 已提交
1746 1747 1748 1749
//     if (pQueryAttr->limit.offset > blockInfo.rows) {
//       pQueryAttr->limit.offset -= blockInfo.rows;
//       pTableQueryInfo->lastKey = (QUERY_IS_ASC_QUERY(pQueryAttr)) ? blockInfo.window.ekey : blockInfo.window.skey;
//       pTableQueryInfo->lastKey += step;
1750
//
L
Liu Jicong 已提交
1751 1752 1753 1754 1755 1756 1757
//       //qDebug("QInfo:0x%"PRIx64" skip rows:%d, offset:%" PRId64, GET_TASKID(pRuntimeEnv), blockInfo.rows,
//              pQuery->limit.offset);
//     } else {  // find the appropriated start position in current block
//       updateOffsetVal(pRuntimeEnv, &blockInfo);
//       break;
//     }
//   }
1758
//
L
Liu Jicong 已提交
1759 1760 1761 1762 1763 1764 1765 1766 1767
//   if (terrno != TSDB_CODE_SUCCESS) {
//     longjmp(pRuntimeEnv->env, terrno);
//   }
// }

// static TSKEY doSkipIntervalProcess(STaskRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo,
// STableQueryInfo* pTableQueryInfo) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
1768
//
L
Liu Jicong 已提交
1769 1770 1771
//   assert(pQueryAttr->limit.offset == 0);
//   STimeWindow tw = *win;
//   getNextTimeWindow(pQueryAttr, &tw);
1772
//
L
Liu Jicong 已提交
1773 1774
//   if ((tw.skey <= pBlockInfo->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) ||
//       (tw.ekey >= pBlockInfo->window.skey && !QUERY_IS_ASC_QUERY(pQueryAttr))) {
1775
//
L
Liu Jicong 已提交
1776 1777 1778 1779
//     // load the data block and check data remaining in current data block
//     // TODO optimize performance
//     SArray *         pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//     SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1780
//
L
Liu Jicong 已提交
1781 1782 1783 1784
//     tw = *win;
//     int32_t startPos =
//         getNextQualifiedWindow(pQueryAttr, &tw, pBlockInfo, pColInfoData->pData, binarySearchForKey, -1);
//     assert(startPos >= 0);
1785
//
L
Liu Jicong 已提交
1786 1787
//     // set the abort info
//     pQueryAttr->pos = startPos;
1788
//
L
Liu Jicong 已提交
1789 1790 1791 1792
//     // reset the query start timestamp
//     pTableQueryInfo->win.skey = ((TSKEY *)pColInfoData->pData)[startPos];
//     pQueryAttr->window.skey = pTableQueryInfo->win.skey;
//     TSKEY key = pTableQueryInfo->win.skey;
1793
//
L
Liu Jicong 已提交
1794 1795
//     pWindowResInfo->prevSKey = tw.skey;
//     int32_t index = pRuntimeEnv->resultRowInfo.curIndex;
1796
//
L
Liu Jicong 已提交
1797 1798
//     int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
//     pRuntimeEnv->resultRowInfo.curIndex = index;  // restore the window index
1799
//
L
Liu Jicong 已提交
1800 1801 1802 1803
//     //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d,
//     lastKey:%" PRId64,
//            GET_TASKID(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes,
//            pQueryAttr->current->lastKey);
1804
//
L
Liu Jicong 已提交
1805 1806 1807 1808 1809
//     return key;
//   } else {  // do nothing
//     pQueryAttr->window.skey      = tw.skey;
//     pWindowResInfo->prevSKey = tw.skey;
//     pTableQueryInfo->lastKey = tw.skey;
1810
//
L
Liu Jicong 已提交
1811 1812
//     return tw.skey;
//   }
1813
//
L
Liu Jicong 已提交
1814 1815 1816 1817 1818 1819 1820 1821 1822 1823
//   return true;
// }

// static bool skipTimeInterval(STaskRuntimeEnv *pRuntimeEnv, TSKEY* start) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//     assert(*start <= pRuntimeEnv->current->lastKey);
//   } else {
//     assert(*start >= pRuntimeEnv->current->lastKey);
//   }
1824
//
L
Liu Jicong 已提交
1825 1826 1827 1828 1829
//   // if queried with value filter, do NOT forward query start position
//   if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL ||
//   pRuntimeEnv->pFillInfo != NULL) {
//     return true;
//   }
1830
//
L
Liu Jicong 已提交
1831 1832 1833 1834 1835 1836 1837
//   /*
//    * 1. for interval without interpolation query we forward pQueryAttr->interval.interval at a time for
//    *    pQueryAttr->limit.offset times. Since hole exists, pQueryAttr->interval.interval*pQueryAttr->limit.offset
//    value is
//    *    not valid. otherwise, we only forward pQueryAttr->limit.offset number of points
//    */
//   assert(pRuntimeEnv->resultRowInfo.prevSKey == TSKEY_INITIAL_VAL);
1838
//
L
Liu Jicong 已提交
1839 1840
//   STimeWindow w = TSWINDOW_INITIALIZER;
//   bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
1841
//
L
Liu Jicong 已提交
1842 1843
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
//   STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
1844
//
L
Liu Jicong 已提交
1845 1846 1847
//   SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
//   while (tsdbNextDataBlock(pRuntimeEnv->pTsdbReadHandle)) {
//     tsdbRetrieveDataBlockInfo(pRuntimeEnv->pTsdbReadHandle, &blockInfo);
1848
//
L
Liu Jicong 已提交
1849 1850 1851 1852 1853 1854 1855 1856 1857
//     if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//       if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
//         getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.skey, blockInfo.window.skey, pQueryAttr->window.ekey,
//         &w); pWindowResInfo->prevSKey = w.skey;
//       }
//     } else {
//       getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.ekey, pQueryAttr->window.ekey, blockInfo.window.ekey, &w);
//       pWindowResInfo->prevSKey = w.skey;
//     }
1858
//
L
Liu Jicong 已提交
1859 1860
//     // the first time window
//     STimeWindow win = getActiveTimeWindow(pWindowResInfo, pWindowResInfo->prevSKey, pQueryAttr);
1861
//
L
Liu Jicong 已提交
1862 1863
//     while (pQueryAttr->limit.offset > 0) {
//       STimeWindow tw = win;
1864
//
L
Liu Jicong 已提交
1865 1866 1867
//       if ((win.ekey <= blockInfo.window.ekey && ascQuery) || (win.ekey >= blockInfo.window.skey && !ascQuery)) {
//         pQueryAttr->limit.offset -= 1;
//         pWindowResInfo->prevSKey = win.skey;
1868
//
L
Liu Jicong 已提交
1869 1870 1871 1872 1873 1874
//         // current time window is aligned with blockInfo.window.ekey
//         // restart it from next data block by set prevSKey to be TSKEY_INITIAL_VAL;
//         if ((win.ekey == blockInfo.window.ekey && ascQuery) || (win.ekey == blockInfo.window.skey && !ascQuery)) {
//           pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL;
//         }
//       }
1875
//
L
Liu Jicong 已提交
1876 1877 1878 1879
//       if (pQueryAttr->limit.offset == 0) {
//         *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//         return true;
//       }
1880
//
L
Liu Jicong 已提交
1881 1882
//       // current window does not ended in current data block, try next data block
//       getNextTimeWindow(pQueryAttr, &tw);
1883
//
L
Liu Jicong 已提交
1884 1885 1886 1887 1888 1889 1890 1891 1892
//       /*
//        * If the next time window still starts from current data block,
//        * load the primary timestamp column first, and then find the start position for the next queried time window.
//        * Note that only the primary timestamp column is required.
//        * TODO: Optimize for this cases. All data blocks are not needed to be loaded, only if the first actually
//        required
//        * time window resides in current data block.
//        */
//       if ((tw.skey <= blockInfo.window.ekey && ascQuery) || (tw.ekey >= blockInfo.window.skey && !ascQuery)) {
1893
//
L
Liu Jicong 已提交
1894 1895
//         SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//         SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1896
//
L
Liu Jicong 已提交
1897 1898 1899
//         if ((win.ekey > blockInfo.window.ekey && ascQuery) || (win.ekey < blockInfo.window.skey && !ascQuery)) {
//           pQueryAttr->limit.offset -= 1;
//         }
1900
//
L
Liu Jicong 已提交
1901 1902 1903 1904 1905 1906 1907 1908
//         if (pQueryAttr->limit.offset == 0) {
//           *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//           return true;
//         } else {
//           tw = win;
//           int32_t startPos =
//               getNextQualifiedWindow(pQueryAttr, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1);
//           assert(startPos >= 0);
1909
//
L
Liu Jicong 已提交
1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920
//           // set the abort info
//           pQueryAttr->pos = startPos;
//           pTableQueryInfo->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
//           pWindowResInfo->prevSKey = tw.skey;
//           win = tw;
//         }
//       } else {
//         break;  // offset is not 0, and next time window begins or ends in the next block.
//       }
//     }
//   }
1921
//
L
Liu Jicong 已提交
1922 1923 1924 1925
//   // check for error
//   if (terrno != TSDB_CODE_SUCCESS) {
//     longjmp(pRuntimeEnv->env, terrno);
//   }
1926
//
L
Liu Jicong 已提交
1927 1928
//   return true;
// }
1929

1930
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) {
H
Haojun Liao 已提交
1931
  if (p->pDownstream == NULL) {
H
Haojun Liao 已提交
1932
    assert(p->numOfDownstream == 0);
1933 1934
  }

wafwerar's avatar
wafwerar 已提交
1935
  p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES);
1936 1937 1938 1939 1940 1941 1942
  if (p->pDownstream == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  memcpy(p->pDownstream, pDownstream, num * POINTER_BYTES);
  p->numOfDownstream = num;
  return TSDB_CODE_SUCCESS;
1943 1944
}

wmmhello's avatar
wmmhello 已提交
1945
static void doDestroyTableList(STableListInfo* pTableqinfoList);
1946

1947
static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQueryInfo* pTableQueryInfo, int32_t order) {
H
Haojun Liao 已提交
1948 1949
#if 0
    if (order == TSDB_ORDER_ASC) {
1950 1951
    assert(
        (pTableQueryInfo->win.skey <= pTableQueryInfo->win.ekey) &&
H
Haojun Liao 已提交
1952 1953
        (pTableQueryInfo->lastKey >= pTaskInfo->window.skey) &&
        (pTableQueryInfo->win.skey >= pTaskInfo->window.skey && pTableQueryInfo->win.ekey <= pTaskInfo->window.ekey));
1954 1955 1956
  } else {
    assert(
        (pTableQueryInfo->win.skey >= pTableQueryInfo->win.ekey) &&
H
Haojun Liao 已提交
1957 1958
        (pTableQueryInfo->lastKey <= pTaskInfo->window.skey) &&
        (pTableQueryInfo->win.skey <= pTaskInfo->window.skey && pTableQueryInfo->win.ekey >= pTaskInfo->window.ekey));
1959
  }
H
Haojun Liao 已提交
1960
#endif
1961 1962
}

1963 1964 1965 1966
typedef struct SFetchRspHandleWrapper {
  uint32_t exchangeId;
  int32_t  sourceIndex;
} SFetchRspHandleWrapper;
1967

D
dapan1121 已提交
1968
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1969
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
1970 1971 1972 1973 1974 1975 1976

  SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
  if (pExchangeInfo == NULL) {
    qWarn("failed to acquire exchange operator, since it may have been released");
    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
1977
  int32_t          index = pWrapper->sourceIndex;
1978
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
1979

H
Haojun Liao 已提交
1980 1981
  if (code == TSDB_CODE_SUCCESS) {
    pSourceDataInfo->pRsp = pMsg->pData;
1982

H
Haojun Liao 已提交
1983 1984
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
    pRsp->numOfRows = htonl(pRsp->numOfRows);
dengyihao's avatar
dengyihao 已提交
1985
    pRsp->compLen = htonl(pRsp->compLen);
1986
    pRsp->numOfCols = htonl(pRsp->numOfCols);
dengyihao's avatar
dengyihao 已提交
1987
    pRsp->useconds = htobe64(pRsp->useconds);
1988

1989
    ASSERT(pRsp != NULL);
1990
    qDebug("%s fetch rsp received, index:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfRows);
H
Haojun Liao 已提交
1991 1992
  } else {
    pSourceDataInfo->code = code;
D
dapan1121 已提交
1993
    qDebug("%s fetch rsp received, index:%d, error:%d", pSourceDataInfo->taskId, index, tstrerror(code));
H
Haojun Liao 已提交
1994
  }
H
Haojun Liao 已提交
1995

H
Haojun Liao 已提交
1996
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
1997 1998 1999 2000

  tsem_post(&pExchangeInfo->ready);
  taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);

wmmhello's avatar
wmmhello 已提交
2001
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2002 2003
}

D
dapan1121 已提交
2004
void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
S
Shengliang Guan 已提交
2005 2006
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
  assert(pMsg->info.ahandle != NULL);
H
Haojun Liao 已提交
2007 2008 2009 2010

  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};

  if (pMsg->contLen > 0) {
wafwerar's avatar
wafwerar 已提交
2011
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
H
Haojun Liao 已提交
2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
  }

  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
  rpcFreeCont(pMsg->pCont);
  destroySendMsgInfo(pSendInfo);
2023 2024
}

L
Liu Jicong 已提交
2025
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
2026
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2027

wafwerar's avatar
wafwerar 已提交
2028
  SResFetchReq* pMsg = taosMemoryCalloc(1, sizeof(SResFetchReq));
2029 2030 2031 2032
  if (NULL == pMsg) {
    pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return pTaskInfo->code;
  }
2033

L
Liu Jicong 已提交
2034 2035
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex);
  SSourceDataInfo*       pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
2036

2037 2038
  ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY);

2039 2040 2041
  qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu,
         GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, pSource->execId,
         sourceIndex, totalSources);
2042 2043 2044 2045 2046

  pMsg->header.vgId = htonl(pSource->addr.nodeId);
  pMsg->sId = htobe64(pSource->schedId);
  pMsg->taskId = htobe64(pSource->taskId);
  pMsg->queryId = htobe64(pTaskInfo->id.queryId);
D
dapan1121 已提交
2047
  pMsg->execId = htonl(pSource->execId);
2048 2049

  // send the fetch remote task result reques
wafwerar's avatar
wafwerar 已提交
2050
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
2051
  if (NULL == pMsgSendInfo) {
wafwerar's avatar
wafwerar 已提交
2052
    taosMemoryFreeClear(pMsg);
2053 2054 2055
    qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
    pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return pTaskInfo->code;
H
Haojun Liao 已提交
2056 2057
  }

2058
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
X
Xiaoyu Wang 已提交
2059
  pWrapper->exchangeId = pExchangeInfo->self;
2060 2061 2062
  pWrapper->sourceIndex = sourceIndex;

  pMsgSendInfo->param = pWrapper;
D
dapan1121 已提交
2063
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
2064 2065
  pMsgSendInfo->msgInfo.pData = pMsg;
  pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
L
Liu Jicong 已提交
2066
  pMsgSendInfo->msgType = pSource->fetchMsgType;
2067
  pMsgSendInfo->fp = loadRemoteDataCallback;
2068

2069
  int64_t transporterId = 0;
L
Liu Jicong 已提交
2070
  int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
2071 2072 2073
  return TSDB_CODE_SUCCESS;
}

2074
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
L
Liu Jicong 已提交
2075 2076
                                     int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
                                     SArray* pColList) {
H
Haojun Liao 已提交
2077
  if (pColList == NULL) {  // data from other sources
2078
    blockDataCleanup(pRes);
2079
    //    blockDataEnsureCapacity(pRes, numOfRows);
2080
    blockDecode(pRes, numOfOutput, numOfRows, pData);
H
Haojun Liao 已提交
2081
  } else {  // extract data according to pColList
2082
    ASSERT(numOfOutput == taosArrayGetSize(pColList));
2083 2084 2085 2086 2087
    char* pStart = pData;

    int32_t numOfCols = htonl(*(int32_t*)pStart);
    pStart += sizeof(int32_t);

2088
    // todo refactor:extract method
2089
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
dengyihao's avatar
dengyihao 已提交
2090
    for (int32_t i = 0; i < numOfCols; ++i) {
2091 2092 2093 2094 2095 2096 2097
      SSysTableSchema* p = (SSysTableSchema*)pStart;

      p->colId = htons(p->colId);
      p->bytes = htonl(p->bytes);
      pStart += sizeof(SSysTableSchema);
    }

2098
    SSDataBlock* pBlock = createDataBlock();
dengyihao's avatar
dengyihao 已提交
2099
    for (int32_t i = 0; i < numOfCols; ++i) {
2100 2101
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
      blockDataAppendColInfo(pBlock, &idata);
2102 2103
    }

2104
    blockDecode(pBlock, numOfCols, numOfRows, pStart);
2105 2106
    blockDataEnsureCapacity(pRes, numOfRows);

H
Haojun Liao 已提交
2107
    // data from mnode
2108
    pRes->info.rows = numOfRows;
2109 2110
    relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
    blockDataDestroy(pBlock);
2111
  }
2112

2113 2114
  // todo move this to time window aggregator, since the primary timestamp may not be known by exchange operator.
  blockDataUpdateTsWindow(pRes, 0);
2115

2116
  int64_t el = taosGetTimestampUs() - startTs;
2117

H
Haojun Liao 已提交
2118 2119
  pLoadInfo->totalRows += numOfRows;
  pLoadInfo->totalSize += compLen;
2120

H
Haojun Liao 已提交
2121 2122 2123
  if (total != NULL) {
    *total += numOfRows;
  }
2124

H
Haojun Liao 已提交
2125
  pLoadInfo->totalElapsed += el;
2126 2127
  return TSDB_CODE_SUCCESS;
}
2128

L
Liu Jicong 已提交
2129 2130
static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
2131
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
H
Haojun Liao 已提交
2132

2133
  int64_t              el = taosGetTimestampUs() - startTs;
H
Haojun Liao 已提交
2134
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2135

H
Haojun Liao 已提交
2136
  pLoadInfo->totalElapsed += el;
H
Haojun Liao 已提交
2137

2138
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
L
Liu Jicong 已提交
2139 2140 2141
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
         pLoadInfo->totalElapsed / 1000.0);
2142 2143 2144 2145 2146

  doSetOperatorCompleted(pOperator);
  return NULL;
}

L
Liu Jicong 已提交
2147 2148
static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
                                                   SExecTaskInfo* pTaskInfo) {
2149 2150 2151 2152 2153 2154 2155 2156
  int32_t code = 0;
  int64_t startTs = taosGetTimestampUs();
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);

  while (1) {
    int32_t completed = 0;
    for (int32_t i = 0; i < totalSources; ++i) {
      SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
2157
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
2158
        completed += 1;
H
Haojun Liao 已提交
2159 2160
        continue;
      }
2161

2162
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
2163 2164 2165
        continue;
      }

2166 2167 2168 2169 2170
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
        code = pDataInfo->code;
        goto _error;
      }

L
Liu Jicong 已提交
2171
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
X
Xiaoyu Wang 已提交
2172
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, i);
2173

L
Liu Jicong 已提交
2174
      SSDataBlock*         pRes = pExchangeInfo->pResult;
H
Haojun Liao 已提交
2175
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2176
      if (pRsp->numOfRows == 0) {
2177 2178
        qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64
               ", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu,
D
dapan1121 已提交
2179
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows,
2180
               pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources);
2181
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2182
        completed += 1;
D
dapan1121 已提交
2183
        taosMemoryFreeClear(pDataInfo->pRsp);
2184 2185
        continue;
      }
H
Haojun Liao 已提交
2186

H
Haojun Liao 已提交
2187
      SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
L
Liu Jicong 已提交
2188 2189 2190
      code =
          extractDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
                                       pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
2191
      if (code != 0) {
2192
        taosMemoryFreeClear(pDataInfo->pRsp);
2193 2194 2195
        goto _error;
      }

2196
      if (pRsp->completed == 1) {
2197 2198
        qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
               " execId:%d"
X
Xiaoyu Wang 已提交
2199 2200
               " index:%d completed, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64
               ", completed:%d try next %d/%" PRIzu,
2201 2202
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRes->info.rows,
               pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, completed + 1, i + 1, totalSources);
2203
        completed += 1;
2204
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2205
      } else {
D
dapan1121 已提交
2206
        qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64
dengyihao's avatar
dengyihao 已提交
2207
               ", totalBytes:%" PRIu64,
2208 2209
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows,
               pLoadInfo->totalRows, pLoadInfo->totalSize);
2210 2211
      }

2212 2213
      taosMemoryFreeClear(pDataInfo->pRsp);

2214 2215
      if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED) {
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
2216 2217
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
        if (code != TSDB_CODE_SUCCESS) {
2218
          taosMemoryFreeClear(pDataInfo->pRsp);
2219 2220 2221 2222 2223 2224 2225
          goto _error;
        }
      }

      return pExchangeInfo->pResult;
    }

2226
    if (completed == totalSources) {
2227 2228
      return setAllSourcesCompleted(pOperator, startTs);
    }
H
Haojun Liao 已提交
2229 2230

    sched_yield();
2231 2232 2233 2234 2235 2236 2237
  }

_error:
  pTaskInfo->code = code;
  return NULL;
}

L
Liu Jicong 已提交
2238 2239 2240
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2241

L
Liu Jicong 已提交
2242
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2243 2244 2245
  int64_t startTs = taosGetTimestampUs();

  // Asynchronously send all fetch requests to all sources.
L
Liu Jicong 已提交
2246
  for (int32_t i = 0; i < totalSources; ++i) {
2247 2248
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2249 2250
      pTaskInfo->code = code;
      return code;
2251 2252 2253 2254
    }
  }

  int64_t endTs = taosGetTimestampUs();
2255
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
X
Xiaoyu Wang 已提交
2256
         totalSources, (endTs - startTs) / 1000.0);
2257

2258
  pOperator->status = OP_RES_TO_RETURN;
H
Haojun Liao 已提交
2259
  pOperator->cost.openCost = taosGetTimestampUs() - startTs;
2260

2261
  tsem_wait(&pExchangeInfo->ready);
H
Haojun Liao 已提交
2262
  return TSDB_CODE_SUCCESS;
2263 2264
}

L
Liu Jicong 已提交
2265 2266 2267
static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2268

L
Liu Jicong 已提交
2269
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2270
  int64_t startTs = taosGetTimestampUs();
2271

L
Liu Jicong 已提交
2272
  while (1) {
2273 2274
    if (pExchangeInfo->current >= totalSources) {
      return setAllSourcesCompleted(pOperator, startTs);
2275
    }
2276

2277 2278 2279
    doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
    tsem_wait(&pExchangeInfo->ready);

dengyihao's avatar
dengyihao 已提交
2280
    SSourceDataInfo*       pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
X
Xiaoyu Wang 已提交
2281
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
2282

H
Haojun Liao 已提交
2283
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
2284 2285
      qError("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d error happens, code:%s", GET_TASKID(pTaskInfo),
             pSource->addr.nodeId, pSource->taskId, pSource->execId, tstrerror(pDataInfo->code));
H
Haojun Liao 已提交
2286 2287 2288 2289
      pOperator->pTaskInfo->code = pDataInfo->code;
      return NULL;
    }

L
Liu Jicong 已提交
2290
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
H
Haojun Liao 已提交
2291
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2292
    if (pRsp->numOfRows == 0) {
2293 2294
      qDebug("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d %d of total completed, rowsOfSource:%" PRIu64
             ", totalRows:%" PRIu64 " try next",
D
dapan1121 已提交
2295
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pExchangeInfo->current + 1,
H
Haojun Liao 已提交
2296
             pDataInfo->totalRows, pLoadInfo->totalRows);
H
Haojun Liao 已提交
2297

2298
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2299
      pExchangeInfo->current += 1;
D
dapan1121 已提交
2300
      taosMemoryFreeClear(pDataInfo->pRsp);
2301 2302
      continue;
    }
H
Haojun Liao 已提交
2303

L
Liu Jicong 已提交
2304
    SSDataBlock*       pRes = pExchangeInfo->pResult;
H
Haojun Liao 已提交
2305
    SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
L
Liu Jicong 已提交
2306
    int32_t            code =
2307
        extractDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
L
Liu Jicong 已提交
2308
                                     pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
2309 2310

    if (pRsp->completed == 1) {
D
dapan1121 已提交
2311
      qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64
L
Liu Jicong 已提交
2312
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
2313 2314 2315
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows,
             pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1,
             totalSources);
2316

2317
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2318 2319
      pExchangeInfo->current += 1;
    } else {
D
dapan1121 已提交
2320
      qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64
L
Liu Jicong 已提交
2321
             ", totalBytes:%" PRIu64,
2322 2323
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows,
             pLoadInfo->totalRows, pLoadInfo->totalSize);
2324 2325
    }

2326
    pOperator->resultInfo.totalRows += pRes->info.rows;
2327
    taosMemoryFreeClear(pDataInfo->pRsp);
2328 2329
    return pExchangeInfo->pResult;
  }
2330 2331
}

L
Liu Jicong 已提交
2332
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
2333
  if (OPTR_IS_OPENED(pOperator)) {
H
Haojun Liao 已提交
2334 2335 2336
    return TSDB_CODE_SUCCESS;
  }

2337 2338
  int64_t st = taosGetTimestampUs();

L
Liu Jicong 已提交
2339
  SExchangeInfo* pExchangeInfo = pOperator->info;
2340
  if (!pExchangeInfo->seqLoadData) {
H
Haojun Liao 已提交
2341 2342 2343 2344 2345 2346
    int32_t code = prepareConcurrentlyLoad(pOperator);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
  }

2347
  OPTR_SET_OPENED(pOperator);
2348
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
2349 2350 2351
  return TSDB_CODE_SUCCESS;
}

2352
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2353 2354
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2355

2356
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
2357
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2358 2359
    return NULL;
  }
2360

L
Liu Jicong 已提交
2361
  size_t               totalSources = taosArrayGetSize(pExchangeInfo->pSources);
H
Haojun Liao 已提交
2362
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
H
Haojun Liao 已提交
2363

2364
  if (pOperator->status == OP_EXEC_DONE) {
L
Liu Jicong 已提交
2365 2366 2367
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
           pLoadInfo->totalElapsed / 1000.0);
2368 2369 2370 2371 2372 2373
    return NULL;
  }

  if (pExchangeInfo->seqLoadData) {
    return seqLoadRemoteData(pOperator);
  } else {
2374
    return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
2375
  }
H
Haojun Liao 已提交
2376
}
2377

2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415
static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  while(1) {
    SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
    if (pBlock == NULL) {
      return NULL;
    }

    ASSERT(pBlock == pExchangeInfo->pResult);

    SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
    if (hasLimitOffsetInfo(pLimitInfo)) {
      int32_t status = handleLimitOffset(pOperator, pLimitInfo, pExchangeInfo->pResult, false);
      if (status == PROJECT_RETRIEVE_CONTINUE) {
        continue;
      } else if (status == PROJECT_RETRIEVE_DONE) {
        size_t rows = pExchangeInfo->pResult->info.rows;
        pExchangeInfo->limitInfo.numOfOutputRows += rows;

        if (rows == 0) {
          doSetOperatorCompleted(pOperator);
          return NULL;
        } else {
          return pExchangeInfo->pResult;
        }
      }
    } else {
      return pExchangeInfo->pResult;
    }
  }
}

2416
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
2417
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
H
Haojun Liao 已提交
2418 2419
  if (pInfo->pSourceDataInfo == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
2420 2421
  }

L
Liu Jicong 已提交
2422
  for (int32_t i = 0; i < numOfSources; ++i) {
2423
    SSourceDataInfo dataInfo = {0};
H
Haojun Liao 已提交
2424
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
2425
    dataInfo.taskId = id;
L
Liu Jicong 已提交
2426
    dataInfo.index = i;
X
Xiaoyu Wang 已提交
2427
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
2428
    if (pDs == NULL) {
H
Haojun Liao 已提交
2429 2430 2431 2432 2433 2434 2435 2436
      taosArrayDestroy(pInfo->pSourceDataInfo);
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
}

2437
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
2438
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
H
Haojun Liao 已提交
2439

2440
  if (numOfSources == 0) {
X
Xiaoyu Wang 已提交
2441
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
2442 2443 2444
    return TSDB_CODE_INVALID_PARA;
  }

H
Haojun Liao 已提交
2445
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
wmmhello's avatar
wmmhello 已提交
2446
  if (pInfo->pSources == NULL) {
2447
    return TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
2448 2449
  }

L
Liu Jicong 已提交
2450
  for (int32_t i = 0; i < numOfSources; ++i) {
D
dapan1121 已提交
2451
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
H
Haojun Liao 已提交
2452 2453
    taosArrayPush(pInfo->pSources, pNode);
  }
2454

2455
  initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
2456 2457
  pInfo->self = taosAddRef(exchangeObjRefPool, pInfo);

2458
  return initDataSource(numOfSources, pInfo, id);
2459 2460 2461 2462 2463 2464
}

SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo) {
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
2465
    goto _error;
2466
  }
H
Haojun Liao 已提交
2467

2468
  int32_t code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
2469 2470 2471
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2472 2473

  tsem_init(&pInfo->ready, 0, 0);
2474

2475
  pInfo->seqLoadData = false;
2476
  pInfo->pTransporter = pTransporter;
2477 2478
  pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc);
  pOperator->name = "ExchangeOperator";
X
Xiaoyu Wang 已提交
2479
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
X
Xiaoyu Wang 已提交
2480
  pOperator->blocking = false;
2481 2482
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
2483
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pResult->pDataBlock);
X
Xiaoyu Wang 已提交
2484
  pOperator->pTaskInfo = pTaskInfo;
2485

L
Liu Jicong 已提交
2486 2487
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
                                         destroyExchangeOperatorInfo, NULL, NULL, NULL);
2488
  return pOperator;
H
Haojun Liao 已提交
2489

L
Liu Jicong 已提交
2490
_error:
H
Haojun Liao 已提交
2491
  if (pInfo != NULL) {
2492
    doDestroyExchangeOperatorInfo(pInfo);
H
Haojun Liao 已提交
2493 2494
  }

wafwerar's avatar
wafwerar 已提交
2495
  taosMemoryFreeClear(pOperator);
2496
  pTaskInfo->code = code;
H
Haojun Liao 已提交
2497
  return NULL;
2498 2499
}

dengyihao's avatar
dengyihao 已提交
2500 2501
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                                const char* pKey);
2502

2503
static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
2504
  SSortedMergeOperatorInfo* pInfo = (SSortedMergeOperatorInfo*)param;
H
Haojun Liao 已提交
2505
  taosArrayDestroy(pInfo->pSortInfo);
2506 2507 2508
  taosArrayDestroy(pInfo->groupInfo);

  if (pInfo->pSortHandle != NULL) {
H
Haojun Liao 已提交
2509
    tsortDestroySortHandle(pInfo->pSortHandle);
2510 2511
  }

H
Haojun Liao 已提交
2512
  blockDataDestroy(pInfo->binfo.pRes);
H
Haojun Liao 已提交
2513
  cleanupAggSup(&pInfo->aggSup);
L
Liu Jicong 已提交
2514

D
dapan1121 已提交
2515
  taosMemoryFreeClear(param);
2516
}
H
Haojun Liao 已提交
2517

L
Liu Jicong 已提交
2518
static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) {
2519 2520 2521 2522
  size_t size = taosArrayGetSize(groupInfo);
  if (size == 0) {
    return true;
  }
2523

2524 2525
  for (int32_t i = 0; i < size; ++i) {
    int32_t* index = taosArrayGet(groupInfo, i);
2526

2527
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index);
L
Liu Jicong 已提交
2528
    bool             isNull = colDataIsNull(pColInfo, rowIndex, pBlock->info.rows, NULL);
2529

2530 2531 2532
    if ((isNull && buf[i] != NULL) || (!isNull && buf[i] == NULL)) {
      return false;
    }
2533

2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546
    char* pCell = colDataGetData(pColInfo, rowIndex);
    if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
      if (varDataLen(pCell) != varDataLen(buf[i])) {
        return false;
      } else {
        if (memcmp(varDataVal(pCell), varDataVal(buf[i]), varDataLen(pCell)) != 0) {
          return false;
        }
      }
    } else {
      if (memcmp(pCell, buf[i], pColInfo->info.bytes) != 0) {
        return false;
      }
2547 2548 2549
    }
  }

2550
  return 0;
2551 2552
}

L
Liu Jicong 已提交
2553 2554 2555
static void doMergeResultImpl(SSortedMergeOperatorInfo* pInfo, SqlFunctionCtx* pCtx, int32_t numOfExpr,
                              int32_t rowIndex) {
  for (int32_t j = 0; j < numOfExpr; ++j) {  // TODO set row index
X
Xiaoyu Wang 已提交
2556
                                             //    pCtx[j].startRow = rowIndex;
2557 2558
  }

2559 2560
  for (int32_t j = 0; j < numOfExpr; ++j) {
    int32_t functionId = pCtx[j].functionId;
L
Liu Jicong 已提交
2561 2562 2563 2564 2565 2566 2567 2568 2569
    //    pCtx[j].fpSet->addInput(&pCtx[j]);

    //    if (functionId < 0) {
    //      SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
    //      doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
    //    } else {
    //      assert(!TSDB_FUNC_IS_SCALAR(functionId));
    //      aAggs[functionId].mergeFunc(&pCtx[j]);
    //    }
2570
  }
2571
}
2572

L
Liu Jicong 已提交
2573 2574
static void doFinalizeResultImpl(SqlFunctionCtx* pCtx, int32_t numOfExpr) {
  for (int32_t j = 0; j < numOfExpr; ++j) {
2575 2576 2577 2578
    int32_t functionId = pCtx[j].functionId;
    //    if (functionId == FUNC_TAG_DUMMY || functionId == FUNC_TS_DUMMY) {
    //      continue;
    //    }
2579

2580 2581 2582 2583
    //    if (functionId < 0) {
    //      SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
    //      doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
    //    } else {
dengyihao's avatar
dengyihao 已提交
2584
    //    pCtx[j].fpSet.finalize(&pCtx[j]);
2585 2586
  }
}
2587

2588
static bool saveCurrentTuple(char** rowColData, SArray* pColumnList, SSDataBlock* pBlock, int32_t rowIndex) {
L
Liu Jicong 已提交
2589
  int32_t size = (int32_t)taosArrayGetSize(pColumnList);
2590

L
Liu Jicong 已提交
2591 2592
  for (int32_t i = 0; i < size; ++i) {
    int32_t*         index = taosArrayGet(pColumnList, i);
2593
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index);
H
Haojun Liao 已提交
2594

2595 2596 2597
    char* data = colDataGetData(pColInfo, rowIndex);
    memcpy(rowColData[i], data, colDataGetLength(pColInfo, rowIndex));
  }
2598

2599 2600
  return true;
}
2601

2602 2603
static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock* pBlock) {
  SSortedMergeOperatorInfo* pInfo = pOperator->info;
2604

2605
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
2606

L
Liu Jicong 已提交
2607
  for (int32_t i = 0; i < pBlock->info.rows; ++i) {
2608 2609 2610 2611 2612 2613 2614 2615 2616
    if (!pInfo->hasGroupVal) {
      ASSERT(i == 0);
      doMergeResultImpl(pInfo, pCtx, numOfExpr, i);
      pInfo->hasGroupVal = saveCurrentTuple(pInfo->groupVal, pInfo->groupInfo, pBlock, i);
    } else {
      if (needToMerge(pBlock, pInfo->groupInfo, pInfo->groupVal, i)) {
        doMergeResultImpl(pInfo, pCtx, numOfExpr, i);
      } else {
        doFinalizeResultImpl(pCtx, numOfExpr);
2617 2618
        int32_t numOfRows = getNumOfResult(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL);
        //        setTagValueForMultipleRows(pCtx, pOperator->exprSupp.numOfExprs, numOfRows);
2619

2620
        // TODO check for available buffer;
H
Haojun Liao 已提交
2621

2622 2623 2624 2625 2626
        // next group info data
        pInfo->binfo.pRes->info.rows += numOfRows;
        for (int32_t j = 0; j < numOfExpr; ++j) {
          if (pCtx[j].functionId < 0) {
            continue;
2627
          }
2628

H
Haojun Liao 已提交
2629
          pCtx[j].fpSet.process(&pCtx[j]);
2630
        }
2631 2632 2633

        doMergeResultImpl(pInfo, pCtx, numOfExpr, i);
        pInfo->hasGroupVal = saveCurrentTuple(pInfo->groupVal, pInfo->groupInfo, pBlock, i);
H
Haojun Liao 已提交
2634
      }
2635 2636 2637 2638
    }
  }
}

2639 2640
static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
  SSortedMergeOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
2641
  SSortHandle*              pHandle = pInfo->pSortHandle;
2642

2643
  SSDataBlock* pDataBlock = createOneDataBlock(pInfo->binfo.pRes, false);
2644
  blockDataEnsureCapacity(pDataBlock, pOperator->resultInfo.capacity);
2645

L
Liu Jicong 已提交
2646
  while (1) {
2647
    blockDataCleanup(pDataBlock);
2648
    while (1) {
H
Haojun Liao 已提交
2649
      STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2650 2651
      if (pTupleHandle == NULL) {
        break;
2652
      }
2653

2654 2655
      // build datablock for merge for one group
      appendOneRowToDataBlock(pDataBlock, pTupleHandle);
2656
      if (pDataBlock->info.rows >= pOperator->resultInfo.capacity) {
2657 2658
        break;
      }
2659
    }
2660

2661 2662 2663
    if (pDataBlock->info.rows == 0) {
      break;
    }
2664

2665
    setInputDataBlock(pOperator, pOperator->exprSupp.pCtx, pDataBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
L
Liu Jicong 已提交
2666 2667
    //  updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor,
    //  pOperator->pRuntimeEnv, true);
2668
    doMergeImpl(pOperator, pOperator->exprSupp.numOfExprs, pDataBlock);
2669 2670
    // flush to tuple store, and after all data have been handled, return to upstream node or sink node
  }
2671

2672 2673 2674
  doFinalizeResultImpl(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs);
  int32_t numOfRows = getNumOfResult(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL);
  //        setTagValueForMultipleRows(pCtx, pOperator->exprSupp.numOfExprs, numOfRows);
2675

2676
  // TODO check for available buffer;
2677

2678 2679
  // next group info data
  pInfo->binfo.pRes->info.rows += numOfRows;
L
Liu Jicong 已提交
2680
  return (pInfo->binfo.pRes->info.rows > 0) ? pInfo->binfo.pRes : NULL;
2681
}
2682

L
Liu Jicong 已提交
2683 2684
SSDataBlock* getSortedMergeBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
                                     SArray* pColMatchInfo, SSortedMergeOperatorInfo* pInfo) {
2685 2686 2687 2688 2689 2690 2691 2692 2693 2694
  blockDataCleanup(pDataBlock);

  SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
  if (p == NULL) {
    return NULL;
  }

  blockDataEnsureCapacity(p, capacity);

  while (1) {
2695
    STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2696 2697 2698 2699
    if (pTupleHandle == NULL) {
      break;
    }

2700
    appendOneRowToDataBlock(p, pTupleHandle);
2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713
    if (p->info.rows >= capacity) {
      break;
    }
  }

  if (p->info.rows > 0) {
    int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
    for (int32_t i = 0; i < numOfCols; ++i) {
      SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i);
      ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID);

      SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
      SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId);
2714
      colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
2715 2716 2717 2718 2719 2720 2721 2722 2723 2724
    }

    pDataBlock->info.rows = p->info.rows;
    pDataBlock->info.capacity = p->info.rows;
  }

  blockDataDestroy(p);
  return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
}

2725
static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) {
2726 2727
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
2728 2729
  }

L
Liu Jicong 已提交
2730
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
2731
  SSortedMergeOperatorInfo* pInfo = pOperator->info;
H
Haojun Liao 已提交
2732
  if (pOperator->status == OP_RES_TO_RETURN) {
2733
    return getSortedMergeBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, NULL, pInfo);
2734 2735
  }

2736
  int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
L
Liu Jicong 已提交
2737 2738
  pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
                                             pInfo->binfo.pRes, "GET_TASKID(pTaskInfo)");
H
Haojun Liao 已提交
2739

2740
  tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
2741

L
Liu Jicong 已提交
2742
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
wmmhello's avatar
wmmhello 已提交
2743
    SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
H
Haojun Liao 已提交
2744
    ps->param = pOperator->pDownstream[i];
H
Haojun Liao 已提交
2745
    tsortAddSource(pInfo->pSortHandle, ps);
2746 2747
  }

H
Haojun Liao 已提交
2748
  int32_t code = tsortOpen(pInfo->pSortHandle);
2749
  if (code != TSDB_CODE_SUCCESS) {
2750
    longjmp(pTaskInfo->env, terrno);
2751 2752
  }

H
Haojun Liao 已提交
2753
  pOperator->status = OP_RES_TO_RETURN;
2754
  return doMerge(pOperator);
2755
}
2756

L
Liu Jicong 已提交
2757 2758
static int32_t initGroupCol(SExprInfo* pExprInfo, int32_t numOfCols, SArray* pGroupInfo,
                            SSortedMergeOperatorInfo* pInfo) {
2759 2760
  if (pGroupInfo == NULL || taosArrayGetSize(pGroupInfo) == 0) {
    return 0;
H
Haojun Liao 已提交
2761 2762
  }

2763 2764 2765 2766 2767 2768 2769 2770
  int32_t len = 0;
  SArray* plist = taosArrayInit(3, sizeof(SColumn));
  pInfo->groupInfo = taosArrayInit(3, sizeof(int32_t));

  if (plist == NULL || pInfo->groupInfo == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

L
Liu Jicong 已提交
2771 2772
  size_t numOfGroupCol = taosArrayGetSize(pInfo->groupInfo);
  for (int32_t i = 0; i < numOfGroupCol; ++i) {
2773
    SColumn* pCol = taosArrayGet(pGroupInfo, i);
L
Liu Jicong 已提交
2774
    for (int32_t j = 0; j < numOfCols; ++j) {
H
Haojun Liao 已提交
2775
      SExprInfo* pe = &pExprInfo[j];
2776
      if (pe->base.resSchema.slotId == pCol->colId) {
2777 2778
        taosArrayPush(plist, pCol);
        taosArrayPush(pInfo->groupInfo, &j);
H
Haojun Liao 已提交
2779
        len += pCol->bytes;
2780 2781
        break;
      }
H
Haojun Liao 已提交
2782 2783 2784
    }
  }

2785
  ASSERT(taosArrayGetSize(pGroupInfo) == taosArrayGetSize(plist));
H
Haojun Liao 已提交
2786

wafwerar's avatar
wafwerar 已提交
2787
  pInfo->groupVal = taosMemoryCalloc(1, (POINTER_BYTES * numOfGroupCol + len));
2788 2789 2790 2791
  if (pInfo->groupVal == NULL) {
    taosArrayDestroy(plist);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Haojun Liao 已提交
2792

2793
  int32_t offset = 0;
L
Liu Jicong 已提交
2794 2795
  char*   start = (char*)(pInfo->groupVal + (POINTER_BYTES * numOfGroupCol));
  for (int32_t i = 0; i < numOfGroupCol; ++i) {
2796 2797
    pInfo->groupVal[i] = start + offset;
    SColumn* pCol = taosArrayGet(plist, i);
H
Haojun Liao 已提交
2798
    offset += pCol->bytes;
2799
  }
H
Haojun Liao 已提交
2800

2801
  taosArrayDestroy(plist);
H
Haojun Liao 已提交
2802

2803 2804
  return TSDB_CODE_SUCCESS;
}
H
Haojun Liao 已提交
2805

L
Liu Jicong 已提交
2806 2807 2808
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo,
                                             int32_t num, SArray* pSortInfo, SArray* pGroupInfo,
                                             SExecTaskInfo* pTaskInfo) {
wafwerar's avatar
wafwerar 已提交
2809
  SSortedMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortedMergeOperatorInfo));
L
Liu Jicong 已提交
2810
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2811
  if (pInfo == NULL || pOperator == NULL) {
2812
    goto _error;
2813
  }
H
Haojun Liao 已提交
2814

2815 2816 2817 2818 2819
  int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, num);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

2820
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
H
Haojun Liao 已提交
2821

2822
  if (pOperator->exprSupp.pCtx == NULL || pInfo->binfo.pRes == NULL) {
2823 2824
    goto _error;
  }
H
Haojun Liao 已提交
2825

2826
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
2827
  code = doInitAggInfoSup(&pInfo->aggSup, pOperator->exprSupp.pCtx, num, keyBufSize, pTaskInfo->id.str);
2828 2829 2830
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
2831

2832
  setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, num);
H
Haojun Liao 已提交
2833
  code = initGroupCol(pExprInfo, num, pGroupInfo, pInfo);
2834 2835 2836
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
2837

L
Liu Jicong 已提交
2838 2839 2840 2841 2842
  //  pInfo->resultRowFactor = (int32_t)(getRowNumForMultioutput(pRuntimeEnv->pQueryAttr,
  //      pRuntimeEnv->pQueryAttr->topBotQuery, false));
  pInfo->sortBufSize = 1024 * 16;  // 1MB
  pInfo->bufPageSize = 1024;
  pInfo->pSortInfo = pSortInfo;
H
Haojun Liao 已提交
2843

2844
  pOperator->resultInfo.capacity = blockDataGetCapacityInRow(pInfo->binfo.pRes, pInfo->bufPageSize);
H
Haojun Liao 已提交
2845

L
Liu Jicong 已提交
2846
  pOperator->name = "SortedMerge";
X
Xiaoyu Wang 已提交
2847
  // pOperator->operatorType = OP_SortedMerge;
2848 2849 2850
  pOperator->blocking = true;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
L
Liu Jicong 已提交
2851
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
2852

2853 2854
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSortedMerge, NULL, NULL, destroySortedMergeOperatorInfo,
                                         NULL, NULL, NULL);
2855 2856 2857
  code = appendDownstream(pOperator, downstream, numOfDownstream);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
2858
  }
H
Haojun Liao 已提交
2859

2860
  return pOperator;
H
Haojun Liao 已提交
2861

L
Liu Jicong 已提交
2862
_error:
2863
  if (pInfo != NULL) {
H
Haojun Liao 已提交
2864
    destroySortedMergeOperatorInfo(pInfo, num);
H
Haojun Liao 已提交
2865 2866
  }

wafwerar's avatar
wafwerar 已提交
2867 2868
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
2869 2870
  terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
  return NULL;
H
Haojun Liao 已提交
2871 2872
}

X
Xiaoyu Wang 已提交
2873
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) {
2874
  // todo add more information about exchange operation
2875
  int32_t type = pOperator->operatorType;
X
Xiaoyu Wang 已提交
2876
  if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN ||
2877
      type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN ||
2878
      type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN) {
2879 2880 2881
    *order = TSDB_ORDER_ASC;
    *scanFlag = MAIN_SCAN;
    return TSDB_CODE_SUCCESS;
2882
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
2883 2884 2885 2886 2887
    STableScanInfo* pTableScanInfo = pOperator->info;
    *order = pTableScanInfo->cond.order;
    *scanFlag = pTableScanInfo->scanFlag;
    return TSDB_CODE_SUCCESS;
  } else {
H
Haojun Liao 已提交
2888
    if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
2889
      return TSDB_CODE_INVALID_PARA;
H
Haojun Liao 已提交
2890
    } else {
2891
      return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag);
2892 2893 2894
    }
  }
}
L
Liu Jicong 已提交
2895
#if 0
L
Liu Jicong 已提交
2896
int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
L
Liu Jicong 已提交
2897
  uint8_t type = pOperator->operatorType;
2898 2899 2900

  pOperator->status = OP_OPENED;

L
Liu Jicong 已提交
2901
  if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
2902
    SStreamScanInfo* pScanInfo = pOperator->info;
L
Liu Jicong 已提交
2903
    pScanInfo->blockType = STREAM_INPUT__TABLE_SCAN;
2904

2905
    pScanInfo->pTableScanOp->status = OP_OPENED;
2906

2907
    STableScanInfo* pInfo = pScanInfo->pTableScanOp->info;
2908 2909
    ASSERT(pInfo->scanMode == TABLE_SCAN__TABLE_ORDER);

L
Liu Jicong 已提交
2910 2911 2912 2913
    if (uid == 0) {
      pInfo->noTable = 1;
      return TSDB_CODE_SUCCESS;
    }
2914 2915 2916 2917 2918 2919

    /*if (pSnapShotScanInfo->dataReader == NULL) {*/
    /*pSnapShotScanInfo->dataReader = tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, 0, 0);*/
    /*pSnapShotScanInfo->scanMode = TABLE_SCAN__TABLE_ORDER;*/
    /*}*/

L
Liu Jicong 已提交
2920 2921
    pInfo->noTable = 0;

2922
    if (pInfo->lastStatus.uid != uid || pInfo->lastStatus.ts != ts) {
L
Liu Jicong 已提交
2923 2924 2925 2926 2927 2928 2929 2930 2931 2932 2933
      SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

      int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
      bool    found = false;
      for (int32_t i = 0; i < tableSz; i++) {
        STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
        if (pTableInfo->uid == uid) {
          found = true;
          pInfo->currentTable = i;
        }
      }
2934
      // TODO after processing drop, found can be false
L
Liu Jicong 已提交
2935
      ASSERT(found);
2936 2937

      tsdbSetTableId(pInfo->dataReader, uid);
H
Haojun Liao 已提交
2938 2939 2940 2941
      int64_t oldSkey = pInfo->cond.twindows.skey;
      pInfo->cond.twindows.skey = ts + 1;
      tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
      pInfo->cond.twindows.skey = oldSkey;
2942 2943
      pInfo->scanTimes = 0;

S
Shengliang Guan 已提交
2944
      qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, ts,
L
Liu Jicong 已提交
2945
             pInfo->currentTable, tableSz);
L
Liu Jicong 已提交
2946
    }
L
Liu Jicong 已提交
2947

L
Liu Jicong 已提交
2948
    return TSDB_CODE_SUCCESS;
2949

L
Liu Jicong 已提交
2950
  } else {
2951 2952 2953 2954 2955
    if (pOperator->numOfDownstream == 1) {
      return doPrepareScan(pOperator->pDownstream[0], uid, ts);
    } else if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator to set the input data block");
      return TSDB_CODE_QRY_APP_ERROR;
L
Liu Jicong 已提交
2956
    } else {
2957 2958
      qError("join not supported for stream block scan");
      return TSDB_CODE_QRY_APP_ERROR;
L
Liu Jicong 已提交
2959 2960 2961 2962
    }
  }
}

2963 2964 2965
int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) {
  int32_t type = pOperator->operatorType;
  if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
2966 2967
    SStreamScanInfo* pScanInfo = pOperator->info;
    STableScanInfo*  pSnapShotScanInfo = pScanInfo->pTableScanOp->info;
L
Liu Jicong 已提交
2968 2969
    *uid = pSnapShotScanInfo->lastStatus.uid;
    *ts = pSnapShotScanInfo->lastStatus.ts;
2970 2971 2972 2973 2974 2975 2976 2977 2978 2979
  } else {
    if (pOperator->pDownstream[0] == NULL) {
      return TSDB_CODE_INVALID_PARA;
    } else {
      doGetScanStatus(pOperator->pDownstream[0], uid, ts);
    }
  }

  return TSDB_CODE_SUCCESS;
}
L
Liu Jicong 已提交
2980
#endif
2981

2982
// this is a blocking operator
L
Liu Jicong 已提交
2983
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
2984 2985
  if (OPTR_IS_OPENED(pOperator)) {
    return TSDB_CODE_SUCCESS;
2986 2987
  }

H
Haojun Liao 已提交
2988
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
2989
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
2990

2991 2992
  SExprSupp*     pSup = &pOperator->exprSupp;
  SOperatorInfo* downstream = pOperator->pDownstream[0];
2993

2994 2995
  int64_t st = taosGetTimestampUs();

2996 2997 2998
  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;

H
Haojun Liao 已提交
2999
  while (1) {
3000
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
3001 3002 3003 3004
    if (pBlock == NULL) {
      break;
    }

3005 3006 3007 3008
    int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, code);
    }
3009

3010
    // there is an scalar expression that needs to be calculated before apply the group aggregation.
3011 3012 3013
    if (pAggInfo->scalarExprSup.pExprInfo != NULL) {
      SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
      code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
3014
      if (code != TSDB_CODE_SUCCESS) {
3015
        longjmp(pTaskInfo->env, code);
3016
      }
3017 3018
    }

3019
    // the pDataBlock are always the same one, no need to call this again
3020 3021
    setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId, pAggInfo);
    setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, true);
3022
    code = doAggregateImpl(pOperator, pSup->pCtx);
3023 3024 3025
    if (code != 0) {
      longjmp(pTaskInfo->env, code);
    }
3026 3027
  }

H
Haojun Liao 已提交
3028
  closeAllResultRows(&pAggInfo->binfo.resultRowInfo);
3029
  initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
H
Haojun Liao 已提交
3030
  OPTR_SET_OPENED(pOperator);
3031

3032
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
3033 3034 3035
  return TSDB_CODE_SUCCESS;
}

3036
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
3037
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
3038 3039 3040 3041 3042 3043
  SOptrBasicInfo*   pInfo = &pAggInfo->binfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

L
Liu Jicong 已提交
3044
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3045
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
H
Haojun Liao 已提交
3046
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
3047
    doSetOperatorCompleted(pOperator);
H
Haojun Liao 已提交
3048 3049 3050
    return NULL;
  }

H
Haojun Liao 已提交
3051
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
S
slzhou 已提交
3052 3053 3054 3055 3056 3057 3058 3059
  while (1) {
    doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
    doFilter(pAggInfo->pCondition, pInfo->pRes);

    if (!hasDataInGroupInfo(&pAggInfo->groupResInfo)) {
      doSetOperatorCompleted(pOperator);
      break;
    }
3060

S
slzhou 已提交
3061 3062 3063 3064
    if (pInfo->pRes->info.rows > 0) {
      break;
    }
  }
3065
  size_t rows = blockDataGetNumOfRows(pInfo->pRes);
3066 3067
  pOperator->resultInfo.totalRows += rows;

3068
  return (rows == 0) ? NULL : pInfo->pRes;
3069 3070
}

wmmhello's avatar
wmmhello 已提交
3071
int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length) {
3072
  if (result == NULL || length == NULL) {
wmmhello's avatar
wmmhello 已提交
3073 3074 3075
    return TSDB_CODE_TSC_INVALID_INPUT;
  }
  SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
3076 3077 3078 3079 3080
  SAggSupporter*  pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
  int32_t         size = taosHashGetSize(pSup->pResultRowHashTable);
  size_t          keyLen = sizeof(uint64_t) * 2;  // estimate the key length
  int32_t         totalSize =
      sizeof(int32_t) + sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize);
wmmhello's avatar
wmmhello 已提交
3081

C
Cary Xu 已提交
3082 3083 3084 3085 3086 3087
  // no result
  if (getTotalBufSize(pSup->pResultBuf) == 0) {
    *result = NULL;
    *length = 0;
    return TSDB_CODE_SUCCESS;
  }
3088

wmmhello's avatar
wmmhello 已提交
3089
  *result = (char*)taosMemoryCalloc(1, totalSize);
L
Liu Jicong 已提交
3090
  if (*result == NULL) {
wmmhello's avatar
wmmhello 已提交
3091
    return TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
3092
  }
wmmhello's avatar
wmmhello 已提交
3093

wmmhello's avatar
wmmhello 已提交
3094
  int32_t offset = sizeof(int32_t);
wmmhello's avatar
wmmhello 已提交
3095 3096
  *(int32_t*)(*result + offset) = size;
  offset += sizeof(int32_t);
3097 3098

  // prepare memory
3099
  SResultRowPosition* pos = &pInfo->resultRowInfo.cur;
dengyihao's avatar
dengyihao 已提交
3100 3101
  void*               pPage = getBufPage(pSup->pResultBuf, pos->pageId);
  SResultRow*         pRow = (SResultRow*)((char*)pPage + pos->offset);
3102 3103 3104
  setBufPageDirty(pPage, true);
  releaseBufPage(pSup->pResultBuf, pPage);

dengyihao's avatar
dengyihao 已提交
3105
  void* pIter = taosHashIterate(pSup->pResultRowHashTable, NULL);
wmmhello's avatar
wmmhello 已提交
3106
  while (pIter) {
dengyihao's avatar
dengyihao 已提交
3107
    void*               key = taosHashGetKey(pIter, &keyLen);
3108
    SResultRowPosition* p1 = (SResultRowPosition*)pIter;
3109

dengyihao's avatar
dengyihao 已提交
3110
    pPage = (SFilePage*)getBufPage(pSup->pResultBuf, p1->pageId);
3111
    pRow = (SResultRow*)((char*)pPage + p1->offset);
3112 3113
    setBufPageDirty(pPage, true);
    releaseBufPage(pSup->pResultBuf, pPage);
wmmhello's avatar
wmmhello 已提交
3114 3115 3116

    // recalculate the result size
    int32_t realTotalSize = offset + sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize;
L
Liu Jicong 已提交
3117
    if (realTotalSize > totalSize) {
wmmhello's avatar
wmmhello 已提交
3118
      char* tmp = (char*)taosMemoryRealloc(*result, realTotalSize);
L
Liu Jicong 已提交
3119
      if (tmp == NULL) {
wafwerar's avatar
wafwerar 已提交
3120
        taosMemoryFree(*result);
wmmhello's avatar
wmmhello 已提交
3121
        *result = NULL;
wmmhello's avatar
wmmhello 已提交
3122
        return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
3123
      } else {
wmmhello's avatar
wmmhello 已提交
3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135
        *result = tmp;
      }
    }
    // save key
    *(int32_t*)(*result + offset) = keyLen;
    offset += sizeof(int32_t);
    memcpy(*result + offset, key, keyLen);
    offset += keyLen;

    // save value
    *(int32_t*)(*result + offset) = pSup->resultRowSize;
    offset += sizeof(int32_t);
3136
    memcpy(*result + offset, pRow, pSup->resultRowSize);
wmmhello's avatar
wmmhello 已提交
3137 3138 3139 3140 3141
    offset += pSup->resultRowSize;

    pIter = taosHashIterate(pSup->pResultRowHashTable, pIter);
  }

wmmhello's avatar
wmmhello 已提交
3142 3143 3144 3145
  *(int32_t*)(*result) = offset;
  *length = offset;

  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
3146 3147
}

3148
int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) {
3149
  if (result == NULL) {
wmmhello's avatar
wmmhello 已提交
3150
    return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
3151
  }
wmmhello's avatar
wmmhello 已提交
3152
  SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
3153
  SAggSupporter*  pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
wmmhello's avatar
wmmhello 已提交
3154 3155

  //  int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
3156
  int32_t length = *(int32_t*)(result);
wmmhello's avatar
wmmhello 已提交
3157
  int32_t offset = sizeof(int32_t);
3158 3159 3160 3161

  int32_t count = *(int32_t*)(result + offset);
  offset += sizeof(int32_t);

L
Liu Jicong 已提交
3162
  while (count-- > 0 && length > offset) {
wmmhello's avatar
wmmhello 已提交
3163 3164 3165
    int32_t keyLen = *(int32_t*)(result + offset);
    offset += sizeof(int32_t);

L
Liu Jicong 已提交
3166
    uint64_t    tableGroupId = *(uint64_t*)(result + offset);
3167
    SResultRow* resultRow = getNewResultRow(pSup->pResultBuf, tableGroupId, pSup->resultRowSize);
L
Liu Jicong 已提交
3168
    if (!resultRow) {
wmmhello's avatar
wmmhello 已提交
3169
      return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
3170
    }
3171

wmmhello's avatar
wmmhello 已提交
3172
    // add a new result set for a new group
3173 3174
    SResultRowPosition pos = {.pageId = resultRow->pageId, .offset = resultRow->offset};
    taosHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &pos, sizeof(SResultRowPosition));
wmmhello's avatar
wmmhello 已提交
3175 3176 3177

    offset += keyLen;
    int32_t valueLen = *(int32_t*)(result + offset);
L
Liu Jicong 已提交
3178
    if (valueLen != pSup->resultRowSize) {
wmmhello's avatar
wmmhello 已提交
3179
      return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
3180 3181 3182 3183 3184 3185 3186 3187 3188 3189
    }
    offset += sizeof(int32_t);
    int32_t pageId = resultRow->pageId;
    int32_t pOffset = resultRow->offset;
    memcpy(resultRow, result + offset, valueLen);
    resultRow->pageId = pageId;
    resultRow->offset = pOffset;
    offset += valueLen;

    initResultRow(resultRow);
dengyihao's avatar
dengyihao 已提交
3190
    pInfo->resultRowInfo.cur = (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset};
wmmhello's avatar
wmmhello 已提交
3191 3192
  }

L
Liu Jicong 已提交
3193
  if (offset != length) {
wmmhello's avatar
wmmhello 已提交
3194
    return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
3195
  }
wmmhello's avatar
wmmhello 已提交
3196
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
3197 3198
}

3199 3200 3201 3202 3203
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
  if (pLimitInfo->remainGroupOffset > 0) {
    if (pLimitInfo->currentGroupId == 0) {  // it is the first group
      pLimitInfo->currentGroupId = pBlock->info.groupId;
      blockDataCleanup(pBlock);
3204
      return PROJECT_RETRIEVE_CONTINUE;
3205 3206 3207
    } else if (pLimitInfo->currentGroupId != pBlock->info.groupId) {
      // now it is the data from a new group
      pLimitInfo->remainGroupOffset -= 1;
3208 3209

      // ignore data block in current group
3210 3211
      if (pLimitInfo->remainGroupOffset > 0) {
        blockDataCleanup(pBlock);
3212 3213 3214 3215 3216
        return PROJECT_RETRIEVE_CONTINUE;
      }
    }

    // set current group id of the project operator
3217
    pLimitInfo->currentGroupId = pBlock->info.groupId;
3218 3219
  }

3220 3221 3222
  if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.groupId) {
    pLimitInfo->numOfOutputGroups += 1;
    if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
3223
      pOperator->status = OP_EXEC_DONE;
3224
      blockDataCleanup(pBlock);
3225 3226 3227 3228 3229

      return PROJECT_RETRIEVE_DONE;
    }

    // reset the value for a new group data
3230 3231
    pLimitInfo->numOfOutputRows = 0;
    pLimitInfo->remainOffset = pLimitInfo->limit.offset;
3232 3233 3234 3235 3236
  }

  // here we reach the start position, according to the limit/offset requirements.

  // set current group id
3237
  pLimitInfo->currentGroupId = pBlock->info.groupId;
3238

3239 3240 3241
  if (pLimitInfo->remainOffset >= pBlock->info.rows) {
    pLimitInfo->remainOffset -= pBlock->info.rows;
    blockDataCleanup(pBlock);
3242
    return PROJECT_RETRIEVE_CONTINUE;
3243 3244 3245
  } else if (pLimitInfo->remainOffset < pBlock->info.rows && pLimitInfo->remainOffset > 0) {
    blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset);
    pLimitInfo->remainOffset = 0;
3246 3247
  }

3248
  // check for the limitation in each group
3249 3250 3251 3252
  if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) {
    int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows);
    blockDataKeepFirstNRows(pBlock, keepRows);
    if (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups) {
3253 3254 3255
      pOperator->status = OP_EXEC_DONE;
    }

3256
    return PROJECT_RETRIEVE_DONE;
3257
  }
3258

3259
  // todo optimize performance
3260 3261
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
  // they may not belong to the same group the limit/offset value is not valid in this case.
3262 3263
  if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || pLimitInfo->slimit.offset != -1 ||
      pLimitInfo->slimit.limit != -1) {
3264
    return PROJECT_RETRIEVE_DONE;
L
Liu Jicong 已提交
3265
  } else {  // not full enough, continue to accumulate the output data in the buffer.
3266 3267 3268 3269
    return PROJECT_RETRIEVE_CONTINUE;
  }
}

3270
static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
3271
  SProjectOperatorInfo* pProjectInfo = pOperator->info;
L
Liu Jicong 已提交
3272
  SOptrBasicInfo*       pInfo = &pProjectInfo->binfo;
3273

L
Liu Jicong 已提交
3274
  SExprSupp*   pSup = &pOperator->exprSupp;
3275
  SSDataBlock* pRes = pInfo->pRes;
3276
  blockDataCleanup(pRes);
3277

3278
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3279
  if (pOperator->status == OP_EXEC_DONE) {
L
Liu Jicong 已提交
3280 3281 3282 3283
    if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
      pOperator->status = OP_OPENED;
      return NULL;
    }
3284 3285
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
3286

H
Haojun Liao 已提交
3287
#if 0
3288 3289 3290 3291 3292
  if (pProjectInfo->existDataBlock) {  // TODO refactor
    SSDataBlock* pBlock = pProjectInfo->existDataBlock;
    pProjectInfo->existDataBlock = NULL;

    // the pDataBlock are always the same one, no need to call this again
H
Haojun Liao 已提交
3293
    setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC);
3294

H
Haojun Liao 已提交
3295
    blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
3296
    projectApplyFunctions(pOperator->exprSupp.pExprInfo, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->exprSupp.numOfExprs);
L
Liu Jicong 已提交
3297
    if (pRes->info.rows >= pProjectInfo->binfo.capacity * 0.8) {
3298 3299
      copyTsColoum(pRes, pInfo->pCtx, pOperator->exprSupp.numOfExprs);
      resetResultRowEntryResult(pInfo->pCtx, pOperator->exprSupp.numOfExprs);
3300 3301 3302
      return pRes;
    }
  }
H
Haojun Liao 已提交
3303
#endif
3304

3305
  int64_t st = 0;
3306 3307 3308
  int32_t order = 0;
  int32_t scanFlag = 0;

3309 3310 3311 3312
  if (pOperator->cost.openCost == 0) {
    st = taosGetTimestampUs();
  }

H
Haojun Liao 已提交
3313 3314
  SOperatorInfo* downstream = pOperator->pDownstream[0];

L
Liu Jicong 已提交
3315
  while (1) {
H
Haojun Liao 已提交
3316
    // The downstream exec may change the value of the newgroup, so use a local variable instead.
L
Liu Jicong 已提交
3317
    qDebug("projection call next");
3318
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
3319
    if (pBlock == NULL) {
L
Liu Jicong 已提交
3320 3321 3322
      qDebug("projection get null");

      /*if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH) {*/
3323
      doSetOperatorCompleted(pOperator);
L
Liu Jicong 已提交
3324 3325
      /*} else if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {*/
      /*pOperator->status = OP_RES_TO_RETURN;*/
L
Liu Jicong 已提交
3326
      /*}*/
3327 3328
      break;
    }
3329 3330 3331 3332
    if (pBlock->info.type == STREAM_RETRIEVE) {
      // for stream interval
      return pBlock;
    }
3333 3334

    // the pDataBlock are always the same one, no need to call this again
3335
    int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag);
3336 3337 3338
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, code);
    }
3339

3340
    setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
3341 3342
    blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);

3343
    code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
X
Xiaoyu Wang 已提交
3344
                                 pProjectInfo->pPseudoColInfo);
3345 3346
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, code);
3347 3348
    }

3349
    int32_t status = handleLimitOffset(pOperator, &pProjectInfo->limitInfo, pInfo->pRes, true);
3350 3351 3352 3353

    // filter shall be applied after apply functions and limit/offset on the result
    doFilter(pProjectInfo->pFilterNode, pInfo->pRes);

5
54liuyao 已提交
3354 3355 3356 3357
    if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
      break;
    }

3358
    if (status == PROJECT_RETRIEVE_CONTINUE || pInfo->pRes->info.rows == 0) {
H
Haojun Liao 已提交
3359
      continue;
L
Liu Jicong 已提交
3360
    } else if (status == PROJECT_RETRIEVE_DONE) {
3361 3362 3363
      break;
    }
  }
dengyihao's avatar
dengyihao 已提交
3364

3365
  size_t rows = pInfo->pRes->info.rows;
3366 3367
  pProjectInfo->limitInfo.numOfOutputRows += rows;

3368 3369 3370
  pOperator->resultInfo.totalRows += rows;

  if (pOperator->cost.openCost == 0) {
3371
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
3372 3373
  }

3374
  return (rows > 0) ? pInfo->pRes : NULL;
3375 3376
}

H
Haojun Liao 已提交
3377
static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo,
L
Liu Jicong 已提交
3378
                                               SExecTaskInfo* pTaskInfo) {
3379
  pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
H
Haojun Liao 已提交
3380

L
Liu Jicong 已提交
3381 3382
  int64_t ekey =
      Q_STATUS_EQUAL(pTaskInfo->status, TASK_COMPLETED) ? pInfo->win.ekey : pInfo->existNewGroupBlock->info.window.ekey;
3383 3384
  taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));

3385
  taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
3386 3387
  taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);

H
Haojun Liao 已提交
3388 3389 3390
  int32_t numOfResultRows = pResultInfo->capacity - pInfo->pRes->info.rows;
  taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pRes, numOfResultRows);

3391
  pInfo->curGroupId = pInfo->existNewGroupBlock->info.groupId;
3392 3393 3394
  pInfo->existNewGroupBlock = NULL;
}

H
Haojun Liao 已提交
3395
static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo,
L
Liu Jicong 已提交
3396
                                            SExecTaskInfo* pTaskInfo) {
3397
  if (taosFillHasMoreResults(pInfo->pFillInfo)) {
H
Haojun Liao 已提交
3398 3399 3400
    int32_t numOfResultRows = pResultInfo->capacity - pInfo->pRes->info.rows;
    taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pRes, numOfResultRows);
    if (pInfo->pRes->info.rows > pResultInfo->threshold) {
3401 3402 3403 3404 3405 3406
      return;
    }
  }

  // handle the cached new group data block
  if (pInfo->existNewGroupBlock) {
H
Haojun Liao 已提交
3407
    doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, pTaskInfo);
3408 3409 3410
  }
}

S
slzhou 已提交
3411
static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
3412 3413
  SFillOperatorInfo* pInfo = pOperator->info;
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
3414

H
Haojun Liao 已提交
3415
  SResultInfo* pResultInfo = &pOperator->resultInfo;
3416 3417 3418
  SSDataBlock* pResBlock = pInfo->pRes;

  blockDataCleanup(pResBlock);
3419

H
Haojun Liao 已提交
3420 3421
  doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo);
  if (pResBlock->info.rows > pResultInfo->threshold || pResBlock->info.rows > 0) {
3422
    return pResBlock;
H
Haojun Liao 已提交
3423
  }
3424

H
Haojun Liao 已提交
3425
  SOperatorInfo* pDownstream = pOperator->pDownstream[0];
L
Liu Jicong 已提交
3426
  while (1) {
3427
    SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream);
3428 3429 3430 3431 3432
    if (pBlock == NULL) {
      if (pInfo->totalInputRows == 0) {
        pOperator->status = OP_EXEC_DONE;
        return NULL;
      }
3433

3434
      taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
3435
    } else {
3436 3437 3438 3439
      blockDataUpdateTsWindow(pBlock, pInfo->primaryTsCol);

      if (pInfo->curGroupId == 0 || pInfo->curGroupId == pBlock->info.groupId) {
        pInfo->curGroupId = pBlock->info.groupId;   // the first data block
3440 3441

        pInfo->totalInputRows += pBlock->info.rows;
3442

3443 3444
        taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey);
        taosFillSetInputDataBlock(pInfo->pFillInfo, pBlock);
3445 3446 3447 3448 3449 3450
      } else if (pInfo->curGroupId != pBlock->info.groupId) { // the new group data block
        pInfo->existNewGroupBlock = pBlock;

        // Fill the previous group data block, before handle the data block of new group.
        // Close the fill operation for previous group data block
        taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
3451 3452 3453
      }
    }

3454
    blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
3455

3456 3457
    int32_t numOfResultRows = pOperator->resultInfo.capacity - pResBlock->info.rows;
    taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows);
3458 3459

    // current group has no more result to return
3460
    if (pResBlock->info.rows > 0) {
3461 3462
      // 1. The result in current group not reach the threshold of output result, continue
      // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
3463
      if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || pInfo->existNewGroupBlock != NULL) {
3464
        return pResBlock;
3465 3466
      }

H
Haojun Liao 已提交
3467
      doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo);
3468
      if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) {
3469
        return pResBlock;
3470 3471 3472
      }
    } else if (pInfo->existNewGroupBlock) {  // try next group
      assert(pBlock != NULL);
H
Haojun Liao 已提交
3473
      doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, pTaskInfo);
3474 3475
      if (pResBlock->info.rows > pResultInfo->threshold) {
        return pResBlock;
3476 3477 3478 3479 3480 3481 3482
      }
    } else {
      return NULL;
    }
  }
}

S
slzhou 已提交
3483 3484 3485 3486 3487 3488 3489 3490
static SSDataBlock* doFill(SOperatorInfo* pOperator) {
  SFillOperatorInfo* pInfo = pOperator->info;
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

S
slzhou 已提交
3491
  SSDataBlock* fillResult = NULL;
S
slzhou 已提交
3492
  while (true) {
S
slzhou 已提交
3493
    fillResult = doFillImpl(pOperator);
S
slzhou 已提交
3494 3495 3496 3497 3498 3499 3500 3501 3502 3503 3504 3505 3506 3507
    if (fillResult != NULL) {
      doFilter(pInfo->pCondition, fillResult);
    }

    if (fillResult == NULL) {
      doSetOperatorCompleted(pOperator);
      break;
    }

    if (fillResult->info.rows > 0) {
      break;
    }
  }

S
slzhou 已提交
3508 3509 3510 3511
  if (fillResult != NULL) {
    size_t rows = fillResult->info.rows;
    pOperator->resultInfo.totalRows += rows;
  }
S
slzhou 已提交
3512

S
slzhou 已提交
3513
  return fillResult;
S
slzhou 已提交
3514 3515
}

H
Haojun Liao 已提交
3516 3517 3518
static void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
  for (int32_t i = 0; i < numOfExprs; ++i) {
    SExprInfo* pExprInfo = &pExpr[i];
H
Haojun Liao 已提交
3519 3520 3521 3522
    for(int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
        taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
      }
H
Haojun Liao 已提交
3523
    }
H
Haojun Liao 已提交
3524

H
Haojun Liao 已提交
3525
    taosMemoryFree(pExprInfo->base.pParam);
H
Haojun Liao 已提交
3526 3527 3528 3529
    taosMemoryFree(pExprInfo->pExpr);
  }
}

3530 3531 3532 3533 3534
static void destroyOperatorInfo(SOperatorInfo* pOperator) {
  if (pOperator == NULL) {
    return;
  }

3535
  if (pOperator->fpSet.closeFn != NULL) {
3536
    pOperator->fpSet.closeFn(pOperator->info, pOperator->exprSupp.numOfExprs);
3537 3538
  }

H
Haojun Liao 已提交
3539
  if (pOperator->pDownstream != NULL) {
L
Liu Jicong 已提交
3540
    for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
H
Haojun Liao 已提交
3541
      destroyOperatorInfo(pOperator->pDownstream[i]);
3542 3543
    }

wafwerar's avatar
wafwerar 已提交
3544
    taosMemoryFreeClear(pOperator->pDownstream);
H
Haojun Liao 已提交
3545
    pOperator->numOfDownstream = 0;
3546 3547
  }

3548
  cleanupExprSupp(&pOperator->exprSupp);
wafwerar's avatar
wafwerar 已提交
3549
  taosMemoryFreeClear(pOperator);
3550 3551
}

3552 3553 3554 3555 3556 3557 3558 3559 3560 3561 3562 3563 3564 3565 3566
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) {
  *defaultPgsz = 4096;
  while (*defaultPgsz < rowSize * 4) {
    *defaultPgsz <<= 1u;
  }

  // at least four pages need to be in buffer
  *defaultBufsz = 4096 * 256;
  if ((*defaultBufsz) <= (*defaultPgsz)) {
    (*defaultBufsz) = (*defaultPgsz) * 4;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
3567 3568
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                         const char* pKey) {
3569 3570
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);

dengyihao's avatar
dengyihao 已提交
3571 3572
  pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
  pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
3573 3574
  pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);

H
Haojun Liao 已提交
3575
  if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
3576 3577 3578
    return TSDB_CODE_OUT_OF_MEMORY;
  }

dengyihao's avatar
dengyihao 已提交
3579
  uint32_t defaultPgsz = 0;
3580 3581
  uint32_t defaultBufsz = 0;
  getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
H
Haojun Liao 已提交
3582

3583
  int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, TD_TMP_DIR_PATH);
H
Haojun Liao 已提交
3584 3585 3586 3587
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

3588 3589 3590
  return TSDB_CODE_SUCCESS;
}

3591
void cleanupAggSup(SAggSupporter* pAggSup) {
wafwerar's avatar
wafwerar 已提交
3592
  taosMemoryFreeClear(pAggSup->keyBuf);
3593
  taosHashCleanup(pAggSup->pResultRowHashTable);
H
Haojun Liao 已提交
3594
  destroyDiskbasedBuf(pAggSup->pResultBuf);
3595 3596
}

L
Liu Jicong 已提交
3597 3598
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
                    const char* pkey) {
3599 3600 3601 3602 3603
  int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

3604
  doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
L
Liu Jicong 已提交
3605
  for (int32_t i = 0; i < numOfCols; ++i) {
3606
    pSup->pCtx[i].pBuf = pAggSup->pResultBuf;
3607 3608
  }

3609
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
3610 3611
}

3612
void initResultSizeInfo(SResultInfo * pResultInfo, int32_t numOfRows) {
wmmhello's avatar
wmmhello 已提交
3613
  ASSERT(numOfRows != 0);
3614 3615
  pResultInfo->capacity = numOfRows;
  pResultInfo->threshold = numOfRows * 0.75;
3616

3617 3618
  if (pResultInfo->threshold == 0) {
    pResultInfo->threshold = numOfRows;
3619 3620 3621
  }
}

3622 3623 3624 3625 3626
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
  pInfo->pRes = pBlock;
  initResultRowInfo(&pInfo->resultRowInfo);
}

3627 3628 3629 3630 3631 3632 3633 3634 3635 3636 3637 3638 3639 3640 3641 3642 3643 3644 3645
static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
  if (pCtx == NULL) {
    return NULL;
  }

  for (int32_t i = 0; i < numOfOutput; ++i) {
    for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) {
      taosVariantDestroy(&pCtx[i].param[j].param);
    }

    taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
    taosMemoryFree(pCtx[i].input.pData);
    taosMemoryFree(pCtx[i].input.pColumnDataAgg);
  }

  taosMemoryFreeClear(pCtx);
  return NULL;
}

3646
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) {
3647 3648 3649 3650
  pSup->pExprInfo = pExprInfo;
  pSup->numOfExprs = numOfExpr;
  if (pSup->pExprInfo != NULL) {
    pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset);
3651 3652 3653
    if (pSup->pCtx == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
3654
  }
3655 3656

  return TSDB_CODE_SUCCESS;
3657 3658
}

3659 3660 3661 3662 3663 3664 3665 3666 3667 3668
void cleanupExprSupp(SExprSupp* pSupp) {
  destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs);
  if (pSupp->pExprInfo != NULL) {
    destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
  }

  taosMemoryFreeClear(pSupp->pExprInfo);
  taosMemoryFree(pSupp->rowEntryInfoOffset);
}

L
Liu Jicong 已提交
3669
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
S
slzhou 已提交
3670
                                           SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo,
wmmhello's avatar
wmmhello 已提交
3671
                                           int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) {
wafwerar's avatar
wafwerar 已提交
3672
  SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
L
Liu Jicong 已提交
3673
  SOperatorInfo*    pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
3674 3675 3676
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
H
Haojun Liao 已提交
3677

3678
  int32_t numOfRows = 1024;
dengyihao's avatar
dengyihao 已提交
3679
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
3680

3681
  initResultSizeInfo(&pOperator->resultInfo, numOfRows);
3682
  int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
L
Liu Jicong 已提交
3683
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3684 3685
    goto _error;
  }
H
Haojun Liao 已提交
3686

3687
  initBasicInfo(&pInfo->binfo, pResultBlock);
3688 3689 3690 3691
  code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3692

L
Liu Jicong 已提交
3693
  pInfo->groupId = INT32_MIN;
S
slzhou 已提交
3694
  pInfo->pCondition = pCondition;
dengyihao's avatar
dengyihao 已提交
3695
  pOperator->name = "TableAggregate";
3696
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
3697
  pOperator->blocking = true;
dengyihao's avatar
dengyihao 已提交
3698 3699 3700
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
3701

3702 3703
  pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo,
                                         aggEncodeResultRow, aggDecodeResultRow, NULL);
H
Haojun Liao 已提交
3704 3705 3706 3707 3708

  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3709 3710

  return pOperator;
L
Liu Jicong 已提交
3711
_error:
H
Haojun Liao 已提交
3712
  destroyAggOperatorInfo(pInfo, numOfCols);
wafwerar's avatar
wafwerar 已提交
3713 3714
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
H
Haojun Liao 已提交
3715 3716
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
3717 3718
}

3719
void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
3720 3721
  assert(pInfo != NULL);
  cleanupResultRowInfo(&pInfo->resultRowInfo);
H
Haojun Liao 已提交
3722
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
3723 3724
}

H
Haojun Liao 已提交
3725
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
3726
  SOptrBasicInfo* pInfo = (SOptrBasicInfo*)param;
3727
  cleanupBasicInfo(pInfo);
L
Liu Jicong 已提交
3728

D
dapan1121 已提交
3729
  taosMemoryFreeClear(param);
3730
}
H
Haojun Liao 已提交
3731

H
Haojun Liao 已提交
3732 3733 3734 3735 3736 3737 3738 3739

static void freeItem(void* pItem) {
  void** p = pItem;
  if (*p != NULL) {
    taosMemoryFreeClear(*p);
  }
}

H
Haojun Liao 已提交
3740
void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
3741
  SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
L
Liu Jicong 已提交
3742 3743
  cleanupBasicInfo(&pInfo->binfo);

H
Haojun Liao 已提交
3744
  cleanupAggSup(&pInfo->aggSup);
H
Haojun Liao 已提交
3745
  cleanupGroupResInfo(&pInfo->groupResInfo);
D
dapan1121 已提交
3746
  taosMemoryFreeClear(param);
3747
}
3748

H
Haojun Liao 已提交
3749
void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
3750
  SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param;
3751
  pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
H
Haojun Liao 已提交
3752
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
wafwerar's avatar
wafwerar 已提交
3753
  taosMemoryFreeClear(pInfo->p);
L
Liu Jicong 已提交
3754

D
dapan1121 已提交
3755
  taosMemoryFreeClear(param);
3756 3757
}

H
Haojun Liao 已提交
3758
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
D
fix bug  
dapan 已提交
3759 3760 3761
  if (NULL == param) {
    return;
  }
L
Liu Jicong 已提交
3762
  SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*)param;
3763
  cleanupBasicInfo(&pInfo->binfo);
H
Haojun Liao 已提交
3764
  cleanupAggSup(&pInfo->aggSup);
H
Haojun Liao 已提交
3765
  taosArrayDestroy(pInfo->pPseudoColInfo);
L
Liu Jicong 已提交
3766

D
dapan1121 已提交
3767
  taosMemoryFreeClear(param);
3768 3769
}

H
Haojun Liao 已提交
3770
static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) {
3771
  SIndefOperatorInfo* pInfo = (SIndefOperatorInfo*)param;
3772
  cleanupBasicInfo(&pInfo->binfo);
H
Haojun Liao 已提交
3773 3774 3775

  taosArrayDestroy(pInfo->pPseudoColInfo);
  cleanupAggSup(&pInfo->aggSup);
3776
  cleanupExprSupp(&pInfo->scalarSup);
L
Liu Jicong 已提交
3777

D
dapan1121 已提交
3778
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
3779 3780
}

H
Haojun Liao 已提交
3781
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
3782
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
3783 3784 3785 3786
  taosRemoveRef(exchangeObjRefPool, pExInfo->self);
}

void doDestroyExchangeOperatorInfo(void* param) {
X
Xiaoyu Wang 已提交
3787
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
3788

H
Haojun Liao 已提交
3789 3790 3791
  taosArrayDestroy(pExInfo->pSources);
  taosArrayDestroy(pExInfo->pSourceDataInfo);
  if (pExInfo->pResult != NULL) {
H
Haojun Liao 已提交
3792
    pExInfo->pResult = blockDataDestroy(pExInfo->pResult);
H
Haojun Liao 已提交
3793 3794 3795
  }

  tsem_destroy(&pExInfo->ready);
L
Liu Jicong 已提交
3796

D
dapan1121 已提交
3797
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
3798 3799
}

H
Haojun Liao 已提交
3800 3801
static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols) {
  SArray* pList = taosArrayInit(4, sizeof(int32_t));
dengyihao's avatar
dengyihao 已提交
3802
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
3803 3804 3805 3806 3807 3808 3809 3810
    if (fmIsPseudoColumnFunc(pCtx[i].functionId)) {
      taosArrayPush(pList, &i);
    }
  }

  return pList;
}

L
Liu Jicong 已提交
3811
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode,
dengyihao's avatar
dengyihao 已提交
3812
                                         SExecTaskInfo* pTaskInfo) {
wafwerar's avatar
wafwerar 已提交
3813
  SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
L
Liu Jicong 已提交
3814
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
3815 3816 3817
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
3818

L
Liu Jicong 已提交
3819
  int32_t    numOfCols = 0;
3820 3821 3822
  SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &numOfCols);

  SSDataBlock* pResBlock = createResDataBlock(pProjPhyNode->node.pOutputDataBlockDesc);
3823
  initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo);
3824

H
Haojun Liao 已提交
3825
  pInfo->binfo.pRes = pResBlock;
3826
  pInfo->pFilterNode = pProjPhyNode->node.pConditions;
H
Haojun Liao 已提交
3827 3828

  int32_t numOfRows = 4096;
dengyihao's avatar
dengyihao 已提交
3829
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
3830

3831 3832 3833 3834 3835
  // Make sure the size of SSDataBlock will never exceed the size of 2MB.
  int32_t TWOMB = 2 * 1024 * 1024;
  if (numOfRows * pResBlock->info.rowSize > TWOMB) {
    numOfRows = TWOMB / pResBlock->info.rowSize;
  }
3836
  initResultSizeInfo(&pOperator->resultInfo, numOfRows);
3837

3838 3839
  initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
  initBasicInfo(&pInfo->binfo, pResBlock);
3840
  setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols);
3841

3842
  pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols);
X
Xiaoyu Wang 已提交
3843
  pOperator->name = "ProjectOperator";
H
Haojun Liao 已提交
3844
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
X
Xiaoyu Wang 已提交
3845 3846 3847 3848
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
3849

L
Liu Jicong 已提交
3850 3851
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL,
                                         destroyProjectOperatorInfo, NULL, NULL, NULL);
L
Liu Jicong 已提交
3852

3853
  int32_t code = appendDownstream(pOperator, &downstream, 1);
H
Haojun Liao 已提交
3854
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3855 3856
    goto _error;
  }
3857 3858

  return pOperator;
H
Haojun Liao 已提交
3859

L
Liu Jicong 已提交
3860
_error:
H
Haojun Liao 已提交
3861 3862
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
3863 3864
}

3865 3866
static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOperatorInfo* downstream,
                              SExecTaskInfo* pTaskInfo) {
3867 3868 3869 3870 3871 3872 3873 3874 3875 3876 3877 3878 3879 3880 3881 3882 3883 3884 3885 3886 3887 3888 3889 3890 3891 3892 3893 3894 3895 3896 3897 3898 3899
  int32_t order = 0;
  int32_t scanFlag = 0;

  SIndefOperatorInfo* pIndefInfo = pOperator->info;
  SOptrBasicInfo*     pInfo = &pIndefInfo->binfo;
  SExprSupp*          pSup = &pOperator->exprSupp;

  // the pDataBlock are always the same one, no need to call this again
  int32_t code = getTableScanInfo(downstream, &order, &scanFlag);
  if (code != TSDB_CODE_SUCCESS) {
    longjmp(pTaskInfo->env, code);
  }

  // there is an scalar expression that needs to be calculated before apply the group aggregation.
  SExprSupp* pScalarSup = &pIndefInfo->scalarSup;
  if (pScalarSup->pExprInfo != NULL) {
    code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
                                 pIndefInfo->pPseudoColInfo);
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, code);
    }
  }

  setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
  blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);

  code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
                               pIndefInfo->pPseudoColInfo);
  if (code != TSDB_CODE_SUCCESS) {
    longjmp(pTaskInfo->env, code);
  }
}

H
Haojun Liao 已提交
3900 3901
static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
  SIndefOperatorInfo* pIndefInfo = pOperator->info;
3902
  SOptrBasicInfo*     pInfo = &pIndefInfo->binfo;
L
Liu Jicong 已提交
3903
  SExprSupp*          pSup = &pOperator->exprSupp;
H
Haojun Liao 已提交
3904 3905 3906 3907 3908 3909 3910 3911 3912 3913 3914 3915 3916 3917 3918 3919 3920

  SSDataBlock* pRes = pInfo->pRes;
  blockDataCleanup(pRes);

  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  int64_t st = 0;

  if (pOperator->cost.openCost == 0) {
    st = taosGetTimestampUs();
  }

  SOperatorInfo* downstream = pOperator->pDownstream[0];

3921
  while (1) {
3922
    // here we need to handle the existsed group results
3923
    if (pIndefInfo->pNextGroupRes != NULL) {  // todo extract method
3924 3925
      for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
        SqlFunctionCtx* pCtx = &pSup->pCtx[k];
H
Haojun Liao 已提交
3926

3927 3928 3929 3930 3931 3932 3933
        SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
        pResInfo->initialized = false;
        pCtx->pOutput = NULL;
      }

      doHandleDataBlock(pOperator, pIndefInfo->pNextGroupRes, downstream, pTaskInfo);
      pIndefInfo->pNextGroupRes = NULL;
H
Haojun Liao 已提交
3934 3935
    }

3936 3937 3938 3939 3940 3941 3942 3943 3944 3945 3946 3947 3948 3949 3950 3951 3952 3953 3954 3955 3956 3957 3958
    if (pInfo->pRes->info.rows < pOperator->resultInfo.threshold) {
      while (1) {
        // The downstream exec may change the value of the newgroup, so use a local variable instead.
        SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
        if (pBlock == NULL) {
          doSetOperatorCompleted(pOperator);
          break;
        }

        if (pIndefInfo->groupId == 0 && pBlock->info.groupId != 0) {
          pIndefInfo->groupId = pBlock->info.groupId;  // this is the initial group result
        } else {
          if (pIndefInfo->groupId != pBlock->info.groupId) {  // reset output buffer and computing status
            pIndefInfo->groupId = pBlock->info.groupId;
            pIndefInfo->pNextGroupRes = pBlock;
            break;
          }
        }

        doHandleDataBlock(pOperator, pBlock, downstream, pTaskInfo);
        if (pInfo->pRes->info.rows >= pOperator->resultInfo.threshold) {
          break;
        }
H
Haojun Liao 已提交
3959 3960 3961
      }
    }

3962 3963
    doFilter(pIndefInfo->pCondition, pInfo->pRes);
    size_t rows = pInfo->pRes->info.rows;
3964
    if (rows > 0 || pOperator->status == OP_EXEC_DONE) {
3965
      break;
3966 3967
    } else {
      blockDataCleanup(pInfo->pRes);
H
Haojun Liao 已提交
3968 3969 3970 3971 3972 3973 3974 3975 3976 3977 3978 3979 3980
    }
  }

  size_t rows = pInfo->pRes->info.rows;
  pOperator->resultInfo.totalRows += rows;

  if (pOperator->cost.openCost == 0) {
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
  }

  return (rows > 0) ? pInfo->pRes : NULL;
}

3981 3982
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode,
                                                 SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
3983
  SIndefOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIndefOperatorInfo));
3984
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
3985 3986 3987 3988
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

3989 3990
  SExprSupp* pSup = &pOperator->exprSupp;

H
Haojun Liao 已提交
3991 3992 3993
  SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode;

  int32_t    numOfExpr = 0;
X
Xiaoyu Wang 已提交
3994
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->pFuncs, NULL, &numOfExpr);
H
Haojun Liao 已提交
3995 3996

  if (pPhyNode->pExprs != NULL) {
3997
    int32_t    num = 0;
3998
    SExprInfo* pSExpr = createExprInfo(pPhyNode->pExprs, NULL, &num);
3999
    int32_t    code = initExprSupp(&pInfo->scalarSup, pSExpr, num);
4000 4001 4002
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
H
Haojun Liao 已提交
4003 4004
  }

4005
  SSDataBlock* pResBlock = createResDataBlock(pPhyNode->node.pOutputDataBlockDesc);
H
Haojun Liao 已提交
4006 4007 4008 4009 4010 4011 4012 4013 4014

  int32_t numOfRows = 4096;
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;

  // Make sure the size of SSDataBlock will never exceed the size of 2MB.
  int32_t TWOMB = 2 * 1024 * 1024;
  if (numOfRows * pResBlock->info.rowSize > TWOMB) {
    numOfRows = TWOMB / pResBlock->info.rowSize;
  }
4015

4016
  initResultSizeInfo(&pOperator->resultInfo, numOfRows);
H
Haojun Liao 已提交
4017

4018
  initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str);
4019 4020
  initBasicInfo(&pInfo->binfo, pResBlock);

4021
  setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
H
Haojun Liao 已提交
4022

4023 4024 4025
  pInfo->binfo.pRes = pResBlock;
  pInfo->pCondition = pPhyNode->node.pConditions;
  pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
H
Haojun Liao 已提交
4026

4027
  pOperator->name = "IndefinitOperator";
4028
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC;
4029 4030 4031
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
4032
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
4033 4034 4035 4036 4037 4038 4039 4040 4041 4042 4043

  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL,
                                         destroyIndefinitOperatorInfo, NULL, NULL, NULL);

  int32_t code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

4044
_error:
H
Haojun Liao 已提交
4045 4046 4047 4048 4049 4050
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}

4051
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SNodeListNode* pValNode,
L
Liu Jicong 已提交
4052
                            STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) {
4053
  SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pValNode);
H
Haojun Liao 已提交
4054

4055
  STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey);
4056
  w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC);
H
Haojun Liao 已提交
4057 4058

  int32_t order = TSDB_ORDER_ASC;
4059 4060
  pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval,
      fillType, pColInfo, pInfo->primaryTsCol, id);
H
Haojun Liao 已提交
4061

4062
  pInfo->win = win;
L
Liu Jicong 已提交
4063
  pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
4064

H
Haojun Liao 已提交
4065
  if (pInfo->pFillInfo == NULL || pInfo->p == NULL) {
H
Haojun Liao 已提交
4066 4067
    taosMemoryFree(pInfo->pFillInfo);
    taosMemoryFree(pInfo->p);
H
Haojun Liao 已提交
4068 4069 4070 4071 4072 4073
    return TSDB_CODE_OUT_OF_MEMORY;
  } else {
    return TSDB_CODE_SUCCESS;
  }
}

H
Haojun Liao 已提交
4074
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo) {
4075 4076 4077 4078 4079 4080
  SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

L
Liu Jicong 已提交
4081 4082 4083
  int32_t      num = 0;
  SSDataBlock* pResBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc);
  SExprInfo*   pExprInfo = createExprInfo(pPhyFillNode->pTargets, NULL, &num);
4084 4085 4086 4087
  SInterval*   pInterval =
      QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
            ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
            : &((SIntervalAggOperatorInfo*)downstream->info)->interval;
4088

4089
  int32_t type = convertFillType(pPhyFillNode->mode);
4090

H
Haojun Liao 已提交
4091
  SResultInfo* pResultInfo = &pOperator->resultInfo;
4092
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
4093
  pInfo->primaryTsCol = ((SColumnNode*)pPhyFillNode->pWStartTs)->slotId;
4094

4095
  int32_t numOfOutputCols = 0;
4096 4097
  SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pTargets, pPhyFillNode->node.pOutputDataBlockDesc,
                                                 &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
4098

4099 4100
  int32_t code = initFillInfo(pInfo, pExprInfo, num, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange,
                              pResultInfo->capacity, pTaskInfo->id.str, pInterval, type);
4101 4102 4103
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
4104

4105 4106 4107 4108 4109 4110 4111 4112
  pInfo->pRes = pResBlock;
  pInfo->pCondition = pPhyFillNode->node.pConditions;
  pInfo->pColMatchColInfo = pColMatchColInfo;
  pOperator->name = "FillOperator";
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
  pOperator->exprSupp.pExprInfo = pExprInfo;
4113
  pOperator->exprSupp.numOfExprs = num;
4114 4115
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
4116

L
Liu Jicong 已提交
4117 4118
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL);
4119

4120
  code = appendDownstream(pOperator, &downstream, 1);
4121
  return pOperator;
H
Haojun Liao 已提交
4122

L
Liu Jicong 已提交
4123
_error:
wafwerar's avatar
wafwerar 已提交
4124 4125
  taosMemoryFreeClear(pOperator);
  taosMemoryFreeClear(pInfo);
H
Haojun Liao 已提交
4126
  return NULL;
4127 4128
}

D
dapan1121 已提交
4129
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model, char* dbFName) {
wafwerar's avatar
wafwerar 已提交
4130
  SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
4131
  setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
4132

4133
  pTaskInfo->schemaInfo.dbname = strdup(dbFName);
4134
  pTaskInfo->cost.created = taosGetTimestampMs();
H
Haojun Liao 已提交
4135
  pTaskInfo->id.queryId = queryId;
dengyihao's avatar
dengyihao 已提交
4136
  pTaskInfo->execModel = model;
H
Haojun Liao 已提交
4137

wafwerar's avatar
wafwerar 已提交
4138
  char* p = taosMemoryCalloc(1, 128);
L
Liu Jicong 已提交
4139
  snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId);
H
Haojun Liao 已提交
4140
  pTaskInfo->id.str = p;
H
Haojun Liao 已提交
4141

4142 4143
  return pTaskInfo;
}
H
Haojun Liao 已提交
4144

H
Hongze Cheng 已提交
4145
static STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
H
Haojun Liao 已提交
4146
                                       STableListInfo* pTableListInfo, const char* idstr);
H
Haojun Liao 已提交
4147

H
Haojun Liao 已提交
4148
static SArray* extractColumnInfo(SNodeList* pNodeList);
4149

4150
int32_t extractTableSchemaInfo(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) {
4151 4152
  SMetaReader mr = {0};
  metaReaderInit(&mr, pHandle->meta, 0);
D
dapan1121 已提交
4153
  int32_t code = metaGetTableEntryByUid(&mr, uid);
4154
  if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
4155
    metaReaderClear(&mr);
4156
    return terrno;
D
dapan1121 已提交
4157
  }
4158

4159
  pTaskInfo->schemaInfo.tablename = strdup(mr.me.name);
4160 4161

  if (mr.me.type == TSDB_SUPER_TABLE) {
4162 4163
    pTaskInfo->schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
    pTaskInfo->schemaInfo.tversion = mr.me.stbEntry.schemaTag.version;
4164
  } else if (mr.me.type == TSDB_CHILD_TABLE) {
4165 4166
    tDecoderClear(&mr.coder);

4167 4168
    tb_uid_t suid = mr.me.ctbEntry.suid;
    metaGetTableEntryByUid(&mr, suid);
4169 4170
    pTaskInfo->schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
    pTaskInfo->schemaInfo.tversion = mr.me.stbEntry.schemaTag.version;
4171
  } else {
4172
    pTaskInfo->schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
4173
  }
4174 4175

  metaReaderClear(&mr);
D
dapan1121 已提交
4176
  return TSDB_CODE_SUCCESS;
4177 4178
}

4179 4180 4181
static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
  taosMemoryFreeClear(pSchemaInfo->dbname);
  if (pSchemaInfo->sw == NULL) {
4182 4183 4184
    return;
  }

4185 4186 4187
  taosMemoryFree(pSchemaInfo->tablename);
  taosMemoryFree(pSchemaInfo->sw->pSchema);
  taosMemoryFree(pSchemaInfo->sw);
4188 4189
}

4190
static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum) {
wmmhello's avatar
wmmhello 已提交
4191
  taosArrayClear(pTableListInfo->pGroupList);
4192 4193
  SArray* sortSupport = taosArrayInit(groupNum, sizeof(uint64_t));
  if (sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
4194 4195
  for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
4196
    uint64_t*      groupId = taosHashGet(pTableListInfo->map, &info->uid, sizeof(uint64_t));
wmmhello's avatar
wmmhello 已提交
4197 4198

    int32_t index = taosArraySearchIdx(sortSupport, groupId, compareUint64Val, TD_EQ);
4199 4200 4201 4202
    if (index == -1) {
      void*   p = taosArraySearch(sortSupport, groupId, compareUint64Val, TD_GT);
      SArray* tGroup = taosArrayInit(8, sizeof(STableKeyInfo));
      if (tGroup == NULL) {
wmmhello's avatar
wmmhello 已提交
4203 4204 4205
        taosArrayDestroy(sortSupport);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
4206
      if (taosArrayPush(tGroup, info) == NULL) {
wmmhello's avatar
wmmhello 已提交
4207 4208 4209 4210
        qError("taos push info array error");
        taosArrayDestroy(sortSupport);
        return TSDB_CODE_QRY_APP_ERROR;
      }
4211
      if (p == NULL) {
wmmhello's avatar
wmmhello 已提交
4212
        if (taosArrayPush(sortSupport, groupId) == NULL) {
wmmhello's avatar
wmmhello 已提交
4213 4214 4215 4216
          qError("taos push support array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
wmmhello's avatar
wmmhello 已提交
4217
        if (taosArrayPush(pTableListInfo->pGroupList, &tGroup) == NULL) {
wmmhello's avatar
wmmhello 已提交
4218 4219 4220 4221
          qError("taos push group array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
4222
      } else {
wmmhello's avatar
wmmhello 已提交
4223
        int32_t pos = TARRAY_ELEM_IDX(sortSupport, p);
4224
        if (taosArrayInsert(sortSupport, pos, groupId) == NULL) {
wmmhello's avatar
wmmhello 已提交
4225 4226 4227 4228
          qError("taos insert support array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
4229
        if (taosArrayInsert(pTableListInfo->pGroupList, pos, &tGroup) == NULL) {
wmmhello's avatar
wmmhello 已提交
4230 4231 4232 4233 4234
          qError("taos insert group array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
      }
4235
    } else {
wmmhello's avatar
wmmhello 已提交
4236
      SArray* tGroup = (SArray*)taosArrayGetP(pTableListInfo->pGroupList, index);
4237
      if (taosArrayPush(tGroup, info) == NULL) {
wmmhello's avatar
wmmhello 已提交
4238 4239 4240 4241 4242 4243 4244 4245 4246 4247
        qError("taos push uid array error");
        taosArrayDestroy(sortSupport);
        return TSDB_CODE_QRY_APP_ERROR;
      }
    }
  }
  taosArrayDestroy(sortSupport);
  return TDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
4248 4249
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group) {
  if (group == NULL) {
wmmhello's avatar
wmmhello 已提交
4250 4251 4252 4253 4254 4255 4256 4257
    return TDB_CODE_SUCCESS;
  }

  pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (pTableListInfo->map == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  int32_t keyLen = 0;
X
Xiaoyu Wang 已提交
4258
  void*   keyBuf = NULL;
wmmhello's avatar
wmmhello 已提交
4259

4260
  SNode* node;
wmmhello's avatar
wmmhello 已提交
4261
  FOREACH(node, group) {
4262
    SExprNode* pExpr = (SExprNode*)node;
wmmhello's avatar
wmmhello 已提交
4263
    keyLen += pExpr->resType.bytes;
wmmhello's avatar
wmmhello 已提交
4264 4265
  }

wmmhello's avatar
wmmhello 已提交
4266
  int32_t nullFlagSize = sizeof(int8_t) * LIST_LENGTH(group);
wmmhello's avatar
wmmhello 已提交
4267 4268 4269 4270 4271 4272 4273
  keyLen += nullFlagSize;

  keyBuf = taosMemoryCalloc(1, keyLen);
  if (keyBuf == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

4274
  int32_t groupNum = 0;
X
Xiaoyu Wang 已提交
4275 4276 4277
  for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
    SMetaReader    mr = {0};
wmmhello's avatar
wmmhello 已提交
4278 4279 4280
    metaReaderInit(&mr, pHandle->meta, 0);
    metaGetTableEntryByUid(&mr, info->uid);

4281
    SNodeList* groupNew = nodesCloneList(group);
wmmhello's avatar
wmmhello 已提交
4282

wmmhello's avatar
wmmhello 已提交
4283
    nodesRewriteExprsPostOrder(groupNew, doTranslateTagExpr, &mr);
wmmhello's avatar
wmmhello 已提交
4284
    char* isNull = (char*)keyBuf;
wmmhello's avatar
wmmhello 已提交
4285 4286
    char* pStart = (char*)keyBuf + nullFlagSize;

4287
    SNode*  pNode;
wmmhello's avatar
wmmhello 已提交
4288
    int32_t index = 0;
4289
    FOREACH(pNode, groupNew) {
wmmhello's avatar
wmmhello 已提交
4290 4291 4292 4293
      SNode*  pNew = NULL;
      int32_t code = scalarCalculateConstants(pNode, &pNew);
      if (TSDB_CODE_SUCCESS == code) {
        REPLACE_NODE(pNew);
X
Xiaoyu Wang 已提交
4294
      } else {
4295
        taosMemoryFree(keyBuf);
D
dapan1121 已提交
4296
        nodesDestroyList(groupNew);
4297
        metaReaderClear(&mr);
wmmhello's avatar
wmmhello 已提交
4298
        return code;
wmmhello's avatar
wmmhello 已提交
4299
      }
4300

wmmhello's avatar
wmmhello 已提交
4301
      ASSERT(nodeType(pNew) == QUERY_NODE_VALUE);
4302
      SValueNode* pValue = (SValueNode*)pNew;
4303

wmmhello's avatar
wmmhello 已提交
4304
      if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL || pValue->isNull) {
wmmhello's avatar
wmmhello 已提交
4305 4306 4307 4308
        isNull[index++] = 1;
        continue;
      } else {
        isNull[index++] = 0;
4309
        char* data = nodesGetValueFromNode(pValue);
L
Liu Jicong 已提交
4310 4311
        if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) {
          if (tTagIsJson(data)) {
wmmhello's avatar
wmmhello 已提交
4312 4313
            terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
            taosMemoryFree(keyBuf);
D
dapan1121 已提交
4314
            nodesDestroyList(groupNew);
4315
            metaReaderClear(&mr);
wmmhello's avatar
wmmhello 已提交
4316 4317
            return terrno;
          }
wmmhello's avatar
wmmhello 已提交
4318
          int32_t len = getJsonValueLen(data);
wmmhello's avatar
wmmhello 已提交
4319 4320 4321
          memcpy(pStart, data, len);
          pStart += len;
        } else if (IS_VAR_DATA_TYPE(pValue->node.resType.type)) {
wmmhello's avatar
wmmhello 已提交
4322 4323
          memcpy(pStart, data, varDataTLen(data));
          pStart += varDataTLen(data);
wmmhello's avatar
wmmhello 已提交
4324
        } else {
wmmhello's avatar
wmmhello 已提交
4325 4326
          memcpy(pStart, data, pValue->node.resType.bytes);
          pStart += pValue->node.resType.bytes;
wmmhello's avatar
wmmhello 已提交
4327 4328 4329
        }
      }
    }
4330

4331
    int32_t  len = (int32_t)(pStart - (char*)keyBuf);
4332 4333
    uint64_t groupId = calcGroupId(keyBuf, len);
    taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &groupId, sizeof(uint64_t));
S
slzhou 已提交
4334
    info->groupId = groupId;
4335
    groupNum++;
wmmhello's avatar
wmmhello 已提交
4336

D
dapan1121 已提交
4337
    nodesDestroyList(groupNew);
wmmhello's avatar
wmmhello 已提交
4338 4339 4340
    metaReaderClear(&mr);
  }
  taosMemoryFree(keyBuf);
4341

4342
  if (pTableListInfo->needSortTableByGroupId) {
wmmhello's avatar
wmmhello 已提交
4343
    return sortTableGroup(pTableListInfo, groupNum);
4344 4345
  }

wmmhello's avatar
wmmhello 已提交
4346 4347 4348
  return TDB_CODE_SUCCESS;
}

4349 4350 4351 4352 4353 4354 4355 4356 4357 4358 4359 4360 4361 4362 4363 4364 4365 4366 4367 4368 4369 4370 4371 4372
static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) {
  memset(pCond, 0, sizeof(SQueryTableDataCond));

  pCond->order = TSDB_ORDER_ASC;
  pCond->numOfCols = 1;
  pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
  if (pCond->colList == NULL) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return terrno;
  }

  pCond->colList->colId = 1;
  pCond->colList->type = TSDB_DATA_TYPE_TIMESTAMP;
  pCond->colList->bytes = sizeof(TSKEY);

  pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
  pCond->suid = uid;
  pCond->type = BLOCK_LOAD_OFFSET_ORDER;
  pCond->startVersion = -1;
  pCond->endVersion  =  -1;

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
4373
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
4374
                                  STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, const char* pUser) {
4375 4376
  int32_t type = nodeType(pPhyNode);

X
Xiaoyu Wang 已提交
4377
  if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
H
Haojun Liao 已提交
4378
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
dengyihao's avatar
dengyihao 已提交
4379
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
4380

4381
      int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags,
4382
                                             pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
4383
      if (code) {
wmmhello's avatar
wmmhello 已提交
4384
        pTaskInfo->code = code;
D
dapan1121 已提交
4385 4386
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
4387

4388
      code = extractTableSchemaInfo(pHandle, pTableScanNode->scan.uid, pTaskInfo);
S
slzhou 已提交
4389
      if (code) {
4390
        pTaskInfo->code = terrno;
wmmhello's avatar
wmmhello 已提交
4391 4392 4393
        return NULL;
      }

H
Haojun Liao 已提交
4394
      SOperatorInfo*  pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
4395 4396
      STableScanInfo* pScanInfo = pOperator->info;
      pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
S
slzhou 已提交
4397
      return pOperator;
L
Liu Jicong 已提交
4398

S
slzhou 已提交
4399 4400
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
      STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
4401
      int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags,
4402
                                             pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
L
Liu Jicong 已提交
4403
      if (code) {
wmmhello's avatar
wmmhello 已提交
4404
        pTaskInfo->code = code;
wmmhello's avatar
wmmhello 已提交
4405 4406
        return NULL;
      }
4407

4408
      code = extractTableSchemaInfo(pHandle, pTableScanNode->scan.uid, pTaskInfo);
wmmhello's avatar
wmmhello 已提交
4409 4410 4411 4412
      if (code) {
        pTaskInfo->code = terrno;
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
4413

4414
      SOperatorInfo* pOperator =
4415
          createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo);
wmmhello's avatar
wmmhello 已提交
4416

4417 4418 4419
      STableScanInfo* pScanInfo = pOperator->info;
      pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
      return pOperator;
L
Liu Jicong 已提交
4420

H
Haojun Liao 已提交
4421
    } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
4422
      return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
4423
    } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
5
54liuyao 已提交
4424
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
4425
      STimeWindowAggSupp   twSup = {
L
Liu Jicong 已提交
4426 4427 4428 4429
            .waterMark = pTableScanNode->watermark,
            .calTrigger = pTableScanNode->triggerType,
            .maxTs = INT64_MIN,
      };
5
54liuyao 已提交
4430
      if (pHandle->vnode) {
4431
        int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags,
4432
                                               pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
L
Liu Jicong 已提交
4433
        if (code) {
wmmhello's avatar
wmmhello 已提交
4434 4435 4436
          pTaskInfo->code = code;
          return NULL;
        }
5
54liuyao 已提交
4437
      }
4438

4439
      SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo, &twSup);
H
Haojun Liao 已提交
4440
      return pOperator;
L
Liu Jicong 已提交
4441

H
Haojun Liao 已提交
4442
    } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
L
Liu Jicong 已提交
4443
      SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
4444
      return createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo);
4445
    } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
X
Xiaoyu Wang 已提交
4446
      STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
4447

4448
      int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanPhyNode, pTagCond, pTagIndexCond, pTableListInfo);
4449
      if (code != TSDB_CODE_SUCCESS) {
4450
        pTaskInfo->code = terrno;
4451 4452 4453
        return NULL;
      }

4454
      return createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo);
4455
    } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
4456
      SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
4457 4458 4459
      pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));

      if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
4460
        int32_t code = vnodeGetAllTableList(pHandle->vnode, pBlockNode->uid, pTableListInfo->pTableList);
4461 4462 4463 4464 4465
        if (code != TSDB_CODE_SUCCESS) {
          pTaskInfo->code = terrno;
          return NULL;
        }
      } else {  // Create one table group.
4466
        STableKeyInfo info = {.uid = pBlockNode->uid, .groupId = 0};
4467 4468 4469 4470
        taosArrayPush(pTableListInfo->pTableList, &info);
      }

      SQueryTableDataCond cond = {0};
4471 4472 4473
      int32_t code = initTableblockDistQueryCond(pBlockNode->suid, &cond);
      if (code != TSDB_CODE_SUCCESS) {
        return NULL;
4474
      }
H
Haojun Liao 已提交
4475 4476 4477

      STsdbReader* pReader = NULL;
      tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, "");
4478 4479
      cleanupQueryTableDataCond(&cond);

4480
      return createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo);
H
Haojun Liao 已提交
4481 4482 4483
    } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
      SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;

4484
      int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
4485 4486 4487 4488
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
      }
4489

4490 4491 4492 4493
      code = extractTableSchemaInfo(pHandle, pScanNode->scan.uid, pTaskInfo);
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
H
Haojun Liao 已提交
4494 4495
      }

4496
      return createLastrowScanOperator(pScanNode, pHandle, pTaskInfo);
H
Haojun Liao 已提交
4497 4498
    } else {
      ASSERT(0);
H
Haojun Liao 已提交
4499 4500 4501
    }
  }

4502 4503
  int32_t num = 0;
  size_t  size = LIST_LENGTH(pPhyNode->pChildren);
H
Haojun Liao 已提交
4504

4505
  SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
dengyihao's avatar
dengyihao 已提交
4506
  for (int32_t i = 0; i < size; ++i) {
4507
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
4508
    ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pUser);
4509 4510 4511
    if (ops[i] == NULL) {
      return NULL;
    }
4512
  }
H
Haojun Liao 已提交
4513

4514
  SOperatorInfo* pOptr = NULL;
H
Haojun Liao 已提交
4515
  if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
4516
    pOptr = createProjectOperatorInfo(ops[0], (SProjectPhysiNode*)pPhyNode, pTaskInfo);
4517
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) {
H
Haojun Liao 已提交
4518 4519
    SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
    SExprInfo*     pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
4520
    SSDataBlock*   pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
H
Haojun Liao 已提交
4521

dengyihao's avatar
dengyihao 已提交
4522
    int32_t    numOfScalarExpr = 0;
4523 4524 4525 4526 4527
    SExprInfo* pScalarExprInfo = NULL;
    if (pAggNode->pExprs != NULL) {
      pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
    }

H
Haojun Liao 已提交
4528 4529
    if (pAggNode->pGroupKeys != NULL) {
      SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys);
dengyihao's avatar
dengyihao 已提交
4530
      pOptr = createGroupOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions,
wmmhello's avatar
wmmhello 已提交
4531
                                      pScalarExprInfo, numOfScalarExpr, pTaskInfo);
H
Haojun Liao 已提交
4532
    } else {
L
Liu Jicong 已提交
4533 4534
      pOptr = createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pAggNode->node.pConditions,
                                          pScalarExprInfo, numOfScalarExpr, pTaskInfo);
H
Haojun Liao 已提交
4535
    }
X
Xiaoyu Wang 已提交
4536
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type || QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
H
Haojun Liao 已提交
4537
    SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
4538

H
Haojun Liao 已提交
4539
    SExprInfo*   pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
4540
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
H
Haojun Liao 已提交
4541

dengyihao's avatar
dengyihao 已提交
4542 4543 4544 4545 4546 4547
    SInterval interval = {.interval = pIntervalPhyNode->interval,
                          .sliding = pIntervalPhyNode->sliding,
                          .intervalUnit = pIntervalPhyNode->intervalUnit,
                          .slidingUnit = pIntervalPhyNode->slidingUnit,
                          .offset = pIntervalPhyNode->offset,
                          .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
H
Haojun Liao 已提交
4548

X
Xiaoyu Wang 已提交
4549 4550 4551 4552 4553
    STimeWindowAggSupp as = {
        .waterMark = pIntervalPhyNode->window.watermark,
        .calTrigger = pIntervalPhyNode->window.triggerType,
        .maxTs = INT64_MIN,
    };
4554
    ASSERT(as.calTrigger != STREAM_TRIGGER_MAX_DELAY);
4555

4556
    int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
L
Liu Jicong 已提交
4557
    bool    isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
4558 4559
    pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pIntervalPhyNode,
                                       pTaskInfo, isStream);
4560

4561 4562
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
    SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode;
S
shenglian zhou 已提交
4563 4564 4565 4566 4567 4568 4569 4570 4571 4572

    SExprInfo*   pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);

    SInterval interval = {.interval = pIntervalPhyNode->interval,
                          .sliding = pIntervalPhyNode->sliding,
                          .intervalUnit = pIntervalPhyNode->intervalUnit,
                          .slidingUnit = pIntervalPhyNode->slidingUnit,
                          .offset = pIntervalPhyNode->offset,
                          .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
4573

S
shenglian zhou 已提交
4574
    int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
L
Liu Jicong 已提交
4575 4576
    pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId,
                                                   pPhyNode->pConditions, pTaskInfo);
S
shenglian zhou 已提交
4577
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
X
Xiaoyu Wang 已提交
4578
    SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
S
shenglian zhou 已提交
4579 4580 4581 4582 4583 4584 4585 4586 4587 4588

    SExprInfo*   pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);

    SInterval interval = {.interval = pIntervalPhyNode->interval,
                          .sliding = pIntervalPhyNode->sliding,
                          .intervalUnit = pIntervalPhyNode->intervalUnit,
                          .slidingUnit = pIntervalPhyNode->slidingUnit,
                          .offset = pIntervalPhyNode->offset,
                          .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
4589

S
shenglian zhou 已提交
4590 4591
    int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
    pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pTaskInfo);
5
54liuyao 已提交
4592
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
4593
    int32_t children = 0;
5
54liuyao 已提交
4594 4595
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) {
5
54liuyao 已提交
4596
    int32_t children = pHandle->numOfVgroups;
5
54liuyao 已提交
4597
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
4598
  } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
4599
    pOptr = createSortOperatorInfo(ops[0], (SSortPhysiNode*)pPhyNode, pTaskInfo);
S
shenglian zhou 已提交
4600 4601
  } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT == type) {
    pOptr = createGroupSortOperatorInfo(ops[0], (SGroupSortPhysiNode*)pPhyNode, pTaskInfo);
X
Xiaoyu Wang 已提交
4602
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) {
4603
    SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode;
4604
    pOptr = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo);
4605
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
H
Haojun Liao 已提交
4606 4607
    SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;

X
Xiaoyu Wang 已提交
4608 4609
    STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark,
                             .calTrigger = pSessionNode->window.triggerType};
4610

H
Haojun Liao 已提交
4611
    SExprInfo*   pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num);
4612
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
4613 4614
    int32_t      tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;

L
Liu Jicong 已提交
4615 4616
    pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as,
                                         pPhyNode->pConditions, pTaskInfo);
4617
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
4618 4619 4620 4621 4622 4623 4624
    pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
    int32_t children = 0;
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION == type) {
    int32_t children = 1;
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
4625
  } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
4626
    pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
4627
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
dengyihao's avatar
dengyihao 已提交
4628
    SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
4629

4630 4631
    STimeWindowAggSupp as = {.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};

dengyihao's avatar
dengyihao 已提交
4632
    SExprInfo*   pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
4633
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
4634 4635
    int32_t      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;

4636
    SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
X
Xiaoyu Wang 已提交
4637
    SColumn      col = extractColumnFromColumnNode(pColNode);
L
Liu Jicong 已提交
4638 4639
    pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pPhyNode->pConditions,
                                          pTaskInfo);
4640
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
5
54liuyao 已提交
4641
    pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
4642
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
4643
    pOptr = createMergeJoinOperatorInfo(ops, size, (SJoinPhysiNode*)pPhyNode, pTaskInfo);
4644
  } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
H
Haojun Liao 已提交
4645
    pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
4646 4647
  } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
    pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo);
4648 4649
  } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
    pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
4650 4651
  } else {
    ASSERT(0);
H
Haojun Liao 已提交
4652
  }
4653 4654 4655

  taosMemoryFree(ops);
  return pOptr;
4656
}
H
Haojun Liao 已提交
4657

H
Haojun Liao 已提交
4658
SArray* extractColumnInfo(SNodeList* pNodeList) {
L
Liu Jicong 已提交
4659
  size_t  numOfCols = LIST_LENGTH(pNodeList);
H
Haojun Liao 已提交
4660 4661 4662 4663 4664 4665
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
  if (pList == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

L
Liu Jicong 已提交
4666 4667
  for (int32_t i = 0; i < numOfCols; ++i) {
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
H
Haojun Liao 已提交
4668

4669 4670 4671
    if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

4672
      SColumn c = extractColumnFromColumnNode(pColNode);
4673 4674
      taosArrayPush(pList, &c);
    } else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) {
L
Liu Jicong 已提交
4675 4676
      SValueNode* pValNode = (SValueNode*)pNode->pExpr;
      SColumn     c = {0};
4677
      c.slotId = pNode->slotId;
L
Liu Jicong 已提交
4678 4679 4680 4681
      c.colId = pNode->slotId;
      c.type = pValNode->node.type;
      c.bytes = pValNode->node.resType.bytes;
      c.scale = pValNode->node.resType.scale;
4682 4683 4684 4685
      c.precision = pValNode->node.resType.precision;

      taosArrayPush(pList, &c);
    }
H
Haojun Liao 已提交
4686 4687 4688 4689 4690
  }

  return pList;
}

4691
#if 0
L
Liu Jicong 已提交
4692 4693
STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
                                STableListInfo* pTableListInfo, const char* idstr) {
4694
  int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo);
wmmhello's avatar
wmmhello 已提交
4695 4696 4697 4698 4699 4700
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
    code = 0;
H
Haojun Liao 已提交
4701
    qDebug("no table qualified for query, %s", idstr);
wmmhello's avatar
wmmhello 已提交
4702 4703 4704
    goto _error;
  }

4705
  SQueryTableDataCond cond = {0};
wmmhello's avatar
wmmhello 已提交
4706
  code = initQueryTableDataCond(&cond, pTableScanNode);
4707
  if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
4708
    goto _error;
X
Xiaoyu Wang 已提交
4709
  }
4710

H
Hongze Cheng 已提交
4711
  STsdbReader* pReader;
H
Haojun Liao 已提交
4712
  code = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, idstr);
H
Haojun Liao 已提交
4713 4714 4715 4716
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

4717
  cleanupQueryTableDataCond(&cond);
H
Haojun Liao 已提交
4718 4719

  return pReader;
wmmhello's avatar
wmmhello 已提交
4720 4721 4722 4723

_error:
  terrno = code;
  return NULL;
H
Haojun Liao 已提交
4724
}
4725
#endif
H
Haojun Liao 已提交
4726

L
Liu Jicong 已提交
4727 4728 4729 4730 4731 4732 4733 4734 4735 4736 4737 4738 4739
static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator");
      return TSDB_CODE_QRY_APP_ERROR;
    }

    if (pOperator->numOfDownstream > 1) {
      qError("join not supported for stream block scan");
      return TSDB_CODE_QRY_APP_ERROR;
    }
    return extractTbscanInStreamOpTree(pOperator->pDownstream[0], ppInfo);
  } else {
4740 4741 4742
    SStreamScanInfo* pInfo = pOperator->info;
    ASSERT(pInfo->pTableScanOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
    *ppInfo = pInfo->pTableScanOp->info;
L
Liu Jicong 已提交
4743 4744 4745 4746
    return 0;
  }
}

4747 4748 4749 4750 4751 4752 4753 4754 4755 4756 4757 4758 4759 4760 4761 4762 4763 4764 4765 4766 4767 4768
int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) {
  if (pNode->pChildren == NULL || LIST_LENGTH(pNode->pChildren) == 0) {
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == pNode->type) {
      *ppNode = (STableScanPhysiNode*)pNode;
      return 0;
    } else {
      ASSERT(0);
      terrno = TSDB_CODE_QRY_APP_ERROR;
      return -1;
    }
  } else {
    if (LIST_LENGTH(pNode->pChildren) != 1) {
      ASSERT(0);
      terrno = TSDB_CODE_QRY_APP_ERROR;
      return -1;
    }
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pNode->pChildren, 0);
    return extractTableScanNode(pChildNode, ppNode);
  }
  return -1;
}

4769
#if 0
L
Liu Jicong 已提交
4770 4771 4772 4773 4774
int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHandle, int64_t uid, int64_t ts) {
  STableScanInfo* pTableScanInfo = NULL;
  if (extractTbscanInStreamOpTree(pOperator, &pTableScanInfo) < 0) {
    return -1;
  }
4775

L
Liu Jicong 已提交
4776 4777 4778 4779
  STableScanPhysiNode* pNode = NULL;
  if (extractTableScanNode(plan->pNode, &pNode) < 0) {
    ASSERT(0);
  }
4780

H
Haojun Liao 已提交
4781
  tsdbReaderClose(pTableScanInfo->dataReader);
4782

L
Liu Jicong 已提交
4783
  STableListInfo info = {0};
H
Haojun Liao 已提交
4784
  pTableScanInfo->dataReader = doCreateDataReader(pNode, pHandle, &info, NULL);
L
Liu Jicong 已提交
4785 4786 4787 4788
  if (pTableScanInfo->dataReader == NULL) {
    ASSERT(0);
    qError("failed to create data reader");
    return TSDB_CODE_QRY_APP_ERROR;
4789
  }
L
Liu Jicong 已提交
4790
  // TODO: set uid and ts to data reader
4791 4792
  return 0;
}
4793
#endif
4794

C
Cary Xu 已提交
4795
int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length, int32_t* nOptrWithVal) {
wmmhello's avatar
wmmhello 已提交
4796
  int32_t code = TDB_CODE_SUCCESS;
4797
  char*   pCurrent = NULL;
wmmhello's avatar
wmmhello 已提交
4798
  int32_t currLength = 0;
4799
  if (ops->fpSet.encodeResultRow) {
C
Cary Xu 已提交
4800
    if (result == NULL || length == NULL || nOptrWithVal == NULL) {
wmmhello's avatar
wmmhello 已提交
4801 4802 4803
      return TSDB_CODE_TSC_INVALID_INPUT;
    }
    code = ops->fpSet.encodeResultRow(ops, &pCurrent, &currLength);
wmmhello's avatar
wmmhello 已提交
4804

4805 4806
    if (code != TDB_CODE_SUCCESS) {
      if (*result != NULL) {
wmmhello's avatar
wmmhello 已提交
4807 4808 4809 4810
        taosMemoryFree(*result);
        *result = NULL;
      }
      return code;
C
Cary Xu 已提交
4811 4812 4813
    } else if (currLength == 0) {
      ASSERT(!pCurrent);
      goto _downstream;
wmmhello's avatar
wmmhello 已提交
4814
    }
wmmhello's avatar
wmmhello 已提交
4815

C
Cary Xu 已提交
4816 4817
    ++(*nOptrWithVal);

C
Cary Xu 已提交
4818
    ASSERT(currLength >= 0);
wmmhello's avatar
wmmhello 已提交
4819

4820
    if (*result == NULL) {
wmmhello's avatar
wmmhello 已提交
4821
      *result = (char*)taosMemoryCalloc(1, currLength + sizeof(int32_t));
wmmhello's avatar
wmmhello 已提交
4822 4823 4824 4825 4826 4827
      if (*result == NULL) {
        taosMemoryFree(pCurrent);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      memcpy(*result + sizeof(int32_t), pCurrent, currLength);
      *(int32_t*)(*result) = currLength + sizeof(int32_t);
4828
    } else {
wmmhello's avatar
wmmhello 已提交
4829
      int32_t sizePre = *(int32_t*)(*result);
4830
      char*   tmp = (char*)taosMemoryRealloc(*result, sizePre + currLength);
wmmhello's avatar
wmmhello 已提交
4831 4832 4833 4834 4835 4836 4837 4838 4839 4840 4841 4842
      if (tmp == NULL) {
        taosMemoryFree(pCurrent);
        taosMemoryFree(*result);
        *result = NULL;
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      *result = tmp;
      memcpy(*result + sizePre, pCurrent, currLength);
      *(int32_t*)(*result) += currLength;
    }
    taosMemoryFree(pCurrent);
    *length = *(int32_t*)(*result);
wmmhello's avatar
wmmhello 已提交
4843 4844
  }

C
Cary Xu 已提交
4845
_downstream:
wmmhello's avatar
wmmhello 已提交
4846
  for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
C
Cary Xu 已提交
4847
    code = encodeOperator(ops->pDownstream[i], result, length, nOptrWithVal);
4848
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
4849
      return code;
wmmhello's avatar
wmmhello 已提交
4850 4851
    }
  }
wmmhello's avatar
wmmhello 已提交
4852
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
4853 4854
}

H
Haojun Liao 已提交
4855
int32_t decodeOperator(SOperatorInfo* ops, const char* result, int32_t length) {
wmmhello's avatar
wmmhello 已提交
4856
  int32_t code = TDB_CODE_SUCCESS;
4857 4858
  if (ops->fpSet.decodeResultRow) {
    if (result == NULL) {
wmmhello's avatar
wmmhello 已提交
4859 4860
      return TSDB_CODE_TSC_INVALID_INPUT;
    }
H
Haojun Liao 已提交
4861

4862
    ASSERT(length == *(int32_t*)result);
H
Haojun Liao 已提交
4863 4864

    const char* data = result + sizeof(int32_t);
L
Liu Jicong 已提交
4865
    code = ops->fpSet.decodeResultRow(ops, (char*)data);
4866
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
4867 4868
      return code;
    }
wmmhello's avatar
wmmhello 已提交
4869

wmmhello's avatar
wmmhello 已提交
4870
    int32_t totalLength = *(int32_t*)result;
4871 4872
    int32_t dataLength = *(int32_t*)data;

4873
    if (totalLength == dataLength + sizeof(int32_t)) {  // the last data
wmmhello's avatar
wmmhello 已提交
4874 4875
      result = NULL;
      length = 0;
4876
    } else {
wmmhello's avatar
wmmhello 已提交
4877 4878 4879 4880
      result += dataLength;
      *(int32_t*)(result) = totalLength - dataLength;
      length = totalLength - dataLength;
    }
wmmhello's avatar
wmmhello 已提交
4881 4882
  }

wmmhello's avatar
wmmhello 已提交
4883 4884
  for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
    code = decodeOperator(ops->pDownstream[i], result, length);
4885
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
4886
      return code;
wmmhello's avatar
wmmhello 已提交
4887 4888
    }
  }
wmmhello's avatar
wmmhello 已提交
4889
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
4890 4891
}

D
dapan1121 已提交
4892
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle) {
D
dapan1121 已提交
4893
  SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo;
4894

D
dapan1121 已提交
4895
  switch (pNode->type) {
D
dapan1121 已提交
4896 4897 4898 4899 4900 4901
    case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
      SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
      if (NULL == pInserterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      pInserterParam->readHandle = readHandle;
L
Liu Jicong 已提交
4902

D
dapan1121 已提交
4903 4904 4905
      *pParam = pInserterParam;
      break;
    }
D
dapan1121 已提交
4906
    case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
4907
      SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
D
dapan1121 已提交
4908 4909 4910 4911
      if (NULL == pDeleterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      int32_t tbNum = taosArrayGetSize(pTask->tableqinfoList.pTableList);
D
dapan1121 已提交
4912
      pDeleterParam->suid = pTask->tableqinfoList.suid;
D
dapan1121 已提交
4913 4914 4915 4916 4917 4918
      pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t));
      if (NULL == pDeleterParam->pUidList) {
        taosMemoryFree(pDeleterParam);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      for (int32_t i = 0; i < tbNum; ++i) {
4919
        STableKeyInfo* pTable = taosArrayGet(pTask->tableqinfoList.pTableList, i);
D
dapan1121 已提交
4920 4921 4922 4923 4924 4925 4926 4927 4928 4929 4930 4931 4932
        taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
      }

      *pParam = pDeleterParam;
      break;
    }
    default:
      break;
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
4933
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
4934
                               const char* sql, EOPTR_EXEC_MODEL model) {
H
Haojun Liao 已提交
4935 4936
  uint64_t queryId = pPlan->id.queryId;

H
Haojun Liao 已提交
4937
  int32_t code = TSDB_CODE_SUCCESS;
D
dapan1121 已提交
4938
  *pTaskInfo = createExecTaskInfo(queryId, taskId, model, pPlan->dbFName);
H
Haojun Liao 已提交
4939 4940 4941 4942
  if (*pTaskInfo == NULL) {
    code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    goto _complete;
  }
H
Haojun Liao 已提交
4943

4944
  (*pTaskInfo)->sql = sql;
4945
  (*pTaskInfo)->pSubplan = pPlan;
4946
  (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, &(*pTaskInfo)->tableqinfoList, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user);
L
Liu Jicong 已提交
4947

D
dapan1121 已提交
4948
  if (NULL == (*pTaskInfo)->pRoot) {
4949
    code = (*pTaskInfo)->code;
D
dapan1121 已提交
4950
    goto _complete;
4951 4952
  }

H
Haojun Liao 已提交
4953 4954
  return code;

H
Haojun Liao 已提交
4955
_complete:
wafwerar's avatar
wafwerar 已提交
4956
  taosMemoryFreeClear(*pTaskInfo);
H
Haojun Liao 已提交
4957 4958
  terrno = code;
  return code;
H
Haojun Liao 已提交
4959 4960
}

wmmhello's avatar
wmmhello 已提交
4961 4962 4963
static void doDestroyTableList(STableListInfo* pTableqinfoList) {
  taosArrayDestroy(pTableqinfoList->pTableList);
  taosHashCleanup(pTableqinfoList->map);
4964 4965
  if (pTableqinfoList->needSortTableByGroupId) {
    for (int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++) {
wmmhello's avatar
wmmhello 已提交
4966
      SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i);
4967 4968 4969
      if (tmp == pTableqinfoList->pTableList) {
        continue;
      }
wmmhello's avatar
wmmhello 已提交
4970 4971 4972 4973
      taosArrayDestroy(tmp);
    }
  }
  taosArrayDestroy(pTableqinfoList->pGroupList);
4974

wmmhello's avatar
wmmhello 已提交
4975 4976
  pTableqinfoList->pTableList = NULL;
  pTableqinfoList->map = NULL;
4977 4978
}

L
Liu Jicong 已提交
4979
void doDestroyTask(SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
4980 4981
  qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));

wmmhello's avatar
wmmhello 已提交
4982
  doDestroyTableList(&pTaskInfo->tableqinfoList);
H
Haojun Liao 已提交
4983
  destroyOperatorInfo(pTaskInfo->pRoot);
4984 4985 4986
  cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);

  nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
4987

wafwerar's avatar
wafwerar 已提交
4988 4989 4990
  taosMemoryFreeClear(pTaskInfo->sql);
  taosMemoryFreeClear(pTaskInfo->id.str);
  taosMemoryFreeClear(pTaskInfo);
4991 4992 4993 4994 4995 4996 4997 4998 4999 5000 5001 5002
}

static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes) {
  if (val == NULL) {
    setNull(output, type, bytes);
    return;
  }

  if (IS_VAR_DATA_TYPE(type)) {
    // Binary data overflows for sort of unknown reasons. Let trim the overflow data
    if (varDataTLen(val) > bytes) {
      int32_t maxLen = bytes - VARSTR_HEADER_SIZE;
L
Liu Jicong 已提交
5003
      int32_t len = (varDataLen(val) > maxLen) ? maxLen : varDataLen(val);
5004 5005 5006 5007 5008 5009 5010 5011 5012 5013 5014 5015
      memcpy(varDataVal(output), varDataVal(val), len);
      varDataSetLen(output, len);
    } else {
      varDataCopy(output, val);
    }
  } else {
    memcpy(output, val, bytes);
  }
}

static int64_t getQuerySupportBufSize(size_t numOfTables) {
  size_t s1 = sizeof(STableQueryInfo);
L
Liu Jicong 已提交
5016 5017
  //  size_t s3 = sizeof(STableCheckInfo);  buffer consumption in tsdb
  return (int64_t)(s1 * 1.5 * numOfTables);
5018 5019 5020 5021 5022 5023 5024
}

int32_t checkForQueryBuf(size_t numOfTables) {
  int64_t t = getQuerySupportBufSize(numOfTables);
  if (tsQueryBufferSizeBytes < 0) {
    return TSDB_CODE_SUCCESS;
  } else if (tsQueryBufferSizeBytes > 0) {
L
Liu Jicong 已提交
5025
    while (1) {
5026 5027 5028 5029 5030 5031 5032 5033 5034 5035 5036 5037 5038 5039 5040 5041 5042 5043 5044 5045 5046 5047 5048 5049 5050 5051
      int64_t s = tsQueryBufferSizeBytes;
      int64_t remain = s - t;
      if (remain >= 0) {
        if (atomic_val_compare_exchange_64(&tsQueryBufferSizeBytes, s, remain) == s) {
          return TSDB_CODE_SUCCESS;
        }
      } else {
        return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
      }
    }
  }

  // disable query processing if the value of tsQueryBufferSize is zero.
  return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
}

void releaseQueryBuf(size_t numOfTables) {
  if (tsQueryBufferSizeBytes < 0) {
    return;
  }

  int64_t t = getQuerySupportBufSize(numOfTables);

  // restore value is not enough buffer available
  atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
}
D
dapan1121 已提交
5052

dengyihao's avatar
dengyihao 已提交
5053 5054
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity,
                                   int32_t* resNum) {
D
dapan1121 已提交
5055 5056
  if (*resNum >= *capacity) {
    *capacity += 10;
dengyihao's avatar
dengyihao 已提交
5057

D
dapan1121 已提交
5058 5059
    *pRes = taosMemoryRealloc(*pRes, (*capacity) * sizeof(SExplainExecInfo));
    if (NULL == *pRes) {
D
dapan1121 已提交
5060
      qError("malloc %d failed", (*capacity) * (int32_t)sizeof(SExplainExecInfo));
D
dapan1121 已提交
5061 5062 5063 5064
      return TSDB_CODE_QRY_OUT_OF_MEMORY;
    }
  }

5065 5066 5067 5068 5069
  SExplainExecInfo* pInfo = &(*pRes)[*resNum];

  pInfo->numOfRows = operatorInfo->resultInfo.totalRows;
  pInfo->startupCost = operatorInfo->cost.openCost;
  pInfo->totalCost = operatorInfo->cost.totalCost;
D
dapan1121 已提交
5070

5071
  if (operatorInfo->fpSet.getExplainFn) {
5072
    int32_t code = operatorInfo->fpSet.getExplainFn(operatorInfo, &pInfo->verboseInfo, &pInfo->verboseLen);
D
dapan1121 已提交
5073
    if (code) {
5074
      qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
D
dapan1121 已提交
5075 5076
      return code;
    }
5077 5078 5079
  } else {
    pInfo->verboseLen = 0;
    pInfo->verboseInfo = NULL;
D
dapan1121 已提交
5080
  }
dengyihao's avatar
dengyihao 已提交
5081

D
dapan1121 已提交
5082
  ++(*resNum);
dengyihao's avatar
dengyihao 已提交
5083

D
dapan1121 已提交
5084
  int32_t code = 0;
D
dapan1121 已提交
5085 5086
  for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
    code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pRes, capacity, resNum);
D
dapan1121 已提交
5087 5088 5089 5090 5091 5092 5093
    if (code) {
      taosMemoryFreeClear(*pRes);
      return TSDB_CODE_QRY_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
5094
}
5
54liuyao 已提交
5095

L
Liu Jicong 已提交
5096
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput,
5097
                               int32_t size) {
5098
  pSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
5
54liuyao 已提交
5099 5100
  pSup->keySize = sizeof(int64_t) + sizeof(TSKEY);
  pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize);
5101 5102
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  pSup->pResultRows = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
5
54liuyao 已提交
5103 5104 5105
  if (pSup->pKeyBuf == NULL || pSup->pResultRows == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
5106
  pSup->valueSize = size;
5
54liuyao 已提交
5107

5
54liuyao 已提交
5108 5109
  pSup->pScanWindow = taosArrayInit(4, sizeof(STimeWindow));

5
54liuyao 已提交
5110 5111 5112 5113 5114 5115 5116 5117 5118
  int32_t pageSize = 4096;
  while (pageSize < pSup->resultRowSize * 4) {
    pageSize <<= 1u;
  }
  // at least four pages need to be in buffer
  int32_t bufSize = 4096 * 256;
  if (bufSize <= pageSize) {
    bufSize = pageSize * 4;
  }
5119
  int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, TD_TMP_DIR_PATH);
L
Liu Jicong 已提交
5120
  for (int32_t i = 0; i < numOfOutput; ++i) {
5121 5122 5123
    pCtx[i].pBuf = pSup->pResultBuf;
  }
  return code;
5
54liuyao 已提交
5124
}