executorimpl.c 172.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

16 17
#include <executorimpl.h>
#include <vnode.h>
H
Haojun Liao 已提交
18 19
#include "filter.h"
#include "function.h"
20 21
#include "functionMgt.h"
#include "os.h"
H
Haojun Liao 已提交
22
#include "querynodes.h"
23
#include "tfill.h"
dengyihao's avatar
dengyihao 已提交
24
#include "tname.h"
X
Xiaoyu Wang 已提交
25
#include "tref.h"
26

H
Haojun Liao 已提交
27
#include "tdatablock.h"
28
#include "tglobal.h"
H
Haojun Liao 已提交
29
#include "tmsg.h"
H
Haojun Liao 已提交
30
#include "tsort.h"
31
#include "ttime.h"
H
Haojun Liao 已提交
32

33
#include "executorimpl.h"
dengyihao's avatar
dengyihao 已提交
34
#include "index.h"
35
#include "query.h"
36 37
#include "tcompare.h"
#include "tcompression.h"
H
Haojun Liao 已提交
38
#include "thash.h"
39
#include "ttypes.h"
dengyihao's avatar
dengyihao 已提交
40
#include "vnode.h"
41

H
Haojun Liao 已提交
42
#define IS_MAIN_SCAN(runtime)          ((runtime)->scanFlag == MAIN_SCAN)
43 44 45 46 47 48
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)

#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)

#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
wafwerar's avatar
wafwerar 已提交
49
  uint32_t v = taosRand();
50 51 52 53

  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
54
    return taosMemoryMalloc(__size);
55 56 57 58
  }
}

static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
wafwerar's avatar
wafwerar 已提交
59
  uint32_t v = taosRand();
60 61 62
  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
63
    return taosMemoryCalloc(num, __size);
64 65 66 67
  }
}

static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
wafwerar's avatar
wafwerar 已提交
68
  uint32_t v = taosRand();
69 70 71
  if (v % 5 <= 1) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
72
    return taosMemoryRealloc(p, __size);
73 74 75 76 77 78 79 80
  }
}

#define calloc  u_calloc
#define malloc  u_malloc
#define realloc u_realloc
#endif

X
Xiaoyu Wang 已提交
81
#define CLEAR_QUERY_STATUS(q, st)   ((q)->status &= (~(st)))
82 83
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)

L
Liu Jicong 已提交
84 85 86
int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; }

static int32_t getExprFunctionId(SExprInfo* pExprInfo) {
87
  assert(pExprInfo != NULL && pExprInfo->pExpr != NULL && pExprInfo->pExpr->nodeType == TEXPR_UNARYEXPR_NODE);
88
  return 0;
89 90 91 92
}

static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes);

93
static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pSDataBlock);
94

X
Xiaoyu Wang 已提交
95
static void releaseQueryBuf(size_t numOfTables);
96 97 98 99 100

static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput);
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
X
Xiaoyu Wang 已提交
101

H
Haojun Liao 已提交
102
static void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput);
H
Haojun Liao 已提交
103 104
static void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput);

105 106
static void destroyOperatorInfo(SOperatorInfo* pOperator);

107
void doSetOperatorCompleted(SOperatorInfo* pOperator) {
108
  pOperator->status = OP_EXEC_DONE;
109

110
  pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000) / 1000.0;
H
Haojun Liao 已提交
111
  if (pOperator->pTaskInfo != NULL) {
112
    setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
113 114
  }
}
115

H
Haojun Liao 已提交
116
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) {
117
  OPTR_SET_OPENED(pOperator);
118
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
119
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
120 121
}

122
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
L
Liu Jicong 已提交
123
                                   __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode,
124
                                   __optr_decode_fn_t decode, __optr_explain_fn_t explain) {
125 126 127 128 129 130 131 132 133 134 135 136 137 138
  SOperatorFpSet fpSet = {
      ._openFn = openFn,
      .getNextFn = nextFn,
      .getStreamResFn = streamFn,
      .cleanupFn = cleanup,
      .closeFn = closeFn,
      .encodeResultRow = encode,
      .decodeResultRow = decode,
      .getExplainFn = explain,
  };

  return fpSet;
}

H
Haojun Liao 已提交
139
void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
H
Haojun Liao 已提交
140

X
Xiaoyu Wang 已提交
141 142 143
static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo,
                                  SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, const int32_t* rowCellOffset,
                                  SqlFunctionCtx* pCtx, int32_t numOfExprs);
H
Haojun Liao 已提交
144

145
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
146 147
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput,
                                     uint64_t groupId);
148

L
Liu Jicong 已提交
149 150
// setup the output buffer for each operator
static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) {
dengyihao's avatar
dengyihao 已提交
151 152
  if (TSDB_COL_IS_TAG(pColumn->flag) || TSDB_COL_IS_UD_COL(pColumn->flag) ||
      pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
153 154 155 156 157 158 159 160 161 162
    return false;
  }

  if (pStatis != NULL && pStatis->numOfNull == 0) {
    return false;
  }

  return true;
}

163
#if 0
L
Liu Jicong 已提交
164 165
static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, char* pData,
                                int16_t bytes, bool masterscan, uint64_t uid) {
166 167 168
  bool existed = false;
  SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid);

L
Liu Jicong 已提交
169 170
  SResultRow** p1 =
      (SResultRow**)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
171 172 173 174 175 176 177 178 179 180 181

  // in case of repeat scan/reverse scan, no new time window added.
  if (QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQueryAttr)) {
    if (!masterscan) {  // the *p1 may be NULL in case of sliding+offset exists.
      return p1 != NULL;
    }

    if (p1 != NULL) {
      if (pResultRowInfo->size == 0) {
        existed = false;
      } else if (pResultRowInfo->size == 1) {
dengyihao's avatar
dengyihao 已提交
182
        //        existed = (pResultRowInfo->pResult[0] == (*p1));
183 184
      } else {  // check if current pResultRowInfo contains the existed pResultRow
        SET_RES_EXT_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid, pResultRowInfo);
L
Liu Jicong 已提交
185 186
        int64_t* index =
            taosHashGet(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes));
187 188 189 190 191 192 193 194 195 196 197 198 199
        if (index != NULL) {
          existed = true;
        } else {
          existed = false;
        }
      }
    }

    return existed;
  }

  return p1 != NULL;
}
200
#endif
201

202
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize) {
L
Liu Jicong 已提交
203
  SFilePage* pData = NULL;
204 205 206 207 208 209 210 211 212 213 214 215 216

  // in the first scan, new space needed for results
  int32_t pageId = -1;
  SIDList list = getDataBufPagesIdList(pResultBuf, tableGroupId);

  if (taosArrayGetSize(list) == 0) {
    pData = getNewBufPage(pResultBuf, tableGroupId, &pageId);
    pData->num = sizeof(SFilePage);
  } else {
    SPageInfo* pi = getLastPageInfo(list);
    pData = getBufPage(pResultBuf, getPageId(pi));
    pageId = getPageId(pi);

wmmhello's avatar
wmmhello 已提交
217
    if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
218 219 220 221 222 223 224 225 226 227 228 229 230 231
      // release current page first, and prepare the next one
      releaseBufPageInfo(pResultBuf, pi);

      pData = getNewBufPage(pResultBuf, tableGroupId, &pageId);
      if (pData != NULL) {
        pData->num = sizeof(SFilePage);
      }
    }
  }

  if (pData == NULL) {
    return NULL;
  }

232 233
  setBufPageDirty(pData, true);

234 235 236 237 238
  // set the number of rows in current disk page
  SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
  pResultRow->pageId = pageId;
  pResultRow->offset = (int32_t)pData->num;

wmmhello's avatar
wmmhello 已提交
239
  pData->num += interBufSize;
240 241 242 243

  return pResultRow;
}

244 245 246 247 248 249 250
/**
 * the struct of key in hash table
 * +----------+---------------+
 * | group id |   key data    |
 * | 8 bytes  | actual length |
 * +----------+---------------+
 */
251 252 253
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
                                   int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
                                   bool isIntervalQuery, SAggSupporter* pSup) {
254
  SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
H
Haojun Liao 已提交
255

dengyihao's avatar
dengyihao 已提交
256 257
  SResultRowPosition* p1 =
      (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
H
Haojun Liao 已提交
258

259 260
  SResultRow* pResult = NULL;

H
Haojun Liao 已提交
261 262
  // in case of repeat scan/reverse scan, no new time window added.
  if (isIntervalQuery) {
263 264
    if (masterscan && p1 != NULL) {  // the *p1 may be NULL in case of sliding+offset exists.
      pResult = getResultRowByPos(pResultBuf, p1);
265
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
266 267
    }
  } else {
dengyihao's avatar
dengyihao 已提交
268 269
    // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
    // pResultRowInfo object.
H
Haojun Liao 已提交
270
    if (p1 != NULL) {
271
      // todo
272
      pResult = getResultRowByPos(pResultBuf, p1);
273
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
274 275 276
    }
  }

L
Liu Jicong 已提交
277
  // 1. close current opened time window
278
  if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
279
    SResultRowPosition pos = pResultRowInfo->cur;
X
Xiaoyu Wang 已提交
280
    SFilePage*         pPage = getBufPage(pResultBuf, pos.pageId);
281 282 283 284 285
    releaseBufPage(pResultBuf, pPage);
  }

  // allocate a new buffer page
  if (pResult == NULL) {
H
Haojun Liao 已提交
286
    ASSERT(pSup->resultRowSize > 0);
287 288
    pResult = getNewResultRow(pResultBuf, groupId, pSup->resultRowSize);

289
    initResultRow(pResult);
H
Haojun Liao 已提交
290

291 292
    // add a new result set for a new group
    SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
X
Xiaoyu Wang 已提交
293 294
    taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
                sizeof(SResultRowPosition));
H
Haojun Liao 已提交
295 296
  }

297 298 299
  // 2. set the new time window to be the new active time window
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};

H
Haojun Liao 已提交
300
  // too many time window in query
301
  if (taosHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
H
Haojun Liao 已提交
302 303 304
    longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
  }

H
Haojun Liao 已提交
305
  return pResult;
H
Haojun Liao 已提交
306 307
}

308
// a new buffer page for each table. Needs to opt this design
L
Liu Jicong 已提交
309
static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, int32_t tid, uint32_t size) {
310 311 312 313
  if (pWindowRes->pageId != -1) {
    return 0;
  }

L
Liu Jicong 已提交
314
  SFilePage* pData = NULL;
315 316 317 318 319 320

  // in the first scan, new space needed for results
  int32_t pageId = -1;
  SIDList list = getDataBufPagesIdList(pResultBuf, tid);

  if (taosArrayGetSize(list) == 0) {
H
Haojun Liao 已提交
321
    pData = getNewBufPage(pResultBuf, tid, &pageId);
322
    pData->num = sizeof(SFilePage);
323 324
  } else {
    SPageInfo* pi = getLastPageInfo(list);
325
    pData = getBufPage(pResultBuf, getPageId(pi));
326
    pageId = getPageId(pi);
327

328
    if (pData->num + size > getBufPageSize(pResultBuf)) {
329
      // release current page first, and prepare the next one
330
      releaseBufPageInfo(pResultBuf, pi);
331

H
Haojun Liao 已提交
332
      pData = getNewBufPage(pResultBuf, tid, &pageId);
333
      if (pData != NULL) {
334
        pData->num = sizeof(SFilePage);
335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354
      }
    }
  }

  if (pData == NULL) {
    return -1;
  }

  // set the number of rows in current disk page
  if (pWindowRes->pageId == -1) {  // not allocated yet, allocate new buffer
    pWindowRes->pageId = pageId;
    pWindowRes->offset = (int32_t)pData->num;

    pData->num += size;
    assert(pWindowRes->pageId >= 0);
  }

  return 0;
}

355
//  query_range_start, query_range_end, window_duration, window_start, window_end
356
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
357 358 359
  pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
  pColData->info.bytes = sizeof(int64_t);

360
  colInfoDataEnsureCapacity(pColData, 5);
361 362 363 364 365 366 367 368 369
  colDataAppendInt64(pColData, 0, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 1, &pQueryWindow->ekey);

  int64_t interval = 0;
  colDataAppendInt64(pColData, 2, &interval);  // this value may be variable in case of 'n' and 'y'.
  colDataAppendInt64(pColData, 3, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
}

X
Xiaoyu Wang 已提交
370 371 372
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin,
                      SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol,
                      int32_t numOfTotal, int32_t numOfOutput, int32_t order) {
373
  for (int32_t k = 0; k < numOfOutput; ++k) {
H
Haojun Liao 已提交
374
    // keep it temporarily
375
    // todo no need this??
dengyihao's avatar
dengyihao 已提交
376 377
    bool    hasAgg = pCtx[k].input.colDataAggIsSet;
    int32_t numOfRows = pCtx[k].input.numOfRows;
H
Haojun Liao 已提交
378
    int32_t startOffset = pCtx[k].input.startRowIndex;
379

380
    pCtx[k].input.startRowIndex = offset;
381
    pCtx[k].input.numOfRows = forwardStep;
382 383 384

    // not a whole block involved in query processing, statistics data can not be used
    // NOTE: the original value of isSet have been changed here
385 386
    if (pCtx[k].input.colDataAggIsSet && forwardStep < numOfTotal) {
      pCtx[k].input.colDataAggIsSet = false;
387 388
    }

389 390
    if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) {
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
391 392

      char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
393

394
      SColumnInfoData idata = {0};
dengyihao's avatar
dengyihao 已提交
395
      idata.info.type = TSDB_DATA_TYPE_BIGINT;
396
      idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
dengyihao's avatar
dengyihao 已提交
397
      idata.pData = p;
398 399 400 401

      SScalarParam out = {.columnData = &idata};
      SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
      pCtx[k].sfp.process(&tw, 1, &out);
402
      pEntryInfo->numOfRes = 1;
403 404 405 406 407 408 409 410 411 412
    } else {
      int32_t code = TSDB_CODE_SUCCESS;
      if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
        code = pCtx[k].fpSet.process(&pCtx[k]);

        if (code != TSDB_CODE_SUCCESS) {
          qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
          taskInfo->code = code;
          longjmp(taskInfo->env, code);
        }
413
      }
414

415 416 417 418 419
      // restore it
      pCtx[k].input.colDataAggIsSet = hasAgg;
      pCtx[k].input.startRowIndex = startOffset;
      pCtx[k].input.numOfRows = numOfRows;
    }
420 421 422
  }
}

dengyihao's avatar
dengyihao 已提交
423
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
424
                                   int32_t scanFlag, bool createDummyCol);
425

dengyihao's avatar
dengyihao 已提交
426 427
static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock,
                                    int32_t order) {
428
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
429
    pCtx[i].order = order;
430
    pCtx[i].input.numOfRows = pBlock->info.rows;
431
    setBlockStatisInfo(&pCtx[i], &pOperator->exprSupp.pExprInfo[i], pBlock);
432 433 434
  }
}

X
Xiaoyu Wang 已提交
435 436
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
                       int32_t scanFlag, bool createDummyCol) {
437
  if (pBlock->pBlockAgg != NULL) {
H
Haojun Liao 已提交
438
    doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
439
  } else {
440
    doSetInputDataBlock(pOperator, pCtx, pBlock, order, scanFlag, createDummyCol);
H
Haojun Liao 已提交
441
  }
442 443
}

L
Liu Jicong 已提交
444 445
static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t paramIndex,
                                             int32_t numOfRows) {
446 447 448 449 450 451 452 453
  SColumnInfoData* pColInfo = NULL;
  if (pInput->pData[paramIndex] == NULL) {
    pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pColInfo == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    // Set the correct column info (data type and bytes)
454 455
    pColInfo->info.type = pFuncParam->param.nType;
    pColInfo->info.bytes = pFuncParam->param.nLen;
456 457

    pInput->pData[paramIndex] = pColInfo;
458 459
  } else {
    pColInfo = pInput->pData[paramIndex];
460 461
  }

462
  colInfoDataEnsureCapacity(pColInfo, numOfRows);
463

464
  int8_t type = pFuncParam->param.nType;
465 466
  if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
    int64_t v = pFuncParam->param.i;
dengyihao's avatar
dengyihao 已提交
467
    for (int32_t i = 0; i < numOfRows; ++i) {
468 469 470 471
      colDataAppendInt64(pColInfo, i, &v);
    }
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
dengyihao's avatar
dengyihao 已提交
472
    for (int32_t i = 0; i < numOfRows; ++i) {
473 474
      colDataAppendDouble(pColInfo, i, &v);
    }
475
  } else if (type == TSDB_DATA_TYPE_VARCHAR) {
L
Liu Jicong 已提交
476
    char* tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE);
477
    STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen);
L
Liu Jicong 已提交
478
    for (int32_t i = 0; i < numOfRows; ++i) {
479 480
      colDataAppend(pColInfo, i, tmp, false);
    }
481 482 483 484 485
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
486
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
X
Xiaoyu Wang 已提交
487
                                   int32_t scanFlag, bool createDummyCol) {
488 489
  int32_t code = TSDB_CODE_SUCCESS;

490
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
L
Liu Jicong 已提交
491
    pCtx[i].order = order;
492 493
    pCtx[i].input.numOfRows = pBlock->info.rows;

L
Liu Jicong 已提交
494
    pCtx[i].pSrcBlock = pBlock;
X
Xiaoyu Wang 已提交
495
    pCtx[i].scanFlag = scanFlag;
H
Haojun Liao 已提交
496

497
    SInputColumnInfoData* pInput = &pCtx[i].input;
498
    pInput->uid = pBlock->info.uid;
C
Cary Xu 已提交
499
    pInput->colDataAggIsSet = false;
500

501
    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
502
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
dengyihao's avatar
dengyihao 已提交
503
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
G
Ganlin Zhao 已提交
504 505
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
dengyihao's avatar
dengyihao 已提交
506
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
507 508 509
        pInput->totalRows = pBlock->info.rows;
        pInput->numOfRows = pBlock->info.rows;
        pInput->startRowIndex = 0;
510

511
        // NOTE: the last parameter is the primary timestamp column
512 513 514
        if (fmIsTimelineFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
          pInput->pPTS = pInput->pData[j];
        }
515 516
        ASSERT(pInput->pData[j] != NULL);
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
517 518 519
        // todo avoid case: top(k, 12), 12 is the value parameter.
        // sum(11), 11 is also the value parameter.
        if (createDummyCol && pOneExpr->base.numOfParams == 1) {
520 521 522 523
          pInput->totalRows = pBlock->info.rows;
          pInput->numOfRows = pBlock->info.rows;
          pInput->startRowIndex = 0;

524
          code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows);
525 526 527
          if (code != TSDB_CODE_SUCCESS) {
            return code;
          }
528
        }
G
Ganlin Zhao 已提交
529 530
      }
    }
H
Haojun Liao 已提交
531
  }
532 533

  return code;
H
Haojun Liao 已提交
534 535
}

536
static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) {
537
  for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
H
Haojun Liao 已提交
538
    if (functionNeedToExecute(&pCtx[k])) {
539
      // todo add a dummy funtion to avoid process check
540 541 542 543 544 545 546 547
      if (pCtx[k].fpSet.process == NULL) {
        continue;
      }

      int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
      if (code != TSDB_CODE_SUCCESS) {
        qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
        return code;
548
      }
549 550
    }
  }
551 552

  return TSDB_CODE_SUCCESS;
553 554
}

H
Haojun Liao 已提交
555
static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) {
dengyihao's avatar
dengyihao 已提交
556
  size_t num = (pPseudoList != NULL) ? taosArrayGetSize(pPseudoList) : 0;
H
Haojun Liao 已提交
557 558 559 560 561
  for (int32_t i = 0; i < num; ++i) {
    pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i);
  }
}

562
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
X
Xiaoyu Wang 已提交
563
                              int32_t numOfOutput, SArray* pPseudoList) {
H
Haojun Liao 已提交
564
  setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
H
Haojun Liao 已提交
565
  pResult->info.groupId = pSrcBlock->info.groupId;
H
Haojun Liao 已提交
566

dengyihao's avatar
dengyihao 已提交
567 568
  // if the source equals to the destination, it is to create a new column as the result of scalar function or some
  // operators.
569 570
  bool createNewColModel = (pResult == pSrcBlock);

571 572
  int32_t numOfRows = 0;

573
  for (int32_t k = 0; k < numOfOutput; ++k) {
dengyihao's avatar
dengyihao 已提交
574
    int32_t         outputSlotId = pExpr[k].base.resSchema.slotId;
575 576
    SqlFunctionCtx* pfCtx = &pCtx[k];

L
Liu Jicong 已提交
577
    if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) {  // it is a project query
578
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
579
      if (pResult->info.rows > 0 && !createNewColModel) {
X
Xiaoyu Wang 已提交
580 581
        colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pfCtx->input.pData[0],
                        pfCtx->input.numOfRows);
582
      } else {
583
        colDataAssign(pColInfoData, pfCtx->input.pData[0], pfCtx->input.numOfRows, &pResult->info);
584
      }
585

586
      numOfRows = pfCtx->input.numOfRows;
587
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
588
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
589

dengyihao's avatar
dengyihao 已提交
590
      int32_t offset = createNewColModel ? 0 : pResult->info.rows;
591
      for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
dengyihao's avatar
dengyihao 已提交
592 593 594
        colDataAppend(pColInfoData, i + offset,
                      taosVariantGet(&pExpr[k].base.pParam[0].param, pExpr[k].base.pParam[0].param.nType),
                      TSDB_DATA_TYPE_NULL == pExpr[k].base.pParam[0].param.nType);
595
      }
596 597

      numOfRows = pSrcBlock->info.rows;
H
Haojun Liao 已提交
598
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) {
599 600 601
      SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
      taosArrayPush(pBlockList, &pSrcBlock);

602
      SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
603
      SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
604

605
      SScalarParam dest = {.columnData = &idata};
X
Xiaoyu Wang 已提交
606
      int32_t      code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
607 608 609 610
      if (code != TSDB_CODE_SUCCESS) {
        taosArrayDestroy(pBlockList);
        return code;
      }
611

dengyihao's avatar
dengyihao 已提交
612
      int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
613
      ASSERT(pResult->info.capacity > 0);
614
      colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
615 616

      numOfRows = dest.numOfRows;
617 618
      taosArrayDestroy(pBlockList);
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
619
      ASSERT(!fmIsAggFunc(pfCtx->functionId));
620

621 622
      // _rowts/_c0, not tbname column
      if (fmIsPseudoColumnFunc(pfCtx->functionId) && (!fmIsScanPseudoColumnFunc(pfCtx->functionId))) {
H
Haojun Liao 已提交
623
        // do nothing
X
Xiaoyu Wang 已提交
624
      } else if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) {
dengyihao's avatar
dengyihao 已提交
625
        SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[k]);
626
        pfCtx->fpSet.init(&pCtx[k], pResInfo);
627

628
        pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
dengyihao's avatar
dengyihao 已提交
629
        pfCtx->offset = createNewColModel ? 0 : pResult->info.rows;  // set the start offset
H
Haojun Liao 已提交
630

631
        // set the timestamp(_rowts) output buffer
632 633
        if (taosArrayGetSize(pPseudoList) > 0) {
          int32_t* outputColIndex = taosArrayGet(pPseudoList, 0);
634
          pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
635
        }
H
Haojun Liao 已提交
636

637
        numOfRows = pfCtx->fpSet.process(pfCtx);
H
Haojun Liao 已提交
638 639 640
      } else {
        SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
        taosArrayPush(pBlockList, &pSrcBlock);
G
Ganlin Zhao 已提交
641

642
        SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
643
        SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
H
Haojun Liao 已提交
644

645
        SScalarParam dest = {.columnData = &idata};
X
Xiaoyu Wang 已提交
646
        int32_t      code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
647 648 649 650
        if (code != TSDB_CODE_SUCCESS) {
          taosArrayDestroy(pBlockList);
          return code;
        }
651

dengyihao's avatar
dengyihao 已提交
652
        int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
653
        ASSERT(pResult->info.capacity > 0);
654
        colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
655 656

        numOfRows = dest.numOfRows;
H
Haojun Liao 已提交
657 658
        taosArrayDestroy(pBlockList);
      }
659
    } else {
660
      ASSERT(0);
661 662
    }
  }
663

664 665 666
  if (!createNewColModel) {
    pResult->info.rows += numOfRows;
  }
667 668

  return TSDB_CODE_SUCCESS;
669 670
}

671 672 673
static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
  if (IS_VAR_DATA_TYPE(type)) {
    // todo disable this
674

675 676 677 678 679 680 681 682 683
    //    if (pResultRow->key == NULL) {
    //      pResultRow->key = taosMemoryMalloc(varDataTLen(pData));
    //      varDataCopy(pResultRow->key, pData);
    //    } else {
    //      ASSERT(memcmp(pResultRow->key, pData, varDataTLen(pData)) == 0);
    //    }
  } else {
    int64_t v = -1;
    GET_TYPED_DATA(v, int64_t, type, pData);
684

685 686
    pResultRow->win.skey = v;
    pResultRow->win.ekey = v;
687 688 689
  }
}

5
54liuyao 已提交
690
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
691
  struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
692

693 694 695 696 697
  // in case of timestamp column, always generated results.
  int32_t functionId = pCtx->functionId;
  if (functionId == -1) {
    return false;
  }
698

699 700
  if (pCtx->scanFlag == REPEAT_SCAN) {
    return fmIsRepeatScanFunc(pCtx->functionId);
701 702
  }

703 704
  if (isRowEntryCompleted(pResInfo)) {
    return false;
705 706
  }

707 708 709
  return true;
}

710 711 712 713 714 715 716
static int32_t doCreateConstantValColumnAggInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type,
                                                int32_t paramIndex, int32_t numOfRows) {
  if (pInput->pData[paramIndex] == NULL) {
    pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pInput->pData[paramIndex] == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
717

718 719 720
    // Set the correct column info (data type and bytes)
    pInput->pData[paramIndex]->info.type = type;
    pInput->pData[paramIndex]->info.bytes = tDataTypes[type].bytes;
721
  }
H
Haojun Liao 已提交
722

723 724 725 726 727 728
  SColumnDataAgg* da = NULL;
  if (pInput->pColumnDataAgg[paramIndex] == NULL) {
    da = taosMemoryCalloc(1, sizeof(SColumnDataAgg));
    pInput->pColumnDataAgg[paramIndex] = da;
    if (da == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
729 730
    }
  } else {
731
    da = pInput->pColumnDataAgg[paramIndex];
732 733
  }

734
  ASSERT(!IS_VAR_DATA_TYPE(type));
735

736 737 738 739 740 741
  if (type == TSDB_DATA_TYPE_BIGINT) {
    int64_t v = pFuncParam->param.i;
    *da = (SColumnDataAgg){.numOfNull = 0, .min = v, .max = v, .maxIndex = 0, .minIndex = 0, .sum = v * numOfRows};
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
    *da = (SColumnDataAgg){.numOfNull = 0, .maxIndex = 0, .minIndex = 0};
742

743 744 745 746 747 748 749 750 751 752 753 754
    *(double*)&da->min = v;
    *(double*)&da->max = v;
    *(double*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_BOOL) {  // todo validate this data type
    bool v = pFuncParam->param.i;

    *da = (SColumnDataAgg){.numOfNull = 0, .maxIndex = 0, .minIndex = 0};
    *(bool*)&da->min = 0;
    *(bool*)&da->max = v;
    *(bool*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_TIMESTAMP) {
    // do nothing
755
  } else {
756
    ASSERT(0);
757 758
  }

759 760
  return TSDB_CODE_SUCCESS;
}
761 762 763 764 765 766 767 768 769 770 771

void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock) {
  int32_t numOfRows = pBlock->info.rows;

  SInputColumnInfoData* pInput = &pCtx->input;
  pInput->numOfRows = numOfRows;
  pInput->totalRows = numOfRows;

  if (pBlock->pBlockAgg != NULL) {
    pInput->colDataAggIsSet = true;

772 773
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
774

775 776
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
777 778 779 780
        pInput->pColumnDataAgg[j] = pBlock->pBlockAgg[slotId];
        if (pInput->pColumnDataAgg[j] == NULL) {
          pInput->colDataAggIsSet = false;
        }
781 782 783 784

        // Here we set the column info data since the data type for each column data is required, but
        // the data in the corresponding SColumnInfoData will not be used.
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
785 786
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
        doCreateConstantValColumnAggInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
787 788
      }
    }
789
  } else {
790
    pInput->colDataAggIsSet = false;
791 792 793
  }

  // set the statistics data for primary time stamp column
794 795 796 797 798
  //  if (pCtx->functionId == FUNCTION_SPREAD && pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
  //    pCtx->isAggSet = true;
  //    pCtx->agg.min = pBlock->info.window.skey;
  //    pCtx->agg.max = pBlock->info.window.ekey;
  //  }
799 800
}

L
Liu Jicong 已提交
801
bool isTaskKilled(SExecTaskInfo* pTaskInfo) {
802 803
  // query has been executed more than tsShellActivityTimer, and the retrieve has not arrived
  // abort current query execution.
L
Liu Jicong 已提交
804 805
  if (pTaskInfo->owner != 0 &&
      ((taosGetTimestampSec() - pTaskInfo->cost.start / 1000) > 10 * getMaximumIdleDurationSec())
806 807
      /*(!needBuildResAfterQueryComplete(pTaskInfo))*/) {
    assert(pTaskInfo->cost.start != 0);
L
Liu Jicong 已提交
808 809 810
    //    qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64
    //           ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec());
    //    return true;
811 812 813 814 815
  }

  return false;
}

L
Liu Jicong 已提交
816
void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED; }
817 818

/////////////////////////////////////////////////////////////////////////////////////////////
L
Liu Jicong 已提交
819
// todo refactor : return window
820
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win) {
H
Haojun Liao 已提交
821
  win->skey = taosTimeTruncate(key, pInterval, precision);
822 823

  /*
H
Haojun Liao 已提交
824
   * if the realSkey > INT64_MAX - pInterval->interval, the query duration between
825 826
   * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
   */
827 828
  win->ekey = taosTimeAdd(win->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
  if (win->ekey < win->skey) {
829 830 831 832
    win->ekey = INT64_MAX;
  }
}

833
#if 0
L
Liu Jicong 已提交
834
static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
835

836 837 838
  bool hasFirstLastFunc = false;
  bool hasOtherFunc = false;

839
  if (status == BLK_DATA_DATA_LOAD || status == BLK_DATA_FILTEROUT) {
840 841 842 843 844
    return status;
  }

  for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
    int32_t functionId = getExprFunctionId(&pQuery->pExpr1[i]);
845

846 847 848 849 850 851 852 853 854 855
    if (functionId == FUNCTION_TS || functionId == FUNCTION_TS_DUMMY || functionId == FUNCTION_TAG ||
        functionId == FUNCTION_TAG_DUMMY) {
      continue;
    }

    if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_LAST_DST) {
      hasFirstLastFunc = true;
    } else {
      hasOtherFunc = true;
    }
856

857 858
  }

859
  if (hasFirstLastFunc && status == BLK_DATA_NOT_LOAD) {
L
Liu Jicong 已提交
860
    if (!hasOtherFunc) {
861
      return BLK_DATA_FILTEROUT;
862
    } else {
863
      return BLK_DATA_DATA_LOAD;
864 865 866 867 868 869
    }
  }

  return status;
}

870 871
#endif

L
Liu Jicong 已提交
872 873
// static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableReq* pQueryMsg, bool stableQuery) {
//   STaskAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
H
Haojun Liao 已提交
874
//
L
Liu Jicong 已提交
875 876 877 878
//   // in case of point-interpolation query, use asc order scan
//   char msg[] = "QInfo:0x%"PRIx64" scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%"
//   PRId64
//                "-%" PRId64 ", new qrange:%" PRId64 "-%" PRId64;
H
Haojun Liao 已提交
879
//
L
Liu Jicong 已提交
880 881 882 883 884
//   // todo handle the case the the order irrelevant query type mixed up with order critical query type
//   // descending order query for last_row query
//   if (isFirstLastRowQuery(pQueryAttr)) {
//     //qDebug("QInfo:0x%"PRIx64" scan order changed for last_row query, old:%d, new:%d", pQInfo->qId,
//     pQueryAttr->order.order, TSDB_ORDER_ASC);
H
Haojun Liao 已提交
885
//
L
Liu Jicong 已提交
886 887
//     pQueryAttr->order.order = TSDB_ORDER_ASC;
//     if (pQueryAttr->window.skey > pQueryAttr->window.ekey) {
wafwerar's avatar
wafwerar 已提交
888
//       TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
L
Liu Jicong 已提交
889
//     }
H
Haojun Liao 已提交
890
//
L
Liu Jicong 已提交
891 892 893
//     pQueryAttr->needReverseScan = false;
//     return;
//   }
H
Haojun Liao 已提交
894
//
L
Liu Jicong 已提交
895 896 897
//   if (pQueryAttr->groupbyColumn && pQueryAttr->order.order == TSDB_ORDER_DESC) {
//     pQueryAttr->order.order = TSDB_ORDER_ASC;
//     if (pQueryAttr->window.skey > pQueryAttr->window.ekey) {
wafwerar's avatar
wafwerar 已提交
898
//       TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
L
Liu Jicong 已提交
899
//     }
H
Haojun Liao 已提交
900
//
L
Liu Jicong 已提交
901 902 903 904
//     pQueryAttr->needReverseScan = false;
//     doUpdateLastKey(pQueryAttr);
//     return;
//   }
H
Haojun Liao 已提交
905
//
L
Liu Jicong 已提交
906 907 908 909 910 911
//   if (pQueryAttr->pointInterpQuery && pQueryAttr->interval.interval == 0) {
//     if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
//       //qDebug(msg, pQInfo->qId, "interp", pQueryAttr->order.order, TSDB_ORDER_ASC, pQueryAttr->window.skey,
//       pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey); TSWAP(pQueryAttr->window.skey,
//       pQueryAttr->window.ekey, TSKEY);
//     }
H
Haojun Liao 已提交
912
//
L
Liu Jicong 已提交
913 914 915
//     pQueryAttr->order.order = TSDB_ORDER_ASC;
//     return;
//   }
H
Haojun Liao 已提交
916
//
L
Liu Jicong 已提交
917 918 919 920
//   if (pQueryAttr->interval.interval == 0) {
//     if (onlyFirstQuery(pQueryAttr)) {
//       if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
//         //qDebug(msg, pQInfo->qId, "only-first", pQueryAttr->order.order, TSDB_ORDER_ASC, pQueryAttr->window.skey,
H
Haojun Liao 已提交
921 922
////               pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
//
wafwerar's avatar
wafwerar 已提交
923
//        TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
H
Haojun Liao 已提交
924 925 926 927 928 929 930 931 932 933
//        doUpdateLastKey(pQueryAttr);
//      }
//
//      pQueryAttr->order.order = TSDB_ORDER_ASC;
//      pQueryAttr->needReverseScan = false;
//    } else if (onlyLastQuery(pQueryAttr) && notContainSessionOrStateWindow(pQueryAttr)) {
//      if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//        //qDebug(msg, pQInfo->qId, "only-last", pQueryAttr->order.order, TSDB_ORDER_DESC, pQueryAttr->window.skey,
////               pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
//
wafwerar's avatar
wafwerar 已提交
934
//        TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
H
Haojun Liao 已提交
935 936 937 938 939 940 941 942 943 944 945 946
//        doUpdateLastKey(pQueryAttr);
//      }
//
//      pQueryAttr->order.order = TSDB_ORDER_DESC;
//      pQueryAttr->needReverseScan = false;
//    }
//
//  } else {  // interval query
//    if (stableQuery) {
//      if (onlyFirstQuery(pQueryAttr)) {
//        if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
//          //qDebug(msg, pQInfo->qId, "only-first stable", pQueryAttr->order.order, TSDB_ORDER_ASC,
L
Liu Jicong 已提交
947 948
////                 pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey,
/// pQueryAttr->window.skey);
H
Haojun Liao 已提交
949
//
wafwerar's avatar
wafwerar 已提交
950
//          TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
H
Haojun Liao 已提交
951 952 953 954 955 956 957 958
//          doUpdateLastKey(pQueryAttr);
//        }
//
//        pQueryAttr->order.order = TSDB_ORDER_ASC;
//        pQueryAttr->needReverseScan = false;
//      } else if (onlyLastQuery(pQueryAttr)) {
//        if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//          //qDebug(msg, pQInfo->qId, "only-last stable", pQueryAttr->order.order, TSDB_ORDER_DESC,
L
Liu Jicong 已提交
959 960
////                 pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey,
/// pQueryAttr->window.skey);
H
Haojun Liao 已提交
961
//
wafwerar's avatar
wafwerar 已提交
962
//          TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
H
Haojun Liao 已提交
963 964 965 966 967 968 969 970 971
//          doUpdateLastKey(pQueryAttr);
//        }
//
//        pQueryAttr->order.order = TSDB_ORDER_DESC;
//        pQueryAttr->needReverseScan = false;
//      }
//    }
//  }
//}
972

L
Liu Jicong 已提交
973 974 975
// static FORCE_INLINE bool doFilterByBlockStatistics(STaskRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis,
// SqlFunctionCtx *pCtx, int32_t numOfRows) {
//   STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
976
//
L
Liu Jicong 已提交
977 978 979
//   if (pDataStatis == NULL || pQueryAttr->pFilters == NULL) {
//     return true;
//   }
980
//
L
Liu Jicong 已提交
981 982
//   return filterRangeExecute(pQueryAttr->pFilters, pDataStatis, pQueryAttr->numOfCols, numOfRows);
// }
983
#if 0
H
Haojun Liao 已提交
984
static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockInfo) {
985 986
  STimeWindow w = {0};

dengyihao's avatar
dengyihao 已提交
987 988
  TSKEY sk = TMIN(pQueryAttr->window.skey, pQueryAttr->window.ekey);
  TSKEY ek = TMAX(pQueryAttr->window.skey, pQueryAttr->window.ekey);
989

990
  if (true) {
L
Liu Jicong 已提交
991
    //    getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.skey, sk, ek, &w);
992 993 994 995 996 997
    assert(w.ekey >= pBlockInfo->window.skey);

    if (w.ekey < pBlockInfo->window.ekey) {
      return true;
    }

L
Liu Jicong 已提交
998 999
    while (1) {
      //      getNextTimeWindow(pQueryAttr, &w);
1000 1001 1002 1003 1004 1005 1006 1007 1008 1009
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

      assert(w.ekey > pBlockInfo->window.ekey);
      if (w.skey <= pBlockInfo->window.ekey && w.skey > pBlockInfo->window.skey) {
        return true;
      }
    }
  } else {
L
Liu Jicong 已提交
1010
    //    getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.ekey, sk, ek, &w);
1011 1012 1013 1014 1015 1016
    assert(w.skey <= pBlockInfo->window.ekey);

    if (w.skey > pBlockInfo->window.skey) {
      return true;
    }

L
Liu Jicong 已提交
1017 1018
    while (1) {
      //      getNextTimeWindow(pQueryAttr, &w);
1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

      assert(w.skey < pBlockInfo->window.skey);
      if (w.ekey < pBlockInfo->window.ekey && w.ekey >= pBlockInfo->window.skey) {
        return true;
      }
    }
  }

  return false;
}
1032
#endif
1033 1034

static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) {
H
Haojun Liao 已提交
1035
  SqlFunctionCtx* pCtx = pTableScanInfo->pCtx;
1036
  uint32_t        status = BLK_DATA_NOT_LOAD;
1037 1038 1039 1040

  int32_t numOfOutput = pTableScanInfo->numOfOutput;
  for (int32_t i = 0; i < numOfOutput; ++i) {
    int32_t functionId = pCtx[i].functionId;
H
Haojun Liao 已提交
1041
    int32_t colId = pTableScanInfo->pExpr[i].base.pParam[0].pCol->colId;
1042 1043 1044

    // group by + first/last should not apply the first/last block filter
    if (functionId < 0) {
1045
      status |= BLK_DATA_DATA_LOAD;
1046 1047
      return status;
    } else {
L
Liu Jicong 已提交
1048
      //      status |= aAggs[functionId].dataReqFunc(&pTableScanInfo->pCtx[i], &pBlock->info.window, colId);
1049
      //      if ((status & BLK_DATA_DATA_LOAD) == BLK_DATA_DATA_LOAD) {
L
Liu Jicong 已提交
1050 1051
      //        return status;
      //      }
1052 1053 1054 1055 1056 1057
    }
  }

  return status;
}

L
Liu Jicong 已提交
1058 1059
int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
                              uint32_t* status) {
1060
  *status = BLK_DATA_NOT_LOAD;
1061

H
Haojun Liao 已提交
1062
  pBlock->pDataBlock = NULL;
L
Liu Jicong 已提交
1063
  pBlock->pBlockAgg = NULL;
H
Haojun Liao 已提交
1064

L
Liu Jicong 已提交
1065 1066
  //  int64_t groupId = pRuntimeEnv->current->groupIndex;
  //  bool    ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
1067

H
Haojun Liao 已提交
1068
  STaskCostInfo* pCost = &pTaskInfo->cost;
1069

1070 1071
//  pCost->totalBlocks += 1;
//  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
1072
#if 0
1073 1074 1075
  // Calculate all time windows that are overlapping or contain current data block.
  // If current data block is contained by all possible time window, do not load current data block.
  if (/*pQueryAttr->pFilters || */pQueryAttr->groupbyColumn || pQueryAttr->sw.gap > 0 ||
H
Haojun Liao 已提交
1076
      (QUERY_IS_INTERVAL_QUERY(pQueryAttr) && overlapWithTimeWindow(pTaskInfo, &pBlock->info))) {
1077
    (*status) = BLK_DATA_DATA_LOAD;
1078 1079 1080
  }

  // check if this data block is required to load
1081
  if ((*status) != BLK_DATA_DATA_LOAD) {
1082 1083 1084 1085 1086 1087 1088
    bool needFilter = true;

    // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
    // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
    if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
      SResultRow* pResult = NULL;

H
Haojun Liao 已提交
1089
      bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
1090 1091 1092 1093 1094 1095
      TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

      STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
      if (pQueryAttr->pointInterpQuery) {
        needFilter = chkWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
1096
                                    pTableScanInfo->rowEntryInfoOffset);
1097 1098 1099
      } else {
        if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId,
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
1100
                                    pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
1101 1102 1103 1104 1105
          longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }
    } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate
      doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx,
1106
                               pTableScanInfo->rowEntryInfoOffset, pTableScanInfo->numOfOutput,
1107 1108 1109 1110 1111 1112
                               pRuntimeEnv->current->groupIndex);
    }

    if (needFilter) {
      (*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock);
    } else {
1113
      (*status) = BLK_DATA_DATA_LOAD;
1114 1115 1116 1117
    }
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
H
Haojun Liao 已提交
1118
//  *status = updateBlockLoadStatus(pRuntimeEnv->pQueryAttr, *status);
1119

1120
  if ((*status) == BLK_DATA_NOT_LOAD || (*status) == BLK_DATA_FILTEROUT) {
1121 1122
    //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//           pBlockInfo->window.ekey, pBlockInfo->rows);
1123
    pCost->skipBlocks += 1;
1124
  } else if ((*status) == BLK_DATA_SMA_LOAD) {
1125 1126
    // this function never returns error?
    pCost->loadBlockStatis += 1;
1127
//    tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
1128 1129

    if (pBlock->pBlockAgg == NULL) {  // data block statistics does not exist, load data block
1130
//      pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
1131 1132 1133
      pCost->totalCheckedRows += pBlock->info.rows;
    }
  } else {
1134
    assert((*status) == BLK_DATA_DATA_LOAD);
1135 1136 1137

    // load the data block statistics to perform further filter
    pCost->loadBlockStatis += 1;
1138
//    tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
1139 1140 1141 1142 1143 1144

    if (pQueryAttr->topBotQuery && pBlock->pBlockAgg != NULL) {
      { // set previous window
        if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
          SResultRow* pResult = NULL;

H
Haojun Liao 已提交
1145
          bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
1146 1147 1148 1149 1150
          TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

          STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
          if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId,
                                      pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
1151
                                      pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162
            longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
          }
        }
      }
      bool load = false;
      for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
        int32_t functionId = pTableScanInfo->pCtx[i].functionId;
        if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM) {
//          load = topbot_datablock_filter(&pTableScanInfo->pCtx[i], (char*)&(pBlock->pBlockAgg[i].min),
//                                         (char*)&(pBlock->pBlockAgg[i].max));
          if (!load) { // current block has been discard due to filter applied
1163
            pCost->skipBlocks += 1;
1164 1165
            //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId,
//                   pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
1166
            (*status) = BLK_DATA_FILTEROUT;
1167 1168 1169 1170 1171 1172 1173 1174
            return TSDB_CODE_SUCCESS;
          }
        }
      }
    }

    // current block has been discard due to filter applied
//    if (!doFilterByBlockStatistics(pRuntimeEnv, pBlock->pBlockAgg, pTableScanInfo->pCtx, pBlockInfo->rows)) {
1175
//      pCost->skipBlocks += 1;
1176 1177
//      qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//             pBlockInfo->window.ekey, pBlockInfo->rows);
1178
//      (*status) = BLK_DATA_FILTEROUT;
1179 1180 1181 1182 1183
//      return TSDB_CODE_SUCCESS;
//    }

    pCost->totalCheckedRows += pBlockInfo->rows;
    pCost->loadBlocks += 1;
1184
//    pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
1185 1186 1187 1188 1189
//    if (pBlock->pDataBlock == NULL) {
//      return terrno;
//    }

//    if (pQueryAttr->pFilters != NULL) {
1190
//      filterSetColFieldData(pQueryAttr->pFilters, taosArrayGetSize(pBlock->pDataBlock), pBlock->pDataBlock);
1191
//    }
1192

1193 1194 1195 1196
//    if (pQueryAttr->pFilters != NULL || pRuntimeEnv->pTsBuf != NULL) {
//      filterColRowsInDataBlock(pRuntimeEnv, pBlock, ascQuery);
//    }
  }
H
Haojun Liao 已提交
1197
#endif
1198 1199 1200
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1201
static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo) {
1202 1203 1204 1205
  if (pTableQueryInfo == NULL) {
    return;
  }

wafwerar's avatar
wafwerar 已提交
1206
  //  TSWAP(pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey);
L
Liu Jicong 已提交
1207
  //  pTableQueryInfo->lastKey = pTableQueryInfo->win.skey;
1208

L
Liu Jicong 已提交
1209 1210
  //  SWITCH_ORDER(pTableQueryInfo->cur.order);
  //  pTableQueryInfo->cur.vgroupIndex = -1;
1211 1212

  // set the index to be the end slot of result rows array
dengyihao's avatar
dengyihao 已提交
1213 1214 1215 1216 1217 1218
  //  SResultRowInfo* pResultRowInfo = &pTableQueryInfo->resInfo;
  //  if (pResultRowInfo->size > 0) {
  //    pResultRowInfo->curPos = pResultRowInfo->size - 1;
  //  } else {
  //    pResultRowInfo->curPos = -1;
  //  }
1219 1220
}

H
Haojun Liao 已提交
1221
void initResultRow(SResultRow* pResultRow) {
X
Xiaoyu Wang 已提交
1222
  //  pResultRow->pEntryInfo = (struct SResultRowEntryInfo*)((char*)pResultRow + sizeof(SResultRow));
1223 1224 1225 1226 1227
}

/*
 * The start of each column SResultRowEntryInfo is denote by RowCellInfoOffset.
 * Note that in case of top/bottom query, the whole multiple rows of result is treated as only one row of results.
H
Haojun Liao 已提交
1228 1229 1230
 * +------------+-----------------result column 1------------+------------------result column 2-----------+
 * | SResultRow | SResultRowEntryInfo | intermediate buffer1 | SResultRowEntryInfo | intermediate buffer 2|
 * +------------+--------------------------------------------+--------------------------------------------+
1231 1232
 *           offset[0]                                  offset[1]                                   offset[2]
 */
1233
// TODO refactor: some function move away
1234 1235 1236
void setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage,
                             int32_t numOfExprs) {
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
1237 1238
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
H
Haojun Liao 已提交
1239

H
Haojun Liao 已提交
1240
  SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
1241
  initResultRowInfo(pResultRowInfo);
H
Haojun Liao 已提交
1242

L
Liu Jicong 已提交
1243 1244
  int64_t     tid = 0;
  int64_t     groupId = 0;
1245 1246
  SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, (char*)&tid, sizeof(tid), true, groupId,
                                            pTaskInfo, false, pSup);
H
Haojun Liao 已提交
1247

1248
  for (int32_t i = 0; i < numOfExprs; ++i) {
1249
    struct SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, rowEntryInfoOffset);
H
Haojun Liao 已提交
1250 1251
    cleanupResultRowEntry(pEntry);

L
Liu Jicong 已提交
1252
    pCtx[i].resultInfo = pEntry;
1253
    pCtx[i].scanFlag = stage;
H
Haojun Liao 已提交
1254 1255
  }

1256
  initCtxOutputBuffer(pCtx, numOfExprs);
H
Haojun Liao 已提交
1257 1258
}

H
Haojun Liao 已提交
1259
void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) {
1260 1261
  for (int32_t j = 0; j < size; ++j) {
    struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
dengyihao's avatar
dengyihao 已提交
1262 1263
    if (isRowEntryInitialized(pResInfo) || fmIsPseudoColumnFunc(pCtx[j].functionId) || pCtx[j].functionId == -1 ||
        fmIsScalarFunc(pCtx[j].functionId)) {
1264 1265 1266
      continue;
    }

H
Haojun Liao 已提交
1267
    pCtx[j].fpSet.init(&pCtx[j], pCtx[j].resultInfo);
1268 1269 1270
  }
}

L
Liu Jicong 已提交
1271
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
1272
  if (status == TASK_NOT_COMPLETED) {
H
Haojun Liao 已提交
1273
    pTaskInfo->status = status;
1274 1275
  } else {
    // QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first
1276
    CLEAR_QUERY_STATUS(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
1277
    pTaskInfo->status |= status;
1278 1279 1280
  }
}

L
Liu Jicong 已提交
1281
void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) {
1282 1283 1284 1285
  if (pTableQueryInfo == NULL) {
    return;
  }

L
Liu Jicong 已提交
1286
  //  taosVariantDestroy(&pTableQueryInfo->tag);
dengyihao's avatar
dengyihao 已提交
1287
  //  cleanupResultRowInfo(&pTableQueryInfo->resInfo);
1288 1289
}

1290
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
1291
  for (int32_t i = 0; i < numOfOutput; ++i) {
1292
    pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
1293 1294 1295 1296 1297

    struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
    if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
      continue;
    }
1298 1299 1300 1301 1302

    if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
      continue;
    }

1303 1304 1305 1306 1307 1308
    if (!pResInfo->initialized) {
      if (pCtx[i].functionId != -1) {
        pCtx[i].fpSet.init(&pCtx[i], pResInfo);
      } else {
        pResInfo->initialized = true;
      }
1309 1310 1311 1312
    }
  }
}

1313
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep);
1314

1315
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
1316 1317 1318 1319 1320
  if (pFilterNode == NULL) {
    return;
  }

  SFilterInfo* filter = NULL;
H
Haojun Liao 已提交
1321

H
Haojun Liao 已提交
1322
  // todo move to the initialization function
H
Haojun Liao 已提交
1323
  int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
1324

1325
  size_t             numOfCols = taosArrayGetSize(pBlock->pDataBlock);
1326
  SFilterColumnParam param1 = {.numOfCols = numOfCols, .pDataBlock = pBlock->pDataBlock};
1327 1328 1329
  code = filterSetDataFromSlotId(filter, &param1);

  int8_t* rowRes = NULL;
1330

1331
  // todo the keep seems never to be True??
1332
  bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols);
D
dapan1121 已提交
1333
  filterFreeInfo(filter);
1334

1335
  extractQualifiedTupleByFilterResult(pBlock, rowRes, keep);
1336
  blockDataUpdateTsWindow(pBlock, 0);
1337 1338
}

1339
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep) {
1340 1341 1342 1343 1344
  if (keep) {
    return;
  }

  if (rowRes != NULL) {
L
Liu Jicong 已提交
1345
    int32_t      totalRows = pBlock->info.rows;
1346
    SSDataBlock* px = createOneDataBlock(pBlock, true);
1347

1348 1349
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
    for (int32_t i = 0; i < numOfCols; ++i) {
1350 1351
      SColumnInfoData* pSrc = taosArrayGet(px->pDataBlock, i);
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
1352
      // it is a reserved column for scalar function, and no data in this column yet.
1353
      if (pDst->pData == NULL) {
1354 1355 1356
        continue;
      }

1357 1358
      colInfoDataCleanup(pDst, pBlock->info.rows);

1359
      int32_t numOfRows = 0;
1360
      for (int32_t j = 0; j < totalRows; ++j) {
D
dapan1121 已提交
1361 1362 1363
        if (rowRes[j] == 0) {
          continue;
        }
1364

D
dapan1121 已提交
1365
        if (colDataIsNull_s(pSrc, j)) {
1366
          colDataAppendNULL(pDst, numOfRows);
D
dapan1121 已提交
1367
        } else {
1368
          colDataAppend(pDst, numOfRows, colDataGetData(pSrc, j), false);
D
dapan1121 已提交
1369
        }
1370
        numOfRows += 1;
H
Haojun Liao 已提交
1371
      }
1372

1373 1374 1375 1376 1377
      if (pBlock->info.rows == totalRows) {
        pBlock->info.rows = numOfRows;
      } else {
        ASSERT(pBlock->info.rows == numOfRows);
      }
1378
    }
1379

dengyihao's avatar
dengyihao 已提交
1380
    blockDataDestroy(px);  // fix memory leak
1381 1382 1383
  } else {
    // do nothing
    pBlock->info.rows = 0;
1384 1385 1386
  }
}

1387 1388
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput,
                              uint64_t groupId) {
1389
  // for simple group by query without interval, all the tables belong to one group result.
1390
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
1391
  SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
1392 1393
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
1394

1395
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId,
L
Liu Jicong 已提交
1396
                                                  sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
L
Liu Jicong 已提交
1397
  assert(pResultRow != NULL);
1398 1399 1400 1401 1402 1403

  /*
   * not assign result buffer yet, add new result buffer
   * all group belong to one result set, and each group result has different group id so set the id to be one
   */
  if (pResultRow->pageId == -1) {
dengyihao's avatar
dengyihao 已提交
1404 1405
    int32_t ret =
        addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, groupId, pAggInfo->binfo.pRes->info.rowSize);
1406 1407 1408 1409 1410
    if (ret != TSDB_CODE_SUCCESS) {
      return;
    }
  }

1411
  setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
1412 1413
}

1414
void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId, SAggOperatorInfo* pAggInfo) {
H
Haojun Liao 已提交
1415
  if (pAggInfo->groupId != INT32_MIN && pAggInfo->groupId == groupId) {
1416 1417 1418
    return;
  }

1419
  doSetTableGroupOutputBuf(pOperator, pAggInfo, numOfOutput, groupId);
1420 1421

  // record the current active group id
H
Haojun Liao 已提交
1422
  pAggInfo->groupId = groupId;
1423 1424
}

1425 1426
static void doUpdateNumOfRows(SResultRow* pRow, int32_t numOfExprs, const int32_t* rowCellOffset) {
  for (int32_t j = 0; j < numOfExprs; ++j) {
1427
    struct SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowCellOffset);
1428 1429 1430 1431 1432 1433 1434 1435 1436 1437
    if (!isRowEntryInitialized(pResInfo)) {
      continue;
    }

    if (pRow->numOfRows < pResInfo->numOfRes) {
      pRow->numOfRows = pResInfo->numOfRes;
    }
  }
}

1438
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
S
shenglian zhou 已提交
1439 1440 1441
                                             SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs,
                                             const int32_t* rowCellOffset, SSDataBlock* pBlock,
                                             SExecTaskInfo* pTaskInfo) {
1442 1443 1444 1445 1446 1447 1448 1449 1450
  SFilePage*  page = getBufPage(pBuf, resultRowPosition->pageId);
  SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);

  doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset);
  if (pRow->numOfRows == 0) {
    releaseBufPage(pBuf, page);
    return 0;
  }

1451 1452 1453 1454 1455 1456 1457
  while (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
    int32_t code = blockDataEnsureCapacity(pBlock, pBlock->info.capacity * 1.25);
    if (TAOS_FAILED(code)) {
      releaseBufPage(pBuf, page);
      qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
      longjmp(pTaskInfo->env, code);
    }
1458 1459 1460 1461 1462
  }

  for (int32_t j = 0; j < numOfExprs; ++j) {
    int32_t slotId = pExprInfo[j].base.resSchema.slotId;

1463
    pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowCellOffset);
1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483
    if (pCtx[j].fpSet.finalize) {
      int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
      if (TAOS_FAILED(code)) {
        qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
        longjmp(pTaskInfo->env, code);
      }
    } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
      // do nothing, todo refactor
    } else {
      // expand the result into multiple rows. E.g., _wstartts, top(k, 20)
      // the _wstartts needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
      char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
      for (int32_t k = 0; k < pRow->numOfRows; ++k) {
        colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
      }
    }
  }

  releaseBufPage(pBuf, page);
1484
  pBlock->info.rows += pRow->numOfRows;
1485 1486 1487 1488

  return 0;
}

X
Xiaoyu Wang 已提交
1489 1490 1491
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf,
                           SGroupResInfo* pGroupResInfo, const int32_t* rowCellOffset, SqlFunctionCtx* pCtx,
                           int32_t numOfExprs) {
1492
  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
1493
  int32_t start = pGroupResInfo->index;
1494

1495
  for (int32_t i = start; i < numOfRows; i += 1) {
L
Liu Jicong 已提交
1496 1497
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
    SFilePage*  page = getBufPage(pBuf, pPos->pos.pageId);
1498

1499
    SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
1500 1501

    doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset);
1502 1503
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
1504
      releaseBufPage(pBuf, page);
1505 1506 1507
      continue;
    }

1508 1509 1510 1511 1512
    if (pBlock->info.groupId == 0) {
      pBlock->info.groupId = pPos->groupId;
    } else {
      // current value belongs to different group, it can't be packed into one datablock
      if (pBlock->info.groupId != pPos->groupId) {
1513
        releaseBufPage(pBuf, page);
1514 1515 1516 1517
        break;
      }
    }

1518
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
1519
      releaseBufPage(pBuf, page);
1520 1521 1522 1523 1524
      break;
    }

    pGroupResInfo->index += 1;

1525
    for (int32_t j = 0; j < numOfExprs; ++j) {
1526 1527
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

1528
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowCellOffset);
1529
      if (pCtx[j].fpSet.finalize) {
1530
        int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
1531
        if (TAOS_FAILED(code)) {
1532 1533
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
          longjmp(pTaskInfo->env, code);
1534
        }
1535 1536
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
1537
      } else {
1538 1539
        // expand the result into multiple rows. E.g., _wstartts, top(k, 20)
        // the _wstartts needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
X
Xiaoyu Wang 已提交
1540 1541
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
1542
        if (pCtx[j].increase) {
L
Liu Jicong 已提交
1543
          int64_t ts = *(int64_t*)in;
1544
          for (int32_t k = 0; k < pRow->numOfRows; ++k) {
L
Liu Jicong 已提交
1545
            colDataAppend(pColInfoData, pBlock->info.rows + k, (const char*)&ts, pCtx[j].resultInfo->isNullRes);
1546 1547 1548 1549 1550 1551
            ts++;
          }
        } else {
          for (int32_t k = 0; k < pRow->numOfRows; ++k) {
            colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
          }
X
Xiaoyu Wang 已提交
1552
        }
1553
      }
1554 1555
    }

1556
    releaseBufPage(pBuf, page);
1557 1558
    pBlock->info.rows += pRow->numOfRows;
    if (pBlock->info.rows >= pBlock->info.capacity) {  // output buffer is full
1559 1560 1561 1562
      break;
    }
  }

X
Xiaoyu Wang 已提交
1563 1564
  qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
         pBlock->info.groupId);
1565
  blockDataUpdateTsWindow(pBlock, 0);
1566 1567 1568
  return 0;
}

X
Xiaoyu Wang 已提交
1569 1570
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                            SDiskbasedBuf* pBuf) {
1571 1572
  SExprInfo*     pExprInfo = pOperator->exprSupp.pExprInfo;
  int32_t        numOfExprs = pOperator->exprSupp.numOfExprs;
1573 1574
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

1575
  int32_t*        rowCellOffset = pOperator->exprSupp.rowEntryInfoOffset;
X
Xiaoyu Wang 已提交
1576
  SSDataBlock*    pBlock = pbInfo->pRes;
1577
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
1578

1579
  blockDataCleanup(pBlock);
1580
  if (!hasDataInGroupInfo(pGroupResInfo)) {
1581 1582 1583
    return;
  }

1584 1585
  // clear the existed group id
  pBlock->info.groupId = 0;
1586
  doCopyToSDataBlock(pTaskInfo, pBlock, pExprInfo, pBuf, pGroupResInfo, rowCellOffset, pCtx, numOfExprs);
1587 1588
}

L
Liu Jicong 已提交
1589
static void updateNumOfRowsInResultRows(SqlFunctionCtx* pCtx, int32_t numOfOutput, SResultRowInfo* pResultRowInfo,
1590
                                        int32_t* rowEntryInfoOffset) {
1591
  // update the number of result for each, only update the number of rows for the corresponding window result.
L
Liu Jicong 已提交
1592 1593 1594
  //  if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
  //    return;
  //  }
H
Haojun Liao 已提交
1595
#if 0
1596
  for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
L
Liu Jicong 已提交
1597
    SResultRow* pResult = pResultRowInfo->pResult[i];
1598 1599 1600 1601 1602 1603 1604

    for (int32_t j = 0; j < numOfOutput; ++j) {
      int32_t functionId = pCtx[j].functionId;
      if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG || functionId == FUNCTION_TAGPRJ) {
        continue;
      }

1605
      SResultRowEntryInfo* pCell = getResultEntryInfo(pResult, j, rowEntryInfoOffset);
1606
      pResult->numOfRows = (uint16_t)(TMAX(pResult->numOfRows, pCell->numOfRes));
1607 1608
    }
  }
H
Haojun Liao 已提交
1609
#endif
1610 1611
}

L
Liu Jicong 已提交
1612
static int32_t compressQueryColData(SColumnInfoData* pColRes, int32_t numOfRows, char* data, int8_t compressed) {
1613 1614
  int32_t colSize = pColRes->info.bytes * numOfRows;
  return (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colSize, numOfRows, data,
L
Liu Jicong 已提交
1615
                                                      colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
1616 1617
}

1618 1619 1620
int32_t doFillTimeIntervalGapsInResults(struct SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t capacity) {
  int32_t numOfRows = (int32_t)taosFillResultDataBlock(pFillInfo, pBlock, capacity - pBlock->info.rows);
  pBlock->info.rows += numOfRows;
1621

1622
  return pBlock->info.rows;
1623 1624
}

L
Liu Jicong 已提交
1625 1626
void queryCostStatis(SExecTaskInfo* pTaskInfo) {
  STaskCostInfo* pSummary = &pTaskInfo->cost;
1627

L
Liu Jicong 已提交
1628 1629 1630
  //  uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pResultRowHashTable);
  //  hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map);
  //  pSummary->hashSize = hashSize;
1631 1632 1633 1634

  // add the merge time
  pSummary->elapsedTime += pSummary->firstStageMergeTime;

L
Liu Jicong 已提交
1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645
  //  SResultRowPool* p = pTaskInfo->pool;
  //  if (p != NULL) {
  //    pSummary->winInfoSize = getResultRowPoolMemSize(p);
  //    pSummary->numOfTimeWindows = getNumOfAllocatedResultRows(p);
  //  } else {
  //    pSummary->winInfoSize = 0;
  //    pSummary->numOfTimeWindows = 0;
  //  }
  //
  //  calculateOperatorProfResults(pQInfo);

1646 1647
  SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
  if (pSummary->pRecoder != NULL) {
X
Xiaoyu Wang 已提交
1648 1649
    qDebug("%s :cost summary: elapsed time:%" PRId64 " us, first merge:%" PRId64
           " us, total blocks:%d, "
1650 1651 1652 1653
           "load block statis:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
           GET_TASKID(pTaskInfo), pSummary->elapsedTime, pSummary->firstStageMergeTime, pRecorder->totalBlocks,
           pRecorder->loadBlockStatis, pRecorder->loadBlocks, pRecorder->totalRows, pRecorder->totalCheckedRows);
  }
L
Liu Jicong 已提交
1654 1655 1656
  // qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb,
  // hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
  //      pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0);
1657 1658
}

L
Liu Jicong 已提交
1659 1660 1661
// static void updateOffsetVal(STaskRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
1662
//
L
Liu Jicong 已提交
1663
//   int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
1664
//
L
Liu Jicong 已提交
1665 1666 1667 1668
//   if (pQueryAttr->limit.offset == pBlockInfo->rows) {  // current block will ignore completed
//     pTableQueryInfo->lastKey = QUERY_IS_ASC_QUERY(pQueryAttr) ? pBlockInfo->window.ekey + step :
//     pBlockInfo->window.skey + step; pQueryAttr->limit.offset = 0; return;
//   }
1669
//
L
Liu Jicong 已提交
1670 1671 1672 1673 1674
//   if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//     pQueryAttr->pos = (int32_t)pQueryAttr->limit.offset;
//   } else {
//     pQueryAttr->pos = pBlockInfo->rows - (int32_t)pQueryAttr->limit.offset - 1;
//   }
1675
//
L
Liu Jicong 已提交
1676
//   assert(pQueryAttr->pos >= 0 && pQueryAttr->pos <= pBlockInfo->rows - 1);
1677
//
L
Liu Jicong 已提交
1678 1679
//   SArray *         pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//   SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1680
//
L
Liu Jicong 已提交
1681 1682
//   // update the pQueryAttr->limit.offset value, and pQueryAttr->pos value
//   TSKEY *keys = (TSKEY *) pColInfoData->pData;
1683
//
L
Liu Jicong 已提交
1684 1685 1686
//   // update the offset value
//   pTableQueryInfo->lastKey = keys[pQueryAttr->pos];
//   pQueryAttr->limit.offset = 0;
1687
//
L
Liu Jicong 已提交
1688
//   int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
1689
//
L
Liu Jicong 已提交
1690 1691 1692 1693
//   //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numBlocksOfStep:%d, numOfRes:%d,
//   lastKey:%"PRId64, GET_TASKID(pRuntimeEnv),
//          pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey);
// }
1694

L
Liu Jicong 已提交
1695 1696
// void skipBlocks(STaskRuntimeEnv *pRuntimeEnv) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
1697
//
L
Liu Jicong 已提交
1698 1699 1700
//   if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0) {
//     return;
//   }
1701
//
L
Liu Jicong 已提交
1702 1703
//   pQueryAttr->pos = 0;
//   int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
1704
//
L
Liu Jicong 已提交
1705 1706
//   STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
//   TsdbQueryHandleT pTsdbReadHandle = pRuntimeEnv->pTsdbReadHandle;
1707
//
L
Liu Jicong 已提交
1708 1709 1710 1711 1712
//   SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
//   while (tsdbNextDataBlock(pTsdbReadHandle)) {
//     if (isTaskKilled(pRuntimeEnv->qinfo)) {
//       longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
//     }
1713
//
L
Liu Jicong 已提交
1714
//     tsdbRetrieveDataBlockInfo(pTsdbReadHandle, &blockInfo);
1715
//
L
Liu Jicong 已提交
1716 1717 1718 1719
//     if (pQueryAttr->limit.offset > blockInfo.rows) {
//       pQueryAttr->limit.offset -= blockInfo.rows;
//       pTableQueryInfo->lastKey = (QUERY_IS_ASC_QUERY(pQueryAttr)) ? blockInfo.window.ekey : blockInfo.window.skey;
//       pTableQueryInfo->lastKey += step;
1720
//
L
Liu Jicong 已提交
1721 1722 1723 1724 1725 1726 1727
//       //qDebug("QInfo:0x%"PRIx64" skip rows:%d, offset:%" PRId64, GET_TASKID(pRuntimeEnv), blockInfo.rows,
//              pQuery->limit.offset);
//     } else {  // find the appropriated start position in current block
//       updateOffsetVal(pRuntimeEnv, &blockInfo);
//       break;
//     }
//   }
1728
//
L
Liu Jicong 已提交
1729 1730 1731 1732 1733 1734 1735 1736 1737
//   if (terrno != TSDB_CODE_SUCCESS) {
//     longjmp(pRuntimeEnv->env, terrno);
//   }
// }

// static TSKEY doSkipIntervalProcess(STaskRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo,
// STableQueryInfo* pTableQueryInfo) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
1738
//
L
Liu Jicong 已提交
1739 1740 1741
//   assert(pQueryAttr->limit.offset == 0);
//   STimeWindow tw = *win;
//   getNextTimeWindow(pQueryAttr, &tw);
1742
//
L
Liu Jicong 已提交
1743 1744
//   if ((tw.skey <= pBlockInfo->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) ||
//       (tw.ekey >= pBlockInfo->window.skey && !QUERY_IS_ASC_QUERY(pQueryAttr))) {
1745
//
L
Liu Jicong 已提交
1746 1747 1748 1749
//     // load the data block and check data remaining in current data block
//     // TODO optimize performance
//     SArray *         pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//     SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1750
//
L
Liu Jicong 已提交
1751 1752 1753 1754
//     tw = *win;
//     int32_t startPos =
//         getNextQualifiedWindow(pQueryAttr, &tw, pBlockInfo, pColInfoData->pData, binarySearchForKey, -1);
//     assert(startPos >= 0);
1755
//
L
Liu Jicong 已提交
1756 1757
//     // set the abort info
//     pQueryAttr->pos = startPos;
1758
//
L
Liu Jicong 已提交
1759 1760 1761 1762
//     // reset the query start timestamp
//     pTableQueryInfo->win.skey = ((TSKEY *)pColInfoData->pData)[startPos];
//     pQueryAttr->window.skey = pTableQueryInfo->win.skey;
//     TSKEY key = pTableQueryInfo->win.skey;
1763
//
L
Liu Jicong 已提交
1764 1765
//     pWindowResInfo->prevSKey = tw.skey;
//     int32_t index = pRuntimeEnv->resultRowInfo.curIndex;
1766
//
L
Liu Jicong 已提交
1767 1768
//     int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
//     pRuntimeEnv->resultRowInfo.curIndex = index;  // restore the window index
1769
//
L
Liu Jicong 已提交
1770 1771 1772 1773
//     //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d,
//     lastKey:%" PRId64,
//            GET_TASKID(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes,
//            pQueryAttr->current->lastKey);
1774
//
L
Liu Jicong 已提交
1775 1776 1777 1778 1779
//     return key;
//   } else {  // do nothing
//     pQueryAttr->window.skey      = tw.skey;
//     pWindowResInfo->prevSKey = tw.skey;
//     pTableQueryInfo->lastKey = tw.skey;
1780
//
L
Liu Jicong 已提交
1781 1782
//     return tw.skey;
//   }
1783
//
L
Liu Jicong 已提交
1784 1785 1786 1787 1788 1789 1790 1791 1792 1793
//   return true;
// }

// static bool skipTimeInterval(STaskRuntimeEnv *pRuntimeEnv, TSKEY* start) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//     assert(*start <= pRuntimeEnv->current->lastKey);
//   } else {
//     assert(*start >= pRuntimeEnv->current->lastKey);
//   }
1794
//
L
Liu Jicong 已提交
1795 1796 1797 1798 1799
//   // if queried with value filter, do NOT forward query start position
//   if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL ||
//   pRuntimeEnv->pFillInfo != NULL) {
//     return true;
//   }
1800
//
L
Liu Jicong 已提交
1801 1802 1803 1804 1805 1806 1807
//   /*
//    * 1. for interval without interpolation query we forward pQueryAttr->interval.interval at a time for
//    *    pQueryAttr->limit.offset times. Since hole exists, pQueryAttr->interval.interval*pQueryAttr->limit.offset
//    value is
//    *    not valid. otherwise, we only forward pQueryAttr->limit.offset number of points
//    */
//   assert(pRuntimeEnv->resultRowInfo.prevSKey == TSKEY_INITIAL_VAL);
1808
//
L
Liu Jicong 已提交
1809 1810
//   STimeWindow w = TSWINDOW_INITIALIZER;
//   bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
1811
//
L
Liu Jicong 已提交
1812 1813
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
//   STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
1814
//
L
Liu Jicong 已提交
1815 1816 1817
//   SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
//   while (tsdbNextDataBlock(pRuntimeEnv->pTsdbReadHandle)) {
//     tsdbRetrieveDataBlockInfo(pRuntimeEnv->pTsdbReadHandle, &blockInfo);
1818
//
L
Liu Jicong 已提交
1819 1820 1821 1822 1823 1824 1825 1826 1827
//     if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//       if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
//         getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.skey, blockInfo.window.skey, pQueryAttr->window.ekey,
//         &w); pWindowResInfo->prevSKey = w.skey;
//       }
//     } else {
//       getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.ekey, pQueryAttr->window.ekey, blockInfo.window.ekey, &w);
//       pWindowResInfo->prevSKey = w.skey;
//     }
1828
//
L
Liu Jicong 已提交
1829 1830
//     // the first time window
//     STimeWindow win = getActiveTimeWindow(pWindowResInfo, pWindowResInfo->prevSKey, pQueryAttr);
1831
//
L
Liu Jicong 已提交
1832 1833
//     while (pQueryAttr->limit.offset > 0) {
//       STimeWindow tw = win;
1834
//
L
Liu Jicong 已提交
1835 1836 1837
//       if ((win.ekey <= blockInfo.window.ekey && ascQuery) || (win.ekey >= blockInfo.window.skey && !ascQuery)) {
//         pQueryAttr->limit.offset -= 1;
//         pWindowResInfo->prevSKey = win.skey;
1838
//
L
Liu Jicong 已提交
1839 1840 1841 1842 1843 1844
//         // current time window is aligned with blockInfo.window.ekey
//         // restart it from next data block by set prevSKey to be TSKEY_INITIAL_VAL;
//         if ((win.ekey == blockInfo.window.ekey && ascQuery) || (win.ekey == blockInfo.window.skey && !ascQuery)) {
//           pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL;
//         }
//       }
1845
//
L
Liu Jicong 已提交
1846 1847 1848 1849
//       if (pQueryAttr->limit.offset == 0) {
//         *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//         return true;
//       }
1850
//
L
Liu Jicong 已提交
1851 1852
//       // current window does not ended in current data block, try next data block
//       getNextTimeWindow(pQueryAttr, &tw);
1853
//
L
Liu Jicong 已提交
1854 1855 1856 1857 1858 1859 1860 1861 1862
//       /*
//        * If the next time window still starts from current data block,
//        * load the primary timestamp column first, and then find the start position for the next queried time window.
//        * Note that only the primary timestamp column is required.
//        * TODO: Optimize for this cases. All data blocks are not needed to be loaded, only if the first actually
//        required
//        * time window resides in current data block.
//        */
//       if ((tw.skey <= blockInfo.window.ekey && ascQuery) || (tw.ekey >= blockInfo.window.skey && !ascQuery)) {
1863
//
L
Liu Jicong 已提交
1864 1865
//         SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//         SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1866
//
L
Liu Jicong 已提交
1867 1868 1869
//         if ((win.ekey > blockInfo.window.ekey && ascQuery) || (win.ekey < blockInfo.window.skey && !ascQuery)) {
//           pQueryAttr->limit.offset -= 1;
//         }
1870
//
L
Liu Jicong 已提交
1871 1872 1873 1874 1875 1876 1877 1878
//         if (pQueryAttr->limit.offset == 0) {
//           *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//           return true;
//         } else {
//           tw = win;
//           int32_t startPos =
//               getNextQualifiedWindow(pQueryAttr, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1);
//           assert(startPos >= 0);
1879
//
L
Liu Jicong 已提交
1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890
//           // set the abort info
//           pQueryAttr->pos = startPos;
//           pTableQueryInfo->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
//           pWindowResInfo->prevSKey = tw.skey;
//           win = tw;
//         }
//       } else {
//         break;  // offset is not 0, and next time window begins or ends in the next block.
//       }
//     }
//   }
1891
//
L
Liu Jicong 已提交
1892 1893 1894 1895
//   // check for error
//   if (terrno != TSDB_CODE_SUCCESS) {
//     longjmp(pRuntimeEnv->env, terrno);
//   }
1896
//
L
Liu Jicong 已提交
1897 1898
//   return true;
// }
1899

1900
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) {
H
Haojun Liao 已提交
1901
  if (p->pDownstream == NULL) {
H
Haojun Liao 已提交
1902
    assert(p->numOfDownstream == 0);
1903 1904
  }

wafwerar's avatar
wafwerar 已提交
1905
  p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES);
1906 1907 1908 1909 1910 1911 1912
  if (p->pDownstream == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  memcpy(p->pDownstream, pDownstream, num * POINTER_BYTES);
  p->numOfDownstream = num;
  return TSDB_CODE_SUCCESS;
1913 1914
}

wmmhello's avatar
wmmhello 已提交
1915
static void doDestroyTableList(STableListInfo* pTableqinfoList);
1916

1917
static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQueryInfo* pTableQueryInfo, int32_t order) {
H
Haojun Liao 已提交
1918 1919
#if 0
    if (order == TSDB_ORDER_ASC) {
1920 1921
    assert(
        (pTableQueryInfo->win.skey <= pTableQueryInfo->win.ekey) &&
H
Haojun Liao 已提交
1922 1923
        (pTableQueryInfo->lastKey >= pTaskInfo->window.skey) &&
        (pTableQueryInfo->win.skey >= pTaskInfo->window.skey && pTableQueryInfo->win.ekey <= pTaskInfo->window.ekey));
1924 1925 1926
  } else {
    assert(
        (pTableQueryInfo->win.skey >= pTableQueryInfo->win.ekey) &&
H
Haojun Liao 已提交
1927 1928
        (pTableQueryInfo->lastKey <= pTaskInfo->window.skey) &&
        (pTableQueryInfo->win.skey <= pTaskInfo->window.skey && pTableQueryInfo->win.ekey >= pTaskInfo->window.ekey));
1929
  }
H
Haojun Liao 已提交
1930
#endif
1931 1932
}

1933 1934 1935 1936
typedef struct SFetchRspHandleWrapper {
  uint32_t exchangeId;
  int32_t  sourceIndex;
} SFetchRspHandleWrapper;
1937

1938
int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1939
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
1940 1941 1942 1943 1944 1945 1946

  SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
  if (pExchangeInfo == NULL) {
    qWarn("failed to acquire exchange operator, since it may have been released");
    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
1947
  int32_t          index = pWrapper->sourceIndex;
1948
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
1949

H
Haojun Liao 已提交
1950 1951
  if (code == TSDB_CODE_SUCCESS) {
    pSourceDataInfo->pRsp = pMsg->pData;
1952

H
Haojun Liao 已提交
1953 1954
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
    pRsp->numOfRows = htonl(pRsp->numOfRows);
dengyihao's avatar
dengyihao 已提交
1955
    pRsp->compLen = htonl(pRsp->compLen);
1956
    pRsp->numOfCols = htonl(pRsp->numOfCols);
dengyihao's avatar
dengyihao 已提交
1957
    pRsp->useconds = htobe64(pRsp->useconds);
1958

1959
    ASSERT(pRsp != NULL);
1960
    qDebug("%s fetch rsp received, index:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfRows);
H
Haojun Liao 已提交
1961 1962 1963
  } else {
    pSourceDataInfo->code = code;
  }
H
Haojun Liao 已提交
1964

H
Haojun Liao 已提交
1965
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
1966 1967 1968 1969 1970

  tsem_post(&pExchangeInfo->ready);
  taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);

  taosMemoryFree(pWrapper);
wmmhello's avatar
wmmhello 已提交
1971
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1972 1973 1974 1975
}

static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
  assert(pMsgBody != NULL);
wafwerar's avatar
wafwerar 已提交
1976 1977
  taosMemoryFreeClear(pMsgBody->msgInfo.pData);
  taosMemoryFreeClear(pMsgBody);
H
Haojun Liao 已提交
1978 1979
}

S
Shengliang Guan 已提交
1980
void qProcessFetchRsp(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
S
Shengliang Guan 已提交
1981 1982
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
  assert(pMsg->info.ahandle != NULL);
H
Haojun Liao 已提交
1983 1984 1985 1986

  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};

  if (pMsg->contLen > 0) {
wafwerar's avatar
wafwerar 已提交
1987
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
H
Haojun Liao 已提交
1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
  }

  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
  rpcFreeCont(pMsg->pCont);
  destroySendMsgInfo(pSendInfo);
1999 2000
}

L
Liu Jicong 已提交
2001
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
2002
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2003

wafwerar's avatar
wafwerar 已提交
2004
  SResFetchReq* pMsg = taosMemoryCalloc(1, sizeof(SResFetchReq));
2005 2006 2007 2008
  if (NULL == pMsg) {
    pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return pTaskInfo->code;
  }
2009

L
Liu Jicong 已提交
2010 2011
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex);
  SSourceDataInfo*       pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
2012

2013 2014
  ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY);

L
Liu Jicong 已提交
2015 2016
  qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", %d/%" PRIzu, GET_TASKID(pTaskInfo),
         pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, sourceIndex, totalSources);
2017 2018 2019 2020 2021 2022 2023

  pMsg->header.vgId = htonl(pSource->addr.nodeId);
  pMsg->sId = htobe64(pSource->schedId);
  pMsg->taskId = htobe64(pSource->taskId);
  pMsg->queryId = htobe64(pTaskInfo->id.queryId);

  // send the fetch remote task result reques
wafwerar's avatar
wafwerar 已提交
2024
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
2025
  if (NULL == pMsgSendInfo) {
wafwerar's avatar
wafwerar 已提交
2026
    taosMemoryFreeClear(pMsg);
2027 2028 2029
    qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
    pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return pTaskInfo->code;
H
Haojun Liao 已提交
2030 2031
  }

2032
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
X
Xiaoyu Wang 已提交
2033
  pWrapper->exchangeId = pExchangeInfo->self;
2034 2035 2036
  pWrapper->sourceIndex = sourceIndex;

  pMsgSendInfo->param = pWrapper;
2037 2038 2039 2040
  pMsgSendInfo->msgInfo.pData = pMsg;
  pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
  pMsgSendInfo->msgType = TDMT_VND_FETCH;
  pMsgSendInfo->fp = loadRemoteDataCallback;
2041

2042
  int64_t transporterId = 0;
L
Liu Jicong 已提交
2043
  int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
2044 2045 2046
  return TSDB_CODE_SUCCESS;
}

2047
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
2048 2049
                                     int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
                                     SArray* pColList) {
H
Haojun Liao 已提交
2050
  if (pColList == NULL) {  // data from other sources
2051
    blockDataCleanup(pRes);
2052
    //    blockDataEnsureCapacity(pRes, numOfRows);
2053
    blockCompressDecode(pRes, numOfOutput, numOfRows, pData);
H
Haojun Liao 已提交
2054
  } else {  // extract data according to pColList
2055
    ASSERT(numOfOutput == taosArrayGetSize(pColList));
2056 2057 2058 2059 2060
    char* pStart = pData;

    int32_t numOfCols = htonl(*(int32_t*)pStart);
    pStart += sizeof(int32_t);

2061
    // todo refactor:extract method
2062
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
dengyihao's avatar
dengyihao 已提交
2063
    for (int32_t i = 0; i < numOfCols; ++i) {
2064 2065 2066 2067 2068 2069 2070
      SSysTableSchema* p = (SSysTableSchema*)pStart;

      p->colId = htons(p->colId);
      p->bytes = htonl(p->bytes);
      pStart += sizeof(SSysTableSchema);
    }

2071
    SSDataBlock* pBlock = createDataBlock();
dengyihao's avatar
dengyihao 已提交
2072
    for (int32_t i = 0; i < numOfCols; ++i) {
2073 2074
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
      blockDataAppendColInfo(pBlock, &idata);
2075 2076
    }

2077
    blockCompressDecode(pBlock, numOfCols, numOfRows, pStart);
2078
    blockDataEnsureCapacity(pRes, numOfRows);
2079

H
Haojun Liao 已提交
2080
    // data from mnode
2081
    pRes->info.rows = numOfRows;
2082 2083
    relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
    blockDataDestroy(pBlock);
2084
  }
2085

2086 2087
  // todo move this to time window aggregator, since the primary timestamp may not be known by exchange operator.
  blockDataUpdateTsWindow(pRes, 0);
2088

2089
  int64_t el = taosGetTimestampUs() - startTs;
2090

H
Haojun Liao 已提交
2091 2092
  pLoadInfo->totalRows += numOfRows;
  pLoadInfo->totalSize += compLen;
2093

H
Haojun Liao 已提交
2094 2095 2096
  if (total != NULL) {
    *total += numOfRows;
  }
2097

H
Haojun Liao 已提交
2098
  pLoadInfo->totalElapsed += el;
2099 2100
  return TSDB_CODE_SUCCESS;
}
2101

L
Liu Jicong 已提交
2102 2103
static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
2104
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
H
Haojun Liao 已提交
2105

2106
  int64_t              el = taosGetTimestampUs() - startTs;
H
Haojun Liao 已提交
2107
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2108

H
Haojun Liao 已提交
2109
  pLoadInfo->totalElapsed += el;
H
Haojun Liao 已提交
2110

2111
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
L
Liu Jicong 已提交
2112 2113 2114
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
         pLoadInfo->totalElapsed / 1000.0);
2115 2116 2117 2118 2119

  doSetOperatorCompleted(pOperator);
  return NULL;
}

L
Liu Jicong 已提交
2120 2121
static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
                                                   SExecTaskInfo* pTaskInfo) {
2122 2123 2124 2125 2126 2127 2128 2129
  int32_t code = 0;
  int64_t startTs = taosGetTimestampUs();
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);

  while (1) {
    int32_t completed = 0;
    for (int32_t i = 0; i < totalSources; ++i) {
      SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
2130
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
2131
        completed += 1;
H
Haojun Liao 已提交
2132 2133
        continue;
      }
2134

2135
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
2136 2137 2138
        continue;
      }

2139 2140 2141 2142 2143
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
        code = pDataInfo->code;
        goto _error;
      }

L
Liu Jicong 已提交
2144
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
X
Xiaoyu Wang 已提交
2145
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, i);
2146

L
Liu Jicong 已提交
2147
      SSDataBlock*         pRes = pExchangeInfo->pResult;
H
Haojun Liao 已提交
2148
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2149
      if (pRsp->numOfRows == 0) {
2150
        qDebug("%s vgId:%d, taskId:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
X
Xiaoyu Wang 已提交
2151
               ", completed:%d try next %d/%" PRIzu,
2152 2153
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i, pDataInfo->totalRows,
               pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources);
2154
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2155
        completed += 1;
D
dapan1121 已提交
2156
        taosMemoryFreeClear(pDataInfo->pRsp);
2157 2158
        continue;
      }
H
Haojun Liao 已提交
2159

H
Haojun Liao 已提交
2160
      SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
2161 2162 2163
      code =
          extractDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
                                       pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
2164
      if (code != 0) {
2165
        taosMemoryFreeClear(pDataInfo->pRsp);
2166 2167 2168
        goto _error;
      }

2169
      if (pRsp->completed == 1) {
X
Xiaoyu Wang 已提交
2170 2171 2172
        qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
               " index:%d completed, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64
               ", completed:%d try next %d/%" PRIzu,
2173 2174 2175
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i, pRes->info.rows, pDataInfo->totalRows,
               pLoadInfo->totalRows, pLoadInfo->totalSize, completed + 1, i + 1, totalSources);
        completed += 1;
2176
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2177
      } else {
dengyihao's avatar
dengyihao 已提交
2178 2179
        qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64
               ", totalBytes:%" PRIu64,
H
Haojun Liao 已提交
2180 2181
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pLoadInfo->totalRows,
               pLoadInfo->totalSize);
2182 2183
      }

2184 2185
      taosMemoryFreeClear(pDataInfo->pRsp);

2186 2187
      if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED) {
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
2188 2189
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
        if (code != TSDB_CODE_SUCCESS) {
2190
          taosMemoryFreeClear(pDataInfo->pRsp);
2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207
          goto _error;
        }
      }

      return pExchangeInfo->pResult;
    }

    if (completed == totalSources) {
      return setAllSourcesCompleted(pOperator, startTs);
    }
  }

_error:
  pTaskInfo->code = code;
  return NULL;
}

L
Liu Jicong 已提交
2208 2209 2210
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2211

L
Liu Jicong 已提交
2212
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2213 2214 2215
  int64_t startTs = taosGetTimestampUs();

  // Asynchronously send all fetch requests to all sources.
L
Liu Jicong 已提交
2216
  for (int32_t i = 0; i < totalSources; ++i) {
2217 2218
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2219 2220
      pTaskInfo->code = code;
      return code;
2221 2222 2223 2224
    }
  }

  int64_t endTs = taosGetTimestampUs();
2225
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
X
Xiaoyu Wang 已提交
2226
         totalSources, (endTs - startTs) / 1000.0);
2227

2228
  pOperator->status = OP_RES_TO_RETURN;
H
Haojun Liao 已提交
2229
  pOperator->cost.openCost = taosGetTimestampUs() - startTs;
2230

2231
  tsem_wait(&pExchangeInfo->ready);
H
Haojun Liao 已提交
2232
  return TSDB_CODE_SUCCESS;
2233 2234
}

L
Liu Jicong 已提交
2235 2236 2237
static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2238

L
Liu Jicong 已提交
2239
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2240
  int64_t startTs = taosGetTimestampUs();
2241

L
Liu Jicong 已提交
2242
  while (1) {
2243 2244
    if (pExchangeInfo->current >= totalSources) {
      return setAllSourcesCompleted(pOperator, startTs);
2245
    }
2246

2247 2248 2249
    doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
    tsem_wait(&pExchangeInfo->ready);

dengyihao's avatar
dengyihao 已提交
2250
    SSourceDataInfo*       pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
X
Xiaoyu Wang 已提交
2251
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
2252

H
Haojun Liao 已提交
2253
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
2254 2255
      qError("%s vgId:%d, taskID:0x%" PRIx64 " error happens, code:%s", GET_TASKID(pTaskInfo), pSource->addr.nodeId,
             pSource->taskId, tstrerror(pDataInfo->code));
H
Haojun Liao 已提交
2256 2257 2258 2259
      pOperator->pTaskInfo->code = pDataInfo->code;
      return NULL;
    }

L
Liu Jicong 已提交
2260
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
H
Haojun Liao 已提交
2261
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2262
    if (pRsp->numOfRows == 0) {
dengyihao's avatar
dengyihao 已提交
2263 2264
      qDebug("%s vgId:%d, taskID:0x%" PRIx64 " %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
             " try next",
2265
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1,
H
Haojun Liao 已提交
2266
             pDataInfo->totalRows, pLoadInfo->totalRows);
H
Haojun Liao 已提交
2267

2268
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2269
      pExchangeInfo->current += 1;
D
dapan1121 已提交
2270
      taosMemoryFreeClear(pDataInfo->pRsp);
2271 2272
      continue;
    }
H
Haojun Liao 已提交
2273

L
Liu Jicong 已提交
2274
    SSDataBlock*       pRes = pExchangeInfo->pResult;
H
Haojun Liao 已提交
2275
    SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
L
Liu Jicong 已提交
2276
    int32_t            code =
2277
        extractDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
2278
                                     pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
2279 2280

    if (pRsp->completed == 1) {
H
Haojun Liao 已提交
2281
      qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64
L
Liu Jicong 已提交
2282 2283 2284
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pDataInfo->totalRows,
             pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1, totalSources);
2285

2286
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2287 2288
      pExchangeInfo->current += 1;
    } else {
L
Liu Jicong 已提交
2289 2290 2291 2292
      qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64
             ", totalBytes:%" PRIu64,
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pLoadInfo->totalRows,
             pLoadInfo->totalSize);
2293 2294
    }

2295
    pOperator->resultInfo.totalRows += pRes->info.rows;
2296
    taosMemoryFreeClear(pDataInfo->pRsp);
2297 2298
    return pExchangeInfo->pResult;
  }
2299 2300
}

L
Liu Jicong 已提交
2301
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
2302
  if (OPTR_IS_OPENED(pOperator)) {
H
Haojun Liao 已提交
2303 2304 2305
    return TSDB_CODE_SUCCESS;
  }

2306 2307
  int64_t st = taosGetTimestampUs();

L
Liu Jicong 已提交
2308
  SExchangeInfo* pExchangeInfo = pOperator->info;
2309
  if (!pExchangeInfo->seqLoadData) {
H
Haojun Liao 已提交
2310 2311 2312 2313 2314 2315
    int32_t code = prepareConcurrentlyLoad(pOperator);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
  }

2316
  OPTR_SET_OPENED(pOperator);
2317
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
2318 2319 2320
  return TSDB_CODE_SUCCESS;
}

2321
static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2322 2323
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2324

2325
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
2326
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2327 2328
    return NULL;
  }
2329

L
Liu Jicong 已提交
2330
  size_t               totalSources = taosArrayGetSize(pExchangeInfo->pSources);
H
Haojun Liao 已提交
2331
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
H
Haojun Liao 已提交
2332

2333
  if (pOperator->status == OP_EXEC_DONE) {
L
Liu Jicong 已提交
2334 2335 2336
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
           pLoadInfo->totalElapsed / 1000.0);
2337 2338 2339 2340 2341 2342
    return NULL;
  }

  if (pExchangeInfo->seqLoadData) {
    return seqLoadRemoteData(pOperator);
  } else {
2343
    return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
2344
  }
H
Haojun Liao 已提交
2345
}
2346

2347
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
2348
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
H
Haojun Liao 已提交
2349 2350
  if (pInfo->pSourceDataInfo == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
2351 2352
  }

L
Liu Jicong 已提交
2353
  for (int32_t i = 0; i < numOfSources; ++i) {
2354
    SSourceDataInfo dataInfo = {0};
H
Haojun Liao 已提交
2355
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
2356
    dataInfo.taskId = id;
L
Liu Jicong 已提交
2357
    dataInfo.index = i;
X
Xiaoyu Wang 已提交
2358
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
2359
    if (pDs == NULL) {
H
Haojun Liao 已提交
2360 2361 2362 2363 2364 2365 2366 2367
      taosArrayDestroy(pInfo->pSourceDataInfo);
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
}

2368
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
2369
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
H
Haojun Liao 已提交
2370

2371
  if (numOfSources == 0) {
X
Xiaoyu Wang 已提交
2372
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
2373 2374 2375
    return TSDB_CODE_INVALID_PARA;
  }

H
Haojun Liao 已提交
2376
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
H
Haojun Liao 已提交
2377 2378
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
  if (pInfo->pSourceDataInfo == NULL || pInfo->pSources == NULL) {
2379
    return TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
2380 2381
  }

L
Liu Jicong 已提交
2382
  for (int32_t i = 0; i < numOfSources; ++i) {
2383
    SNodeListNode* pNode = (SNodeListNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
H
Haojun Liao 已提交
2384 2385
    taosArrayPush(pInfo->pSources, pNode);
  }
2386

2387 2388
  pInfo->self = taosAddRef(exchangeObjRefPool, pInfo);

2389
  return initDataSource(numOfSources, pInfo, id);
2390 2391 2392 2393 2394 2395
}

SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo) {
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
2396
    goto _error;
2397
  }
H
Haojun Liao 已提交
2398

2399
  int32_t code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
2400 2401 2402
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2403 2404

  tsem_init(&pInfo->ready, 0, 0);
2405

X
Xiaoyu Wang 已提交
2406
  pInfo->seqLoadData = false;
2407
  pInfo->pTransporter = pTransporter;
X
Xiaoyu Wang 已提交
2408 2409
  pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc);
  pOperator->name = "ExchangeOperator";
X
Xiaoyu Wang 已提交
2410
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
X
Xiaoyu Wang 已提交
2411 2412 2413
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
2414
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pResult->pDataBlock);
X
Xiaoyu Wang 已提交
2415
  pOperator->pTaskInfo = pTaskInfo;
2416

L
Liu Jicong 已提交
2417 2418
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
                                         destroyExchangeOperatorInfo, NULL, NULL, NULL);
2419
  return pOperator;
H
Haojun Liao 已提交
2420

L
Liu Jicong 已提交
2421
_error:
H
Haojun Liao 已提交
2422
  if (pInfo != NULL) {
2423
    doDestroyExchangeOperatorInfo(pInfo);
H
Haojun Liao 已提交
2424 2425
  }

wafwerar's avatar
wafwerar 已提交
2426 2427
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
2428
  pTaskInfo->code = code;
H
Haojun Liao 已提交
2429
  return NULL;
2430 2431
}

dengyihao's avatar
dengyihao 已提交
2432 2433
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                                const char* pKey);
2434

2435
static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
2436
  SSortedMergeOperatorInfo* pInfo = (SSortedMergeOperatorInfo*)param;
H
Haojun Liao 已提交
2437
  taosArrayDestroy(pInfo->pSortInfo);
2438 2439 2440
  taosArrayDestroy(pInfo->groupInfo);

  if (pInfo->pSortHandle != NULL) {
H
Haojun Liao 已提交
2441
    tsortDestroySortHandle(pInfo->pSortHandle);
2442 2443
  }

H
Haojun Liao 已提交
2444
  blockDataDestroy(pInfo->binfo.pRes);
H
Haojun Liao 已提交
2445
  cleanupAggSup(&pInfo->aggSup);
2446
}
H
Haojun Liao 已提交
2447

L
Liu Jicong 已提交
2448
static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) {
2449 2450 2451 2452
  size_t size = taosArrayGetSize(groupInfo);
  if (size == 0) {
    return true;
  }
2453

2454 2455
  for (int32_t i = 0; i < size; ++i) {
    int32_t* index = taosArrayGet(groupInfo, i);
2456

2457
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index);
L
Liu Jicong 已提交
2458
    bool             isNull = colDataIsNull(pColInfo, rowIndex, pBlock->info.rows, NULL);
2459

2460 2461 2462
    if ((isNull && buf[i] != NULL) || (!isNull && buf[i] == NULL)) {
      return false;
    }
2463

2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476
    char* pCell = colDataGetData(pColInfo, rowIndex);
    if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
      if (varDataLen(pCell) != varDataLen(buf[i])) {
        return false;
      } else {
        if (memcmp(varDataVal(pCell), varDataVal(buf[i]), varDataLen(pCell)) != 0) {
          return false;
        }
      }
    } else {
      if (memcmp(pCell, buf[i], pColInfo->info.bytes) != 0) {
        return false;
      }
2477 2478 2479
    }
  }

2480
  return 0;
2481 2482
}

L
Liu Jicong 已提交
2483 2484 2485
static void doMergeResultImpl(SSortedMergeOperatorInfo* pInfo, SqlFunctionCtx* pCtx, int32_t numOfExpr,
                              int32_t rowIndex) {
  for (int32_t j = 0; j < numOfExpr; ++j) {  // TODO set row index
X
Xiaoyu Wang 已提交
2486
                                             //    pCtx[j].startRow = rowIndex;
2487 2488
  }

2489 2490
  for (int32_t j = 0; j < numOfExpr; ++j) {
    int32_t functionId = pCtx[j].functionId;
L
Liu Jicong 已提交
2491 2492 2493 2494 2495 2496 2497 2498 2499
    //    pCtx[j].fpSet->addInput(&pCtx[j]);

    //    if (functionId < 0) {
    //      SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
    //      doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
    //    } else {
    //      assert(!TSDB_FUNC_IS_SCALAR(functionId));
    //      aAggs[functionId].mergeFunc(&pCtx[j]);
    //    }
2500
  }
2501
}
2502

L
Liu Jicong 已提交
2503 2504
static void doFinalizeResultImpl(SqlFunctionCtx* pCtx, int32_t numOfExpr) {
  for (int32_t j = 0; j < numOfExpr; ++j) {
2505 2506 2507 2508
    int32_t functionId = pCtx[j].functionId;
    //    if (functionId == FUNC_TAG_DUMMY || functionId == FUNC_TS_DUMMY) {
    //      continue;
    //    }
2509

2510 2511 2512 2513
    //    if (functionId < 0) {
    //      SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
    //      doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
    //    } else {
dengyihao's avatar
dengyihao 已提交
2514
    //    pCtx[j].fpSet.finalize(&pCtx[j]);
2515 2516
  }
}
2517

2518
static bool saveCurrentTuple(char** rowColData, SArray* pColumnList, SSDataBlock* pBlock, int32_t rowIndex) {
L
Liu Jicong 已提交
2519
  int32_t size = (int32_t)taosArrayGetSize(pColumnList);
2520

L
Liu Jicong 已提交
2521 2522
  for (int32_t i = 0; i < size; ++i) {
    int32_t*         index = taosArrayGet(pColumnList, i);
2523
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index);
H
Haojun Liao 已提交
2524

2525 2526 2527
    char* data = colDataGetData(pColInfo, rowIndex);
    memcpy(rowColData[i], data, colDataGetLength(pColInfo, rowIndex));
  }
2528

2529 2530
  return true;
}
2531

2532 2533
static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock* pBlock) {
  SSortedMergeOperatorInfo* pInfo = pOperator->info;
2534

2535
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
2536

L
Liu Jicong 已提交
2537
  for (int32_t i = 0; i < pBlock->info.rows; ++i) {
2538 2539 2540 2541 2542 2543 2544 2545 2546
    if (!pInfo->hasGroupVal) {
      ASSERT(i == 0);
      doMergeResultImpl(pInfo, pCtx, numOfExpr, i);
      pInfo->hasGroupVal = saveCurrentTuple(pInfo->groupVal, pInfo->groupInfo, pBlock, i);
    } else {
      if (needToMerge(pBlock, pInfo->groupInfo, pInfo->groupVal, i)) {
        doMergeResultImpl(pInfo, pCtx, numOfExpr, i);
      } else {
        doFinalizeResultImpl(pCtx, numOfExpr);
2547 2548
        int32_t numOfRows = getNumOfResult(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL);
        //        setTagValueForMultipleRows(pCtx, pOperator->exprSupp.numOfExprs, numOfRows);
2549

2550
        // TODO check for available buffer;
H
Haojun Liao 已提交
2551

2552 2553 2554 2555 2556
        // next group info data
        pInfo->binfo.pRes->info.rows += numOfRows;
        for (int32_t j = 0; j < numOfExpr; ++j) {
          if (pCtx[j].functionId < 0) {
            continue;
2557
          }
2558

H
Haojun Liao 已提交
2559
          pCtx[j].fpSet.process(&pCtx[j]);
2560
        }
2561 2562 2563

        doMergeResultImpl(pInfo, pCtx, numOfExpr, i);
        pInfo->hasGroupVal = saveCurrentTuple(pInfo->groupVal, pInfo->groupInfo, pBlock, i);
H
Haojun Liao 已提交
2564
      }
2565 2566 2567 2568
    }
  }
}

2569 2570
static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
  SSortedMergeOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
2571
  SSortHandle*              pHandle = pInfo->pSortHandle;
2572

2573
  SSDataBlock* pDataBlock = createOneDataBlock(pInfo->binfo.pRes, false);
2574
  blockDataEnsureCapacity(pDataBlock, pOperator->resultInfo.capacity);
2575

L
Liu Jicong 已提交
2576
  while (1) {
2577
    blockDataCleanup(pDataBlock);
2578
    while (1) {
H
Haojun Liao 已提交
2579
      STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2580 2581
      if (pTupleHandle == NULL) {
        break;
2582
      }
2583

2584 2585
      // build datablock for merge for one group
      appendOneRowToDataBlock(pDataBlock, pTupleHandle);
2586
      if (pDataBlock->info.rows >= pOperator->resultInfo.capacity) {
2587 2588
        break;
      }
2589
    }
2590

2591 2592 2593
    if (pDataBlock->info.rows == 0) {
      break;
    }
2594

2595
    setInputDataBlock(pOperator, pOperator->exprSupp.pCtx, pDataBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
L
Liu Jicong 已提交
2596 2597
    //  updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor,
    //  pOperator->pRuntimeEnv, true);
2598
    doMergeImpl(pOperator, pOperator->exprSupp.numOfExprs, pDataBlock);
2599 2600
    // flush to tuple store, and after all data have been handled, return to upstream node or sink node
  }
2601

2602 2603 2604
  doFinalizeResultImpl(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs);
  int32_t numOfRows = getNumOfResult(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL);
  //        setTagValueForMultipleRows(pCtx, pOperator->exprSupp.numOfExprs, numOfRows);
2605

2606
  // TODO check for available buffer;
2607

2608 2609
  // next group info data
  pInfo->binfo.pRes->info.rows += numOfRows;
L
Liu Jicong 已提交
2610
  return (pInfo->binfo.pRes->info.rows > 0) ? pInfo->binfo.pRes : NULL;
2611
}
2612

L
Liu Jicong 已提交
2613 2614
SSDataBlock* getSortedMergeBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
                                     SArray* pColMatchInfo, SSortedMergeOperatorInfo* pInfo) {
2615 2616 2617 2618 2619 2620 2621 2622 2623 2624
  blockDataCleanup(pDataBlock);

  SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
  if (p == NULL) {
    return NULL;
  }

  blockDataEnsureCapacity(p, capacity);

  while (1) {
2625
    STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2626 2627 2628 2629
    if (pTupleHandle == NULL) {
      break;
    }

2630
    appendOneRowToDataBlock(p, pTupleHandle);
2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643
    if (p->info.rows >= capacity) {
      break;
    }
  }

  if (p->info.rows > 0) {
    int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
    for (int32_t i = 0; i < numOfCols; ++i) {
      SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i);
      ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID);

      SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
      SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId);
2644
      colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
2645 2646 2647 2648 2649 2650 2651 2652 2653 2654
    }

    pDataBlock->info.rows = p->info.rows;
    pDataBlock->info.capacity = p->info.rows;
  }

  blockDataDestroy(p);
  return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
}

2655
static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) {
2656 2657
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
2658 2659
  }

L
Liu Jicong 已提交
2660
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
2661
  SSortedMergeOperatorInfo* pInfo = pOperator->info;
H
Haojun Liao 已提交
2662
  if (pOperator->status == OP_RES_TO_RETURN) {
2663
    return getSortedMergeBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, NULL, pInfo);
2664 2665
  }

2666
  int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
2667 2668
  pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
                                             pInfo->binfo.pRes, "GET_TASKID(pTaskInfo)");
H
Haojun Liao 已提交
2669

2670
  tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
2671

L
Liu Jicong 已提交
2672
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
wmmhello's avatar
wmmhello 已提交
2673
    SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
H
Haojun Liao 已提交
2674
    ps->param = pOperator->pDownstream[i];
H
Haojun Liao 已提交
2675
    tsortAddSource(pInfo->pSortHandle, ps);
2676 2677
  }

H
Haojun Liao 已提交
2678
  int32_t code = tsortOpen(pInfo->pSortHandle);
2679
  if (code != TSDB_CODE_SUCCESS) {
2680
    longjmp(pTaskInfo->env, terrno);
2681 2682
  }

H
Haojun Liao 已提交
2683
  pOperator->status = OP_RES_TO_RETURN;
2684
  return doMerge(pOperator);
2685
}
2686

L
Liu Jicong 已提交
2687 2688
static int32_t initGroupCol(SExprInfo* pExprInfo, int32_t numOfCols, SArray* pGroupInfo,
                            SSortedMergeOperatorInfo* pInfo) {
2689 2690
  if (pGroupInfo == NULL || taosArrayGetSize(pGroupInfo) == 0) {
    return 0;
H
Haojun Liao 已提交
2691 2692
  }

2693 2694 2695 2696 2697 2698 2699 2700
  int32_t len = 0;
  SArray* plist = taosArrayInit(3, sizeof(SColumn));
  pInfo->groupInfo = taosArrayInit(3, sizeof(int32_t));

  if (plist == NULL || pInfo->groupInfo == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

L
Liu Jicong 已提交
2701 2702
  size_t numOfGroupCol = taosArrayGetSize(pInfo->groupInfo);
  for (int32_t i = 0; i < numOfGroupCol; ++i) {
2703
    SColumn* pCol = taosArrayGet(pGroupInfo, i);
L
Liu Jicong 已提交
2704
    for (int32_t j = 0; j < numOfCols; ++j) {
H
Haojun Liao 已提交
2705
      SExprInfo* pe = &pExprInfo[j];
2706
      if (pe->base.resSchema.slotId == pCol->colId) {
2707 2708
        taosArrayPush(plist, pCol);
        taosArrayPush(pInfo->groupInfo, &j);
H
Haojun Liao 已提交
2709
        len += pCol->bytes;
2710 2711
        break;
      }
H
Haojun Liao 已提交
2712 2713 2714
    }
  }

2715
  ASSERT(taosArrayGetSize(pGroupInfo) == taosArrayGetSize(plist));
H
Haojun Liao 已提交
2716

wafwerar's avatar
wafwerar 已提交
2717
  pInfo->groupVal = taosMemoryCalloc(1, (POINTER_BYTES * numOfGroupCol + len));
2718 2719 2720 2721
  if (pInfo->groupVal == NULL) {
    taosArrayDestroy(plist);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Haojun Liao 已提交
2722

2723
  int32_t offset = 0;
L
Liu Jicong 已提交
2724 2725
  char*   start = (char*)(pInfo->groupVal + (POINTER_BYTES * numOfGroupCol));
  for (int32_t i = 0; i < numOfGroupCol; ++i) {
2726 2727
    pInfo->groupVal[i] = start + offset;
    SColumn* pCol = taosArrayGet(plist, i);
H
Haojun Liao 已提交
2728
    offset += pCol->bytes;
2729
  }
H
Haojun Liao 已提交
2730

2731
  taosArrayDestroy(plist);
H
Haojun Liao 已提交
2732

2733 2734
  return TSDB_CODE_SUCCESS;
}
H
Haojun Liao 已提交
2735

L
Liu Jicong 已提交
2736 2737 2738
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo,
                                             int32_t num, SArray* pSortInfo, SArray* pGroupInfo,
                                             SExecTaskInfo* pTaskInfo) {
wafwerar's avatar
wafwerar 已提交
2739
  SSortedMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortedMergeOperatorInfo));
L
Liu Jicong 已提交
2740
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2741
  if (pInfo == NULL || pOperator == NULL) {
2742
    goto _error;
2743
  }
H
Haojun Liao 已提交
2744

2745 2746 2747 2748 2749
  int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, num);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

2750
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
H
Haojun Liao 已提交
2751

2752
  if (pOperator->exprSupp.pCtx == NULL || pInfo->binfo.pRes == NULL) {
2753 2754
    goto _error;
  }
H
Haojun Liao 已提交
2755

2756
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
2757
  code = doInitAggInfoSup(&pInfo->aggSup, pOperator->exprSupp.pCtx, num, keyBufSize, pTaskInfo->id.str);
2758 2759 2760
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
2761

2762
  setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, num);
H
Haojun Liao 已提交
2763
  code = initGroupCol(pExprInfo, num, pGroupInfo, pInfo);
2764 2765 2766
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
2767

L
Liu Jicong 已提交
2768 2769 2770 2771 2772
  //  pInfo->resultRowFactor = (int32_t)(getRowNumForMultioutput(pRuntimeEnv->pQueryAttr,
  //      pRuntimeEnv->pQueryAttr->topBotQuery, false));
  pInfo->sortBufSize = 1024 * 16;  // 1MB
  pInfo->bufPageSize = 1024;
  pInfo->pSortInfo = pSortInfo;
H
Haojun Liao 已提交
2773

2774
  pOperator->resultInfo.capacity = blockDataGetCapacityInRow(pInfo->binfo.pRes, pInfo->bufPageSize);
H
Haojun Liao 已提交
2775

L
Liu Jicong 已提交
2776
  pOperator->name = "SortedMerge";
X
Xiaoyu Wang 已提交
2777
  // pOperator->operatorType = OP_SortedMerge;
2778
  pOperator->blocking = true;
L
Liu Jicong 已提交
2779 2780 2781
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
2782

2783 2784
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSortedMerge, NULL, NULL, destroySortedMergeOperatorInfo,
                                         NULL, NULL, NULL);
2785 2786 2787
  code = appendDownstream(pOperator, downstream, numOfDownstream);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
2788
  }
H
Haojun Liao 已提交
2789

2790
  return pOperator;
H
Haojun Liao 已提交
2791

L
Liu Jicong 已提交
2792
_error:
2793
  if (pInfo != NULL) {
H
Haojun Liao 已提交
2794
    destroySortedMergeOperatorInfo(pInfo, num);
H
Haojun Liao 已提交
2795 2796
  }

wafwerar's avatar
wafwerar 已提交
2797 2798
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
2799 2800
  terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
  return NULL;
H
Haojun Liao 已提交
2801 2802
}

X
Xiaoyu Wang 已提交
2803
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) {
2804
  // todo add more information about exchange operation
2805
  int32_t type = pOperator->operatorType;
X
Xiaoyu Wang 已提交
2806
  if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN ||
2807
      type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN ||
2808
      type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN) {
2809 2810 2811
    *order = TSDB_ORDER_ASC;
    *scanFlag = MAIN_SCAN;
    return TSDB_CODE_SUCCESS;
2812
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
2813 2814 2815 2816 2817
    STableScanInfo* pTableScanInfo = pOperator->info;
    *order = pTableScanInfo->cond.order;
    *scanFlag = pTableScanInfo->scanFlag;
    return TSDB_CODE_SUCCESS;
  } else {
H
Haojun Liao 已提交
2818
    if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
2819
      return TSDB_CODE_INVALID_PARA;
H
Haojun Liao 已提交
2820
    } else {
2821
      return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag);
2822 2823 2824
    }
  }
}
2825 2826

// this is a blocking operator
L
Liu Jicong 已提交
2827
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
2828 2829
  if (OPTR_IS_OPENED(pOperator)) {
    return TSDB_CODE_SUCCESS;
2830 2831
  }

H
Haojun Liao 已提交
2832
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
2833
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
2834

2835 2836
  SExprSupp*     pSup = &pOperator->exprSupp;
  SOperatorInfo* downstream = pOperator->pDownstream[0];
2837

2838 2839
  int64_t st = taosGetTimestampUs();

2840 2841 2842
  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;

H
Haojun Liao 已提交
2843
  while (1) {
2844
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
2845 2846 2847 2848
    if (pBlock == NULL) {
      break;
    }

2849 2850 2851 2852
    int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, code);
    }
2853

2854
    // there is an scalar expression that needs to be calculated before apply the group aggregation.
2855 2856 2857
    if (pAggInfo->scalarExprSup.pExprInfo != NULL) {
      SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
      code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
2858
      if (code != TSDB_CODE_SUCCESS) {
2859
        longjmp(pTaskInfo->env, code);
2860
      }
2861 2862
    }

2863
    // the pDataBlock are always the same one, no need to call this again
2864 2865 2866
    setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId, pAggInfo);
    setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, true);
    code = doAggregateImpl(pOperator, 0, pSup->pCtx);
2867 2868 2869
    if (code != 0) {
      longjmp(pTaskInfo->env, code);
    }
2870

dengyihao's avatar
dengyihao 已提交
2871
#if 0  // test for encode/decode result info
2872
    if(pOperator->fpSet.encodeResultRow){
2873 2874
      char *result = NULL;
      int32_t length = 0;
2875 2876
      pOperator->fpSet.encodeResultRow(pOperator, &result, &length);
      SAggSupporter* pSup = &pAggInfo->aggSup;
2877 2878
      taosHashClear(pSup->pResultRowHashTable);
      pInfo->resultRowInfo.size = 0;
2879
      pOperator->fpSet.decodeResultRow(pOperator, result);
2880 2881 2882
      if(result){
        taosMemoryFree(result);
      }
2883
    }
2884
#endif
2885 2886
  }

H
Haojun Liao 已提交
2887
  closeAllResultRows(&pAggInfo->binfo.resultRowInfo);
2888
  initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
H
Haojun Liao 已提交
2889
  OPTR_SET_OPENED(pOperator);
2890

2891
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
2892 2893 2894
  return TSDB_CODE_SUCCESS;
}

2895
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2896
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
2897 2898 2899 2900 2901 2902
  SOptrBasicInfo*   pInfo = &pAggInfo->binfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

L
Liu Jicong 已提交
2903
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2904
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
H
Haojun Liao 已提交
2905
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
2906
    doSetOperatorCompleted(pOperator);
H
Haojun Liao 已提交
2907 2908 2909
    return NULL;
  }

H
Haojun Liao 已提交
2910
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
2911
  doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
2912
  if (pInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pAggInfo->groupResInfo)) {
H
Haojun Liao 已提交
2913 2914
    doSetOperatorCompleted(pOperator);
  }
2915

2916
  size_t rows = blockDataGetNumOfRows(pInfo->pRes);
2917 2918
  pOperator->resultInfo.totalRows += rows;

2919
  return (rows == 0) ? NULL : pInfo->pRes;
2920 2921
}

wmmhello's avatar
wmmhello 已提交
2922
int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length) {
2923
  if (result == NULL || length == NULL) {
wmmhello's avatar
wmmhello 已提交
2924 2925 2926
    return TSDB_CODE_TSC_INVALID_INPUT;
  }
  SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
2927 2928 2929 2930 2931
  SAggSupporter*  pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
  int32_t         size = taosHashGetSize(pSup->pResultRowHashTable);
  size_t          keyLen = sizeof(uint64_t) * 2;  // estimate the key length
  int32_t         totalSize =
      sizeof(int32_t) + sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize);
wmmhello's avatar
wmmhello 已提交
2932

C
Cary Xu 已提交
2933 2934 2935 2936 2937 2938
  // no result
  if (getTotalBufSize(pSup->pResultBuf) == 0) {
    *result = NULL;
    *length = 0;
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
2939

wmmhello's avatar
wmmhello 已提交
2940
  *result = (char*)taosMemoryCalloc(1, totalSize);
L
Liu Jicong 已提交
2941
  if (*result == NULL) {
wmmhello's avatar
wmmhello 已提交
2942
    return TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
2943
  }
wmmhello's avatar
wmmhello 已提交
2944

wmmhello's avatar
wmmhello 已提交
2945
  int32_t offset = sizeof(int32_t);
wmmhello's avatar
wmmhello 已提交
2946 2947
  *(int32_t*)(*result + offset) = size;
  offset += sizeof(int32_t);
2948 2949

  // prepare memory
2950
  SResultRowPosition* pos = &pInfo->resultRowInfo.cur;
dengyihao's avatar
dengyihao 已提交
2951 2952
  void*               pPage = getBufPage(pSup->pResultBuf, pos->pageId);
  SResultRow*         pRow = (SResultRow*)((char*)pPage + pos->offset);
2953 2954 2955
  setBufPageDirty(pPage, true);
  releaseBufPage(pSup->pResultBuf, pPage);

dengyihao's avatar
dengyihao 已提交
2956
  void* pIter = taosHashIterate(pSup->pResultRowHashTable, NULL);
wmmhello's avatar
wmmhello 已提交
2957
  while (pIter) {
dengyihao's avatar
dengyihao 已提交
2958
    void*               key = taosHashGetKey(pIter, &keyLen);
2959
    SResultRowPosition* p1 = (SResultRowPosition*)pIter;
2960

dengyihao's avatar
dengyihao 已提交
2961
    pPage = (SFilePage*)getBufPage(pSup->pResultBuf, p1->pageId);
2962
    pRow = (SResultRow*)((char*)pPage + p1->offset);
2963 2964
    setBufPageDirty(pPage, true);
    releaseBufPage(pSup->pResultBuf, pPage);
wmmhello's avatar
wmmhello 已提交
2965 2966 2967

    // recalculate the result size
    int32_t realTotalSize = offset + sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize;
L
Liu Jicong 已提交
2968
    if (realTotalSize > totalSize) {
wmmhello's avatar
wmmhello 已提交
2969
      char* tmp = (char*)taosMemoryRealloc(*result, realTotalSize);
L
Liu Jicong 已提交
2970
      if (tmp == NULL) {
wafwerar's avatar
wafwerar 已提交
2971
        taosMemoryFree(*result);
wmmhello's avatar
wmmhello 已提交
2972
        *result = NULL;
wmmhello's avatar
wmmhello 已提交
2973
        return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
2974
      } else {
wmmhello's avatar
wmmhello 已提交
2975 2976 2977 2978 2979 2980 2981 2982 2983 2984 2985 2986
        *result = tmp;
      }
    }
    // save key
    *(int32_t*)(*result + offset) = keyLen;
    offset += sizeof(int32_t);
    memcpy(*result + offset, key, keyLen);
    offset += keyLen;

    // save value
    *(int32_t*)(*result + offset) = pSup->resultRowSize;
    offset += sizeof(int32_t);
2987
    memcpy(*result + offset, pRow, pSup->resultRowSize);
wmmhello's avatar
wmmhello 已提交
2988 2989 2990 2991 2992
    offset += pSup->resultRowSize;

    pIter = taosHashIterate(pSup->pResultRowHashTable, pIter);
  }

wmmhello's avatar
wmmhello 已提交
2993 2994 2995 2996
  *(int32_t*)(*result) = offset;
  *length = offset;

  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2997 2998
}

2999
int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) {
3000
  if (result == NULL) {
wmmhello's avatar
wmmhello 已提交
3001
    return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
3002
  }
wmmhello's avatar
wmmhello 已提交
3003
  SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
3004
  SAggSupporter*  pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
wmmhello's avatar
wmmhello 已提交
3005 3006

  //  int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
3007
  int32_t length = *(int32_t*)(result);
wmmhello's avatar
wmmhello 已提交
3008
  int32_t offset = sizeof(int32_t);
3009 3010 3011 3012

  int32_t count = *(int32_t*)(result + offset);
  offset += sizeof(int32_t);

L
Liu Jicong 已提交
3013
  while (count-- > 0 && length > offset) {
wmmhello's avatar
wmmhello 已提交
3014 3015 3016
    int32_t keyLen = *(int32_t*)(result + offset);
    offset += sizeof(int32_t);

L
Liu Jicong 已提交
3017
    uint64_t    tableGroupId = *(uint64_t*)(result + offset);
3018
    SResultRow* resultRow = getNewResultRow(pSup->pResultBuf, tableGroupId, pSup->resultRowSize);
L
Liu Jicong 已提交
3019
    if (!resultRow) {
wmmhello's avatar
wmmhello 已提交
3020
      return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
3021
    }
3022

wmmhello's avatar
wmmhello 已提交
3023
    // add a new result set for a new group
3024 3025
    SResultRowPosition pos = {.pageId = resultRow->pageId, .offset = resultRow->offset};
    taosHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &pos, sizeof(SResultRowPosition));
wmmhello's avatar
wmmhello 已提交
3026 3027 3028

    offset += keyLen;
    int32_t valueLen = *(int32_t*)(result + offset);
L
Liu Jicong 已提交
3029
    if (valueLen != pSup->resultRowSize) {
wmmhello's avatar
wmmhello 已提交
3030
      return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
3031 3032 3033 3034 3035 3036 3037 3038 3039 3040
    }
    offset += sizeof(int32_t);
    int32_t pageId = resultRow->pageId;
    int32_t pOffset = resultRow->offset;
    memcpy(resultRow, result + offset, valueLen);
    resultRow->pageId = pageId;
    resultRow->offset = pOffset;
    offset += valueLen;

    initResultRow(resultRow);
dengyihao's avatar
dengyihao 已提交
3041
    pInfo->resultRowInfo.cur = (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset};
wmmhello's avatar
wmmhello 已提交
3042 3043
  }

L
Liu Jicong 已提交
3044
  if (offset != length) {
wmmhello's avatar
wmmhello 已提交
3045
    return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
3046
  }
wmmhello's avatar
wmmhello 已提交
3047
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
3048 3049
}

3050 3051
enum {
  PROJECT_RETRIEVE_CONTINUE = 0x1,
L
Liu Jicong 已提交
3052
  PROJECT_RETRIEVE_DONE = 0x2,
3053 3054 3055 3056 3057
};

static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
  SProjectOperatorInfo* pProjectInfo = pOperator->info;
  SOptrBasicInfo*       pInfo = &pProjectInfo->binfo;
L
Liu Jicong 已提交
3058
  SSDataBlock*          pRes = pInfo->pRes;
3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105 3106

  if (pProjectInfo->curSOffset > 0) {
    if (pProjectInfo->groupId == 0) {  // it is the first group
      pProjectInfo->groupId = pBlock->info.groupId;
      blockDataCleanup(pInfo->pRes);
      return PROJECT_RETRIEVE_CONTINUE;
    } else if (pProjectInfo->groupId != pBlock->info.groupId) {
      pProjectInfo->curSOffset -= 1;

      // ignore data block in current group
      if (pProjectInfo->curSOffset > 0) {
        blockDataCleanup(pInfo->pRes);
        return PROJECT_RETRIEVE_CONTINUE;
      }
    }

    // set current group id of the project operator
    pProjectInfo->groupId = pBlock->info.groupId;
  }

  if (pProjectInfo->groupId != 0 && pProjectInfo->groupId != pBlock->info.groupId) {
    pProjectInfo->curGroupOutput += 1;
    if ((pProjectInfo->slimit.limit > 0) && (pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput)) {
      pOperator->status = OP_EXEC_DONE;
      blockDataCleanup(pRes);

      return PROJECT_RETRIEVE_DONE;
    }

    // reset the value for a new group data
    pProjectInfo->curOffset = 0;
    pProjectInfo->curOutput = 0;
  }

  // here we reach the start position, according to the limit/offset requirements.

  // set current group id
  pProjectInfo->groupId = pBlock->info.groupId;

  if (pProjectInfo->curOffset >= pRes->info.rows) {
    pProjectInfo->curOffset -= pRes->info.rows;
    blockDataCleanup(pRes);
    return PROJECT_RETRIEVE_CONTINUE;
  } else if (pProjectInfo->curOffset < pRes->info.rows && pProjectInfo->curOffset > 0) {
    blockDataTrimFirstNRows(pRes, pProjectInfo->curOffset);
    pProjectInfo->curOffset = 0;
  }

3107 3108 3109
  // check for the limitation in each group
  if (pProjectInfo->limit.limit > 0 && pProjectInfo->curOutput + pRes->info.rows >= pProjectInfo->limit.limit) {
    pRes->info.rows = (int32_t)(pProjectInfo->limit.limit - pProjectInfo->curOutput);
3110
    if (pProjectInfo->slimit.limit > 0 && pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput) {
3111 3112 3113
      pOperator->status = OP_EXEC_DONE;
    }

3114
    return PROJECT_RETRIEVE_DONE;
3115
  }
3116

3117
  // todo optimize performance
3118 3119
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
  // they may not belong to the same group the limit/offset value is not valid in this case.
L
Liu Jicong 已提交
3120 3121
  if (pRes->info.rows >= pOperator->resultInfo.threshold || pProjectInfo->slimit.offset != -1 ||
      pProjectInfo->slimit.limit != -1) {
3122
    return PROJECT_RETRIEVE_DONE;
L
Liu Jicong 已提交
3123
  } else {  // not full enough, continue to accumulate the output data in the buffer.
3124 3125 3126 3127
    return PROJECT_RETRIEVE_CONTINUE;
  }
}

3128
static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
3129
  SProjectOperatorInfo* pProjectInfo = pOperator->info;
L
Liu Jicong 已提交
3130
  SOptrBasicInfo*       pInfo = &pProjectInfo->binfo;
3131

3132
  SExprSupp*   pSup = &pOperator->exprSupp;
3133
  SSDataBlock* pRes = pInfo->pRes;
3134
  blockDataCleanup(pRes);
3135

3136
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3137 3138 3139
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
3140

H
Haojun Liao 已提交
3141
#if 0
3142 3143 3144 3145 3146
  if (pProjectInfo->existDataBlock) {  // TODO refactor
    SSDataBlock* pBlock = pProjectInfo->existDataBlock;
    pProjectInfo->existDataBlock = NULL;

    // the pDataBlock are always the same one, no need to call this again
H
Haojun Liao 已提交
3147
    setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC);
3148

H
Haojun Liao 已提交
3149
    blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
3150
    projectApplyFunctions(pOperator->exprSupp.pExprInfo, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->exprSupp.numOfExprs);
L
Liu Jicong 已提交
3151
    if (pRes->info.rows >= pProjectInfo->binfo.capacity * 0.8) {
3152 3153
      copyTsColoum(pRes, pInfo->pCtx, pOperator->exprSupp.numOfExprs);
      resetResultRowEntryResult(pInfo->pCtx, pOperator->exprSupp.numOfExprs);
3154 3155 3156
      return pRes;
    }
  }
H
Haojun Liao 已提交
3157
#endif
3158

3159
  int64_t st = 0;
3160 3161 3162
  int32_t order = 0;
  int32_t scanFlag = 0;

3163 3164 3165 3166
  if (pOperator->cost.openCost == 0) {
    st = taosGetTimestampUs();
  }

H
Haojun Liao 已提交
3167 3168
  SOperatorInfo* downstream = pOperator->pDownstream[0];

L
Liu Jicong 已提交
3169
  while (1) {
H
Haojun Liao 已提交
3170
    // The downstream exec may change the value of the newgroup, so use a local variable instead.
3171
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
3172
    if (pBlock == NULL) {
3173
      doSetOperatorCompleted(pOperator);
3174 3175 3176 3177
      break;
    }

    // the pDataBlock are always the same one, no need to call this again
3178
    int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag);
3179 3180 3181
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, code);
    }
3182

3183
    doFilter(pProjectInfo->pFilterNode, pBlock);
3184

3185
    setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
3186 3187
    blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);

3188
    code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
X
Xiaoyu Wang 已提交
3189
                                 pProjectInfo->pPseudoColInfo);
3190 3191
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, code);
3192 3193
    }

3194 3195
    int32_t status = handleLimitOffset(pOperator, pBlock);
    if (status == PROJECT_RETRIEVE_CONTINUE) {
H
Haojun Liao 已提交
3196
      continue;
L
Liu Jicong 已提交
3197
    } else if (status == PROJECT_RETRIEVE_DONE) {
3198 3199 3200
      break;
    }
  }
dengyihao's avatar
dengyihao 已提交
3201

H
Haojun Liao 已提交
3202
  pProjectInfo->curOutput += pInfo->pRes->info.rows;
H
Haojun Liao 已提交
3203

3204 3205 3206 3207
  size_t rows = pInfo->pRes->info.rows;
  pOperator->resultInfo.totalRows += rows;

  if (pOperator->cost.openCost == 0) {
3208
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
3209 3210
  }

3211
  return (rows > 0) ? pInfo->pRes : NULL;
3212 3213
}

L
Liu Jicong 已提交
3214 3215
static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, bool* newgroup,
                                               SExecTaskInfo* pTaskInfo) {
3216
  pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
H
Haojun Liao 已提交
3217

L
Liu Jicong 已提交
3218 3219
  int64_t ekey = Q_STATUS_EQUAL(pTaskInfo->status, TASK_COMPLETED) ? pTaskInfo->window.ekey
                                                                   : pInfo->existNewGroupBlock->info.window.ekey;
3220 3221
  taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));

3222
  taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
3223 3224
  taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);

3225
  doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pResultInfo->capacity);
3226 3227 3228 3229
  pInfo->existNewGroupBlock = NULL;
  *newgroup = true;
}

L
Liu Jicong 已提交
3230 3231
static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, bool* newgroup,
                                            SExecTaskInfo* pTaskInfo) {
3232 3233
  if (taosFillHasMoreResults(pInfo->pFillInfo)) {
    *newgroup = false;
3234
    doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pResultInfo->capacity);
H
Haojun Liao 已提交
3235
    if (pInfo->pRes->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult)) {
3236 3237 3238 3239 3240 3241
      return;
    }
  }

  // handle the cached new group data block
  if (pInfo->existNewGroupBlock) {
3242
    doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, newgroup, pTaskInfo);
3243 3244 3245
  }
}

3246
static SSDataBlock* doFill(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
3247 3248
  SFillOperatorInfo* pInfo = pOperator->info;
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
3249

H
Haojun Liao 已提交
3250
  SResultInfo* pResultInfo = &pOperator->resultInfo;
3251 3252 3253
  SSDataBlock* pResBlock = pInfo->pRes;

  blockDataCleanup(pResBlock);
3254 3255 3256 3257
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

3258
  // todo handle different group data interpolation
X
Xiaoyu Wang 已提交
3259 3260
  bool  n = false;
  bool* newgroup = &n;
3261
  doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo);
3262 3263
  if (pResBlock->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult && pResBlock->info.rows > 0)) {
    return pResBlock;
H
Haojun Liao 已提交
3264
  }
3265

H
Haojun Liao 已提交
3266
  SOperatorInfo* pDownstream = pOperator->pDownstream[0];
L
Liu Jicong 已提交
3267
  while (1) {
3268
    SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream);
3269 3270 3271 3272 3273 3274 3275 3276 3277 3278
    if (*newgroup) {
      assert(pBlock != NULL);
    }

    if (*newgroup && pInfo->totalInputRows > 0) {  // there are already processed current group data block
      pInfo->existNewGroupBlock = pBlock;
      *newgroup = false;

      // Fill the previous group data block, before handle the data block of new group.
      // Close the fill operation for previous group data block
3279
      taosFillSetStartInfo(pInfo->pFillInfo, 0, pTaskInfo->window.ekey);
3280 3281 3282 3283 3284 3285 3286
    } else {
      if (pBlock == NULL) {
        if (pInfo->totalInputRows == 0) {
          pOperator->status = OP_EXEC_DONE;
          return NULL;
        }

3287
        taosFillSetStartInfo(pInfo->pFillInfo, 0, pTaskInfo->window.ekey);
3288 3289 3290 3291 3292 3293 3294
      } else {
        pInfo->totalInputRows += pBlock->info.rows;
        taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey);
        taosFillSetInputDataBlock(pInfo->pFillInfo, pBlock);
      }
    }

3295 3296
    blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
    doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pResBlock, pOperator->resultInfo.capacity);
3297 3298

    // current group has no more result to return
3299
    if (pResBlock->info.rows > 0) {
3300 3301
      // 1. The result in current group not reach the threshold of output result, continue
      // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
3302 3303
      if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || (!pInfo->multigroupResult)) {
        return pResBlock;
3304 3305
      }

3306
      doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo);
3307 3308
      if (pResBlock->info.rows > pOperator->resultInfo.threshold || pBlock == NULL) {
        return pResBlock;
3309 3310 3311
      }
    } else if (pInfo->existNewGroupBlock) {  // try next group
      assert(pBlock != NULL);
3312
      doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, newgroup, pTaskInfo);
3313 3314
      if (pResBlock->info.rows > pResultInfo->threshold) {
        return pResBlock;
3315 3316 3317 3318 3319 3320 3321
      }
    } else {
      return NULL;
    }
  }
}

H
Haojun Liao 已提交
3322 3323 3324 3325 3326 3327 3328 3329 3330 3331 3332
static void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
  for (int32_t i = 0; i < numOfExprs; ++i) {
    SExprInfo* pExprInfo = &pExpr[i];
    if (pExprInfo->pExpr->nodeType == QUERY_NODE_COLUMN) {
      taosMemoryFree(pExprInfo->base.pParam[0].pCol);
    }
    taosMemoryFree(pExprInfo->base.pParam);
    taosMemoryFree(pExprInfo->pExpr);
  }
}

3333 3334 3335 3336 3337
static void destroyOperatorInfo(SOperatorInfo* pOperator) {
  if (pOperator == NULL) {
    return;
  }

3338
  if (pOperator->fpSet.closeFn != NULL) {
3339
    pOperator->fpSet.closeFn(pOperator->info, pOperator->exprSupp.numOfExprs);
3340 3341
  }

H
Haojun Liao 已提交
3342
  if (pOperator->pDownstream != NULL) {
L
Liu Jicong 已提交
3343
    for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
H
Haojun Liao 已提交
3344
      destroyOperatorInfo(pOperator->pDownstream[i]);
3345 3346
    }

wafwerar's avatar
wafwerar 已提交
3347
    taosMemoryFreeClear(pOperator->pDownstream);
H
Haojun Liao 已提交
3348
    pOperator->numOfDownstream = 0;
3349 3350
  }

3351 3352
  if (pOperator->exprSupp.pExprInfo != NULL) {
    destroyExprInfo(pOperator->exprSupp.pExprInfo, pOperator->exprSupp.numOfExprs);
H
Haojun Liao 已提交
3353 3354
  }

3355
  taosMemoryFreeClear(pOperator->exprSupp.pExprInfo);
wafwerar's avatar
wafwerar 已提交
3356 3357
  taosMemoryFreeClear(pOperator->info);
  taosMemoryFreeClear(pOperator);
3358 3359
}

3360 3361 3362 3363 3364 3365 3366 3367 3368 3369 3370 3371 3372 3373 3374
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) {
  *defaultPgsz = 4096;
  while (*defaultPgsz < rowSize * 4) {
    *defaultPgsz <<= 1u;
  }

  // at least four pages need to be in buffer
  *defaultBufsz = 4096 * 256;
  if ((*defaultBufsz) <= (*defaultPgsz)) {
    (*defaultBufsz) = (*defaultPgsz) * 4;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
3375 3376
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                         const char* pKey) {
3377 3378
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);

dengyihao's avatar
dengyihao 已提交
3379 3380
  pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
  pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
3381 3382
  pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);

H
Haojun Liao 已提交
3383
  if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
3384 3385 3386
    return TSDB_CODE_OUT_OF_MEMORY;
  }

dengyihao's avatar
dengyihao 已提交
3387
  uint32_t defaultPgsz = 0;
3388 3389
  uint32_t defaultBufsz = 0;
  getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
H
Haojun Liao 已提交
3390

3391
  int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, TD_TMP_DIR_PATH);
H
Haojun Liao 已提交
3392 3393 3394 3395
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

3396 3397 3398
  return TSDB_CODE_SUCCESS;
}

3399
void cleanupAggSup(SAggSupporter* pAggSup) {
wafwerar's avatar
wafwerar 已提交
3400
  taosMemoryFreeClear(pAggSup->keyBuf);
3401
  taosHashCleanup(pAggSup->pResultRowHashTable);
H
Haojun Liao 已提交
3402
  destroyDiskbasedBuf(pAggSup->pResultBuf);
3403 3404
}

3405 3406
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
                    const char* pkey) {
3407 3408 3409 3410 3411
  int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

3412
  doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
L
Liu Jicong 已提交
3413
  for (int32_t i = 0; i < numOfCols; ++i) {
3414
    pSup->pCtx[i].pBuf = pAggSup->pResultBuf;
3415 3416
  }

3417
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
3418 3419
}

3420 3421 3422 3423 3424 3425 3426 3427 3428
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) {
  pOperator->resultInfo.capacity = numOfRows;
  pOperator->resultInfo.threshold = numOfRows * 0.75;

  if (pOperator->resultInfo.threshold == 0) {
    pOperator->resultInfo.capacity = numOfRows;
  }
}

3429 3430 3431 3432 3433
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
  pInfo->pRes = pBlock;
  initResultRowInfo(&pInfo->resultRowInfo);
}

3434
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) {
3435 3436 3437 3438
  pSup->pExprInfo = pExprInfo;
  pSup->numOfExprs = numOfExpr;
  if (pSup->pExprInfo != NULL) {
    pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset);
3439 3440 3441
    if (pSup->pCtx == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
3442
  }
3443 3444

  return TSDB_CODE_SUCCESS;
3445 3446
}

L
Liu Jicong 已提交
3447
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
3448
                                           SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
wmmhello's avatar
wmmhello 已提交
3449
                                           int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) {
wafwerar's avatar
wafwerar 已提交
3450
  SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
L
Liu Jicong 已提交
3451
  SOperatorInfo*    pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
3452 3453 3454
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
H
Haojun Liao 已提交
3455

3456
  int32_t numOfRows = 1024;
dengyihao's avatar
dengyihao 已提交
3457
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
3458 3459

  initResultSizeInfo(pOperator, numOfRows);
3460
  int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
L
Liu Jicong 已提交
3461
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3462 3463
    goto _error;
  }
H
Haojun Liao 已提交
3464

3465
  initBasicInfo(&pInfo->binfo, pResultBlock);
3466 3467 3468 3469
  code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3470

3471
  pInfo->groupId = INT32_MIN;
dengyihao's avatar
dengyihao 已提交
3472
  pOperator->name = "TableAggregate";
3473
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
3474
  pOperator->blocking = true;
dengyihao's avatar
dengyihao 已提交
3475 3476 3477
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
3478

3479 3480
  pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo,
                                         aggEncodeResultRow, aggDecodeResultRow, NULL);
H
Haojun Liao 已提交
3481 3482 3483 3484 3485

  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3486 3487

  return pOperator;
L
Liu Jicong 已提交
3488
_error:
H
Haojun Liao 已提交
3489
  destroyAggOperatorInfo(pInfo, numOfCols);
wafwerar's avatar
wafwerar 已提交
3490 3491
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
H
Haojun Liao 已提交
3492 3493
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
3494 3495
}

3496 3497 3498 3499 3500 3501 3502 3503 3504 3505 3506 3507 3508 3509 3510 3511 3512 3513 3514
static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
  if (pCtx == NULL) {
    return NULL;
  }

  for (int32_t i = 0; i < numOfOutput; ++i) {
    for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) {
      taosVariantDestroy(&pCtx[i].param[j].param);
    }

    taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
    taosMemoryFree(pCtx[i].input.pData);
    taosMemoryFree(pCtx[i].input.pColumnDataAgg);
  }

  taosMemoryFreeClear(pCtx);
  return NULL;
}

3515
void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
3516 3517
  assert(pInfo != NULL);
  cleanupResultRowInfo(&pInfo->resultRowInfo);
H
Haojun Liao 已提交
3518
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
3519 3520
}

H
Haojun Liao 已提交
3521
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
3522
  SOptrBasicInfo* pInfo = (SOptrBasicInfo*)param;
3523
  cleanupBasicInfo(pInfo);
3524
}
H
Haojun Liao 已提交
3525 3526

void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
3527
  SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
3528
  cleanupBasicInfo(&pInfo->binfo);
3529
}
3530

H
Haojun Liao 已提交
3531
void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
3532
  SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param;
3533
  pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
H
Haojun Liao 已提交
3534
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
wafwerar's avatar
wafwerar 已提交
3535
  taosMemoryFreeClear(pInfo->p);
3536 3537
}

H
Haojun Liao 已提交
3538
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
D
fix bug  
dapan 已提交
3539 3540 3541
  if (NULL == param) {
    return;
  }
L
Liu Jicong 已提交
3542
  SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*)param;
3543
  cleanupBasicInfo(&pInfo->binfo);
H
Haojun Liao 已提交
3544
  cleanupAggSup(&pInfo->aggSup);
H
Haojun Liao 已提交
3545
  taosArrayDestroy(pInfo->pPseudoColInfo);
3546 3547
}

3548
void cleanupExecSupp(SExprSupp* pSupp) {
3549 3550 3551 3552 3553 3554
  destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs);
  destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);

  taosMemoryFree(pSupp->rowEntryInfoOffset);
}

H
Haojun Liao 已提交
3555
static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) {
3556
  SIndefOperatorInfo* pInfo = (SIndefOperatorInfo*)param;
3557
  cleanupBasicInfo(&pInfo->binfo);
H
Haojun Liao 已提交
3558 3559 3560

  taosArrayDestroy(pInfo->pPseudoColInfo);
  cleanupAggSup(&pInfo->aggSup);
3561
  cleanupExecSupp(&pInfo->scalarSup);
H
Haojun Liao 已提交
3562 3563
}

H
Haojun Liao 已提交
3564
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
3565
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
3566 3567 3568 3569
  taosRemoveRef(exchangeObjRefPool, pExInfo->self);
}

void doDestroyExchangeOperatorInfo(void* param) {
X
Xiaoyu Wang 已提交
3570
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
3571

H
Haojun Liao 已提交
3572 3573 3574 3575 3576 3577 3578 3579 3580
  taosArrayDestroy(pExInfo->pSources);
  taosArrayDestroy(pExInfo->pSourceDataInfo);
  if (pExInfo->pResult != NULL) {
    blockDataDestroy(pExInfo->pResult);
  }

  tsem_destroy(&pExInfo->ready);
}

H
Haojun Liao 已提交
3581 3582
static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols) {
  SArray* pList = taosArrayInit(4, sizeof(int32_t));
dengyihao's avatar
dengyihao 已提交
3583
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
3584 3585 3586 3587 3588 3589 3590 3591
    if (fmIsPseudoColumnFunc(pCtx[i].functionId)) {
      taosArrayPush(pList, &i);
    }
  }

  return pList;
}

3592 3593 3594 3595
static int64_t getLimit(SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; }

static int64_t getOffset(SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; }

3596
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode,
dengyihao's avatar
dengyihao 已提交
3597
                                         SExecTaskInfo* pTaskInfo) {
wafwerar's avatar
wafwerar 已提交
3598
  SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
L
Liu Jicong 已提交
3599
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
3600 3601 3602
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
3603

3604
  int32_t    numOfCols = 0;
3605 3606 3607
  SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &numOfCols);

  SSDataBlock* pResBlock = createResDataBlock(pProjPhyNode->node.pOutputDataBlockDesc);
3608 3609
  SLimit       limit = {.limit = getLimit(pProjPhyNode->node.pLimit), .offset = getOffset(pProjPhyNode->node.pLimit)};
  SLimit slimit = {.limit = getLimit(pProjPhyNode->node.pSlimit), .offset = getOffset(pProjPhyNode->node.pSlimit)};
3610

3611 3612 3613 3614
  pInfo->limit = limit;
  pInfo->slimit = slimit;
  pInfo->curOffset = limit.offset;
  pInfo->curSOffset = slimit.offset;
H
Haojun Liao 已提交
3615
  pInfo->binfo.pRes = pResBlock;
3616
  pInfo->pFilterNode = pProjPhyNode->node.pConditions;
H
Haojun Liao 已提交
3617 3618

  int32_t numOfRows = 4096;
dengyihao's avatar
dengyihao 已提交
3619
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
3620

3621 3622 3623 3624 3625
  // Make sure the size of SSDataBlock will never exceed the size of 2MB.
  int32_t TWOMB = 2 * 1024 * 1024;
  if (numOfRows * pResBlock->info.rowSize > TWOMB) {
    numOfRows = TWOMB / pResBlock->info.rowSize;
  }
3626
  initResultSizeInfo(pOperator, numOfRows);
3627

3628 3629
  initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
  initBasicInfo(&pInfo->binfo, pResBlock);
3630
  setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols);
3631

3632
  pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols);
X
Xiaoyu Wang 已提交
3633
  pOperator->name = "ProjectOperator";
H
Haojun Liao 已提交
3634
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
X
Xiaoyu Wang 已提交
3635 3636 3637 3638
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
3639

L
Liu Jicong 已提交
3640 3641
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL,
                                         destroyProjectOperatorInfo, NULL, NULL, NULL);
L
Liu Jicong 已提交
3642

3643
  int32_t code = appendDownstream(pOperator, &downstream, 1);
H
Haojun Liao 已提交
3644
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3645 3646
    goto _error;
  }
3647 3648

  return pOperator;
H
Haojun Liao 已提交
3649

L
Liu Jicong 已提交
3650
_error:
H
Haojun Liao 已提交
3651 3652
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
3653 3654
}

H
Haojun Liao 已提交
3655 3656
static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
  SIndefOperatorInfo* pIndefInfo = pOperator->info;
3657
  SOptrBasicInfo*     pInfo = &pIndefInfo->binfo;
3658
  SExprSupp*          pSup = &pOperator->exprSupp;
H
Haojun Liao 已提交
3659 3660 3661 3662 3663 3664 3665 3666 3667 3668 3669 3670 3671 3672 3673 3674 3675 3676 3677 3678 3679 3680 3681 3682 3683 3684 3685 3686 3687 3688 3689 3690 3691 3692

  SSDataBlock* pRes = pInfo->pRes;
  blockDataCleanup(pRes);

  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  int64_t st = 0;
  int32_t order = 0;
  int32_t scanFlag = 0;

  if (pOperator->cost.openCost == 0) {
    st = taosGetTimestampUs();
  }

  SOperatorInfo* downstream = pOperator->pDownstream[0];

  while (1) {
    // The downstream exec may change the value of the newgroup, so use a local variable instead.
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
    if (pBlock == NULL) {
      doSetOperatorCompleted(pOperator);
      break;
    }

    // the pDataBlock are always the same one, no need to call this again
    int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag);
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, code);
    }

    // there is an scalar expression that needs to be calculated before apply the group aggregation.
3693
    SExprSupp* pScalarSup = &pIndefInfo->scalarSup;
3694 3695
    if (pScalarSup->pExprInfo != NULL) {
      code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
3696
                                   pIndefInfo->pPseudoColInfo);
H
Haojun Liao 已提交
3697 3698 3699 3700 3701
      if (code != TSDB_CODE_SUCCESS) {
        longjmp(pTaskInfo->env, code);
      }
    }

3702
    setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
H
Haojun Liao 已提交
3703 3704
    blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);

3705 3706
    code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pInfo->pRes, pBlock, pSup->pCtx,
                                 pOperator->exprSupp.numOfExprs, pIndefInfo->pPseudoColInfo);
H
Haojun Liao 已提交
3707 3708 3709 3710 3711 3712 3713 3714 3715 3716 3717 3718 3719 3720 3721
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, code);
    }
  }

  size_t rows = pInfo->pRes->info.rows;
  pOperator->resultInfo.totalRows += rows;

  if (pOperator->cost.openCost == 0) {
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
  }

  return (rows > 0) ? pInfo->pRes : NULL;
}

3722 3723
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode,
                                                 SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
3724
  SIndefOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIndefOperatorInfo));
3725
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
3726 3727 3728 3729
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

3730 3731
  SExprSupp* pSup = &pOperator->exprSupp;

H
Haojun Liao 已提交
3732 3733 3734
  SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode;

  int32_t    numOfExpr = 0;
X
Xiaoyu Wang 已提交
3735
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->pFuncs, NULL, &numOfExpr);
H
Haojun Liao 已提交
3736 3737

  if (pPhyNode->pExprs != NULL) {
3738
    int32_t    num = 0;
3739
    SExprInfo* pSExpr = createExprInfo(pPhyNode->pExprs, NULL, &num);
3740
    int32_t    code = initExprSupp(&pInfo->scalarSup, pSExpr, num);
3741 3742 3743
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
H
Haojun Liao 已提交
3744 3745
  }

3746
  SSDataBlock* pResBlock = createResDataBlock(pPhyNode->node.pOutputDataBlockDesc);
H
Haojun Liao 已提交
3747 3748 3749 3750 3751 3752 3753 3754 3755 3756 3757

  int32_t numOfRows = 4096;
  size_t  keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;

  // Make sure the size of SSDataBlock will never exceed the size of 2MB.
  int32_t TWOMB = 2 * 1024 * 1024;
  if (numOfRows * pResBlock->info.rowSize > TWOMB) {
    numOfRows = TWOMB / pResBlock->info.rowSize;
  }
  initResultSizeInfo(pOperator, numOfRows);

3758 3759 3760
  initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str);
  initBasicInfo(&pInfo->binfo, pResBlock);

3761
  setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
H
Haojun Liao 已提交
3762

3763
  pInfo->binfo.pRes = pResBlock;
3764
  pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
H
Haojun Liao 已提交
3765

3766
  pOperator->name = "IndefinitOperator";
H
Haojun Liao 已提交
3767
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
3768 3769 3770
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
3771 3772
  pOperator->exprSupp.pExprInfo = pExprInfo;
  pOperator->exprSupp.numOfExprs = numOfExpr;
3773
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
3774 3775 3776 3777 3778 3779 3780 3781 3782 3783 3784

  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL,
                                         destroyIndefinitOperatorInfo, NULL, NULL, NULL);

  int32_t code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

3785
_error:
H
Haojun Liao 已提交
3786 3787 3788 3789 3790 3791
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}

3792
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SNodeListNode* pValNode,
L
Liu Jicong 已提交
3793
                            STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) {
3794
  SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pValNode);
H
Haojun Liao 已提交
3795 3796

  STimeWindow w = TSWINDOW_INITIALIZER;
3797
  getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey, &w);
H
Haojun Liao 已提交
3798 3799

  int32_t order = TSDB_ORDER_ASC;
3800
  pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, id);
H
Haojun Liao 已提交
3801

wafwerar's avatar
wafwerar 已提交
3802
  pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
H
Haojun Liao 已提交
3803
  if (pInfo->pFillInfo == NULL || pInfo->p == NULL) {
H
Haojun Liao 已提交
3804 3805
    taosMemoryFree(pInfo->pFillInfo);
    taosMemoryFree(pInfo->p);
H
Haojun Liao 已提交
3806 3807 3808 3809 3810 3811
    return TSDB_CODE_OUT_OF_MEMORY;
  } else {
    return TSDB_CODE_SUCCESS;
  }
}

3812 3813 3814 3815 3816 3817 3818 3819
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, bool multigroupResult,
                                      SExecTaskInfo* pTaskInfo) {
  SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

3820 3821 3822 3823 3824
  int32_t      num = 0;
  SSDataBlock* pResBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc);
  SExprInfo*   pExprInfo = createExprInfo(pPhyFillNode->pTargets, NULL, &num);
  SInterval*   pInterval = &((SIntervalAggOperatorInfo*)downstream->info)->interval;
  int32_t      type = convertFillType(pPhyFillNode->mode);
3825

H
Haojun Liao 已提交
3826
  SResultInfo* pResultInfo = &pOperator->resultInfo;
3827 3828
  initResultSizeInfo(pOperator, 4096);

3829 3830
  int32_t code = initFillInfo(pInfo, pExprInfo, num, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange,
                              pResultInfo->capacity, pTaskInfo->id.str, pInterval, type);
3831 3832 3833
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3834

3835
  pInfo->pRes = pResBlock;
3836
  pInfo->multigroupResult = multigroupResult;
dengyihao's avatar
dengyihao 已提交
3837
  pOperator->name = "FillOperator";
3838
  pOperator->blocking = false;
dengyihao's avatar
dengyihao 已提交
3839
  pOperator->status = OP_NOT_OPENED;
3840
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
3841 3842
  pOperator->exprSupp.pExprInfo = pExprInfo;
  pOperator->exprSupp.numOfExprs = num;
dengyihao's avatar
dengyihao 已提交
3843
  pOperator->info = pInfo;
3844
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
3845

L
Liu Jicong 已提交
3846 3847
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL);
3848

3849
  code = appendDownstream(pOperator, &downstream, 1);
3850
  return pOperator;
H
Haojun Liao 已提交
3851

L
Liu Jicong 已提交
3852
_error:
wafwerar's avatar
wafwerar 已提交
3853 3854
  taosMemoryFreeClear(pOperator);
  taosMemoryFreeClear(pInfo);
H
Haojun Liao 已提交
3855
  return NULL;
3856 3857
}

D
dapan1121 已提交
3858
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model, char* dbFName) {
wafwerar's avatar
wafwerar 已提交
3859
  SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
3860
  setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
3861

D
dapan1121 已提交
3862
  pTaskInfo->schemaVer.dbname = strdup(dbFName);
3863
  pTaskInfo->cost.created = taosGetTimestampMs();
H
Haojun Liao 已提交
3864
  pTaskInfo->id.queryId = queryId;
dengyihao's avatar
dengyihao 已提交
3865
  pTaskInfo->execModel = model;
H
Haojun Liao 已提交
3866

wafwerar's avatar
wafwerar 已提交
3867
  char* p = taosMemoryCalloc(1, 128);
L
Liu Jicong 已提交
3868
  snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId);
H
Haojun Liao 已提交
3869
  pTaskInfo->id.str = p;
H
Haojun Liao 已提交
3870

3871 3872
  return pTaskInfo;
}
H
Haojun Liao 已提交
3873

H
Hongze Cheng 已提交
3874
static STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
H
Haojun Liao 已提交
3875
                                       STableListInfo* pTableListInfo, const char* idstr);
H
Haojun Liao 已提交
3876

H
Haojun Liao 已提交
3877
static SArray* extractColumnInfo(SNodeList* pNodeList);
3878

D
dapan1121 已提交
3879
int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) {
3880 3881
  SMetaReader mr = {0};
  metaReaderInit(&mr, pHandle->meta, 0);
D
dapan1121 已提交
3882 3883 3884 3885 3886
  int32_t code = metaGetTableEntryByUid(&mr, uid);
  if (code) {
    metaReaderClear(&mr);
    return code;
  }
3887 3888 3889 3890

  pTaskInfo->schemaVer.tablename = strdup(mr.me.name);

  if (mr.me.type == TSDB_SUPER_TABLE) {
3891
    pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
3892
    pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version;
3893 3894 3895
  } else if (mr.me.type == TSDB_CHILD_TABLE) {
    tb_uid_t suid = mr.me.ctbEntry.suid;
    metaGetTableEntryByUid(&mr, suid);
3896
    pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
3897
    pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.version;
3898
  } else {
3899
    pTaskInfo->schemaVer.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
3900
  }
3901 3902

  metaReaderClear(&mr);
D
dapan1121 已提交
3903 3904

  return TSDB_CODE_SUCCESS;
3905 3906
}

wmmhello's avatar
wmmhello 已提交
3907 3908 3909 3910 3911 3912 3913 3914 3915 3916 3917 3918 3919 3920 3921 3922 3923 3924 3925 3926 3927 3928 3929 3930 3931 3932 3933 3934 3935 3936 3937 3938 3939 3940 3941 3942 3943 3944 3945 3946 3947 3948 3949 3950 3951 3952 3953 3954 3955 3956 3957 3958 3959 3960 3961 3962 3963 3964 3965
static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum){
  taosArrayClear(pTableListInfo->pGroupList);
  SArray *sortSupport = taosArrayInit(groupNum, sizeof(uint64_t));
  if(sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY;
  for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
    uint64_t* groupId = taosHashGet(pTableListInfo->map, &info->uid, sizeof(uint64_t));

    int32_t index = taosArraySearchIdx(sortSupport, groupId, compareUint64Val, TD_EQ);
    if (index == -1){
      void *p = taosArraySearch(sortSupport, groupId, compareUint64Val, TD_GT);
      SArray *tGroup = taosArrayInit(8, sizeof(STableKeyInfo));
      if(tGroup == NULL) {
        taosArrayDestroy(sortSupport);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      if(taosArrayPush(tGroup, info) == NULL){
        qError("taos push info array error");
        taosArrayDestroy(sortSupport);
        return TSDB_CODE_QRY_APP_ERROR;
      }
      if(p == NULL){
        if(taosArrayPush(sortSupport, groupId) != NULL){
          qError("taos push support array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
        if(taosArrayPush(pTableListInfo->pGroupList, &tGroup) != NULL){
          qError("taos push group array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
      }else{
        int32_t pos = TARRAY_ELEM_IDX(sortSupport, p);
        if(taosArrayInsert(sortSupport, pos, groupId) == NULL){
          qError("taos insert support array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
        if(taosArrayInsert(pTableListInfo->pGroupList, pos, &tGroup) == NULL){
          qError("taos insert group array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
      }
    }else{
      SArray* tGroup = (SArray*)taosArrayGetP(pTableListInfo->pGroupList, index);
      if(taosArrayPush(tGroup, info) == NULL){
        qError("taos push uid array error");
        taosArrayDestroy(sortSupport);
        return TSDB_CODE_QRY_APP_ERROR;
      }
    }

  }
  taosArrayDestroy(sortSupport);
  return TDB_CODE_SUCCESS;
}

wmmhello's avatar
wmmhello 已提交
3966 3967
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group) {
  if (group == NULL) {
wmmhello's avatar
wmmhello 已提交
3968 3969 3970 3971 3972 3973 3974 3975
    return TDB_CODE_SUCCESS;
  }

  pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (pTableListInfo->map == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  int32_t keyLen = 0;
X
Xiaoyu Wang 已提交
3976
  void*   keyBuf = NULL;
wmmhello's avatar
wmmhello 已提交
3977 3978 3979 3980 3981

  SNode*           node;
  FOREACH(node, group) {
    SExprNode *pExpr =  (SExprNode *)node;
    keyLen += pExpr->resType.bytes;
wmmhello's avatar
wmmhello 已提交
3982 3983
  }

wmmhello's avatar
wmmhello 已提交
3984
  int32_t nullFlagSize = sizeof(int8_t) * LIST_LENGTH(group);
wmmhello's avatar
wmmhello 已提交
3985 3986 3987 3988 3989 3990 3991
  keyLen += nullFlagSize;

  keyBuf = taosMemoryCalloc(1, keyLen);
  if (keyBuf == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

3992
  int32_t groupNum = 0;
X
Xiaoyu Wang 已提交
3993 3994 3995
  for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
    SMetaReader    mr = {0};
wmmhello's avatar
wmmhello 已提交
3996 3997 3998
    metaReaderInit(&mr, pHandle->meta, 0);
    metaGetTableEntryByUid(&mr, info->uid);

wmmhello's avatar
wmmhello 已提交
3999 4000
    SNodeList *groupNew = nodesCloneList(group);

wmmhello's avatar
wmmhello 已提交
4001
    nodesRewriteExprsPostOrder(groupNew, doTranslateTagExpr, &mr);
wmmhello's avatar
wmmhello 已提交
4002
    char* isNull = (char*)keyBuf;
wmmhello's avatar
wmmhello 已提交
4003 4004 4005 4006 4007 4008 4009 4010 4011
    char* pStart = (char*)keyBuf + nullFlagSize;

    SNode* pNode;
    int32_t index = 0;
    FOREACH(pNode, groupNew){
      SNode*  pNew = NULL;
      int32_t code = scalarCalculateConstants(pNode, &pNew);
      if (TSDB_CODE_SUCCESS == code) {
        REPLACE_NODE(pNew);
X
Xiaoyu Wang 已提交
4012
      } else {
4013
        taosMemoryFree(keyBuf);
wmmhello's avatar
wmmhello 已提交
4014 4015
        nodesClearList(groupNew);
        return code;
wmmhello's avatar
wmmhello 已提交
4016
      }
4017

wmmhello's avatar
wmmhello 已提交
4018 4019
      ASSERT(nodeType(pNew) == QUERY_NODE_VALUE);
      SValueNode *pValue = (SValueNode *)pNew;
4020

wmmhello's avatar
wmmhello 已提交
4021 4022 4023 4024 4025 4026
      if (pValue->node.resType.type == TSDB_DATA_TYPE_NULL) {
        isNull[index++] = 1;
        continue;
      } else {
        isNull[index++] = 0;
        char*       data = nodesGetValueFromNode(pValue);
wmmhello's avatar
wmmhello 已提交
4027
        if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON){
wmmhello's avatar
wmmhello 已提交
4028
          int32_t len = getJsonValueLen(data);
wmmhello's avatar
wmmhello 已提交
4029 4030 4031
          memcpy(pStart, data, len);
          pStart += len;
        } else if (IS_VAR_DATA_TYPE(pValue->node.resType.type)) {
wmmhello's avatar
wmmhello 已提交
4032 4033
          memcpy(pStart, data, varDataTLen(data));
          pStart += varDataTLen(data);
wmmhello's avatar
wmmhello 已提交
4034
        } else {
wmmhello's avatar
wmmhello 已提交
4035 4036
          memcpy(pStart, data, pValue->node.resType.bytes);
          pStart += pValue->node.resType.bytes;
wmmhello's avatar
wmmhello 已提交
4037 4038 4039
        }
      }
    }
X
Xiaoyu Wang 已提交
4040
    int32_t   len = (int32_t)(pStart - (char*)keyBuf);
4041 4042
    uint64_t groupId = calcGroupId(keyBuf, len);
    taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &groupId, sizeof(uint64_t));
S
slzhou 已提交
4043
    info->groupId = groupId;
4044
    groupNum++;
wmmhello's avatar
wmmhello 已提交
4045

wmmhello's avatar
wmmhello 已提交
4046
    nodesClearList(groupNew);
wmmhello's avatar
wmmhello 已提交
4047 4048 4049
    metaReaderClear(&mr);
  }
  taosMemoryFree(keyBuf);
4050 4051

  if(pTableListInfo->needSortTableByGroupId){
wmmhello's avatar
wmmhello 已提交
4052
    return sortTableGroup(pTableListInfo, groupNum);
4053 4054
  }

wmmhello's avatar
wmmhello 已提交
4055 4056 4057
  return TDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
4058
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
4059
                                  uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo, const char* pUser) {
4060 4061
  int32_t type = nodeType(pPhyNode);

X
Xiaoyu Wang 已提交
4062
  if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
H
Haojun Liao 已提交
4063
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
dengyihao's avatar
dengyihao 已提交
4064
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
4065

wmmhello's avatar
wmmhello 已提交
4066
      int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
D
dapan1121 已提交
4067
      if (code) {
wmmhello's avatar
wmmhello 已提交
4068
        pTaskInfo->code = code;
D
dapan1121 已提交
4069 4070
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
4071

wmmhello's avatar
wmmhello 已提交
4072
      code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
X
Xiaoyu Wang 已提交
4073
      if (code) {
4074
        pTaskInfo->code = terrno;
wmmhello's avatar
wmmhello 已提交
4075 4076 4077
        return NULL;
      }

H
Haojun Liao 已提交
4078
      SOperatorInfo*  pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
4079 4080
      STableScanInfo* pScanInfo = pOperator->info;
      pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
S
slzhou 已提交
4081
      return pOperator;
L
Liu Jicong 已提交
4082

S
slzhou 已提交
4083 4084
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
      STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
wmmhello's avatar
wmmhello 已提交
4085 4086 4087 4088 4089 4090 4091 4092 4093
      int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
      if(code){
        return NULL;
      }
      code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
      if (code) {
        pTaskInfo->code = terrno;
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
4094

4095
      SOperatorInfo*  pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo, queryId, taskId);
4096

4097 4098 4099
      STableScanInfo* pScanInfo = pOperator->info;
      pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
      return pOperator;
L
Liu Jicong 已提交
4100

H
Haojun Liao 已提交
4101
    } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
4102
      return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo);
L
Liu Jicong 已提交
4103

H
Haojun Liao 已提交
4104
    } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
5
54liuyao 已提交
4105
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
4106
      STimeWindowAggSupp   twSup = {
L
Liu Jicong 已提交
4107 4108 4109 4110
            .waterMark = pTableScanNode->watermark,
            .calTrigger = pTableScanNode->triggerType,
            .maxTs = INT64_MIN,
      };
L
Liu Jicong 已提交
4111
      if (pHandle) {
wmmhello's avatar
wmmhello 已提交
4112
        createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
5
54liuyao 已提交
4113
      }
4114

wmmhello's avatar
wmmhello 已提交
4115
      SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTaskInfo, &twSup, queryId, taskId);
H
Haojun Liao 已提交
4116
      return pOperator;
L
Liu Jicong 已提交
4117

H
Haojun Liao 已提交
4118
    } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
L
Liu Jicong 已提交
4119
      SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
4120
      return createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo);
4121
    } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
X
Xiaoyu Wang 已提交
4122
      STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
4123

4124
      int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanPhyNode, pTableListInfo);
4125
      if (code != TSDB_CODE_SUCCESS) {
4126
        pTaskInfo->code = terrno;
4127 4128 4129
        return NULL;
      }

4130
      return createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo);
4131
    } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
4132
      SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
4133 4134 4135
      pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));

      if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
4136
        int32_t code = vnodeGetAllTableList(pHandle->vnode, pBlockNode->uid, pTableListInfo->pTableList);
4137 4138 4139 4140 4141
        if (code != TSDB_CODE_SUCCESS) {
          pTaskInfo->code = terrno;
          return NULL;
        }
      } else {  // Create one table group.
S
slzhou 已提交
4142
        STableKeyInfo info = {.lastKey = 0, .uid = pBlockNode->uid, .groupId = 0};
4143 4144 4145 4146 4147 4148 4149 4150 4151 4152 4153 4154 4155 4156 4157 4158 4159 4160 4161 4162 4163 4164
        taosArrayPush(pTableListInfo->pTableList, &info);
      }

      SQueryTableDataCond cond = {0};

      {
        cond.order = TSDB_ORDER_ASC;
        cond.numOfCols = 1;
        cond.colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
        if (cond.colList == NULL) {
          terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
          return NULL;
        }

        cond.colList->colId = 1;
        cond.colList->type = TSDB_DATA_TYPE_TIMESTAMP;
        cond.colList->bytes = sizeof(TSKEY);

        cond.numOfTWindows = 1;
        cond.twindows = taosMemoryCalloc(1, sizeof(STimeWindow));
        cond.twindows[0] = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
        cond.suid = pBlockNode->suid;
H
Haojun Liao 已提交
4165
        cond.type = BLOCK_LOAD_OFFSET_ORDER;
4166
      }
H
Haojun Liao 已提交
4167 4168 4169

      STsdbReader* pReader = NULL;
      tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, "");
4170 4171
      cleanupQueryTableDataCond(&cond);

4172
      return createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo);
H
Haojun Liao 已提交
4173 4174 4175 4176 4177 4178 4179 4180 4181
    } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
      SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;

//      int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
//      if (code) {
//        pTaskInfo->code = code;
//        return NULL;
//      }

4182 4183
      int32_t code = extractTableSchemaVersion(pHandle, pScanNode->uid, pTaskInfo);

H
Haojun Liao 已提交
4184
      pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
H
Haojun Liao 已提交
4185
      if (pScanNode->tableType == TSDB_SUPER_TABLE) {
4186
        code = vnodeGetAllTableList(pHandle->vnode, pScanNode->uid, pTableListInfo->pTableList);
H
Haojun Liao 已提交
4187 4188 4189 4190 4191 4192 4193 4194 4195
        if (code != TSDB_CODE_SUCCESS) {
          pTaskInfo->code = terrno;
          return NULL;
        }
      } else {  // Create one table group.
        STableKeyInfo info = {.lastKey = 0, .uid = pScanNode->uid, .groupId = 0};
        taosArrayPush(pTableListInfo->pTableList, &info);
      }

H
Haojun Liao 已提交
4196
      return createLastrowScanOperator(pScanNode, pHandle, pTableListInfo->pTableList, pTaskInfo);
H
Haojun Liao 已提交
4197 4198
    } else {
      ASSERT(0);
H
Haojun Liao 已提交
4199 4200 4201
    }
  }

4202 4203
  int32_t num = 0;
  size_t  size = LIST_LENGTH(pPhyNode->pChildren);
H
Haojun Liao 已提交
4204

4205
  SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
dengyihao's avatar
dengyihao 已提交
4206
  for (int32_t i = 0; i < size; ++i) {
4207
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
4208
    ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableListInfo, pUser);
4209 4210 4211
    if (ops[i] == NULL) {
      return NULL;
    }
4212
  }
H
Haojun Liao 已提交
4213

4214
  SOperatorInfo* pOptr = NULL;
H
Haojun Liao 已提交
4215
  if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
4216
    pOptr = createProjectOperatorInfo(ops[0], (SProjectPhysiNode*)pPhyNode, pTaskInfo);
4217
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) {
H
Haojun Liao 已提交
4218 4219
    SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
    SExprInfo*     pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
4220
    SSDataBlock*   pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
H
Haojun Liao 已提交
4221

dengyihao's avatar
dengyihao 已提交
4222
    int32_t    numOfScalarExpr = 0;
4223 4224 4225 4226 4227
    SExprInfo* pScalarExprInfo = NULL;
    if (pAggNode->pExprs != NULL) {
      pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
    }

H
Haojun Liao 已提交
4228 4229
    if (pAggNode->pGroupKeys != NULL) {
      SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys);
dengyihao's avatar
dengyihao 已提交
4230
      pOptr = createGroupOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions,
wmmhello's avatar
wmmhello 已提交
4231
                                      pScalarExprInfo, numOfScalarExpr, pTaskInfo);
H
Haojun Liao 已提交
4232
    } else {
dengyihao's avatar
dengyihao 已提交
4233 4234
      pOptr =
          createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pScalarExprInfo, numOfScalarExpr, pTaskInfo);
H
Haojun Liao 已提交
4235
    }
X
Xiaoyu Wang 已提交
4236
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type || QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
H
Haojun Liao 已提交
4237
    SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
4238

H
Haojun Liao 已提交
4239
    SExprInfo*   pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
4240
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
H
Haojun Liao 已提交
4241

dengyihao's avatar
dengyihao 已提交
4242 4243 4244 4245 4246 4247
    SInterval interval = {.interval = pIntervalPhyNode->interval,
                          .sliding = pIntervalPhyNode->sliding,
                          .intervalUnit = pIntervalPhyNode->intervalUnit,
                          .slidingUnit = pIntervalPhyNode->slidingUnit,
                          .offset = pIntervalPhyNode->offset,
                          .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
H
Haojun Liao 已提交
4248

X
Xiaoyu Wang 已提交
4249 4250 4251 4252 4253
    STimeWindowAggSupp as = {
        .waterMark = pIntervalPhyNode->window.watermark,
        .calTrigger = pIntervalPhyNode->window.triggerType,
        .maxTs = INT64_MIN,
    };
4254
    ASSERT(as.calTrigger != STREAM_TRIGGER_MAX_DELAY);
4255

4256
    int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
L
Liu Jicong 已提交
4257
    bool    isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
4258 4259
    pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pIntervalPhyNode,
                                       pTaskInfo, isStream);
4260

4261 4262
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
    SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode;
S
shenglian zhou 已提交
4263 4264 4265

    SExprInfo*   pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
4266

S
shenglian zhou 已提交
4267 4268 4269 4270 4271 4272
    SInterval interval = {.interval = pIntervalPhyNode->interval,
                          .sliding = pIntervalPhyNode->sliding,
                          .intervalUnit = pIntervalPhyNode->intervalUnit,
                          .slidingUnit = pIntervalPhyNode->slidingUnit,
                          .offset = pIntervalPhyNode->offset,
                          .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
4273

S
shenglian zhou 已提交
4274
    int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
4275
    pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pTaskInfo);
S
shenglian zhou 已提交
4276
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
X
Xiaoyu Wang 已提交
4277
    SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
S
shenglian zhou 已提交
4278 4279 4280 4281 4282 4283 4284 4285 4286 4287 4288 4289 4290

    SExprInfo*   pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);

    SInterval interval = {.interval = pIntervalPhyNode->interval,
                          .sliding = pIntervalPhyNode->sliding,
                          .intervalUnit = pIntervalPhyNode->intervalUnit,
                          .slidingUnit = pIntervalPhyNode->slidingUnit,
                          .offset = pIntervalPhyNode->offset,
                          .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};

    int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
    pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pTaskInfo);
5
54liuyao 已提交
4291
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
4292
    int32_t children = 0;
5
54liuyao 已提交
4293 4294
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) {
4295
    int32_t children = 1;
5
54liuyao 已提交
4296
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
4297
  } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
4298
    pOptr = createSortOperatorInfo(ops[0], (SSortPhysiNode*)pPhyNode, pTaskInfo);
S
shenglian zhou 已提交
4299 4300
  } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT == type) {
    pOptr = createGroupSortOperatorInfo(ops[0], (SGroupSortPhysiNode*)pPhyNode, pTaskInfo);
X
Xiaoyu Wang 已提交
4301
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) {
4302
    SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode;
4303
    pOptr = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo);
4304
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
H
Haojun Liao 已提交
4305 4306
    SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;

X
Xiaoyu Wang 已提交
4307 4308
    STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark,
                             .calTrigger = pSessionNode->window.triggerType};
4309

H
Haojun Liao 已提交
4310
    SExprInfo*   pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num);
4311
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
4312 4313
    int32_t      tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;

X
Xiaoyu Wang 已提交
4314 4315
    pOptr =
        createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo);
4316
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
4317 4318 4319 4320 4321 4322 4323
    pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
    int32_t children = 0;
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION == type) {
    int32_t children = 1;
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
4324
  } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
4325
    pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
4326
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
dengyihao's avatar
dengyihao 已提交
4327
    SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
4328

4329 4330
    STimeWindowAggSupp as = {.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};

dengyihao's avatar
dengyihao 已提交
4331
    SExprInfo*   pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
4332
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
4333 4334
    int32_t      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;

4335
    SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
X
Xiaoyu Wang 已提交
4336
    SColumn      col = extractColumnFromColumnNode(pColNode);
4337
    pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pTaskInfo);
4338
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
5
54liuyao 已提交
4339
    pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
4340
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
4341
    pOptr = createMergeJoinOperatorInfo(ops, size, (SJoinPhysiNode*)pPhyNode, pTaskInfo);
4342
  } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
4343
    pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, false, pTaskInfo);
H
Haojun Liao 已提交
4344 4345
  } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
    pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo);
4346 4347
  } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
    pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
4348 4349
  } else {
    ASSERT(0);
H
Haojun Liao 已提交
4350
  }
4351 4352 4353

  taosMemoryFree(ops);
  return pOptr;
4354
}
H
Haojun Liao 已提交
4355

4356
int32_t compareTimeWindow(const void* p1, const void* p2, const void* param) {
dengyihao's avatar
dengyihao 已提交
4357 4358 4359
  const SQueryTableDataCond* pCond = param;
  const STimeWindow*         pWin1 = p1;
  const STimeWindow*         pWin2 = p2;
4360 4361 4362 4363 4364 4365 4366 4367
  if (pCond->order == TSDB_ORDER_ASC) {
    return pWin1->skey - pWin2->skey;
  } else if (pCond->order == TSDB_ORDER_DESC) {
    return pWin2->skey - pWin1->skey;
  }
  return 0;
}

H
Haojun Liao 已提交
4368
SArray* extractColumnInfo(SNodeList* pNodeList) {
L
Liu Jicong 已提交
4369
  size_t  numOfCols = LIST_LENGTH(pNodeList);
H
Haojun Liao 已提交
4370 4371 4372 4373 4374 4375
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
  if (pList == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

L
Liu Jicong 已提交
4376 4377
  for (int32_t i = 0; i < numOfCols; ++i) {
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
H
Haojun Liao 已提交
4378

4379 4380 4381
    if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

4382
      SColumn c = extractColumnFromColumnNode(pColNode);
4383 4384
      taosArrayPush(pList, &c);
    } else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) {
L
Liu Jicong 已提交
4385 4386
      SValueNode* pValNode = (SValueNode*)pNode->pExpr;
      SColumn     c = {0};
4387
      c.slotId = pNode->slotId;
L
Liu Jicong 已提交
4388 4389 4390 4391
      c.colId = pNode->slotId;
      c.type = pValNode->node.type;
      c.bytes = pValNode->node.resType.bytes;
      c.scale = pValNode->node.resType.scale;
4392 4393 4394 4395
      c.precision = pValNode->node.resType.precision;

      taosArrayPush(pList, &c);
    }
H
Haojun Liao 已提交
4396 4397 4398 4399 4400
  }

  return pList;
}

H
Haojun Liao 已提交
4401
STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableListInfo* pTableListInfo, const char* idstr) {
4402
  int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo);
wmmhello's avatar
wmmhello 已提交
4403 4404 4405 4406 4407 4408
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
    code = 0;
H
Haojun Liao 已提交
4409
    qDebug("no table qualified for query, %s", idstr);
wmmhello's avatar
wmmhello 已提交
4410 4411 4412
    goto _error;
  }

4413
  SQueryTableDataCond cond = {0};
wmmhello's avatar
wmmhello 已提交
4414
  code = initQueryTableDataCond(&cond, pTableScanNode);
4415
  if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
4416
    goto _error;
X
Xiaoyu Wang 已提交
4417
  }
4418

H
Hongze Cheng 已提交
4419
  STsdbReader* pReader;
H
Haojun Liao 已提交
4420
  code = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, idstr);
H
Haojun Liao 已提交
4421 4422 4423 4424
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

4425
  cleanupQueryTableDataCond(&cond);
H
Haojun Liao 已提交
4426 4427

  return pReader;
wmmhello's avatar
wmmhello 已提交
4428 4429 4430 4431

_error:
  terrno = code;
  return NULL;
H
Haojun Liao 已提交
4432 4433
}

L
Liu Jicong 已提交
4434 4435 4436 4437 4438 4439 4440 4441 4442 4443 4444 4445 4446 4447 4448 4449 4450 4451 4452 4453
static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator");
      return TSDB_CODE_QRY_APP_ERROR;
    }

    if (pOperator->numOfDownstream > 1) {
      qError("join not supported for stream block scan");
      return TSDB_CODE_QRY_APP_ERROR;
    }
    return extractTbscanInStreamOpTree(pOperator->pDownstream[0], ppInfo);
  } else {
    SStreamBlockScanInfo* pInfo = pOperator->info;
    ASSERT(pInfo->pSnapshotReadOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
    *ppInfo = pInfo->pSnapshotReadOp->info;
    return 0;
  }
}

4454 4455 4456 4457 4458 4459 4460 4461 4462 4463 4464 4465 4466 4467 4468 4469 4470 4471 4472 4473 4474 4475
int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) {
  if (pNode->pChildren == NULL || LIST_LENGTH(pNode->pChildren) == 0) {
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == pNode->type) {
      *ppNode = (STableScanPhysiNode*)pNode;
      return 0;
    } else {
      ASSERT(0);
      terrno = TSDB_CODE_QRY_APP_ERROR;
      return -1;
    }
  } else {
    if (LIST_LENGTH(pNode->pChildren) != 1) {
      ASSERT(0);
      terrno = TSDB_CODE_QRY_APP_ERROR;
      return -1;
    }
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pNode->pChildren, 0);
    return extractTableScanNode(pChildNode, ppNode);
  }
  return -1;
}

L
Liu Jicong 已提交
4476 4477 4478 4479 4480
int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHandle, int64_t uid, int64_t ts) {
  STableScanInfo* pTableScanInfo = NULL;
  if (extractTbscanInStreamOpTree(pOperator, &pTableScanInfo) < 0) {
    return -1;
  }
4481

L
Liu Jicong 已提交
4482 4483 4484 4485
  STableScanPhysiNode* pNode = NULL;
  if (extractTableScanNode(plan->pNode, &pNode) < 0) {
    ASSERT(0);
  }
4486

H
Haojun Liao 已提交
4487
  tsdbReaderClose(pTableScanInfo->dataReader);
4488

L
Liu Jicong 已提交
4489
  STableListInfo info = {0};
H
Haojun Liao 已提交
4490
  pTableScanInfo->dataReader = doCreateDataReader(pNode, pHandle, &info, NULL);
L
Liu Jicong 已提交
4491 4492 4493 4494
  if (pTableScanInfo->dataReader == NULL) {
    ASSERT(0);
    qError("failed to create data reader");
    return TSDB_CODE_QRY_APP_ERROR;
4495
  }
L
Liu Jicong 已提交
4496
  // TODO: set uid and ts to data reader
4497 4498 4499
  return 0;
}

C
Cary Xu 已提交
4500
int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length, int32_t* nOptrWithVal) {
wmmhello's avatar
wmmhello 已提交
4501
  int32_t code = TDB_CODE_SUCCESS;
4502
  char*   pCurrent = NULL;
wmmhello's avatar
wmmhello 已提交
4503
  int32_t currLength = 0;
4504
  if (ops->fpSet.encodeResultRow) {
C
Cary Xu 已提交
4505
    if (result == NULL || length == NULL || nOptrWithVal == NULL) {
wmmhello's avatar
wmmhello 已提交
4506 4507 4508
      return TSDB_CODE_TSC_INVALID_INPUT;
    }
    code = ops->fpSet.encodeResultRow(ops, &pCurrent, &currLength);
wmmhello's avatar
wmmhello 已提交
4509

4510 4511
    if (code != TDB_CODE_SUCCESS) {
      if (*result != NULL) {
wmmhello's avatar
wmmhello 已提交
4512 4513 4514 4515
        taosMemoryFree(*result);
        *result = NULL;
      }
      return code;
C
Cary Xu 已提交
4516 4517 4518
    } else if (currLength == 0) {
      ASSERT(!pCurrent);
      goto _downstream;
wmmhello's avatar
wmmhello 已提交
4519
    }
wmmhello's avatar
wmmhello 已提交
4520

C
Cary Xu 已提交
4521 4522
    ++(*nOptrWithVal);

C
Cary Xu 已提交
4523
    ASSERT(currLength >= 0);
wmmhello's avatar
wmmhello 已提交
4524

4525
    if (*result == NULL) {
wmmhello's avatar
wmmhello 已提交
4526
      *result = (char*)taosMemoryCalloc(1, currLength + sizeof(int32_t));
wmmhello's avatar
wmmhello 已提交
4527 4528 4529 4530 4531 4532
      if (*result == NULL) {
        taosMemoryFree(pCurrent);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      memcpy(*result + sizeof(int32_t), pCurrent, currLength);
      *(int32_t*)(*result) = currLength + sizeof(int32_t);
4533
    } else {
wmmhello's avatar
wmmhello 已提交
4534
      int32_t sizePre = *(int32_t*)(*result);
4535
      char*   tmp = (char*)taosMemoryRealloc(*result, sizePre + currLength);
wmmhello's avatar
wmmhello 已提交
4536 4537 4538 4539 4540 4541 4542 4543 4544 4545 4546 4547
      if (tmp == NULL) {
        taosMemoryFree(pCurrent);
        taosMemoryFree(*result);
        *result = NULL;
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      *result = tmp;
      memcpy(*result + sizePre, pCurrent, currLength);
      *(int32_t*)(*result) += currLength;
    }
    taosMemoryFree(pCurrent);
    *length = *(int32_t*)(*result);
wmmhello's avatar
wmmhello 已提交
4548 4549
  }

C
Cary Xu 已提交
4550
_downstream:
wmmhello's avatar
wmmhello 已提交
4551
  for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
C
Cary Xu 已提交
4552
    code = encodeOperator(ops->pDownstream[i], result, length, nOptrWithVal);
4553
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
4554
      return code;
wmmhello's avatar
wmmhello 已提交
4555 4556
    }
  }
wmmhello's avatar
wmmhello 已提交
4557
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
4558 4559
}

H
Haojun Liao 已提交
4560
int32_t decodeOperator(SOperatorInfo* ops, const char* result, int32_t length) {
wmmhello's avatar
wmmhello 已提交
4561
  int32_t code = TDB_CODE_SUCCESS;
4562 4563
  if (ops->fpSet.decodeResultRow) {
    if (result == NULL) {
wmmhello's avatar
wmmhello 已提交
4564 4565
      return TSDB_CODE_TSC_INVALID_INPUT;
    }
H
Haojun Liao 已提交
4566

4567
    ASSERT(length == *(int32_t*)result);
H
Haojun Liao 已提交
4568 4569

    const char* data = result + sizeof(int32_t);
4570
    code = ops->fpSet.decodeResultRow(ops, (char*)data);
4571
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
4572 4573
      return code;
    }
wmmhello's avatar
wmmhello 已提交
4574

wmmhello's avatar
wmmhello 已提交
4575
    int32_t totalLength = *(int32_t*)result;
4576 4577
    int32_t dataLength = *(int32_t*)data;

4578
    if (totalLength == dataLength + sizeof(int32_t)) {  // the last data
wmmhello's avatar
wmmhello 已提交
4579 4580
      result = NULL;
      length = 0;
4581
    } else {
wmmhello's avatar
wmmhello 已提交
4582 4583 4584 4585
      result += dataLength;
      *(int32_t*)(result) = totalLength - dataLength;
      length = totalLength - dataLength;
    }
wmmhello's avatar
wmmhello 已提交
4586 4587
  }

wmmhello's avatar
wmmhello 已提交
4588 4589
  for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
    code = decodeOperator(ops->pDownstream[i], result, length);
4590
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
4591
      return code;
wmmhello's avatar
wmmhello 已提交
4592 4593
    }
  }
wmmhello's avatar
wmmhello 已提交
4594
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
4595 4596
}

4597
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo) {
D
dapan1121 已提交
4598
  SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo;
4599

D
dapan1121 已提交
4600 4601
  switch (pNode->type) {
    case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
4602
      SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
D
dapan1121 已提交
4603 4604 4605 4606
      if (NULL == pDeleterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      int32_t tbNum = taosArrayGetSize(pTask->tableqinfoList.pTableList);
D
dapan1121 已提交
4607
      pDeleterParam->suid = pTask->tableqinfoList.suid;
D
dapan1121 已提交
4608 4609 4610 4611 4612 4613
      pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t));
      if (NULL == pDeleterParam->pUidList) {
        taosMemoryFree(pDeleterParam);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      for (int32_t i = 0; i < tbNum; ++i) {
4614
        STableKeyInfo* pTable = taosArrayGet(pTask->tableqinfoList.pTableList, i);
D
dapan1121 已提交
4615 4616 4617 4618 4619 4620 4621 4622 4623 4624 4625 4626 4627
        taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
      }

      *pParam = pDeleterParam;
      break;
    }
    default:
      break;
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
4628
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
4629
                               const char* sql, EOPTR_EXEC_MODEL model) {
H
Haojun Liao 已提交
4630 4631
  uint64_t queryId = pPlan->id.queryId;

H
Haojun Liao 已提交
4632
  int32_t code = TSDB_CODE_SUCCESS;
D
dapan1121 已提交
4633
  *pTaskInfo = createExecTaskInfo(queryId, taskId, model, pPlan->dbFName);
H
Haojun Liao 已提交
4634 4635 4636 4637
  if (*pTaskInfo == NULL) {
    code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    goto _complete;
  }
H
Haojun Liao 已提交
4638

4639
  (*pTaskInfo)->sql = sql;
wmmhello's avatar
wmmhello 已提交
4640 4641
  (*pTaskInfo)->tableqinfoList.pTagCond = pPlan->pTagCond;
  (*pTaskInfo)->tableqinfoList.pTagIndexCond = pPlan->pTagIndexCond;
L
Liu Jicong 已提交
4642
  (*pTaskInfo)->pRoot =
4643
      createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoList, pPlan->user);
L
Liu Jicong 已提交
4644

D
dapan1121 已提交
4645 4646 4647 4648
  
  /* XXXXXXXXXXXXXXXXXXXX */
  (*pTaskInfo)->pHandle = pHandle;
  /* XXXXXXXXXXXXXXXXXXXX */
D
dapan1121 已提交
4649
  if (NULL == (*pTaskInfo)->pRoot) {
4650
    code = (*pTaskInfo)->code;
D
dapan1121 已提交
4651 4652
    goto _complete;
  }
H
Haojun Liao 已提交
4653

H
Haojun Liao 已提交
4654 4655
  return code;

H
Haojun Liao 已提交
4656
_complete:
wafwerar's avatar
wafwerar 已提交
4657
  taosMemoryFreeClear(*pTaskInfo);
H
Haojun Liao 已提交
4658 4659
  terrno = code;
  return code;
H
Haojun Liao 已提交
4660 4661
}

wmmhello's avatar
wmmhello 已提交
4662 4663 4664
static void doDestroyTableList(STableListInfo* pTableqinfoList) {
  taosArrayDestroy(pTableqinfoList->pTableList);
  taosHashCleanup(pTableqinfoList->map);
wmmhello's avatar
wmmhello 已提交
4665 4666 4667 4668 4669 4670 4671
  if(pTableqinfoList->needSortTableByGroupId){
    for(int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++){
      SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i);
      taosArrayDestroy(tmp);
    }
  }
  taosArrayDestroy(pTableqinfoList->pGroupList);
4672

wmmhello's avatar
wmmhello 已提交
4673 4674
  pTableqinfoList->pTableList = NULL;
  pTableqinfoList->map = NULL;
4675 4676
}

L
Liu Jicong 已提交
4677
void doDestroyTask(SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
4678 4679
  qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));

wmmhello's avatar
wmmhello 已提交
4680
  doDestroyTableList(&pTaskInfo->tableqinfoList);
H
Haojun Liao 已提交
4681
  destroyOperatorInfo(pTaskInfo->pRoot);
L
Liu Jicong 已提交
4682 4683
  //  taosArrayDestroy(pTaskInfo->summary.queryProfEvents);
  //  taosHashCleanup(pTaskInfo->summary.operatorProfResults);
4684

4685 4686
  taosMemoryFree(pTaskInfo->schemaVer.dbname);
  taosMemoryFree(pTaskInfo->schemaVer.tablename);
wafwerar's avatar
wafwerar 已提交
4687 4688 4689
  taosMemoryFreeClear(pTaskInfo->sql);
  taosMemoryFreeClear(pTaskInfo->id.str);
  taosMemoryFreeClear(pTaskInfo);
4690 4691 4692 4693 4694 4695 4696 4697 4698 4699 4700 4701
}

static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes) {
  if (val == NULL) {
    setNull(output, type, bytes);
    return;
  }

  if (IS_VAR_DATA_TYPE(type)) {
    // Binary data overflows for sort of unknown reasons. Let trim the overflow data
    if (varDataTLen(val) > bytes) {
      int32_t maxLen = bytes - VARSTR_HEADER_SIZE;
L
Liu Jicong 已提交
4702
      int32_t len = (varDataLen(val) > maxLen) ? maxLen : varDataLen(val);
4703 4704 4705 4706 4707 4708 4709 4710 4711 4712 4713 4714
      memcpy(varDataVal(output), varDataVal(val), len);
      varDataSetLen(output, len);
    } else {
      varDataCopy(output, val);
    }
  } else {
    memcpy(output, val, bytes);
  }
}

static int64_t getQuerySupportBufSize(size_t numOfTables) {
  size_t s1 = sizeof(STableQueryInfo);
L
Liu Jicong 已提交
4715 4716
  //  size_t s3 = sizeof(STableCheckInfo);  buffer consumption in tsdb
  return (int64_t)(s1 * 1.5 * numOfTables);
4717 4718 4719 4720 4721 4722 4723
}

int32_t checkForQueryBuf(size_t numOfTables) {
  int64_t t = getQuerySupportBufSize(numOfTables);
  if (tsQueryBufferSizeBytes < 0) {
    return TSDB_CODE_SUCCESS;
  } else if (tsQueryBufferSizeBytes > 0) {
L
Liu Jicong 已提交
4724
    while (1) {
4725 4726 4727 4728 4729 4730 4731 4732 4733 4734 4735 4736 4737 4738 4739 4740 4741 4742 4743 4744 4745 4746 4747 4748 4749 4750
      int64_t s = tsQueryBufferSizeBytes;
      int64_t remain = s - t;
      if (remain >= 0) {
        if (atomic_val_compare_exchange_64(&tsQueryBufferSizeBytes, s, remain) == s) {
          return TSDB_CODE_SUCCESS;
        }
      } else {
        return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
      }
    }
  }

  // disable query processing if the value of tsQueryBufferSize is zero.
  return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
}

void releaseQueryBuf(size_t numOfTables) {
  if (tsQueryBufferSizeBytes < 0) {
    return;
  }

  int64_t t = getQuerySupportBufSize(numOfTables);

  // restore value is not enough buffer available
  atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
}
D
dapan1121 已提交
4751

dengyihao's avatar
dengyihao 已提交
4752 4753
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity,
                                   int32_t* resNum) {
D
dapan1121 已提交
4754 4755
  if (*resNum >= *capacity) {
    *capacity += 10;
dengyihao's avatar
dengyihao 已提交
4756

D
dapan1121 已提交
4757 4758
    *pRes = taosMemoryRealloc(*pRes, (*capacity) * sizeof(SExplainExecInfo));
    if (NULL == *pRes) {
D
dapan1121 已提交
4759
      qError("malloc %d failed", (*capacity) * (int32_t)sizeof(SExplainExecInfo));
D
dapan1121 已提交
4760 4761 4762 4763
      return TSDB_CODE_QRY_OUT_OF_MEMORY;
    }
  }

4764 4765 4766 4767 4768
  SExplainExecInfo* pInfo = &(*pRes)[*resNum];

  pInfo->numOfRows = operatorInfo->resultInfo.totalRows;
  pInfo->startupCost = operatorInfo->cost.openCost;
  pInfo->totalCost = operatorInfo->cost.totalCost;
D
dapan1121 已提交
4769

4770
  if (operatorInfo->fpSet.getExplainFn) {
4771
    int32_t code = operatorInfo->fpSet.getExplainFn(operatorInfo, &pInfo->verboseInfo, &pInfo->verboseLen);
D
dapan1121 已提交
4772
    if (code) {
4773
      qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
D
dapan1121 已提交
4774 4775
      return code;
    }
4776 4777 4778
  } else {
    pInfo->verboseLen = 0;
    pInfo->verboseInfo = NULL;
D
dapan1121 已提交
4779
  }
dengyihao's avatar
dengyihao 已提交
4780

D
dapan1121 已提交
4781
  ++(*resNum);
dengyihao's avatar
dengyihao 已提交
4782

D
dapan1121 已提交
4783
  int32_t code = 0;
D
dapan1121 已提交
4784 4785
  for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
    code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pRes, capacity, resNum);
D
dapan1121 已提交
4786 4787 4788 4789 4790 4791 4792
    if (code) {
      taosMemoryFreeClear(*pRes);
      return TSDB_CODE_QRY_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
4793
}
5
54liuyao 已提交
4794

L
Liu Jicong 已提交
4795
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput,
4796
                               int32_t size) {
4797
  pSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
5
54liuyao 已提交
4798 4799
  pSup->keySize = sizeof(int64_t) + sizeof(TSKEY);
  pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize);
4800 4801
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  pSup->pResultRows = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
5
54liuyao 已提交
4802 4803 4804
  if (pSup->pKeyBuf == NULL || pSup->pResultRows == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
4805
  pSup->valueSize = size;
5
54liuyao 已提交
4806

5
54liuyao 已提交
4807 4808
  pSup->pScanWindow = taosArrayInit(4, sizeof(STimeWindow));

5
54liuyao 已提交
4809 4810 4811 4812 4813 4814 4815 4816 4817
  int32_t pageSize = 4096;
  while (pageSize < pSup->resultRowSize * 4) {
    pageSize <<= 1u;
  }
  // at least four pages need to be in buffer
  int32_t bufSize = 4096 * 256;
  if (bufSize <= pageSize) {
    bufSize = pageSize * 4;
  }
4818
  int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, TD_TMP_DIR_PATH);
L
Liu Jicong 已提交
4819
  for (int32_t i = 0; i < numOfOutput; ++i) {
4820 4821 4822
    pCtx[i].pBuf = pSup->pResultBuf;
  }
  return code;
5
54liuyao 已提交
4823
}