executorimpl.c 171.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

H
Haojun Liao 已提交
16 17
#include "filter.h"
#include "function.h"
18 19
#include "functionMgt.h"
#include "os.h"
H
Haojun Liao 已提交
20
#include "querynodes.h"
21
#include "tfill.h"
dengyihao's avatar
dengyihao 已提交
22
#include "tname.h"
X
Xiaoyu Wang 已提交
23
#include "tref.h"
24

H
Haojun Liao 已提交
25
#include "tdatablock.h"
26
#include "tglobal.h"
H
Haojun Liao 已提交
27
#include "tmsg.h"
H
Haojun Liao 已提交
28
#include "tsort.h"
29
#include "ttime.h"
H
Haojun Liao 已提交
30

31
#include "executorimpl.h"
dengyihao's avatar
dengyihao 已提交
32
#include "index.h"
33
#include "query.h"
34 35
#include "tcompare.h"
#include "tcompression.h"
H
Haojun Liao 已提交
36
#include "thash.h"
37
#include "ttypes.h"
dengyihao's avatar
dengyihao 已提交
38
#include "vnode.h"
39

H
Haojun Liao 已提交
40
#define IS_MAIN_SCAN(runtime)          ((runtime)->scanFlag == MAIN_SCAN)
41 42 43 44 45 46
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)

#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)

#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
wafwerar's avatar
wafwerar 已提交
47
  uint32_t v = taosRand();
48 49 50 51

  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
52
    return taosMemoryMalloc(__size);
53 54 55 56
  }
}

static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
wafwerar's avatar
wafwerar 已提交
57
  uint32_t v = taosRand();
58 59 60
  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
61
    return taosMemoryCalloc(num, __size);
62 63 64 65
  }
}

static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
wafwerar's avatar
wafwerar 已提交
66
  uint32_t v = taosRand();
67 68 69
  if (v % 5 <= 1) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
70
    return taosMemoryRealloc(p, __size);
71 72 73 74 75 76 77 78
  }
}

#define calloc  u_calloc
#define malloc  u_malloc
#define realloc u_realloc
#endif

H
Haojun Liao 已提交
79 80 81 82 83 84
#define T_LONG_JMP(_obj, _c) \
  do {                       \
    assert((_c) != -1);      \
    longjmp((_obj), (_c));   \
  } while (0);

X
Xiaoyu Wang 已提交
85
#define CLEAR_QUERY_STATUS(q, st)   ((q)->status &= (~(st)))
86 87
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)

L
Liu Jicong 已提交
88 89 90
int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; }

static int32_t getExprFunctionId(SExprInfo* pExprInfo) {
91
  assert(pExprInfo != NULL && pExprInfo->pExpr != NULL && pExprInfo->pExpr->nodeType == TEXPR_UNARYEXPR_NODE);
92
  return 0;
93 94 95 96
}

static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes);

97
static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pSDataBlock);
98

X
Xiaoyu Wang 已提交
99
static void releaseQueryBuf(size_t numOfTables);
100

101
static void destroyFillOperatorInfo(void* param, int32_t numOfOutput);
102 103 104
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput);
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
X
Xiaoyu Wang 已提交
105

H
Haojun Liao 已提交
106
static void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput);
H
Haojun Liao 已提交
107 108
static void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput);

109 110
static void destroyOperatorInfo(SOperatorInfo* pOperator);

111
void doSetOperatorCompleted(SOperatorInfo* pOperator) {
112
  pOperator->status = OP_EXEC_DONE;
113

114
  pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000) / 1000.0;
H
Haojun Liao 已提交
115
  if (pOperator->pTaskInfo != NULL) {
116
    setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
117 118
  }
}
119

H
Haojun Liao 已提交
120
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) {
121
  OPTR_SET_OPENED(pOperator);
122
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
123
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
124 125
}

126
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
L
Liu Jicong 已提交
127
                                   __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode,
128
                                   __optr_decode_fn_t decode, __optr_explain_fn_t explain) {
129 130 131 132 133 134 135 136 137 138 139 140 141 142
  SOperatorFpSet fpSet = {
      ._openFn = openFn,
      .getNextFn = nextFn,
      .getStreamResFn = streamFn,
      .cleanupFn = cleanup,
      .closeFn = closeFn,
      .encodeResultRow = encode,
      .decodeResultRow = decode,
      .getExplainFn = explain,
  };

  return fpSet;
}

H
Haojun Liao 已提交
143
void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
H
Haojun Liao 已提交
144

145 146
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
                                  SGroupResInfo* pGroupResInfo);
H
Haojun Liao 已提交
147

148
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
149
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
150

L
Liu Jicong 已提交
151 152
// setup the output buffer for each operator
static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) {
dengyihao's avatar
dengyihao 已提交
153 154
  if (TSDB_COL_IS_TAG(pColumn->flag) || TSDB_COL_IS_UD_COL(pColumn->flag) ||
      pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
155 156 157 158 159 160 161 162 163 164
    return false;
  }

  if (pStatis != NULL && pStatis->numOfNull == 0) {
    return false;
  }

  return true;
}

165
#if 0
L
Liu Jicong 已提交
166 167
static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, char* pData,
                                int16_t bytes, bool masterscan, uint64_t uid) {
168 169 170
  bool existed = false;
  SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid);

L
Liu Jicong 已提交
171 172
  SResultRow** p1 =
      (SResultRow**)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
173 174 175 176 177 178 179 180 181 182 183

  // in case of repeat scan/reverse scan, no new time window added.
  if (QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQueryAttr)) {
    if (!masterscan) {  // the *p1 may be NULL in case of sliding+offset exists.
      return p1 != NULL;
    }

    if (p1 != NULL) {
      if (pResultRowInfo->size == 0) {
        existed = false;
      } else if (pResultRowInfo->size == 1) {
dengyihao's avatar
dengyihao 已提交
184
        //        existed = (pResultRowInfo->pResult[0] == (*p1));
185 186
      } else {  // check if current pResultRowInfo contains the existed pResultRow
        SET_RES_EXT_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid, pResultRowInfo);
L
Liu Jicong 已提交
187 188
        int64_t* index =
            taosHashGet(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes));
189 190 191 192 193 194 195 196 197 198 199 200 201
        if (index != NULL) {
          existed = true;
        } else {
          existed = false;
        }
      }
    }

    return existed;
  }

  return p1 != NULL;
}
202
#endif
203

204
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize) {
L
Liu Jicong 已提交
205
  SFilePage* pData = NULL;
206 207 208 209 210 211 212 213 214 215 216 217 218

  // in the first scan, new space needed for results
  int32_t pageId = -1;
  SIDList list = getDataBufPagesIdList(pResultBuf, tableGroupId);

  if (taosArrayGetSize(list) == 0) {
    pData = getNewBufPage(pResultBuf, tableGroupId, &pageId);
    pData->num = sizeof(SFilePage);
  } else {
    SPageInfo* pi = getLastPageInfo(list);
    pData = getBufPage(pResultBuf, getPageId(pi));
    pageId = getPageId(pi);

wmmhello's avatar
wmmhello 已提交
219
    if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
220 221 222 223 224 225 226 227 228 229 230 231 232 233
      // release current page first, and prepare the next one
      releaseBufPageInfo(pResultBuf, pi);

      pData = getNewBufPage(pResultBuf, tableGroupId, &pageId);
      if (pData != NULL) {
        pData->num = sizeof(SFilePage);
      }
    }
  }

  if (pData == NULL) {
    return NULL;
  }

234 235
  setBufPageDirty(pData, true);

236 237 238 239 240
  // set the number of rows in current disk page
  SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
  pResultRow->pageId = pageId;
  pResultRow->offset = (int32_t)pData->num;

wmmhello's avatar
wmmhello 已提交
241
  pData->num += interBufSize;
242 243 244 245

  return pResultRow;
}

246 247 248 249 250 251 252
/**
 * the struct of key in hash table
 * +----------+---------------+
 * | group id |   key data    |
 * | 8 bytes  | actual length |
 * +----------+---------------+
 */
253 254 255
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
                                   int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
                                   bool isIntervalQuery, SAggSupporter* pSup) {
256
  SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
H
Haojun Liao 已提交
257

dengyihao's avatar
dengyihao 已提交
258 259
  SResultRowPosition* p1 =
      (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
H
Haojun Liao 已提交
260

261 262
  SResultRow* pResult = NULL;

H
Haojun Liao 已提交
263 264
  // in case of repeat scan/reverse scan, no new time window added.
  if (isIntervalQuery) {
265
    if (masterscan && p1 != NULL) {  // the *p1 may be NULL in case of sliding+offset exists.
266
      pResult = getResultRowByPos(pResultBuf, p1, true);
267
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
268 269
    }
  } else {
dengyihao's avatar
dengyihao 已提交
270 271
    // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
    // pResultRowInfo object.
H
Haojun Liao 已提交
272
    if (p1 != NULL) {
273
      // todo
274
      pResult = getResultRowByPos(pResultBuf, p1, true);
275
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
276 277 278
    }
  }

L
Liu Jicong 已提交
279
  // 1. close current opened time window
280
  if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
281
#ifdef BUF_PAGE_DEBUG
wmmhello's avatar
wmmhello 已提交
282
    qDebug("page_1");
283
#endif
284
    SResultRowPosition pos = pResultRowInfo->cur;
X
Xiaoyu Wang 已提交
285
    SFilePage*         pPage = getBufPage(pResultBuf, pos.pageId);
286 287 288 289 290
    releaseBufPage(pResultBuf, pPage);
  }

  // allocate a new buffer page
  if (pResult == NULL) {
291
#ifdef BUF_PAGE_DEBUG
wmmhello's avatar
wmmhello 已提交
292
    qDebug("page_2");
293
#endif
H
Haojun Liao 已提交
294
    ASSERT(pSup->resultRowSize > 0);
295 296
    pResult = getNewResultRow(pResultBuf, groupId, pSup->resultRowSize);

297
    initResultRow(pResult);
H
Haojun Liao 已提交
298

299 300
    // add a new result set for a new group
    SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
X
Xiaoyu Wang 已提交
301 302
    taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
                sizeof(SResultRowPosition));
H
Haojun Liao 已提交
303 304
  }

305 306 307
  // 2. set the new time window to be the new active time window
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};

H
Haojun Liao 已提交
308
  // too many time window in query
309 310
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
      taosHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
H
Haojun Liao 已提交
311 312 313
    longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
  }

H
Haojun Liao 已提交
314
  return pResult;
H
Haojun Liao 已提交
315 316
}

317
// a new buffer page for each table. Needs to opt this design
L
Liu Jicong 已提交
318
static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, int32_t tid, uint32_t size) {
319 320 321 322
  if (pWindowRes->pageId != -1) {
    return 0;
  }

L
Liu Jicong 已提交
323
  SFilePage* pData = NULL;
324 325 326 327 328 329

  // in the first scan, new space needed for results
  int32_t pageId = -1;
  SIDList list = getDataBufPagesIdList(pResultBuf, tid);

  if (taosArrayGetSize(list) == 0) {
H
Haojun Liao 已提交
330
    pData = getNewBufPage(pResultBuf, tid, &pageId);
331
    pData->num = sizeof(SFilePage);
332 333
  } else {
    SPageInfo* pi = getLastPageInfo(list);
334
    pData = getBufPage(pResultBuf, getPageId(pi));
335
    pageId = getPageId(pi);
336

337
    if (pData->num + size > getBufPageSize(pResultBuf)) {
338
      // release current page first, and prepare the next one
339
      releaseBufPageInfo(pResultBuf, pi);
340

H
Haojun Liao 已提交
341
      pData = getNewBufPage(pResultBuf, tid, &pageId);
342
      if (pData != NULL) {
343
        pData->num = sizeof(SFilePage);
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
      }
    }
  }

  if (pData == NULL) {
    return -1;
  }

  // set the number of rows in current disk page
  if (pWindowRes->pageId == -1) {  // not allocated yet, allocate new buffer
    pWindowRes->pageId = pageId;
    pWindowRes->offset = (int32_t)pData->num;

    pData->num += size;
    assert(pWindowRes->pageId >= 0);
  }

  return 0;
}

364
//  query_range_start, query_range_end, window_duration, window_start, window_end
365
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
366 367 368
  pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
  pColData->info.bytes = sizeof(int64_t);

369
  colInfoDataEnsureCapacity(pColData, 5);
370 371 372 373 374 375 376 377 378
  colDataAppendInt64(pColData, 0, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 1, &pQueryWindow->ekey);

  int64_t interval = 0;
  colDataAppendInt64(pColData, 2, &interval);  // this value may be variable in case of 'n' and 'y'.
  colDataAppendInt64(pColData, 3, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
}

L
Liu Jicong 已提交
379
void cleanupExecTimeWindowInfo(SColumnInfoData* pColData) { colDataDestroy(pColData); }
H
Haojun Liao 已提交
380

381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400
typedef struct {
  bool    hasAgg;
  int32_t numOfRows;
  int32_t startOffset;
} SFunctionCtxStatus;

static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
  pStatus->hasAgg = pCtx->input.colDataAggIsSet;
  pStatus->numOfRows = pCtx->input.numOfRows;
  pStatus->startOffset = pCtx->input.startRowIndex;
}

static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
  pCtx->input.colDataAggIsSet = pStatus->hasAgg;
  pCtx->input.numOfRows  = pStatus->numOfRows;
  pCtx->input.startRowIndex = pStatus->startOffset;
}

void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset,
                      int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) {
401
  for (int32_t k = 0; k < numOfOutput; ++k) {
H
Haojun Liao 已提交
402
    // keep it temporarily
403 404
    SFunctionCtxStatus status = {0};
    functionCtxSave(&pCtx[k], &status);
405

406
    pCtx[k].input.startRowIndex = offset;
407
    pCtx[k].input.numOfRows = forwardStep;
408 409 410

    // not a whole block involved in query processing, statistics data can not be used
    // NOTE: the original value of isSet have been changed here
411 412
    if (pCtx[k].input.colDataAggIsSet && forwardStep < numOfTotal) {
      pCtx[k].input.colDataAggIsSet = false;
413 414
    }

415 416
    if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) {
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
417 418

      char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
419

420
      SColumnInfoData idata = {0};
dengyihao's avatar
dengyihao 已提交
421
      idata.info.type = TSDB_DATA_TYPE_BIGINT;
422
      idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
dengyihao's avatar
dengyihao 已提交
423
      idata.pData = p;
424 425 426 427

      SScalarParam out = {.columnData = &idata};
      SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
      pCtx[k].sfp.process(&tw, 1, &out);
428
      pEntryInfo->numOfRes = 1;
429 430 431 432 433 434 435 436 437 438
    } else {
      int32_t code = TSDB_CODE_SUCCESS;
      if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
        code = pCtx[k].fpSet.process(&pCtx[k]);

        if (code != TSDB_CODE_SUCCESS) {
          qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
          taskInfo->code = code;
          longjmp(taskInfo->env, code);
        }
439
      }
440

441
      // restore it
442
      functionCtxRestore(&pCtx[k], &status);
443
    }
444 445 446
  }
}

dengyihao's avatar
dengyihao 已提交
447
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
448
                                   int32_t scanFlag, bool createDummyCol);
449

dengyihao's avatar
dengyihao 已提交
450 451
static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock,
                                    int32_t order) {
452
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
453
    pCtx[i].order = order;
454
    pCtx[i].input.numOfRows = pBlock->info.rows;
455
    setBlockSMAInfo(&pCtx[i], &pOperator->exprSupp.pExprInfo[i], pBlock);
456
    pCtx[i].pSrcBlock = pBlock;
457 458 459
  }
}

X
Xiaoyu Wang 已提交
460 461
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
                       int32_t scanFlag, bool createDummyCol) {
462
  if (pBlock->pBlockAgg != NULL) {
H
Haojun Liao 已提交
463
    doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
464
  } else {
465
    doSetInputDataBlock(pOperator, pCtx, pBlock, order, scanFlag, createDummyCol);
H
Haojun Liao 已提交
466
  }
467 468
}

L
Liu Jicong 已提交
469 470
static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t paramIndex,
                                             int32_t numOfRows) {
471 472 473 474 475 476 477 478
  SColumnInfoData* pColInfo = NULL;
  if (pInput->pData[paramIndex] == NULL) {
    pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pColInfo == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    // Set the correct column info (data type and bytes)
479 480
    pColInfo->info.type = pFuncParam->param.nType;
    pColInfo->info.bytes = pFuncParam->param.nLen;
481 482

    pInput->pData[paramIndex] = pColInfo;
483 484
  } else {
    pColInfo = pInput->pData[paramIndex];
485 486
  }

487
  colInfoDataEnsureCapacity(pColInfo, numOfRows);
488

489
  int8_t type = pFuncParam->param.nType;
490 491
  if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
    int64_t v = pFuncParam->param.i;
dengyihao's avatar
dengyihao 已提交
492
    for (int32_t i = 0; i < numOfRows; ++i) {
493 494 495 496
      colDataAppendInt64(pColInfo, i, &v);
    }
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
dengyihao's avatar
dengyihao 已提交
497
    for (int32_t i = 0; i < numOfRows; ++i) {
498 499
      colDataAppendDouble(pColInfo, i, &v);
    }
500
  } else if (type == TSDB_DATA_TYPE_VARCHAR) {
L
Liu Jicong 已提交
501
    char* tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE);
502
    STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen);
L
Liu Jicong 已提交
503
    for (int32_t i = 0; i < numOfRows; ++i) {
504 505
      colDataAppend(pColInfo, i, tmp, false);
    }
506 507 508 509 510
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
511
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
X
Xiaoyu Wang 已提交
512
                                   int32_t scanFlag, bool createDummyCol) {
513 514
  int32_t code = TSDB_CODE_SUCCESS;

515
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
L
Liu Jicong 已提交
516
    pCtx[i].order = order;
517 518
    pCtx[i].input.numOfRows = pBlock->info.rows;

L
Liu Jicong 已提交
519
    pCtx[i].pSrcBlock = pBlock;
X
Xiaoyu Wang 已提交
520
    pCtx[i].scanFlag = scanFlag;
H
Haojun Liao 已提交
521

522
    SInputColumnInfoData* pInput = &pCtx[i].input;
523
    pInput->uid = pBlock->info.uid;
C
Cary Xu 已提交
524
    pInput->colDataAggIsSet = false;
525

526
    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
527
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
dengyihao's avatar
dengyihao 已提交
528
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
G
Ganlin Zhao 已提交
529 530
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
dengyihao's avatar
dengyihao 已提交
531
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
532 533 534
        pInput->totalRows = pBlock->info.rows;
        pInput->numOfRows = pBlock->info.rows;
        pInput->startRowIndex = 0;
535

536
        // NOTE: the last parameter is the primary timestamp column
H
Haojun Liao 已提交
537
        // todo: refactor this
538
        if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
L
Liu Jicong 已提交
539 540
          pInput->pPTS = pInput->pData[j];  // in case of merge function, this is not always the ts column data.
                                            //          ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP);
541
        }
542 543
        ASSERT(pInput->pData[j] != NULL);
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
544 545 546
        // todo avoid case: top(k, 12), 12 is the value parameter.
        // sum(11), 11 is also the value parameter.
        if (createDummyCol && pOneExpr->base.numOfParams == 1) {
547 548 549 550
          pInput->totalRows = pBlock->info.rows;
          pInput->numOfRows = pBlock->info.rows;
          pInput->startRowIndex = 0;

551
          code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows);
552 553 554
          if (code != TSDB_CODE_SUCCESS) {
            return code;
          }
555
        }
G
Ganlin Zhao 已提交
556 557
      }
    }
H
Haojun Liao 已提交
558
  }
559 560

  return code;
H
Haojun Liao 已提交
561 562
}

563
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
564
  for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
H
Haojun Liao 已提交
565
    if (functionNeedToExecute(&pCtx[k])) {
566
      // todo add a dummy funtion to avoid process check
567 568 569
      if (pCtx[k].fpSet.process == NULL) {
        continue;
      }
H
Haojun Liao 已提交
570

571 572 573 574
      int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
      if (code != TSDB_CODE_SUCCESS) {
        qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
        return code;
575
      }
576 577
    }
  }
578 579

  return TSDB_CODE_SUCCESS;
580 581
}

H
Haojun Liao 已提交
582
static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) {
dengyihao's avatar
dengyihao 已提交
583
  size_t num = (pPseudoList != NULL) ? taosArrayGetSize(pPseudoList) : 0;
H
Haojun Liao 已提交
584 585 586 587 588
  for (int32_t i = 0; i < num; ++i) {
    pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i);
  }
}

589
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
X
Xiaoyu Wang 已提交
590
                              int32_t numOfOutput, SArray* pPseudoList) {
H
Haojun Liao 已提交
591
  setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611

  if (pSrcBlock == NULL) {
    for (int32_t k = 0; k < numOfOutput; ++k) {
      int32_t outputSlotId = pExpr[k].base.resSchema.slotId;

      ASSERT(pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE);
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);

      int32_t type = pExpr[k].base.pParam[0].param.nType;
      if (TSDB_DATA_TYPE_NULL == type) {
        colDataAppendNNULL(pColInfoData, 0, 1);
      } else {
        colDataAppend(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false);
      }
    }

    pResult->info.rows = 1;
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
612
  pResult->info.groupId = pSrcBlock->info.groupId;
H
Haojun Liao 已提交
613

614 615
  // if the source equals to the destination, it is to create a new column as the result of scalar
  // function or some operators.
616 617
  bool createNewColModel = (pResult == pSrcBlock);

618 619
  int32_t numOfRows = 0;

620
  for (int32_t k = 0; k < numOfOutput; ++k) {
621 622
    int32_t               outputSlotId = pExpr[k].base.resSchema.slotId;
    SqlFunctionCtx*       pfCtx = &pCtx[k];
623
    SInputColumnInfoData* pInputData = &pfCtx->input;
624

L
Liu Jicong 已提交
625
    if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) {  // it is a project query
626
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
627
      if (pResult->info.rows > 0 && !createNewColModel) {
628 629
        colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pInputData->pData[0],
                        pInputData->numOfRows);
630
      } else {
631
        colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info);
632
      }
633

634
      numOfRows = pInputData->numOfRows;
635
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
636
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
637

dengyihao's avatar
dengyihao 已提交
638
      int32_t offset = createNewColModel ? 0 : pResult->info.rows;
639 640 641 642 643 644 645 646

      int32_t type = pExpr[k].base.pParam[0].param.nType;
      if (TSDB_DATA_TYPE_NULL == type) {
        colDataAppendNNULL(pColInfoData, offset, pSrcBlock->info.rows);
      } else {
        for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
          colDataAppend(pColInfoData, i + offset, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false);
        }
647
      }
648 649

      numOfRows = pSrcBlock->info.rows;
H
Haojun Liao 已提交
650
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) {
651 652 653
      SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
      taosArrayPush(pBlockList, &pSrcBlock);

654
      SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
655
      SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
656

657
      SScalarParam dest = {.columnData = &idata};
X
Xiaoyu Wang 已提交
658
      int32_t      code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
659 660 661 662
      if (code != TSDB_CODE_SUCCESS) {
        taosArrayDestroy(pBlockList);
        return code;
      }
663

dengyihao's avatar
dengyihao 已提交
664
      int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
665
      ASSERT(pResult->info.capacity > 0);
666
      colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
D
dapan1121 已提交
667
      colDataDestroy(&idata);
L
Liu Jicong 已提交
668

669
      numOfRows = dest.numOfRows;
670 671
      taosArrayDestroy(pBlockList);
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
672 673
      // _rowts/_c0, not tbname column
      if (fmIsPseudoColumnFunc(pfCtx->functionId) && (!fmIsScanPseudoColumnFunc(pfCtx->functionId))) {
H
Haojun Liao 已提交
674
        // do nothing
675
      } else if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) {
676 677
        SResultRowEntryInfo* pResInfo = GET_RES_INFO(pfCtx);
        pfCtx->fpSet.init(pfCtx, pResInfo);
678 679 680 681 682 683 684 685 686 687

        pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
        pfCtx->offset = createNewColModel ? 0 : pResult->info.rows;  // set the start offset

        // set the timestamp(_rowts) output buffer
        if (taosArrayGetSize(pPseudoList) > 0) {
          int32_t* outputColIndex = taosArrayGet(pPseudoList, 0);
          pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
        }

688 689 690 691 692
        // link pDstBlock to set selectivity value
        if (pfCtx->subsidiaries.num > 0) {
          pfCtx->pDstBlock = pResult;
        }

693
        numOfRows = pfCtx->fpSet.process(pfCtx);
H
Haojun Liao 已提交
694
      } else if (fmIsAggFunc(pfCtx->functionId)) {
G
Ganlin Zhao 已提交
695
        // selective value output should be set during corresponding function execution
696 697 698
        if (fmIsSelectValueFunc(pfCtx->functionId)) {
          continue;
        }
699 700
        // _group_key function for "partition by tbname" + csum(col_name) query
        SColumnInfoData* pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
701
        int32_t          slotId = pfCtx->param[0].pCol->slotId;
702 703 704

        // todo handle the json tag
        SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
705
        for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
706 707 708 709 710 711 712 713 714
          bool isNull = colDataIsNull_s(pInput, f);
          if (isNull) {
            colDataAppendNULL(pOutput, pResult->info.rows + f);
          } else {
            char* data = colDataGetData(pInput, f);
            colDataAppend(pOutput, pResult->info.rows + f, data, isNull);
          }
        }

H
Haojun Liao 已提交
715 716 717
      } else {
        SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
        taosArrayPush(pBlockList, &pSrcBlock);
G
Ganlin Zhao 已提交
718

719
        SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
720
        SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
H
Haojun Liao 已提交
721

722
        SScalarParam dest = {.columnData = &idata};
X
Xiaoyu Wang 已提交
723
        int32_t      code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
724 725 726 727
        if (code != TSDB_CODE_SUCCESS) {
          taosArrayDestroy(pBlockList);
          return code;
        }
728

dengyihao's avatar
dengyihao 已提交
729
        int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
730
        ASSERT(pResult->info.capacity > 0);
731
        colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
D
dapan1121 已提交
732
        colDataDestroy(&idata);
733 734

        numOfRows = dest.numOfRows;
H
Haojun Liao 已提交
735 736
        taosArrayDestroy(pBlockList);
      }
737
    } else {
738
      return TSDB_CODE_OPS_NOT_SUPPORT;
739 740
    }
  }
741

742 743 744
  if (!createNewColModel) {
    pResult->info.rows += numOfRows;
  }
745 746

  return TSDB_CODE_SUCCESS;
747 748
}

5
54liuyao 已提交
749
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
750
  struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
751

752 753 754 755 756
  // in case of timestamp column, always generated results.
  int32_t functionId = pCtx->functionId;
  if (functionId == -1) {
    return false;
  }
757

758 759
  if (pCtx->scanFlag == REPEAT_SCAN) {
    return fmIsRepeatScanFunc(pCtx->functionId);
760 761
  }

762 763
  if (isRowEntryCompleted(pResInfo)) {
    return false;
764 765
  }

766 767 768
  return true;
}

769 770 771 772 773 774 775
static int32_t doCreateConstantValColumnAggInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type,
                                                int32_t paramIndex, int32_t numOfRows) {
  if (pInput->pData[paramIndex] == NULL) {
    pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pInput->pData[paramIndex] == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
776

777 778 779
    // Set the correct column info (data type and bytes)
    pInput->pData[paramIndex]->info.type = type;
    pInput->pData[paramIndex]->info.bytes = tDataTypes[type].bytes;
780
  }
H
Haojun Liao 已提交
781

782 783 784 785 786 787
  SColumnDataAgg* da = NULL;
  if (pInput->pColumnDataAgg[paramIndex] == NULL) {
    da = taosMemoryCalloc(1, sizeof(SColumnDataAgg));
    pInput->pColumnDataAgg[paramIndex] = da;
    if (da == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
788 789
    }
  } else {
790
    da = pInput->pColumnDataAgg[paramIndex];
791 792
  }

793
  ASSERT(!IS_VAR_DATA_TYPE(type));
794

795 796
  if (type == TSDB_DATA_TYPE_BIGINT) {
    int64_t v = pFuncParam->param.i;
797
    *da = (SColumnDataAgg){.numOfNull = 0, .min = v, .max = v, .sum = v * numOfRows};
798 799
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
800
    *da = (SColumnDataAgg){.numOfNull = 0};
801

802 803 804 805 806 807
    *(double*)&da->min = v;
    *(double*)&da->max = v;
    *(double*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_BOOL) {  // todo validate this data type
    bool v = pFuncParam->param.i;

808
    *da = (SColumnDataAgg){.numOfNull = 0};
809 810 811 812 813
    *(bool*)&da->min = 0;
    *(bool*)&da->max = v;
    *(bool*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_TIMESTAMP) {
    // do nothing
814
  } else {
815
    ASSERT(0);
816 817
  }

818 819
  return TSDB_CODE_SUCCESS;
}
820

821
void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock) {
822 823 824 825 826 827 828 829 830
  int32_t numOfRows = pBlock->info.rows;

  SInputColumnInfoData* pInput = &pCtx->input;
  pInput->numOfRows = numOfRows;
  pInput->totalRows = numOfRows;

  if (pBlock->pBlockAgg != NULL) {
    pInput->colDataAggIsSet = true;

831 832
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
833

834 835
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
836 837 838 839
        pInput->pColumnDataAgg[j] = pBlock->pBlockAgg[slotId];
        if (pInput->pColumnDataAgg[j] == NULL) {
          pInput->colDataAggIsSet = false;
        }
840 841 842 843

        // Here we set the column info data since the data type for each column data is required, but
        // the data in the corresponding SColumnInfoData will not be used.
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
844 845
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
        doCreateConstantValColumnAggInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
846 847
      }
    }
848
  } else {
849
    pInput->colDataAggIsSet = false;
850 851 852
  }

  // set the statistics data for primary time stamp column
853 854 855 856 857
  //  if (pCtx->functionId == FUNCTION_SPREAD && pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
  //    pCtx->isAggSet = true;
  //    pCtx->agg.min = pBlock->info.window.skey;
  //    pCtx->agg.max = pBlock->info.window.ekey;
  //  }
858 859
}

L
Liu Jicong 已提交
860
bool isTaskKilled(SExecTaskInfo* pTaskInfo) {
861 862
  // query has been executed more than tsShellActivityTimer, and the retrieve has not arrived
  // abort current query execution.
L
Liu Jicong 已提交
863 864
  if (pTaskInfo->owner != 0 &&
      ((taosGetTimestampSec() - pTaskInfo->cost.start / 1000) > 10 * getMaximumIdleDurationSec())
865 866
      /*(!needBuildResAfterQueryComplete(pTaskInfo))*/) {
    assert(pTaskInfo->cost.start != 0);
L
Liu Jicong 已提交
867 868 869
    //    qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64
    //           ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec());
    //    return true;
870 871 872 873 874
  }

  return false;
}

L
Liu Jicong 已提交
875
void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED; }
876 877

/////////////////////////////////////////////////////////////////////////////////////////////
878
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) {
L
Liu Jicong 已提交
879
  STimeWindow win = {0};
880
  win.skey = taosTimeTruncate(key, pInterval, precision);
881 882

  /*
H
Haojun Liao 已提交
883
   * if the realSkey > INT64_MAX - pInterval->interval, the query duration between
884 885
   * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
   */
886 887 888
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
  if (win.ekey < win.skey) {
    win.ekey = INT64_MAX;
889
  }
890 891

  return win;
892 893
}

894
#if 0
L
Liu Jicong 已提交
895
static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
896

897 898 899
  bool hasFirstLastFunc = false;
  bool hasOtherFunc = false;

900
  if (status == BLK_DATA_DATA_LOAD || status == BLK_DATA_FILTEROUT) {
901 902 903 904 905
    return status;
  }

  for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
    int32_t functionId = getExprFunctionId(&pQuery->pExpr1[i]);
906

907 908 909 910 911 912 913 914 915 916
    if (functionId == FUNCTION_TS || functionId == FUNCTION_TS_DUMMY || functionId == FUNCTION_TAG ||
        functionId == FUNCTION_TAG_DUMMY) {
      continue;
    }

    if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_LAST_DST) {
      hasFirstLastFunc = true;
    } else {
      hasOtherFunc = true;
    }
917

918 919
  }

920
  if (hasFirstLastFunc && status == BLK_DATA_NOT_LOAD) {
L
Liu Jicong 已提交
921
    if (!hasOtherFunc) {
922
      return BLK_DATA_FILTEROUT;
923
    } else {
924
      return BLK_DATA_DATA_LOAD;
925 926 927 928 929 930
    }
  }

  return status;
}

931 932
#endif

L
Liu Jicong 已提交
933 934
// static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableReq* pQueryMsg, bool stableQuery) {
//   STaskAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
H
Haojun Liao 已提交
935
//
L
Liu Jicong 已提交
936 937 938 939
//   // in case of point-interpolation query, use asc order scan
//   char msg[] = "QInfo:0x%"PRIx64" scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%"
//   PRId64
//                "-%" PRId64 ", new qrange:%" PRId64 "-%" PRId64;
H
Haojun Liao 已提交
940
//
L
Liu Jicong 已提交
941 942 943 944 945
//   // todo handle the case the the order irrelevant query type mixed up with order critical query type
//   // descending order query for last_row query
//   if (isFirstLastRowQuery(pQueryAttr)) {
//     //qDebug("QInfo:0x%"PRIx64" scan order changed for last_row query, old:%d, new:%d", pQInfo->qId,
//     pQueryAttr->order.order, TSDB_ORDER_ASC);
H
Haojun Liao 已提交
946
//
L
Liu Jicong 已提交
947 948
//     pQueryAttr->order.order = TSDB_ORDER_ASC;
//     if (pQueryAttr->window.skey > pQueryAttr->window.ekey) {
wafwerar's avatar
wafwerar 已提交
949
//       TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
L
Liu Jicong 已提交
950
//     }
H
Haojun Liao 已提交
951
//
L
Liu Jicong 已提交
952 953 954
//     pQueryAttr->needReverseScan = false;
//     return;
//   }
H
Haojun Liao 已提交
955
//
L
Liu Jicong 已提交
956 957 958
//   if (pQueryAttr->groupbyColumn && pQueryAttr->order.order == TSDB_ORDER_DESC) {
//     pQueryAttr->order.order = TSDB_ORDER_ASC;
//     if (pQueryAttr->window.skey > pQueryAttr->window.ekey) {
wafwerar's avatar
wafwerar 已提交
959
//       TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
L
Liu Jicong 已提交
960
//     }
H
Haojun Liao 已提交
961
//
L
Liu Jicong 已提交
962 963 964 965
//     pQueryAttr->needReverseScan = false;
//     doUpdateLastKey(pQueryAttr);
//     return;
//   }
H
Haojun Liao 已提交
966
//
L
Liu Jicong 已提交
967 968 969 970 971 972
//   if (pQueryAttr->pointInterpQuery && pQueryAttr->interval.interval == 0) {
//     if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
//       //qDebug(msg, pQInfo->qId, "interp", pQueryAttr->order.order, TSDB_ORDER_ASC, pQueryAttr->window.skey,
//       pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey); TSWAP(pQueryAttr->window.skey,
//       pQueryAttr->window.ekey, TSKEY);
//     }
H
Haojun Liao 已提交
973
//
L
Liu Jicong 已提交
974 975 976
//     pQueryAttr->order.order = TSDB_ORDER_ASC;
//     return;
//   }
H
Haojun Liao 已提交
977
//
L
Liu Jicong 已提交
978 979 980 981
//   if (pQueryAttr->interval.interval == 0) {
//     if (onlyFirstQuery(pQueryAttr)) {
//       if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
//         //qDebug(msg, pQInfo->qId, "only-first", pQueryAttr->order.order, TSDB_ORDER_ASC, pQueryAttr->window.skey,
H
Haojun Liao 已提交
982 983
////               pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
//
wafwerar's avatar
wafwerar 已提交
984
//        TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
H
Haojun Liao 已提交
985 986 987 988 989 990 991 992 993 994
//        doUpdateLastKey(pQueryAttr);
//      }
//
//      pQueryAttr->order.order = TSDB_ORDER_ASC;
//      pQueryAttr->needReverseScan = false;
//    } else if (onlyLastQuery(pQueryAttr) && notContainSessionOrStateWindow(pQueryAttr)) {
//      if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//        //qDebug(msg, pQInfo->qId, "only-last", pQueryAttr->order.order, TSDB_ORDER_DESC, pQueryAttr->window.skey,
////               pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
//
wafwerar's avatar
wafwerar 已提交
995
//        TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
H
Haojun Liao 已提交
996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007
//        doUpdateLastKey(pQueryAttr);
//      }
//
//      pQueryAttr->order.order = TSDB_ORDER_DESC;
//      pQueryAttr->needReverseScan = false;
//    }
//
//  } else {  // interval query
//    if (stableQuery) {
//      if (onlyFirstQuery(pQueryAttr)) {
//        if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
//          //qDebug(msg, pQInfo->qId, "only-first stable", pQueryAttr->order.order, TSDB_ORDER_ASC,
L
Liu Jicong 已提交
1008 1009
////                 pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey,
/// pQueryAttr->window.skey);
H
Haojun Liao 已提交
1010
//
wafwerar's avatar
wafwerar 已提交
1011
//          TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
H
Haojun Liao 已提交
1012 1013 1014 1015 1016 1017 1018 1019
//          doUpdateLastKey(pQueryAttr);
//        }
//
//        pQueryAttr->order.order = TSDB_ORDER_ASC;
//        pQueryAttr->needReverseScan = false;
//      } else if (onlyLastQuery(pQueryAttr)) {
//        if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//          //qDebug(msg, pQInfo->qId, "only-last stable", pQueryAttr->order.order, TSDB_ORDER_DESC,
L
Liu Jicong 已提交
1020 1021
////                 pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey,
/// pQueryAttr->window.skey);
H
Haojun Liao 已提交
1022
//
wafwerar's avatar
wafwerar 已提交
1023
//          TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
H
Haojun Liao 已提交
1024 1025 1026 1027 1028 1029 1030 1031 1032
//          doUpdateLastKey(pQueryAttr);
//        }
//
//        pQueryAttr->order.order = TSDB_ORDER_DESC;
//        pQueryAttr->needReverseScan = false;
//      }
//    }
//  }
//}
1033

1034
#if 0
H
Haojun Liao 已提交
1035
static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockInfo) {
1036 1037
  STimeWindow w = {0};

dengyihao's avatar
dengyihao 已提交
1038 1039
  TSKEY sk = TMIN(pQueryAttr->window.skey, pQueryAttr->window.ekey);
  TSKEY ek = TMAX(pQueryAttr->window.skey, pQueryAttr->window.ekey);
1040

1041
  if (true) {
L
Liu Jicong 已提交
1042
    //    getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.skey, sk, ek, &w);
1043 1044 1045 1046 1047 1048
    assert(w.ekey >= pBlockInfo->window.skey);

    if (w.ekey < pBlockInfo->window.ekey) {
      return true;
    }

L
Liu Jicong 已提交
1049 1050
    while (1) {
      //      getNextTimeWindow(pQueryAttr, &w);
1051 1052 1053 1054 1055 1056 1057 1058 1059 1060
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

      assert(w.ekey > pBlockInfo->window.ekey);
      if (w.skey <= pBlockInfo->window.ekey && w.skey > pBlockInfo->window.skey) {
        return true;
      }
    }
  } else {
L
Liu Jicong 已提交
1061
    //    getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.ekey, sk, ek, &w);
1062 1063 1064 1065 1066 1067
    assert(w.skey <= pBlockInfo->window.ekey);

    if (w.skey > pBlockInfo->window.skey) {
      return true;
    }

L
Liu Jicong 已提交
1068 1069
    while (1) {
      //      getNextTimeWindow(pQueryAttr, &w);
1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

      assert(w.skey < pBlockInfo->window.skey);
      if (w.ekey < pBlockInfo->window.ekey && w.ekey >= pBlockInfo->window.skey) {
        return true;
      }
    }
  }

  return false;
}
1083
#endif
1084 1085

static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) {
H
Haojun Liao 已提交
1086
#if 0
H
Haojun Liao 已提交
1087
  SqlFunctionCtx* pCtx = pTableScanInfo->pCtx;
1088
  uint32_t        status = BLK_DATA_NOT_LOAD;
1089

L
Liu Jicong 已提交
1090
  int32_t numOfOutput = 0;  // pTableScanInfo->numOfOutput;
1091 1092
  for (int32_t i = 0; i < numOfOutput; ++i) {
    int32_t functionId = pCtx[i].functionId;
H
Haojun Liao 已提交
1093
    int32_t colId = pTableScanInfo->pExpr[i].base.pParam[0].pCol->colId;
1094 1095 1096

    // group by + first/last should not apply the first/last block filter
    if (functionId < 0) {
1097
      status |= BLK_DATA_DATA_LOAD;
1098 1099
      return status;
    } else {
L
Liu Jicong 已提交
1100
      //      status |= aAggs[functionId].dataReqFunc(&pTableScanInfo->pCtx[i], &pBlock->info.window, colId);
1101
      //      if ((status & BLK_DATA_DATA_LOAD) == BLK_DATA_DATA_LOAD) {
L
Liu Jicong 已提交
1102 1103
      //        return status;
      //      }
1104 1105 1106 1107
    }
  }

  return status;
H
Haojun Liao 已提交
1108 1109
#endif
  return 0;
1110 1111
}

L
Liu Jicong 已提交
1112 1113
int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
                              uint32_t* status) {
1114
  *status = BLK_DATA_NOT_LOAD;
1115

H
Haojun Liao 已提交
1116
  pBlock->pDataBlock = NULL;
L
Liu Jicong 已提交
1117
  pBlock->pBlockAgg = NULL;
H
Haojun Liao 已提交
1118

L
Liu Jicong 已提交
1119 1120
  //  int64_t groupId = pRuntimeEnv->current->groupIndex;
  //  bool    ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
1121

H
Haojun Liao 已提交
1122
  STaskCostInfo* pCost = &pTaskInfo->cost;
1123

1124 1125
//  pCost->totalBlocks += 1;
//  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
1126
#if 0
1127 1128 1129
  // Calculate all time windows that are overlapping or contain current data block.
  // If current data block is contained by all possible time window, do not load current data block.
  if (/*pQueryAttr->pFilters || */pQueryAttr->groupbyColumn || pQueryAttr->sw.gap > 0 ||
H
Haojun Liao 已提交
1130
      (QUERY_IS_INTERVAL_QUERY(pQueryAttr) && overlapWithTimeWindow(pTaskInfo, &pBlock->info))) {
1131
    (*status) = BLK_DATA_DATA_LOAD;
1132 1133 1134
  }

  // check if this data block is required to load
1135
  if ((*status) != BLK_DATA_DATA_LOAD) {
1136 1137 1138 1139 1140 1141 1142
    bool needFilter = true;

    // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
    // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
    if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
      SResultRow* pResult = NULL;

H
Haojun Liao 已提交
1143
      bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
1144 1145 1146 1147 1148 1149
      TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

      STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
      if (pQueryAttr->pointInterpQuery) {
        needFilter = chkWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
1150
                                    pTableScanInfo->rowEntryInfoOffset);
1151 1152 1153
      } else {
        if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId,
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
1154
                                    pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
1155 1156 1157 1158 1159
          longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
        }
      }
    } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate
      doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx,
1160
                               pTableScanInfo->rowEntryInfoOffset, pTableScanInfo->numOfOutput,
1161 1162 1163 1164 1165 1166
                               pRuntimeEnv->current->groupIndex);
    }

    if (needFilter) {
      (*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock);
    } else {
1167
      (*status) = BLK_DATA_DATA_LOAD;
1168 1169 1170 1171
    }
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
H
Haojun Liao 已提交
1172
//  *status = updateBlockLoadStatus(pRuntimeEnv->pQueryAttr, *status);
1173

1174
  if ((*status) == BLK_DATA_NOT_LOAD || (*status) == BLK_DATA_FILTEROUT) {
1175 1176
    //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//           pBlockInfo->window.ekey, pBlockInfo->rows);
1177
    pCost->skipBlocks += 1;
1178
  } else if ((*status) == BLK_DATA_SMA_LOAD) {
1179 1180
    // this function never returns error?
    pCost->loadBlockStatis += 1;
1181
//    tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
1182 1183

    if (pBlock->pBlockAgg == NULL) {  // data block statistics does not exist, load data block
1184
//      pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
1185 1186 1187
      pCost->totalCheckedRows += pBlock->info.rows;
    }
  } else {
1188
    assert((*status) == BLK_DATA_DATA_LOAD);
1189 1190 1191

    // load the data block statistics to perform further filter
    pCost->loadBlockStatis += 1;
1192
//    tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
1193 1194 1195 1196 1197 1198

    if (pQueryAttr->topBotQuery && pBlock->pBlockAgg != NULL) {
      { // set previous window
        if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
          SResultRow* pResult = NULL;

H
Haojun Liao 已提交
1199
          bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
1200 1201 1202 1203 1204
          TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

          STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
          if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId,
                                      pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
1205
                                      pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216
            longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
          }
        }
      }
      bool load = false;
      for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
        int32_t functionId = pTableScanInfo->pCtx[i].functionId;
        if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM) {
//          load = topbot_datablock_filter(&pTableScanInfo->pCtx[i], (char*)&(pBlock->pBlockAgg[i].min),
//                                         (char*)&(pBlock->pBlockAgg[i].max));
          if (!load) { // current block has been discard due to filter applied
1217
            pCost->skipBlocks += 1;
1218 1219
            //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId,
//                   pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
1220
            (*status) = BLK_DATA_FILTEROUT;
1221 1222 1223 1224 1225 1226 1227
            return TSDB_CODE_SUCCESS;
          }
        }
      }
    }

    // current block has been discard due to filter applied
H
Haojun Liao 已提交
1228
//    if (!doFilterByBlockSMA(pRuntimeEnv, pBlock->pBlockAgg, pTableScanInfo->pCtx, pBlockInfo->rows)) {
1229
//      pCost->skipBlocks += 1;
1230 1231
//      qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//             pBlockInfo->window.ekey, pBlockInfo->rows);
1232
//      (*status) = BLK_DATA_FILTEROUT;
1233 1234 1235 1236 1237
//      return TSDB_CODE_SUCCESS;
//    }

    pCost->totalCheckedRows += pBlockInfo->rows;
    pCost->loadBlocks += 1;
1238
//    pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
1239 1240 1241 1242 1243
//    if (pBlock->pDataBlock == NULL) {
//      return terrno;
//    }

//    if (pQueryAttr->pFilters != NULL) {
1244
//      filterSetColFieldData(pQueryAttr->pFilters, taosArrayGetSize(pBlock->pDataBlock), pBlock->pDataBlock);
1245
//    }
1246

1247 1248 1249 1250
//    if (pQueryAttr->pFilters != NULL || pRuntimeEnv->pTsBuf != NULL) {
//      filterColRowsInDataBlock(pRuntimeEnv, pBlock, ascQuery);
//    }
  }
H
Haojun Liao 已提交
1251
#endif
1252 1253 1254
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1255
static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo) {
1256 1257 1258 1259
  if (pTableQueryInfo == NULL) {
    return;
  }

wafwerar's avatar
wafwerar 已提交
1260
  //  TSWAP(pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey);
L
Liu Jicong 已提交
1261
  //  pTableQueryInfo->lastKey = pTableQueryInfo->win.skey;
1262

L
Liu Jicong 已提交
1263 1264
  //  SWITCH_ORDER(pTableQueryInfo->cur.order);
  //  pTableQueryInfo->cur.vgroupIndex = -1;
1265 1266

  // set the index to be the end slot of result rows array
dengyihao's avatar
dengyihao 已提交
1267 1268 1269 1270 1271 1272
  //  SResultRowInfo* pResultRowInfo = &pTableQueryInfo->resInfo;
  //  if (pResultRowInfo->size > 0) {
  //    pResultRowInfo->curPos = pResultRowInfo->size - 1;
  //  } else {
  //    pResultRowInfo->curPos = -1;
  //  }
1273 1274
}

H
Haojun Liao 已提交
1275
void initResultRow(SResultRow* pResultRow) {
X
Xiaoyu Wang 已提交
1276
  //  pResultRow->pEntryInfo = (struct SResultRowEntryInfo*)((char*)pResultRow + sizeof(SResultRow));
1277 1278
}

L
Liu Jicong 已提交
1279
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
1280
  if (status == TASK_NOT_COMPLETED) {
H
Haojun Liao 已提交
1281
    pTaskInfo->status = status;
1282 1283
  } else {
    // QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first
1284
    CLEAR_QUERY_STATUS(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
1285
    pTaskInfo->status |= status;
1286 1287 1288
  }
}

L
Liu Jicong 已提交
1289
void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) {
1290 1291 1292 1293
  if (pTableQueryInfo == NULL) {
    return;
  }

L
Liu Jicong 已提交
1294
  //  taosVariantDestroy(&pTableQueryInfo->tag);
dengyihao's avatar
dengyihao 已提交
1295
  //  cleanupResultRowInfo(&pTableQueryInfo->resInfo);
1296 1297
}

1298
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
5
54liuyao 已提交
1299
  bool init = false;
1300
  for (int32_t i = 0; i < numOfOutput; ++i) {
1301
    pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
5
54liuyao 已提交
1302 1303 1304
    if (init) {
      continue;
    }
1305 1306 1307 1308 1309

    struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
    if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
      continue;
    }
1310 1311 1312 1313 1314

    if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
      continue;
    }

1315 1316 1317 1318 1319 1320
    if (!pResInfo->initialized) {
      if (pCtx[i].functionId != -1) {
        pCtx[i].fpSet.init(&pCtx[i], pResInfo);
      } else {
        pResInfo->initialized = true;
      }
5
54liuyao 已提交
1321 1322
    } else {
      init = true;
1323 1324 1325 1326
    }
  }
}

H
Haojun Liao 已提交
1327
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep);
1328

1329
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo) {
1330
  if (pFilterNode == NULL || pBlock->info.rows == 0) {
S
shenglian zhou 已提交
1331 1332
    return;
  }
1333

1334
  SFilterInfo* filter = NULL;
H
Haojun Liao 已提交
1335

H
Haojun Liao 已提交
1336
  // todo move to the initialization function
H
Haojun Liao 已提交
1337
  int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
1338

1339
  size_t             numOfCols = taosArrayGetSize(pBlock->pDataBlock);
1340
  SFilterColumnParam param1 = {.numOfCols = numOfCols, .pDataBlock = pBlock->pDataBlock};
1341 1342 1343
  code = filterSetDataFromSlotId(filter, &param1);

  int8_t* rowRes = NULL;
1344

1345
  // todo the keep seems never to be True??
1346
  bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols);
D
dapan1121 已提交
1347
  filterFreeInfo(filter);
1348

H
Haojun Liao 已提交
1349
  extractQualifiedTupleByFilterResult(pBlock, rowRes, keep);
1350 1351

  if (pColMatchInfo != NULL) {
1352
    for (int32_t i = 0; i < taosArrayGetSize(pColMatchInfo); ++i) {
1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363
      SColMatchInfo* pInfo = taosArrayGet(pColMatchInfo, i);
      if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->targetSlotId);
        if (pColData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
          blockDataUpdateTsWindow(pBlock, pInfo->targetSlotId);
          break;
        }
      }
    }
  }

H
Haojun Liao 已提交
1364
  taosMemoryFree(rowRes);
1365 1366
}

H
Haojun Liao 已提交
1367
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep) {
1368 1369 1370 1371 1372
  if (keep) {
    return;
  }

  if (rowRes != NULL) {
L
Liu Jicong 已提交
1373
    int32_t      totalRows = pBlock->info.rows;
1374
    SSDataBlock* px = createOneDataBlock(pBlock, true);
1375

1376 1377
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
    for (int32_t i = 0; i < numOfCols; ++i) {
1378 1379
      SColumnInfoData* pSrc = taosArrayGet(px->pDataBlock, i);
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
1380
      // it is a reserved column for scalar function, and no data in this column yet.
1381
      if (pDst->pData == NULL || pSrc->pData == NULL) {
1382 1383 1384
        continue;
      }

1385 1386
      colInfoDataCleanup(pDst, pBlock->info.rows);

1387
      int32_t numOfRows = 0;
1388
      for (int32_t j = 0; j < totalRows; ++j) {
D
dapan1121 已提交
1389 1390 1391
        if (rowRes[j] == 0) {
          continue;
        }
1392

D
dapan1121 已提交
1393
        if (colDataIsNull_s(pSrc, j)) {
1394
          colDataAppendNULL(pDst, numOfRows);
D
dapan1121 已提交
1395
        } else {
1396
          colDataAppend(pDst, numOfRows, colDataGetData(pSrc, j), false);
D
dapan1121 已提交
1397
        }
1398
        numOfRows += 1;
H
Haojun Liao 已提交
1399
      }
1400

1401 1402 1403 1404 1405
      if (pBlock->info.rows == totalRows) {
        pBlock->info.rows = numOfRows;
      } else {
        ASSERT(pBlock->info.rows == numOfRows);
      }
1406
    }
1407

dengyihao's avatar
dengyihao 已提交
1408
    blockDataDestroy(px);  // fix memory leak
1409 1410 1411
  } else {
    // do nothing
    pBlock->info.rows = 0;
1412 1413 1414
  }
}

1415
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
1416
  // for simple group by query without interval, all the tables belong to one group result.
1417 1418 1419
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
  SAggOperatorInfo* pAggInfo = pOperator->info;

1420
  SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
1421 1422
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
1423

1424
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId,
L
Liu Jicong 已提交
1425
                                                  sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
L
Liu Jicong 已提交
1426
  assert(pResultRow != NULL);
1427 1428 1429 1430 1431 1432

  /*
   * not assign result buffer yet, add new result buffer
   * all group belong to one result set, and each group result has different group id so set the id to be one
   */
  if (pResultRow->pageId == -1) {
dengyihao's avatar
dengyihao 已提交
1433 1434
    int32_t ret =
        addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, groupId, pAggInfo->binfo.pRes->info.rowSize);
1435 1436 1437 1438 1439
    if (ret != TSDB_CODE_SUCCESS) {
      return;
    }
  }

1440
  setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
1441 1442
}

1443 1444 1445
static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
  SAggOperatorInfo* pAggInfo = pOperator->info;
  if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) {
1446 1447
    return;
  }
1448 1449

  doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId);
1450 1451

  // record the current active group id
H
Haojun Liao 已提交
1452
  pAggInfo->groupId = groupId;
1453 1454
}

dengyihao's avatar
dengyihao 已提交
1455 1456
static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs,
                              const int32_t* rowCellOffset) {
1457
  bool returnNotNull = false;
1458
  for (int32_t j = 0; j < numOfExprs; ++j) {
1459
    struct SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowCellOffset);
1460 1461 1462 1463 1464 1465 1466
    if (!isRowEntryInitialized(pResInfo)) {
      continue;
    }

    if (pRow->numOfRows < pResInfo->numOfRes) {
      pRow->numOfRows = pResInfo->numOfRes;
    }
1467

1468
    if (fmIsNotNullOutputFunc(pCtx[j].functionId)) {
1469 1470
      returnNotNull = true;
    }
1471
  }
S
shenglian zhou 已提交
1472 1473
  // if all expr skips all blocks, e.g. all null inputs for max function, output one row in final result.
  //  except for first/last, which require not null output, output no rows
1474
  if (pRow->numOfRows == 0 && !returnNotNull) {
1475
    pRow->numOfRows = 1;
1476 1477 1478
  }
}

1479
// todo extract method with copytoSSDataBlock
1480
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
S
shenglian zhou 已提交
1481 1482 1483
                                             SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs,
                                             const int32_t* rowCellOffset, SSDataBlock* pBlock,
                                             SExecTaskInfo* pTaskInfo) {
1484 1485 1486
  SFilePage*  page = getBufPage(pBuf, resultRowPosition->pageId);
  SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);

1487
  doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowCellOffset);
1488 1489 1490 1491 1492
  if (pRow->numOfRows == 0) {
    releaseBufPage(pBuf, page);
    return 0;
  }

1493 1494 1495 1496 1497 1498 1499
  while (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
    int32_t code = blockDataEnsureCapacity(pBlock, pBlock->info.capacity * 1.25);
    if (TAOS_FAILED(code)) {
      releaseBufPage(pBuf, page);
      qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
      longjmp(pTaskInfo->env, code);
    }
1500 1501 1502 1503 1504
  }

  for (int32_t j = 0; j < numOfExprs; ++j) {
    int32_t slotId = pExprInfo[j].base.resSchema.slotId;

1505
    pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowCellOffset);
1506 1507 1508 1509 1510 1511 1512 1513 1514
    if (pCtx[j].fpSet.finalize) {
      int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
      if (TAOS_FAILED(code)) {
        qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
        longjmp(pTaskInfo->env, code);
      }
    } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
      // do nothing, todo refactor
    } else {
1515 1516
      // expand the result into multiple rows. E.g., _wstart, top(k, 20)
      // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
1517 1518 1519 1520 1521 1522 1523 1524 1525
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
      char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
      for (int32_t k = 0; k < pRow->numOfRows; ++k) {
        colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
      }
    }
  }

  releaseBufPage(pBuf, page);
1526
  pBlock->info.rows += pRow->numOfRows;
1527 1528 1529 1530

  return 0;
}

1531 1532 1533 1534 1535 1536 1537
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
                           SGroupResInfo* pGroupResInfo) {
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

1538
  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
1539

1540
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
L
Liu Jicong 已提交
1541 1542
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
    SFilePage*  page = getBufPage(pBuf, pPos->pos.pageId);
1543

1544
    SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
1545

H
Haojun Liao 已提交
1546
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
1547 1548

    // no results, continue to check the next one
1549 1550
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
1551
      releaseBufPage(pBuf, page);
1552 1553 1554
      continue;
    }

1555 1556 1557 1558 1559
    if (pBlock->info.groupId == 0) {
      pBlock->info.groupId = pPos->groupId;
    } else {
      // current value belongs to different group, it can't be packed into one datablock
      if (pBlock->info.groupId != pPos->groupId) {
1560
        releaseBufPage(pBuf, page);
1561 1562 1563 1564
        break;
      }
    }

1565
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
1566
      ASSERT(pBlock->info.rows > 0);
1567
      releaseBufPage(pBuf, page);
1568 1569 1570 1571 1572
      break;
    }

    pGroupResInfo->index += 1;

1573
    for (int32_t j = 0; j < numOfExprs; ++j) {
1574 1575
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

1576
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
1577
      if (pCtx[j].fpSet.finalize) {
1578
#ifdef BUF_PAGE_DEBUG
wmmhello's avatar
wmmhello 已提交
1579
        qDebug("\npage_finalize %d", numOfExprs);
1580
#endif
1581
        int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
1582
        if (TAOS_FAILED(code)) {
1583 1584
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
          longjmp(pTaskInfo->env, code);
1585
        }
1586 1587
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
1588
      } else {
1589 1590
        // expand the result into multiple rows. E.g., _wstart, top(k, 20)
        // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
X
Xiaoyu Wang 已提交
1591 1592
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
1593
        if (pCtx[j].increase) {
L
Liu Jicong 已提交
1594
          int64_t ts = *(int64_t*)in;
1595
          for (int32_t k = 0; k < pRow->numOfRows; ++k) {
L
Liu Jicong 已提交
1596
            colDataAppend(pColInfoData, pBlock->info.rows + k, (const char*)&ts, pCtx[j].resultInfo->isNullRes);
1597 1598 1599 1600 1601 1602
            ts++;
          }
        } else {
          for (int32_t k = 0; k < pRow->numOfRows; ++k) {
            colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
          }
X
Xiaoyu Wang 已提交
1603
        }
1604
      }
1605 1606
    }

1607
    releaseBufPage(pBuf, page);
1608
    pBlock->info.rows += pRow->numOfRows;
1609 1610
  }

X
Xiaoyu Wang 已提交
1611 1612
  qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
         pBlock->info.groupId);
1613

1614
  blockDataUpdateTsWindow(pBlock, 0);
1615 1616 1617
  return 0;
}

X
Xiaoyu Wang 已提交
1618 1619
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                            SDiskbasedBuf* pBuf) {
1620
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1621
  SSDataBlock*   pBlock = pbInfo->pRes;
1622

1623 1624 1625
  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

1626
  blockDataCleanup(pBlock);
1627
  if (!hasRemainResults(pGroupResInfo)) {
1628 1629 1630
    return;
  }

1631 1632
  // clear the existed group id
  pBlock->info.groupId = 0;
1633 1634 1635
  if (!pbInfo->mergeResultBlock) {
    doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
  } else {
dengyihao's avatar
dengyihao 已提交
1636
    while (hasRemainResults(pGroupResInfo)) {
1637 1638 1639
      doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
      if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
        break;
1640 1641
      }

1642 1643
      // clearing group id to continue to merge data that belong to different groups
      pBlock->info.groupId = 0;
1644
    }
1645 1646 1647

    // clear the group id info in SSDataBlock, since the client does not need it
    pBlock->info.groupId = 0;
1648 1649 1650
  }
}

L
Liu Jicong 已提交
1651
static int32_t compressQueryColData(SColumnInfoData* pColRes, int32_t numOfRows, char* data, int8_t compressed) {
1652 1653
  int32_t colSize = pColRes->info.bytes * numOfRows;
  return (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colSize, numOfRows, data,
L
Liu Jicong 已提交
1654
                                                      colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
1655 1656
}

L
Liu Jicong 已提交
1657 1658
void queryCostStatis(SExecTaskInfo* pTaskInfo) {
  STaskCostInfo* pSummary = &pTaskInfo->cost;
1659

L
Liu Jicong 已提交
1660 1661 1662
  //  uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pResultRowHashTable);
  //  hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map);
  //  pSummary->hashSize = hashSize;
1663

L
Liu Jicong 已提交
1664 1665 1666 1667 1668 1669 1670 1671 1672
  //  SResultRowPool* p = pTaskInfo->pool;
  //  if (p != NULL) {
  //    pSummary->winInfoSize = getResultRowPoolMemSize(p);
  //    pSummary->numOfTimeWindows = getNumOfAllocatedResultRows(p);
  //  } else {
  //    pSummary->winInfoSize = 0;
  //    pSummary->numOfTimeWindows = 0;
  //  }

1673 1674
  SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
  if (pSummary->pRecoder != NULL) {
1675
    qDebug(
X
Xiaoyu Wang 已提交
1676 1677 1678 1679
        "%s :cost summary: elapsed time:%.2f ms, total blocks:%d, load block SMA:%d, load data block:%d, total "
        "rows:%" PRId64 ", check rows:%" PRId64,
        GET_TASKID(pTaskInfo), pSummary->elapsedTime / 1000.0, pRecorder->totalBlocks, pRecorder->loadBlockStatis,
        pRecorder->loadBlocks, pRecorder->totalRows, pRecorder->totalCheckedRows);
1680
  }
1681

L
Liu Jicong 已提交
1682 1683 1684
  // qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb,
  // hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
  //      pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0);
1685 1686
}

L
Liu Jicong 已提交
1687 1688 1689
// static void updateOffsetVal(STaskRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
1690
//
L
Liu Jicong 已提交
1691
//   int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
1692
//
L
Liu Jicong 已提交
1693 1694 1695 1696
//   if (pQueryAttr->limit.offset == pBlockInfo->rows) {  // current block will ignore completed
//     pTableQueryInfo->lastKey = QUERY_IS_ASC_QUERY(pQueryAttr) ? pBlockInfo->window.ekey + step :
//     pBlockInfo->window.skey + step; pQueryAttr->limit.offset = 0; return;
//   }
1697
//
L
Liu Jicong 已提交
1698 1699 1700 1701 1702
//   if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//     pQueryAttr->pos = (int32_t)pQueryAttr->limit.offset;
//   } else {
//     pQueryAttr->pos = pBlockInfo->rows - (int32_t)pQueryAttr->limit.offset - 1;
//   }
1703
//
L
Liu Jicong 已提交
1704
//   assert(pQueryAttr->pos >= 0 && pQueryAttr->pos <= pBlockInfo->rows - 1);
1705
//
L
Liu Jicong 已提交
1706 1707
//   SArray *         pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//   SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1708
//
L
Liu Jicong 已提交
1709 1710
//   // update the pQueryAttr->limit.offset value, and pQueryAttr->pos value
//   TSKEY *keys = (TSKEY *) pColInfoData->pData;
1711
//
L
Liu Jicong 已提交
1712 1713 1714
//   // update the offset value
//   pTableQueryInfo->lastKey = keys[pQueryAttr->pos];
//   pQueryAttr->limit.offset = 0;
1715
//
L
Liu Jicong 已提交
1716
//   int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
1717
//
L
Liu Jicong 已提交
1718 1719 1720 1721
//   //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numBlocksOfStep:%d, numOfRes:%d,
//   lastKey:%"PRId64, GET_TASKID(pRuntimeEnv),
//          pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey);
// }
1722

L
Liu Jicong 已提交
1723 1724
// void skipBlocks(STaskRuntimeEnv *pRuntimeEnv) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
1725
//
L
Liu Jicong 已提交
1726 1727 1728
//   if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0) {
//     return;
//   }
1729
//
L
Liu Jicong 已提交
1730 1731
//   pQueryAttr->pos = 0;
//   int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
1732
//
L
Liu Jicong 已提交
1733 1734
//   STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
//   TsdbQueryHandleT pTsdbReadHandle = pRuntimeEnv->pTsdbReadHandle;
1735
//
L
Liu Jicong 已提交
1736 1737 1738 1739 1740
//   SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
//   while (tsdbNextDataBlock(pTsdbReadHandle)) {
//     if (isTaskKilled(pRuntimeEnv->qinfo)) {
//       longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
//     }
1741
//
L
Liu Jicong 已提交
1742
//     tsdbRetrieveDataBlockInfo(pTsdbReadHandle, &blockInfo);
1743
//
L
Liu Jicong 已提交
1744 1745 1746 1747
//     if (pQueryAttr->limit.offset > blockInfo.rows) {
//       pQueryAttr->limit.offset -= blockInfo.rows;
//       pTableQueryInfo->lastKey = (QUERY_IS_ASC_QUERY(pQueryAttr)) ? blockInfo.window.ekey : blockInfo.window.skey;
//       pTableQueryInfo->lastKey += step;
1748
//
L
Liu Jicong 已提交
1749 1750 1751 1752 1753 1754 1755
//       //qDebug("QInfo:0x%"PRIx64" skip rows:%d, offset:%" PRId64, GET_TASKID(pRuntimeEnv), blockInfo.rows,
//              pQuery->limit.offset);
//     } else {  // find the appropriated start position in current block
//       updateOffsetVal(pRuntimeEnv, &blockInfo);
//       break;
//     }
//   }
1756
//
L
Liu Jicong 已提交
1757 1758 1759 1760 1761 1762 1763 1764 1765
//   if (terrno != TSDB_CODE_SUCCESS) {
//     longjmp(pRuntimeEnv->env, terrno);
//   }
// }

// static TSKEY doSkipIntervalProcess(STaskRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo,
// STableQueryInfo* pTableQueryInfo) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
1766
//
L
Liu Jicong 已提交
1767 1768 1769
//   assert(pQueryAttr->limit.offset == 0);
//   STimeWindow tw = *win;
//   getNextTimeWindow(pQueryAttr, &tw);
1770
//
L
Liu Jicong 已提交
1771 1772
//   if ((tw.skey <= pBlockInfo->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) ||
//       (tw.ekey >= pBlockInfo->window.skey && !QUERY_IS_ASC_QUERY(pQueryAttr))) {
1773
//
L
Liu Jicong 已提交
1774 1775 1776 1777
//     // load the data block and check data remaining in current data block
//     // TODO optimize performance
//     SArray *         pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//     SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1778
//
L
Liu Jicong 已提交
1779 1780 1781 1782
//     tw = *win;
//     int32_t startPos =
//         getNextQualifiedWindow(pQueryAttr, &tw, pBlockInfo, pColInfoData->pData, binarySearchForKey, -1);
//     assert(startPos >= 0);
1783
//
L
Liu Jicong 已提交
1784 1785
//     // set the abort info
//     pQueryAttr->pos = startPos;
1786
//
L
Liu Jicong 已提交
1787 1788 1789 1790
//     // reset the query start timestamp
//     pTableQueryInfo->win.skey = ((TSKEY *)pColInfoData->pData)[startPos];
//     pQueryAttr->window.skey = pTableQueryInfo->win.skey;
//     TSKEY key = pTableQueryInfo->win.skey;
1791
//
L
Liu Jicong 已提交
1792 1793
//     pWindowResInfo->prevSKey = tw.skey;
//     int32_t index = pRuntimeEnv->resultRowInfo.curIndex;
1794
//
L
Liu Jicong 已提交
1795 1796
//     int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
//     pRuntimeEnv->resultRowInfo.curIndex = index;  // restore the window index
1797
//
L
Liu Jicong 已提交
1798 1799 1800 1801
//     //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d,
//     lastKey:%" PRId64,
//            GET_TASKID(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes,
//            pQueryAttr->current->lastKey);
1802
//
L
Liu Jicong 已提交
1803 1804 1805 1806 1807
//     return key;
//   } else {  // do nothing
//     pQueryAttr->window.skey      = tw.skey;
//     pWindowResInfo->prevSKey = tw.skey;
//     pTableQueryInfo->lastKey = tw.skey;
1808
//
L
Liu Jicong 已提交
1809 1810
//     return tw.skey;
//   }
1811
//
L
Liu Jicong 已提交
1812 1813 1814 1815 1816 1817 1818 1819 1820 1821
//   return true;
// }

// static bool skipTimeInterval(STaskRuntimeEnv *pRuntimeEnv, TSKEY* start) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//     assert(*start <= pRuntimeEnv->current->lastKey);
//   } else {
//     assert(*start >= pRuntimeEnv->current->lastKey);
//   }
1822
//
L
Liu Jicong 已提交
1823 1824 1825 1826 1827
//   // if queried with value filter, do NOT forward query start position
//   if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL ||
//   pRuntimeEnv->pFillInfo != NULL) {
//     return true;
//   }
1828
//
L
Liu Jicong 已提交
1829 1830 1831 1832 1833 1834 1835
//   /*
//    * 1. for interval without interpolation query we forward pQueryAttr->interval.interval at a time for
//    *    pQueryAttr->limit.offset times. Since hole exists, pQueryAttr->interval.interval*pQueryAttr->limit.offset
//    value is
//    *    not valid. otherwise, we only forward pQueryAttr->limit.offset number of points
//    */
//   assert(pRuntimeEnv->resultRowInfo.prevSKey == TSKEY_INITIAL_VAL);
1836
//
L
Liu Jicong 已提交
1837 1838
//   STimeWindow w = TSWINDOW_INITIALIZER;
//   bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
1839
//
L
Liu Jicong 已提交
1840 1841
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
//   STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
1842
//
L
Liu Jicong 已提交
1843 1844 1845
//   SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
//   while (tsdbNextDataBlock(pRuntimeEnv->pTsdbReadHandle)) {
//     tsdbRetrieveDataBlockInfo(pRuntimeEnv->pTsdbReadHandle, &blockInfo);
1846
//
L
Liu Jicong 已提交
1847 1848 1849 1850 1851 1852 1853 1854 1855
//     if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//       if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
//         getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.skey, blockInfo.window.skey, pQueryAttr->window.ekey,
//         &w); pWindowResInfo->prevSKey = w.skey;
//       }
//     } else {
//       getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.ekey, pQueryAttr->window.ekey, blockInfo.window.ekey, &w);
//       pWindowResInfo->prevSKey = w.skey;
//     }
1856
//
L
Liu Jicong 已提交
1857 1858
//     // the first time window
//     STimeWindow win = getActiveTimeWindow(pWindowResInfo, pWindowResInfo->prevSKey, pQueryAttr);
1859
//
L
Liu Jicong 已提交
1860 1861
//     while (pQueryAttr->limit.offset > 0) {
//       STimeWindow tw = win;
1862
//
L
Liu Jicong 已提交
1863 1864 1865
//       if ((win.ekey <= blockInfo.window.ekey && ascQuery) || (win.ekey >= blockInfo.window.skey && !ascQuery)) {
//         pQueryAttr->limit.offset -= 1;
//         pWindowResInfo->prevSKey = win.skey;
1866
//
L
Liu Jicong 已提交
1867 1868 1869 1870 1871 1872
//         // current time window is aligned with blockInfo.window.ekey
//         // restart it from next data block by set prevSKey to be TSKEY_INITIAL_VAL;
//         if ((win.ekey == blockInfo.window.ekey && ascQuery) || (win.ekey == blockInfo.window.skey && !ascQuery)) {
//           pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL;
//         }
//       }
1873
//
L
Liu Jicong 已提交
1874 1875 1876 1877
//       if (pQueryAttr->limit.offset == 0) {
//         *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//         return true;
//       }
1878
//
L
Liu Jicong 已提交
1879 1880
//       // current window does not ended in current data block, try next data block
//       getNextTimeWindow(pQueryAttr, &tw);
1881
//
L
Liu Jicong 已提交
1882 1883 1884 1885 1886 1887 1888 1889 1890
//       /*
//        * If the next time window still starts from current data block,
//        * load the primary timestamp column first, and then find the start position for the next queried time window.
//        * Note that only the primary timestamp column is required.
//        * TODO: Optimize for this cases. All data blocks are not needed to be loaded, only if the first actually
//        required
//        * time window resides in current data block.
//        */
//       if ((tw.skey <= blockInfo.window.ekey && ascQuery) || (tw.ekey >= blockInfo.window.skey && !ascQuery)) {
1891
//
L
Liu Jicong 已提交
1892 1893
//         SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//         SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1894
//
L
Liu Jicong 已提交
1895 1896 1897
//         if ((win.ekey > blockInfo.window.ekey && ascQuery) || (win.ekey < blockInfo.window.skey && !ascQuery)) {
//           pQueryAttr->limit.offset -= 1;
//         }
1898
//
L
Liu Jicong 已提交
1899 1900 1901 1902 1903 1904 1905 1906
//         if (pQueryAttr->limit.offset == 0) {
//           *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//           return true;
//         } else {
//           tw = win;
//           int32_t startPos =
//               getNextQualifiedWindow(pQueryAttr, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1);
//           assert(startPos >= 0);
1907
//
L
Liu Jicong 已提交
1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918
//           // set the abort info
//           pQueryAttr->pos = startPos;
//           pTableQueryInfo->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
//           pWindowResInfo->prevSKey = tw.skey;
//           win = tw;
//         }
//       } else {
//         break;  // offset is not 0, and next time window begins or ends in the next block.
//       }
//     }
//   }
1919
//
L
Liu Jicong 已提交
1920 1921 1922 1923
//   // check for error
//   if (terrno != TSDB_CODE_SUCCESS) {
//     longjmp(pRuntimeEnv->env, terrno);
//   }
1924
//
L
Liu Jicong 已提交
1925 1926
//   return true;
// }
1927

1928
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) {
H
Haojun Liao 已提交
1929
  if (p->pDownstream == NULL) {
H
Haojun Liao 已提交
1930
    assert(p->numOfDownstream == 0);
1931 1932
  }

wafwerar's avatar
wafwerar 已提交
1933
  p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES);
1934 1935 1936 1937 1938 1939 1940
  if (p->pDownstream == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  memcpy(p->pDownstream, pDownstream, num * POINTER_BYTES);
  p->numOfDownstream = num;
  return TSDB_CODE_SUCCESS;
1941 1942
}

wmmhello's avatar
wmmhello 已提交
1943
static void doDestroyTableList(STableListInfo* pTableqinfoList);
1944

1945
static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQueryInfo* pTableQueryInfo, int32_t order) {
H
Haojun Liao 已提交
1946 1947
#if 0
    if (order == TSDB_ORDER_ASC) {
1948 1949
    assert(
        (pTableQueryInfo->win.skey <= pTableQueryInfo->win.ekey) &&
H
Haojun Liao 已提交
1950 1951
        (pTableQueryInfo->lastKey >= pTaskInfo->window.skey) &&
        (pTableQueryInfo->win.skey >= pTaskInfo->window.skey && pTableQueryInfo->win.ekey <= pTaskInfo->window.ekey));
1952 1953 1954
  } else {
    assert(
        (pTableQueryInfo->win.skey >= pTableQueryInfo->win.ekey) &&
H
Haojun Liao 已提交
1955 1956
        (pTableQueryInfo->lastKey <= pTaskInfo->window.skey) &&
        (pTableQueryInfo->win.skey <= pTaskInfo->window.skey && pTableQueryInfo->win.ekey >= pTaskInfo->window.ekey));
1957
  }
H
Haojun Liao 已提交
1958
#endif
1959 1960
}

1961 1962 1963 1964
typedef struct SFetchRspHandleWrapper {
  uint32_t exchangeId;
  int32_t  sourceIndex;
} SFetchRspHandleWrapper;
1965

D
dapan1121 已提交
1966
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1967
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
1968 1969 1970 1971

  SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
  if (pExchangeInfo == NULL) {
    qWarn("failed to acquire exchange operator, since it may have been released");
1972
    taosMemoryFree(pMsg->pData);
1973 1974 1975
    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
1976
  int32_t          index = pWrapper->sourceIndex;
1977
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
1978

H
Haojun Liao 已提交
1979 1980
  if (code == TSDB_CODE_SUCCESS) {
    pSourceDataInfo->pRsp = pMsg->pData;
1981

H
Haojun Liao 已提交
1982 1983
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
    pRsp->numOfRows = htonl(pRsp->numOfRows);
dengyihao's avatar
dengyihao 已提交
1984
    pRsp->compLen = htonl(pRsp->compLen);
1985
    pRsp->numOfCols = htonl(pRsp->numOfCols);
dengyihao's avatar
dengyihao 已提交
1986
    pRsp->useconds = htobe64(pRsp->useconds);
1987
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
1988

1989
    ASSERT(pRsp != NULL);
H
Haojun Liao 已提交
1990 1991
    qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfBlocks,
           pRsp->numOfRows);
H
Haojun Liao 已提交
1992
  } else {
1993
    taosMemoryFree(pMsg->pData);
H
Haojun Liao 已提交
1994
    pSourceDataInfo->code = code;
D
dapan1121 已提交
1995
    qDebug("%s fetch rsp received, index:%d, error:%d", pSourceDataInfo->taskId, index, tstrerror(code));
H
Haojun Liao 已提交
1996
  }
H
Haojun Liao 已提交
1997

H
Haojun Liao 已提交
1998
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
1999 2000 2001 2002

  tsem_post(&pExchangeInfo->ready);
  taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);

wmmhello's avatar
wmmhello 已提交
2003
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2004 2005
}

D
dapan1121 已提交
2006
void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
S
Shengliang Guan 已提交
2007 2008
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
  assert(pMsg->info.ahandle != NULL);
H
Haojun Liao 已提交
2009 2010 2011 2012

  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};

  if (pMsg->contLen > 0) {
wafwerar's avatar
wafwerar 已提交
2013
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
H
Haojun Liao 已提交
2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
  }

  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
  rpcFreeCont(pMsg->pCont);
  destroySendMsgInfo(pSendInfo);
2025 2026
}

L
Liu Jicong 已提交
2027
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
2028
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2029

wafwerar's avatar
wafwerar 已提交
2030
  SResFetchReq* pMsg = taosMemoryCalloc(1, sizeof(SResFetchReq));
2031 2032 2033 2034
  if (NULL == pMsg) {
    pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return pTaskInfo->code;
  }
2035

L
Liu Jicong 已提交
2036 2037
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex);
  SSourceDataInfo*       pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
2038

2039 2040
  ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY);

2041 2042 2043
  qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu,
         GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, pSource->execId,
         sourceIndex, totalSources);
2044 2045 2046 2047 2048

  pMsg->header.vgId = htonl(pSource->addr.nodeId);
  pMsg->sId = htobe64(pSource->schedId);
  pMsg->taskId = htobe64(pSource->taskId);
  pMsg->queryId = htobe64(pTaskInfo->id.queryId);
D
dapan1121 已提交
2049
  pMsg->execId = htonl(pSource->execId);
2050 2051

  // send the fetch remote task result reques
wafwerar's avatar
wafwerar 已提交
2052
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
2053
  if (NULL == pMsgSendInfo) {
wafwerar's avatar
wafwerar 已提交
2054
    taosMemoryFreeClear(pMsg);
2055 2056 2057
    qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
    pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return pTaskInfo->code;
H
Haojun Liao 已提交
2058 2059
  }

2060
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
X
Xiaoyu Wang 已提交
2061
  pWrapper->exchangeId = pExchangeInfo->self;
2062 2063 2064
  pWrapper->sourceIndex = sourceIndex;

  pMsgSendInfo->param = pWrapper;
D
dapan1121 已提交
2065
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
2066 2067
  pMsgSendInfo->msgInfo.pData = pMsg;
  pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
L
Liu Jicong 已提交
2068
  pMsgSendInfo->msgType = pSource->fetchMsgType;
2069
  pMsgSendInfo->fp = loadRemoteDataCallback;
2070

2071
  int64_t transporterId = 0;
L
Liu Jicong 已提交
2072
  int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
2073 2074 2075
  return TSDB_CODE_SUCCESS;
}

2076 2077 2078 2079 2080 2081 2082 2083 2084
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs,
                          SOperatorInfo* pOperator) {
  pInfo->totalRows += numOfRows;
  pInfo->totalSize += dataLen;
  pInfo->totalElapsed += (taosGetTimestampUs() - startTs);
  pOperator->resultInfo.totalRows += numOfRows;
}

int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, int32_t numOfOutput, SArray* pColList,
dengyihao's avatar
dengyihao 已提交
2085
                                     char** pNextStart) {
H
Haojun Liao 已提交
2086
  if (pColList == NULL) {  // data from other sources
2087
    blockDataCleanup(pRes);
dengyihao's avatar
dengyihao 已提交
2088
    *pNextStart = (char*)blockDecode(pRes, pData);
H
Haojun Liao 已提交
2089
  } else {  // extract data according to pColList
2090
    ASSERT(numOfOutput == taosArrayGetSize(pColList));
2091 2092 2093 2094 2095
    char* pStart = pData;

    int32_t numOfCols = htonl(*(int32_t*)pStart);
    pStart += sizeof(int32_t);

2096
    // todo refactor:extract method
2097
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
dengyihao's avatar
dengyihao 已提交
2098
    for (int32_t i = 0; i < numOfCols; ++i) {
2099 2100 2101 2102 2103 2104 2105
      SSysTableSchema* p = (SSysTableSchema*)pStart;

      p->colId = htons(p->colId);
      p->bytes = htonl(p->bytes);
      pStart += sizeof(SSysTableSchema);
    }

2106
    SSDataBlock* pBlock = createDataBlock();
dengyihao's avatar
dengyihao 已提交
2107
    for (int32_t i = 0; i < numOfCols; ++i) {
2108 2109
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
      blockDataAppendColInfo(pBlock, &idata);
2110 2111
    }

2112
    blockDecode(pBlock, pStart);
2113
    blockDataEnsureCapacity(pRes, pBlock->info.rows);
2114

H
Haojun Liao 已提交
2115
    // data from mnode
2116
    pRes->info.rows = pBlock->info.rows;
2117 2118
    relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
    blockDataDestroy(pBlock);
2119
  }
2120

2121 2122
  // todo move this to time window aggregator, since the primary timestamp may not be known by exchange operator.
  blockDataUpdateTsWindow(pRes, 0);
2123 2124
  return TSDB_CODE_SUCCESS;
}
2125

L
Liu Jicong 已提交
2126 2127
static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
2128
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
H
Haojun Liao 已提交
2129

2130
  int64_t              el = taosGetTimestampUs() - startTs;
H
Haojun Liao 已提交
2131
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2132

H
Haojun Liao 已提交
2133
  pLoadInfo->totalElapsed += el;
H
Haojun Liao 已提交
2134

2135
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
L
Liu Jicong 已提交
2136 2137 2138
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
         pLoadInfo->totalElapsed / 1000.0);
2139 2140 2141 2142 2143

  doSetOperatorCompleted(pOperator);
  return NULL;
}

2144 2145
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
                                           SExecTaskInfo* pTaskInfo) {
2146 2147 2148 2149 2150 2151 2152 2153
  int32_t code = 0;
  int64_t startTs = taosGetTimestampUs();
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);

  while (1) {
    int32_t completed = 0;
    for (int32_t i = 0; i < totalSources; ++i) {
      SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
2154
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
2155
        completed += 1;
H
Haojun Liao 已提交
2156 2157
        continue;
      }
2158

2159
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
2160 2161 2162
        continue;
      }

2163 2164 2165 2166 2167
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
        code = pDataInfo->code;
        goto _error;
      }

L
Liu Jicong 已提交
2168
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
X
Xiaoyu Wang 已提交
2169
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, i);
2170

H
Haojun Liao 已提交
2171
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2172
      if (pRsp->numOfRows == 0) {
2173 2174
        qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64
               ", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu,
D
dapan1121 已提交
2175
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows,
2176
               pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources);
2177
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2178
        completed += 1;
D
dapan1121 已提交
2179
        taosMemoryFreeClear(pDataInfo->pRsp);
2180 2181
        continue;
      }
H
Haojun Liao 已提交
2182

2183
      SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
dengyihao's avatar
dengyihao 已提交
2184 2185 2186
      int32_t            index = 0;
      char*              pStart = pRetrieveRsp->data;
      while (index++ < pRetrieveRsp->numOfBlocks) {
2187
        SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false);
2188
        code = extractDataBlockFromFetchRsp(pb, pStart, pRetrieveRsp->numOfCols, NULL, &pStart);
2189 2190 2191 2192 2193 2194
        if (code != 0) {
          taosMemoryFreeClear(pDataInfo->pRsp);
          goto _error;
        }

        taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
2195 2196
      }

2197
      updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
2198

2199
      if (pRsp->completed == 1) {
dengyihao's avatar
dengyihao 已提交
2200 2201 2202 2203
        qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
               " execId:%d"
               " index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
               ", total:%.2f Kb,"
2204
               " completed:%d try next %d/%" PRIzu,
H
Haojun Liao 已提交
2205 2206 2207
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks,
               pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
               completed + 1, i + 1, totalSources);
2208
        completed += 1;
2209
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2210
      } else {
dengyihao's avatar
dengyihao 已提交
2211 2212 2213 2214
        qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
               " execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb",
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks,
               pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
2215 2216
      }

2217 2218
      taosMemoryFreeClear(pDataInfo->pRsp);

2219 2220
      if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED) {
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
2221 2222
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
        if (code != TSDB_CODE_SUCCESS) {
2223
          taosMemoryFreeClear(pDataInfo->pRsp);
2224 2225 2226 2227
          goto _error;
        }
      }

2228
      return;
2229 2230
    }

2231
    if (completed == totalSources) {
2232 2233
      setAllSourcesCompleted(pOperator, startTs);
      return;
2234
    }
H
Haojun Liao 已提交
2235 2236

    sched_yield();
2237 2238 2239 2240 2241 2242
  }

_error:
  pTaskInfo->code = code;
}

L
Liu Jicong 已提交
2243 2244 2245
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2246

L
Liu Jicong 已提交
2247
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2248 2249 2250
  int64_t startTs = taosGetTimestampUs();

  // Asynchronously send all fetch requests to all sources.
L
Liu Jicong 已提交
2251
  for (int32_t i = 0; i < totalSources; ++i) {
2252 2253
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2254 2255
      pTaskInfo->code = code;
      return code;
2256 2257 2258 2259
    }
  }

  int64_t endTs = taosGetTimestampUs();
2260
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
X
Xiaoyu Wang 已提交
2261
         totalSources, (endTs - startTs) / 1000.0);
2262

2263
  pOperator->status = OP_RES_TO_RETURN;
H
Haojun Liao 已提交
2264
  pOperator->cost.openCost = taosGetTimestampUs() - startTs;
2265

2266
  tsem_wait(&pExchangeInfo->ready);
H
Haojun Liao 已提交
2267
  return TSDB_CODE_SUCCESS;
2268 2269
}

2270
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2271 2272
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2273

L
Liu Jicong 已提交
2274
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2275
  int64_t startTs = taosGetTimestampUs();
2276

L
Liu Jicong 已提交
2277
  while (1) {
2278
    if (pExchangeInfo->current >= totalSources) {
2279 2280
      setAllSourcesCompleted(pOperator, startTs);
      return TSDB_CODE_SUCCESS;
2281
    }
2282

2283 2284 2285
    doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
    tsem_wait(&pExchangeInfo->ready);

dengyihao's avatar
dengyihao 已提交
2286
    SSourceDataInfo*       pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
X
Xiaoyu Wang 已提交
2287
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
2288

H
Haojun Liao 已提交
2289
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
2290 2291
      qError("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d error happens, code:%s", GET_TASKID(pTaskInfo),
             pSource->addr.nodeId, pSource->taskId, pSource->execId, tstrerror(pDataInfo->code));
H
Haojun Liao 已提交
2292
      pOperator->pTaskInfo->code = pDataInfo->code;
2293
      return pOperator->pTaskInfo->code;
H
Haojun Liao 已提交
2294 2295
    }

L
Liu Jicong 已提交
2296
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
H
Haojun Liao 已提交
2297
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2298
    if (pRsp->numOfRows == 0) {
2299 2300
      qDebug("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d %d of total completed, rowsOfSource:%" PRIu64
             ", totalRows:%" PRIu64 " try next",
D
dapan1121 已提交
2301
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pExchangeInfo->current + 1,
H
Haojun Liao 已提交
2302
             pDataInfo->totalRows, pLoadInfo->totalRows);
H
Haojun Liao 已提交
2303

2304
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2305
      pExchangeInfo->current += 1;
D
dapan1121 已提交
2306
      taosMemoryFreeClear(pDataInfo->pRsp);
2307 2308
      continue;
    }
H
Haojun Liao 已提交
2309

2310 2311 2312
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;

    char*   pStart = pRetrieveRsp->data;
2313
    int32_t code = extractDataBlockFromFetchRsp(NULL, pStart, pRetrieveRsp->numOfCols, NULL, &pStart);
2314 2315

    if (pRsp->completed == 1) {
D
dapan1121 已提交
2316
      qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64
L
Liu Jicong 已提交
2317
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
2318
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
2319 2320
             pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1,
             totalSources);
2321

2322
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2323 2324
      pExchangeInfo->current += 1;
    } else {
D
dapan1121 已提交
2325
      qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64
L
Liu Jicong 已提交
2326
             ", totalBytes:%" PRIu64,
2327
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
2328
             pLoadInfo->totalRows, pLoadInfo->totalSize);
2329 2330
    }

2331 2332 2333
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;

2334
    taosMemoryFreeClear(pDataInfo->pRsp);
2335
    return TSDB_CODE_SUCCESS;
2336
  }
2337 2338
}

L
Liu Jicong 已提交
2339
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
2340
  if (OPTR_IS_OPENED(pOperator)) {
H
Haojun Liao 已提交
2341 2342 2343
    return TSDB_CODE_SUCCESS;
  }

2344 2345
  int64_t st = taosGetTimestampUs();

L
Liu Jicong 已提交
2346
  SExchangeInfo* pExchangeInfo = pOperator->info;
2347
  if (!pExchangeInfo->seqLoadData) {
H
Haojun Liao 已提交
2348 2349 2350 2351 2352 2353
    int32_t code = prepareConcurrentlyLoad(pOperator);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
  }

2354
  OPTR_SET_OPENED(pOperator);
2355
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
2356 2357 2358
  return TSDB_CODE_SUCCESS;
}

2359 2360 2361 2362 2363
static void freeBlock(void* pParam) {
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
  blockDataDestroy(pBlock);
}

2364
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2365 2366
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2367

2368
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
2369
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2370 2371
    return NULL;
  }
2372

2373
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
H
Haojun Liao 已提交
2374

2375
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2376
  if (pOperator->status == OP_EXEC_DONE) {
L
Liu Jicong 已提交
2377 2378 2379
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
           pLoadInfo->totalElapsed / 1000.0);
2380 2381 2382
    return NULL;
  }

2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395
  size_t size = taosArrayGetSize(pExchangeInfo->pResultBlockList);
  if (size == 0 || pExchangeInfo->rspBlockIndex >= size) {
    pExchangeInfo->rspBlockIndex = 0;
    taosArrayClearEx(pExchangeInfo->pResultBlockList, freeBlock);
    if (pExchangeInfo->seqLoadData) {
      seqLoadRemoteData(pOperator);
    } else {
      concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
    }

    if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
      return NULL;
    }
2396
  }
2397 2398 2399

  // we have buffered retrieved datablock, return it directly
  return taosArrayGetP(pExchangeInfo->pResultBlockList, pExchangeInfo->rspBlockIndex++);
H
Haojun Liao 已提交
2400
}
2401

2402 2403 2404 2405 2406 2407 2408 2409
static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

L
Liu Jicong 已提交
2410
  while (1) {
2411 2412 2413 2414 2415 2416 2417
    SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
    if (pBlock == NULL) {
      return NULL;
    }

    SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
    if (hasLimitOffsetInfo(pLimitInfo)) {
2418
      int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
2419 2420 2421
      if (status == PROJECT_RETRIEVE_CONTINUE) {
        continue;
      } else if (status == PROJECT_RETRIEVE_DONE) {
2422
        size_t rows = pBlock->info.rows;
2423 2424 2425 2426 2427 2428
        pExchangeInfo->limitInfo.numOfOutputRows += rows;

        if (rows == 0) {
          doSetOperatorCompleted(pOperator);
          return NULL;
        } else {
2429
          return pBlock;
2430 2431 2432
        }
      }
    } else {
2433
      return pBlock;
2434 2435 2436 2437
    }
  }
}

2438
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
2439
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
H
Haojun Liao 已提交
2440 2441
  if (pInfo->pSourceDataInfo == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
2442 2443
  }

L
Liu Jicong 已提交
2444
  for (int32_t i = 0; i < numOfSources; ++i) {
2445
    SSourceDataInfo dataInfo = {0};
H
Haojun Liao 已提交
2446
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
2447
    dataInfo.taskId = id;
L
Liu Jicong 已提交
2448
    dataInfo.index = i;
X
Xiaoyu Wang 已提交
2449
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
2450
    if (pDs == NULL) {
H
Haojun Liao 已提交
2451 2452 2453 2454 2455 2456 2457 2458
      taosArrayDestroy(pInfo->pSourceDataInfo);
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
}

2459
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
2460
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
H
Haojun Liao 已提交
2461

2462
  if (numOfSources == 0) {
X
Xiaoyu Wang 已提交
2463
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
2464 2465 2466
    return TSDB_CODE_INVALID_PARA;
  }

H
Haojun Liao 已提交
2467
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
wmmhello's avatar
wmmhello 已提交
2468
  if (pInfo->pSources == NULL) {
2469
    return TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
2470 2471
  }

L
Liu Jicong 已提交
2472
  for (int32_t i = 0; i < numOfSources; ++i) {
D
dapan1121 已提交
2473
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
H
Haojun Liao 已提交
2474 2475
    taosArrayPush(pInfo->pSources, pNode);
  }
2476

2477
  initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
2478 2479
  pInfo->self = taosAddRef(exchangeObjRefPool, pInfo);

2480
  return initDataSource(numOfSources, pInfo, id);
2481 2482 2483 2484 2485 2486
}

SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo) {
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
2487
    goto _error;
2488
  }
H
Haojun Liao 已提交
2489

2490
  int32_t code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
2491 2492 2493
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2494 2495

  tsem_init(&pInfo->ready, 0, 0);
2496 2497
  pInfo->pDummyBlock = createResDataBlock(pExNode->node.pOutputDataBlockDesc);
  pInfo->pResultBlockList = taosArrayInit(1, POINTER_BYTES);
2498

2499
  pInfo->seqLoadData = false;
2500
  pInfo->pTransporter = pTransporter;
2501

2502
  pOperator->name = "ExchangeOperator";
X
Xiaoyu Wang 已提交
2503
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
X
Xiaoyu Wang 已提交
2504
  pOperator->blocking = false;
2505 2506
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
2507
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
X
Xiaoyu Wang 已提交
2508
  pOperator->pTaskInfo = pTaskInfo;
2509

L
Liu Jicong 已提交
2510 2511
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
                                         destroyExchangeOperatorInfo, NULL, NULL, NULL);
2512
  return pOperator;
H
Haojun Liao 已提交
2513

L
Liu Jicong 已提交
2514
_error:
H
Haojun Liao 已提交
2515
  if (pInfo != NULL) {
2516
    doDestroyExchangeOperatorInfo(pInfo);
H
Haojun Liao 已提交
2517 2518
  }

wafwerar's avatar
wafwerar 已提交
2519
  taosMemoryFreeClear(pOperator);
2520
  pTaskInfo->code = code;
H
Haojun Liao 已提交
2521
  return NULL;
2522 2523
}

dengyihao's avatar
dengyihao 已提交
2524 2525
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                                const char* pKey);
2526

2527
static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
2528
  SSortedMergeOperatorInfo* pInfo = (SSortedMergeOperatorInfo*)param;
H
Haojun Liao 已提交
2529
  taosArrayDestroy(pInfo->pSortInfo);
2530 2531 2532
  taosArrayDestroy(pInfo->groupInfo);

  if (pInfo->pSortHandle != NULL) {
H
Haojun Liao 已提交
2533
    tsortDestroySortHandle(pInfo->pSortHandle);
2534 2535
  }

H
Haojun Liao 已提交
2536
  blockDataDestroy(pInfo->binfo.pRes);
H
Haojun Liao 已提交
2537
  cleanupAggSup(&pInfo->aggSup);
L
Liu Jicong 已提交
2538

D
dapan1121 已提交
2539
  taosMemoryFreeClear(param);
2540
}
H
Haojun Liao 已提交
2541

L
Liu Jicong 已提交
2542
static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) {
2543 2544 2545 2546
  size_t size = taosArrayGetSize(groupInfo);
  if (size == 0) {
    return true;
  }
2547

2548 2549
  for (int32_t i = 0; i < size; ++i) {
    int32_t* index = taosArrayGet(groupInfo, i);
2550

2551
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index);
L
Liu Jicong 已提交
2552
    bool             isNull = colDataIsNull(pColInfo, rowIndex, pBlock->info.rows, NULL);
2553

2554 2555 2556
    if ((isNull && buf[i] != NULL) || (!isNull && buf[i] == NULL)) {
      return false;
    }
2557

2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570
    char* pCell = colDataGetData(pColInfo, rowIndex);
    if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
      if (varDataLen(pCell) != varDataLen(buf[i])) {
        return false;
      } else {
        if (memcmp(varDataVal(pCell), varDataVal(buf[i]), varDataLen(pCell)) != 0) {
          return false;
        }
      }
    } else {
      if (memcmp(pCell, buf[i], pColInfo->info.bytes) != 0) {
        return false;
      }
2571 2572 2573
    }
  }

2574
  return 0;
2575 2576
}

L
Liu Jicong 已提交
2577 2578 2579
static void doMergeResultImpl(SSortedMergeOperatorInfo* pInfo, SqlFunctionCtx* pCtx, int32_t numOfExpr,
                              int32_t rowIndex) {
  for (int32_t j = 0; j < numOfExpr; ++j) {  // TODO set row index
X
Xiaoyu Wang 已提交
2580
                                             //    pCtx[j].startRow = rowIndex;
2581 2582
  }

2583 2584
  for (int32_t j = 0; j < numOfExpr; ++j) {
    int32_t functionId = pCtx[j].functionId;
L
Liu Jicong 已提交
2585 2586 2587 2588 2589 2590 2591 2592 2593
    //    pCtx[j].fpSet->addInput(&pCtx[j]);

    //    if (functionId < 0) {
    //      SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
    //      doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
    //    } else {
    //      assert(!TSDB_FUNC_IS_SCALAR(functionId));
    //      aAggs[functionId].mergeFunc(&pCtx[j]);
    //    }
2594
  }
2595
}
2596

L
Liu Jicong 已提交
2597 2598
static void doFinalizeResultImpl(SqlFunctionCtx* pCtx, int32_t numOfExpr) {
  for (int32_t j = 0; j < numOfExpr; ++j) {
2599 2600 2601 2602
    int32_t functionId = pCtx[j].functionId;
    //    if (functionId == FUNC_TAG_DUMMY || functionId == FUNC_TS_DUMMY) {
    //      continue;
    //    }
2603

2604 2605 2606 2607
    //    if (functionId < 0) {
    //      SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
    //      doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
    //    } else {
dengyihao's avatar
dengyihao 已提交
2608
    //    pCtx[j].fpSet.finalize(&pCtx[j]);
2609 2610
  }
}
2611

2612
static bool saveCurrentTuple(char** rowColData, SArray* pColumnList, SSDataBlock* pBlock, int32_t rowIndex) {
L
Liu Jicong 已提交
2613
  int32_t size = (int32_t)taosArrayGetSize(pColumnList);
2614

L
Liu Jicong 已提交
2615 2616
  for (int32_t i = 0; i < size; ++i) {
    int32_t*         index = taosArrayGet(pColumnList, i);
2617
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index);
H
Haojun Liao 已提交
2618

2619 2620 2621
    char* data = colDataGetData(pColInfo, rowIndex);
    memcpy(rowColData[i], data, colDataGetLength(pColInfo, rowIndex));
  }
2622

2623 2624
  return true;
}
2625

2626 2627
static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock* pBlock) {
  SSortedMergeOperatorInfo* pInfo = pOperator->info;
2628

2629
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
2630

L
Liu Jicong 已提交
2631
  for (int32_t i = 0; i < pBlock->info.rows; ++i) {
2632 2633 2634 2635 2636 2637 2638 2639 2640
    if (!pInfo->hasGroupVal) {
      ASSERT(i == 0);
      doMergeResultImpl(pInfo, pCtx, numOfExpr, i);
      pInfo->hasGroupVal = saveCurrentTuple(pInfo->groupVal, pInfo->groupInfo, pBlock, i);
    } else {
      if (needToMerge(pBlock, pInfo->groupInfo, pInfo->groupVal, i)) {
        doMergeResultImpl(pInfo, pCtx, numOfExpr, i);
      } else {
        doFinalizeResultImpl(pCtx, numOfExpr);
2641 2642
        int32_t numOfRows = getNumOfResult(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL);
        //        setTagValueForMultipleRows(pCtx, pOperator->exprSupp.numOfExprs, numOfRows);
2643

2644
        // TODO check for available buffer;
H
Haojun Liao 已提交
2645

2646 2647 2648 2649 2650
        // next group info data
        pInfo->binfo.pRes->info.rows += numOfRows;
        for (int32_t j = 0; j < numOfExpr; ++j) {
          if (pCtx[j].functionId < 0) {
            continue;
2651
          }
2652

H
Haojun Liao 已提交
2653
          pCtx[j].fpSet.process(&pCtx[j]);
2654
        }
2655 2656 2657

        doMergeResultImpl(pInfo, pCtx, numOfExpr, i);
        pInfo->hasGroupVal = saveCurrentTuple(pInfo->groupVal, pInfo->groupInfo, pBlock, i);
H
Haojun Liao 已提交
2658
      }
2659 2660 2661 2662
    }
  }
}

2663 2664
static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
  SSortedMergeOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
2665
  SSortHandle*              pHandle = pInfo->pSortHandle;
2666

2667
  SSDataBlock* pDataBlock = createOneDataBlock(pInfo->binfo.pRes, false);
2668
  blockDataEnsureCapacity(pDataBlock, pOperator->resultInfo.capacity);
2669

L
Liu Jicong 已提交
2670
  while (1) {
2671
    blockDataCleanup(pDataBlock);
2672
    while (1) {
H
Haojun Liao 已提交
2673
      STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2674 2675
      if (pTupleHandle == NULL) {
        break;
2676
      }
2677

2678 2679
      // build datablock for merge for one group
      appendOneRowToDataBlock(pDataBlock, pTupleHandle);
2680
      if (pDataBlock->info.rows >= pOperator->resultInfo.capacity) {
2681 2682
        break;
      }
2683
    }
2684

2685 2686 2687
    if (pDataBlock->info.rows == 0) {
      break;
    }
2688

2689
    setInputDataBlock(pOperator, pOperator->exprSupp.pCtx, pDataBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
L
Liu Jicong 已提交
2690 2691
    //  updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor,
    //  pOperator->pRuntimeEnv, true);
2692
    doMergeImpl(pOperator, pOperator->exprSupp.numOfExprs, pDataBlock);
2693 2694
    // flush to tuple store, and after all data have been handled, return to upstream node or sink node
  }
2695

2696 2697 2698
  doFinalizeResultImpl(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs);
  int32_t numOfRows = getNumOfResult(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL);
  //        setTagValueForMultipleRows(pCtx, pOperator->exprSupp.numOfExprs, numOfRows);
2699

2700
  // TODO check for available buffer;
2701

2702 2703
  // next group info data
  pInfo->binfo.pRes->info.rows += numOfRows;
L
Liu Jicong 已提交
2704
  return (pInfo->binfo.pRes->info.rows > 0) ? pInfo->binfo.pRes : NULL;
2705
}
2706

L
Liu Jicong 已提交
2707 2708
SSDataBlock* getSortedMergeBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
                                     SArray* pColMatchInfo, SSortedMergeOperatorInfo* pInfo) {
2709 2710 2711 2712 2713 2714 2715 2716 2717 2718
  blockDataCleanup(pDataBlock);

  SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
  if (p == NULL) {
    return NULL;
  }

  blockDataEnsureCapacity(p, capacity);

  while (1) {
2719
    STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2720 2721 2722 2723
    if (pTupleHandle == NULL) {
      break;
    }

2724
    appendOneRowToDataBlock(p, pTupleHandle);
2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737
    if (p->info.rows >= capacity) {
      break;
    }
  }

  if (p->info.rows > 0) {
    int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
    for (int32_t i = 0; i < numOfCols; ++i) {
      SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i);
      ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID);

      SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
      SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId);
2738
      colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
2739 2740 2741 2742 2743 2744 2745 2746 2747 2748
    }

    pDataBlock->info.rows = p->info.rows;
    pDataBlock->info.capacity = p->info.rows;
  }

  blockDataDestroy(p);
  return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
}

2749
static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) {
2750 2751
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
2752 2753
  }

L
Liu Jicong 已提交
2754
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;
2755
  SSortedMergeOperatorInfo* pInfo = pOperator->info;
H
Haojun Liao 已提交
2756
  if (pOperator->status == OP_RES_TO_RETURN) {
2757
    return getSortedMergeBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, NULL, pInfo);
2758 2759
  }

2760
  int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
L
Liu Jicong 已提交
2761 2762
  pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
                                             pInfo->binfo.pRes, "GET_TASKID(pTaskInfo)");
H
Haojun Liao 已提交
2763

2764
  tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
2765

L
Liu Jicong 已提交
2766
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
wmmhello's avatar
wmmhello 已提交
2767
    SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
H
Haojun Liao 已提交
2768
    ps->param = pOperator->pDownstream[i];
H
Haojun Liao 已提交
2769
    tsortAddSource(pInfo->pSortHandle, ps);
2770 2771
  }

H
Haojun Liao 已提交
2772
  int32_t code = tsortOpen(pInfo->pSortHandle);
2773
  if (code != TSDB_CODE_SUCCESS) {
2774
    longjmp(pTaskInfo->env, terrno);
2775 2776
  }

H
Haojun Liao 已提交
2777
  pOperator->status = OP_RES_TO_RETURN;
2778
  return doMerge(pOperator);
2779
}
2780

L
Liu Jicong 已提交
2781 2782
static int32_t initGroupCol(SExprInfo* pExprInfo, int32_t numOfCols, SArray* pGroupInfo,
                            SSortedMergeOperatorInfo* pInfo) {
2783 2784
  if (pGroupInfo == NULL || taosArrayGetSize(pGroupInfo) == 0) {
    return 0;
H
Haojun Liao 已提交
2785 2786
  }

2787 2788 2789 2790 2791 2792 2793 2794
  int32_t len = 0;
  SArray* plist = taosArrayInit(3, sizeof(SColumn));
  pInfo->groupInfo = taosArrayInit(3, sizeof(int32_t));

  if (plist == NULL || pInfo->groupInfo == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

L
Liu Jicong 已提交
2795 2796
  size_t numOfGroupCol = taosArrayGetSize(pInfo->groupInfo);
  for (int32_t i = 0; i < numOfGroupCol; ++i) {
2797
    SColumn* pCol = taosArrayGet(pGroupInfo, i);
L
Liu Jicong 已提交
2798
    for (int32_t j = 0; j < numOfCols; ++j) {
H
Haojun Liao 已提交
2799
      SExprInfo* pe = &pExprInfo[j];
2800
      if (pe->base.resSchema.slotId == pCol->colId) {
2801 2802
        taosArrayPush(plist, pCol);
        taosArrayPush(pInfo->groupInfo, &j);
H
Haojun Liao 已提交
2803
        len += pCol->bytes;
2804 2805
        break;
      }
H
Haojun Liao 已提交
2806 2807 2808
    }
  }

2809
  ASSERT(taosArrayGetSize(pGroupInfo) == taosArrayGetSize(plist));
H
Haojun Liao 已提交
2810

wafwerar's avatar
wafwerar 已提交
2811
  pInfo->groupVal = taosMemoryCalloc(1, (POINTER_BYTES * numOfGroupCol + len));
2812 2813 2814 2815
  if (pInfo->groupVal == NULL) {
    taosArrayDestroy(plist);
    return TSDB_CODE_OUT_OF_MEMORY;
  }
H
Haojun Liao 已提交
2816

2817
  int32_t offset = 0;
L
Liu Jicong 已提交
2818 2819
  char*   start = (char*)(pInfo->groupVal + (POINTER_BYTES * numOfGroupCol));
  for (int32_t i = 0; i < numOfGroupCol; ++i) {
2820 2821
    pInfo->groupVal[i] = start + offset;
    SColumn* pCol = taosArrayGet(plist, i);
H
Haojun Liao 已提交
2822
    offset += pCol->bytes;
2823
  }
H
Haojun Liao 已提交
2824

2825
  taosArrayDestroy(plist);
H
Haojun Liao 已提交
2826

2827 2828
  return TSDB_CODE_SUCCESS;
}
H
Haojun Liao 已提交
2829

X
Xiaoyu Wang 已提交
2830
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) {
2831
  // todo add more information about exchange operation
2832
  int32_t type = pOperator->operatorType;
X
Xiaoyu Wang 已提交
2833
  if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN ||
2834
      type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN ||
2835
      type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN) {
2836 2837 2838
    *order = TSDB_ORDER_ASC;
    *scanFlag = MAIN_SCAN;
    return TSDB_CODE_SUCCESS;
2839
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
2840 2841 2842 2843
    STableScanInfo* pTableScanInfo = pOperator->info;
    *order = pTableScanInfo->cond.order;
    *scanFlag = pTableScanInfo->scanFlag;
    return TSDB_CODE_SUCCESS;
2844 2845 2846 2847 2848
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) {
    STableMergeScanInfo* pTableScanInfo = pOperator->info;
    *order = pTableScanInfo->cond.order;
    *scanFlag = pTableScanInfo->scanFlag;
    return TSDB_CODE_SUCCESS;
2849
  } else {
H
Haojun Liao 已提交
2850
    if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
2851
      return TSDB_CODE_INVALID_PARA;
H
Haojun Liao 已提交
2852
    } else {
2853
      return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag);
2854 2855 2856
    }
  }
}
L
Liu Jicong 已提交
2857
#if 0
L
Liu Jicong 已提交
2858
int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
L
Liu Jicong 已提交
2859
  uint8_t type = pOperator->operatorType;
2860 2861 2862

  pOperator->status = OP_OPENED;

L
Liu Jicong 已提交
2863
  if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
2864
    SStreamScanInfo* pScanInfo = pOperator->info;
L
Liu Jicong 已提交
2865
    pScanInfo->blockType = STREAM_INPUT__TABLE_SCAN;
2866

2867
    pScanInfo->pTableScanOp->status = OP_OPENED;
2868

2869
    STableScanInfo* pInfo = pScanInfo->pTableScanOp->info;
2870 2871
    ASSERT(pInfo->scanMode == TABLE_SCAN__TABLE_ORDER);

L
Liu Jicong 已提交
2872 2873 2874 2875
    if (uid == 0) {
      pInfo->noTable = 1;
      return TSDB_CODE_SUCCESS;
    }
2876 2877 2878 2879 2880 2881

    /*if (pSnapShotScanInfo->dataReader == NULL) {*/
    /*pSnapShotScanInfo->dataReader = tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, 0, 0);*/
    /*pSnapShotScanInfo->scanMode = TABLE_SCAN__TABLE_ORDER;*/
    /*}*/

L
Liu Jicong 已提交
2882 2883
    pInfo->noTable = 0;

2884
    if (pInfo->lastStatus.uid != uid || pInfo->lastStatus.ts != ts) {
L
Liu Jicong 已提交
2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895
      SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

      int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
      bool    found = false;
      for (int32_t i = 0; i < tableSz; i++) {
        STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
        if (pTableInfo->uid == uid) {
          found = true;
          pInfo->currentTable = i;
        }
      }
2896
      // TODO after processing drop, found can be false
L
Liu Jicong 已提交
2897
      ASSERT(found);
2898 2899

      tsdbSetTableId(pInfo->dataReader, uid);
H
Haojun Liao 已提交
2900 2901 2902 2903
      int64_t oldSkey = pInfo->cond.twindows.skey;
      pInfo->cond.twindows.skey = ts + 1;
      tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
      pInfo->cond.twindows.skey = oldSkey;
2904 2905
      pInfo->scanTimes = 0;

S
Shengliang Guan 已提交
2906
      qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, ts,
L
Liu Jicong 已提交
2907
             pInfo->currentTable, tableSz);
L
Liu Jicong 已提交
2908
    }
L
Liu Jicong 已提交
2909

L
Liu Jicong 已提交
2910
    return TSDB_CODE_SUCCESS;
2911

L
Liu Jicong 已提交
2912
  } else {
2913 2914 2915 2916 2917
    if (pOperator->numOfDownstream == 1) {
      return doPrepareScan(pOperator->pDownstream[0], uid, ts);
    } else if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator to set the input data block");
      return TSDB_CODE_QRY_APP_ERROR;
L
Liu Jicong 已提交
2918
    } else {
2919 2920
      qError("join not supported for stream block scan");
      return TSDB_CODE_QRY_APP_ERROR;
L
Liu Jicong 已提交
2921 2922 2923 2924
    }
  }
}

2925 2926 2927
int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) {
  int32_t type = pOperator->operatorType;
  if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
2928 2929
    SStreamScanInfo* pScanInfo = pOperator->info;
    STableScanInfo*  pSnapShotScanInfo = pScanInfo->pTableScanOp->info;
L
Liu Jicong 已提交
2930 2931
    *uid = pSnapShotScanInfo->lastStatus.uid;
    *ts = pSnapShotScanInfo->lastStatus.ts;
2932 2933 2934 2935 2936 2937 2938 2939 2940 2941
  } else {
    if (pOperator->pDownstream[0] == NULL) {
      return TSDB_CODE_INVALID_PARA;
    } else {
      doGetScanStatus(pOperator->pDownstream[0], uid, ts);
    }
  }

  return TSDB_CODE_SUCCESS;
}
L
Liu Jicong 已提交
2942
#endif
2943

2944
// this is a blocking operator
L
Liu Jicong 已提交
2945
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
2946 2947
  if (OPTR_IS_OPENED(pOperator)) {
    return TSDB_CODE_SUCCESS;
2948 2949
  }

H
Haojun Liao 已提交
2950
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
2951
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
2952

2953 2954
  SExprSupp*     pSup = &pOperator->exprSupp;
  SOperatorInfo* downstream = pOperator->pDownstream[0];
2955

2956 2957
  int64_t st = taosGetTimestampUs();

2958 2959 2960
  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;

H
Haojun Liao 已提交
2961
  while (1) {
2962
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
2963 2964 2965 2966
    if (pBlock == NULL) {
      break;
    }

2967 2968 2969 2970
    int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, code);
    }
2971

2972
    // there is an scalar expression that needs to be calculated before apply the group aggregation.
2973 2974 2975
    if (pAggInfo->scalarExprSup.pExprInfo != NULL) {
      SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
      code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
2976
      if (code != TSDB_CODE_SUCCESS) {
2977
        longjmp(pTaskInfo->env, code);
2978
      }
2979 2980
    }

2981
    // the pDataBlock are always the same one, no need to call this again
2982
    setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId);
2983
    setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, true);
2984
    code = doAggregateImpl(pOperator, pSup->pCtx);
2985 2986 2987
    if (code != 0) {
      longjmp(pTaskInfo->env, code);
    }
2988 2989
  }

2990
  initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
H
Haojun Liao 已提交
2991
  OPTR_SET_OPENED(pOperator);
2992

2993
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
2994 2995 2996
  return TSDB_CODE_SUCCESS;
}

2997
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2998
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
2999 3000 3001 3002 3003 3004
  SOptrBasicInfo*   pInfo = &pAggInfo->binfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

L
Liu Jicong 已提交
3005
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3006
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
H
Haojun Liao 已提交
3007
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
3008
    doSetOperatorCompleted(pOperator);
H
Haojun Liao 已提交
3009 3010 3011
    return NULL;
  }

H
Haojun Liao 已提交
3012
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
S
slzhou 已提交
3013 3014
  while (1) {
    doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
3015
    doFilter(pAggInfo->pCondition, pInfo->pRes, NULL);
S
slzhou 已提交
3016

3017
    if (!hasRemainResults(&pAggInfo->groupResInfo)) {
S
slzhou 已提交
3018 3019 3020
      doSetOperatorCompleted(pOperator);
      break;
    }
3021

S
slzhou 已提交
3022 3023 3024 3025
    if (pInfo->pRes->info.rows > 0) {
      break;
    }
  }
3026

3027
  size_t rows = blockDataGetNumOfRows(pInfo->pRes);
3028 3029
  pOperator->resultInfo.totalRows += rows;

3030
  return (rows == 0) ? NULL : pInfo->pRes;
3031 3032
}

wmmhello's avatar
wmmhello 已提交
3033
int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length) {
3034
  if (result == NULL || length == NULL) {
wmmhello's avatar
wmmhello 已提交
3035 3036 3037
    return TSDB_CODE_TSC_INVALID_INPUT;
  }
  SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
3038 3039 3040 3041 3042
  SAggSupporter*  pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
  int32_t         size = taosHashGetSize(pSup->pResultRowHashTable);
  size_t          keyLen = sizeof(uint64_t) * 2;  // estimate the key length
  int32_t         totalSize =
      sizeof(int32_t) + sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize);
wmmhello's avatar
wmmhello 已提交
3043

C
Cary Xu 已提交
3044 3045 3046 3047 3048 3049
  // no result
  if (getTotalBufSize(pSup->pResultBuf) == 0) {
    *result = NULL;
    *length = 0;
    return TSDB_CODE_SUCCESS;
  }
3050

wmmhello's avatar
wmmhello 已提交
3051
  *result = (char*)taosMemoryCalloc(1, totalSize);
L
Liu Jicong 已提交
3052
  if (*result == NULL) {
wmmhello's avatar
wmmhello 已提交
3053
    return TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
3054
  }
wmmhello's avatar
wmmhello 已提交
3055

wmmhello's avatar
wmmhello 已提交
3056
  int32_t offset = sizeof(int32_t);
wmmhello's avatar
wmmhello 已提交
3057 3058
  *(int32_t*)(*result + offset) = size;
  offset += sizeof(int32_t);
3059 3060

  // prepare memory
3061
  SResultRowPosition* pos = &pInfo->resultRowInfo.cur;
dengyihao's avatar
dengyihao 已提交
3062 3063
  void*               pPage = getBufPage(pSup->pResultBuf, pos->pageId);
  SResultRow*         pRow = (SResultRow*)((char*)pPage + pos->offset);
3064 3065 3066
  setBufPageDirty(pPage, true);
  releaseBufPage(pSup->pResultBuf, pPage);

dengyihao's avatar
dengyihao 已提交
3067
  void* pIter = taosHashIterate(pSup->pResultRowHashTable, NULL);
wmmhello's avatar
wmmhello 已提交
3068
  while (pIter) {
dengyihao's avatar
dengyihao 已提交
3069
    void*               key = taosHashGetKey(pIter, &keyLen);
3070
    SResultRowPosition* p1 = (SResultRowPosition*)pIter;
3071

dengyihao's avatar
dengyihao 已提交
3072
    pPage = (SFilePage*)getBufPage(pSup->pResultBuf, p1->pageId);
3073
    pRow = (SResultRow*)((char*)pPage + p1->offset);
3074 3075
    setBufPageDirty(pPage, true);
    releaseBufPage(pSup->pResultBuf, pPage);
wmmhello's avatar
wmmhello 已提交
3076 3077 3078

    // recalculate the result size
    int32_t realTotalSize = offset + sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize;
L
Liu Jicong 已提交
3079
    if (realTotalSize > totalSize) {
wmmhello's avatar
wmmhello 已提交
3080
      char* tmp = (char*)taosMemoryRealloc(*result, realTotalSize);
L
Liu Jicong 已提交
3081
      if (tmp == NULL) {
wafwerar's avatar
wafwerar 已提交
3082
        taosMemoryFree(*result);
wmmhello's avatar
wmmhello 已提交
3083
        *result = NULL;
wmmhello's avatar
wmmhello 已提交
3084
        return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
3085
      } else {
wmmhello's avatar
wmmhello 已提交
3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096 3097
        *result = tmp;
      }
    }
    // save key
    *(int32_t*)(*result + offset) = keyLen;
    offset += sizeof(int32_t);
    memcpy(*result + offset, key, keyLen);
    offset += keyLen;

    // save value
    *(int32_t*)(*result + offset) = pSup->resultRowSize;
    offset += sizeof(int32_t);
3098
    memcpy(*result + offset, pRow, pSup->resultRowSize);
wmmhello's avatar
wmmhello 已提交
3099 3100 3101 3102 3103
    offset += pSup->resultRowSize;

    pIter = taosHashIterate(pSup->pResultRowHashTable, pIter);
  }

wmmhello's avatar
wmmhello 已提交
3104 3105 3106 3107
  *(int32_t*)(*result) = offset;
  *length = offset;

  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
3108 3109
}

3110
int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) {
3111
  if (result == NULL) {
wmmhello's avatar
wmmhello 已提交
3112
    return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
3113
  }
wmmhello's avatar
wmmhello 已提交
3114
  SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
3115
  SAggSupporter*  pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
wmmhello's avatar
wmmhello 已提交
3116 3117

  //  int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
3118
  int32_t length = *(int32_t*)(result);
wmmhello's avatar
wmmhello 已提交
3119
  int32_t offset = sizeof(int32_t);
3120 3121 3122 3123

  int32_t count = *(int32_t*)(result + offset);
  offset += sizeof(int32_t);

L
Liu Jicong 已提交
3124
  while (count-- > 0 && length > offset) {
wmmhello's avatar
wmmhello 已提交
3125 3126 3127
    int32_t keyLen = *(int32_t*)(result + offset);
    offset += sizeof(int32_t);

L
Liu Jicong 已提交
3128
    uint64_t    tableGroupId = *(uint64_t*)(result + offset);
3129
    SResultRow* resultRow = getNewResultRow(pSup->pResultBuf, tableGroupId, pSup->resultRowSize);
L
Liu Jicong 已提交
3130
    if (!resultRow) {
wmmhello's avatar
wmmhello 已提交
3131
      return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
3132
    }
3133

wmmhello's avatar
wmmhello 已提交
3134
    // add a new result set for a new group
3135 3136
    SResultRowPosition pos = {.pageId = resultRow->pageId, .offset = resultRow->offset};
    taosHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &pos, sizeof(SResultRowPosition));
wmmhello's avatar
wmmhello 已提交
3137 3138 3139

    offset += keyLen;
    int32_t valueLen = *(int32_t*)(result + offset);
L
Liu Jicong 已提交
3140
    if (valueLen != pSup->resultRowSize) {
wmmhello's avatar
wmmhello 已提交
3141
      return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
3142 3143 3144 3145 3146 3147 3148 3149 3150 3151
    }
    offset += sizeof(int32_t);
    int32_t pageId = resultRow->pageId;
    int32_t pOffset = resultRow->offset;
    memcpy(resultRow, result + offset, valueLen);
    resultRow->pageId = pageId;
    resultRow->offset = pOffset;
    offset += valueLen;

    initResultRow(resultRow);
dengyihao's avatar
dengyihao 已提交
3152
    pInfo->resultRowInfo.cur = (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset};
C
Cary Xu 已提交
3153
    // releaseBufPage(pSup->pResultBuf, getBufPage(pSup->pResultBuf, pageId));
wmmhello's avatar
wmmhello 已提交
3154 3155
  }

L
Liu Jicong 已提交
3156
  if (offset != length) {
wmmhello's avatar
wmmhello 已提交
3157
    return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
3158
  }
wmmhello's avatar
wmmhello 已提交
3159
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
3160 3161
}

3162 3163 3164 3165 3166
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
  if (pLimitInfo->remainGroupOffset > 0) {
    if (pLimitInfo->currentGroupId == 0) {  // it is the first group
      pLimitInfo->currentGroupId = pBlock->info.groupId;
      blockDataCleanup(pBlock);
3167
      return PROJECT_RETRIEVE_CONTINUE;
3168 3169 3170
    } else if (pLimitInfo->currentGroupId != pBlock->info.groupId) {
      // now it is the data from a new group
      pLimitInfo->remainGroupOffset -= 1;
3171 3172

      // ignore data block in current group
3173 3174
      if (pLimitInfo->remainGroupOffset > 0) {
        blockDataCleanup(pBlock);
3175 3176 3177 3178 3179
        return PROJECT_RETRIEVE_CONTINUE;
      }
    }

    // set current group id of the project operator
3180
    pLimitInfo->currentGroupId = pBlock->info.groupId;
3181 3182
  }

3183
  // here check for a new group data, we need to handle the data of the previous group.
3184 3185 3186
  if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.groupId) {
    pLimitInfo->numOfOutputGroups += 1;
    if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
3187
      pOperator->status = OP_EXEC_DONE;
3188
      blockDataCleanup(pBlock);
3189 3190 3191 3192 3193

      return PROJECT_RETRIEVE_DONE;
    }

    // reset the value for a new group data
3194 3195
    pLimitInfo->numOfOutputRows = 0;
    pLimitInfo->remainOffset = pLimitInfo->limit.offset;
3196 3197 3198 3199 3200

    // existing rows that belongs to previous group.
    if (pBlock->info.rows > 0) {
      return PROJECT_RETRIEVE_DONE;
    }
3201 3202 3203 3204 3205
  }

  // here we reach the start position, according to the limit/offset requirements.

  // set current group id
3206
  pLimitInfo->currentGroupId = pBlock->info.groupId;
3207

3208 3209 3210
  if (pLimitInfo->remainOffset >= pBlock->info.rows) {
    pLimitInfo->remainOffset -= pBlock->info.rows;
    blockDataCleanup(pBlock);
3211
    return PROJECT_RETRIEVE_CONTINUE;
3212 3213 3214
  } else if (pLimitInfo->remainOffset < pBlock->info.rows && pLimitInfo->remainOffset > 0) {
    blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset);
    pLimitInfo->remainOffset = 0;
3215 3216
  }

3217
  // check for the limitation in each group
3218 3219 3220 3221
  if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) {
    int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows);
    blockDataKeepFirstNRows(pBlock, keepRows);
    if (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups) {
3222 3223 3224
      pOperator->status = OP_EXEC_DONE;
    }

3225
    return PROJECT_RETRIEVE_DONE;
3226
  }
3227

3228
  // todo optimize performance
3229 3230
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
  // they may not belong to the same group the limit/offset value is not valid in this case.
3231 3232
  if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || pLimitInfo->slimit.offset != -1 ||
      pLimitInfo->slimit.limit != -1) {
3233
    return PROJECT_RETRIEVE_DONE;
L
Liu Jicong 已提交
3234
  } else {  // not full enough, continue to accumulate the output data in the buffer.
3235 3236 3237 3238
    return PROJECT_RETRIEVE_CONTINUE;
  }
}

3239
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
L
Liu Jicong 已提交
3240 3241
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
                                               SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) {
3242
  pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
3243 3244 3245 3246 3247
  SSDataBlock* pResBlock = pInfo->pFinalRes;

  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;
  getTableScanInfo(pOperator, &order, &scanFlag);
H
Haojun Liao 已提交
3248

L
Liu Jicong 已提交
3249 3250
  int64_t ekey =
      Q_STATUS_EQUAL(pTaskInfo->status, TASK_COMPLETED) ? pInfo->win.ekey : pInfo->existNewGroupBlock->info.window.ekey;
3251 3252
  taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));

3253 3254 3255 3256
  doApplyScalarCalculation(pOperator, pInfo->existNewGroupBlock, order, scanFlag);

  taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, ekey);
  taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
3257

3258 3259
  int32_t numOfResultRows = pResultInfo->capacity - pResBlock->info.rows;
  taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows);
H
Haojun Liao 已提交
3260

3261
  pInfo->curGroupId = pInfo->existNewGroupBlock->info.groupId;
3262 3263 3264
  pInfo->existNewGroupBlock = NULL;
}

L
Liu Jicong 已提交
3265 3266
static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
                                            SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) {
3267
  if (taosFillHasMoreResults(pInfo->pFillInfo)) {
H
Haojun Liao 已提交
3268 3269
    int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows;
    taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pFinalRes, numOfResultRows);
3270 3271
    pInfo->pRes->info.groupId = pInfo->curGroupId;
    return;
3272 3273 3274 3275
  }

  // handle the cached new group data block
  if (pInfo->existNewGroupBlock) {
3276 3277 3278 3279 3280 3281
    doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo);
  }
}

static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
  SFillOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
3282 3283
  SExprSupp*         pSup = &pOperator->exprSupp;
  SSDataBlock*       pResBlock = pInfo->pFinalRes;
3284 3285 3286 3287 3288 3289 3290 3291 3292

  setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
  projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL);
  pInfo->pRes->info.groupId = pBlock->info.groupId;

  SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->primaryTsCol);
  SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, pInfo->primarySrcSlotId);
  colDataAssign(pDst, pSrc, pInfo->pRes->info.rows, &pResBlock->info);

L
Liu Jicong 已提交
3293
  for (int32_t i = 0; i < pInfo->numOfNotFillExpr; ++i) {
3294 3295 3296 3297
    SFillColInfo* pCol = &pInfo->pFillInfo->pFillCol[i + pInfo->numOfExpr];
    ASSERT(pCol->notFillCol);

    SExprInfo* pExpr = pCol->pExpr;
L
Liu Jicong 已提交
3298 3299
    int32_t    srcSlotId = pExpr->base.pParam[0].pCol->slotId;
    int32_t    dstSlotId = pExpr->base.resSchema.slotId;
3300 3301 3302 3303

    SColumnInfoData* pDst1 = taosArrayGet(pInfo->pRes->pDataBlock, dstSlotId);
    SColumnInfoData* pSrc1 = taosArrayGet(pBlock->pDataBlock, srcSlotId);
    colDataAssign(pDst1, pSrc1, pInfo->pRes->info.rows, &pResBlock->info);
3304 3305 3306
  }
}

S
slzhou 已提交
3307
static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
3308 3309
  SFillOperatorInfo* pInfo = pOperator->info;
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
3310

H
Haojun Liao 已提交
3311
  SResultInfo* pResultInfo = &pOperator->resultInfo;
H
Haojun Liao 已提交
3312
  SSDataBlock* pResBlock = pInfo->pFinalRes;
3313 3314

  blockDataCleanup(pResBlock);
H
Haojun Liao 已提交
3315
  blockDataCleanup(pInfo->pRes);
3316

H
Haojun Liao 已提交
3317 3318
  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;
3319
  getTableScanInfo(pOperator, &order, &scanFlag);
3320

3321
  doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo);
3322
  if (pResBlock->info.rows > 0) {
3323
    pResBlock->info.groupId = pInfo->curGroupId;
3324
    return pResBlock;
H
Haojun Liao 已提交
3325
  }
3326

H
Haojun Liao 已提交
3327
  SOperatorInfo* pDownstream = pOperator->pDownstream[0];
L
Liu Jicong 已提交
3328
  while (1) {
3329
    SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream);
3330 3331
    if (pBlock == NULL) {
      if (pInfo->totalInputRows == 0) {
3332
        doSetOperatorCompleted(pOperator);
3333 3334
        return NULL;
      }
3335

3336
      taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
3337
    } else {
3338
      blockDataUpdateTsWindow(pBlock, pInfo->primarySrcSlotId);
3339
      doApplyScalarCalculation(pOperator, pBlock, order, scanFlag);
3340

H
Haojun Liao 已提交
3341 3342 3343
      if (pInfo->curGroupId == 0 || pInfo->curGroupId == pInfo->pRes->info.groupId) {
        pInfo->curGroupId = pInfo->pRes->info.groupId;  // the first data block
        pInfo->totalInputRows += pInfo->pRes->info.rows;
3344

S
slzhou 已提交
3345 3346 3347 3348 3349
        if (order == pInfo->pFillInfo->order) {
          taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, pBlock->info.window.ekey);
        } else {
          taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, pBlock->info.window.skey);
        }
H
Haojun Liao 已提交
3350
        taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
L
Liu Jicong 已提交
3351
      } else if (pInfo->curGroupId != pBlock->info.groupId) {  // the new group data block
3352 3353 3354 3355 3356
        pInfo->existNewGroupBlock = pBlock;

        // Fill the previous group data block, before handle the data block of new group.
        // Close the fill operation for previous group data block
        taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
3357 3358 3359
      }
    }

3360 3361
    int32_t numOfResultRows = pOperator->resultInfo.capacity - pResBlock->info.rows;
    taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows);
3362 3363

    // current group has no more result to return
3364
    if (pResBlock->info.rows > 0) {
3365 3366
      // 1. The result in current group not reach the threshold of output result, continue
      // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
3367
      if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || pInfo->existNewGroupBlock != NULL) {
3368
        pResBlock->info.groupId = pInfo->curGroupId;
3369
        return pResBlock;
3370 3371
      }

3372
      doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo);
3373
      if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) {
3374
        pResBlock->info.groupId = pInfo->curGroupId;
3375
        return pResBlock;
3376 3377 3378
      }
    } else if (pInfo->existNewGroupBlock) {  // try next group
      assert(pBlock != NULL);
3379 3380 3381 3382 3383

      blockDataCleanup(pResBlock);
      blockDataCleanup(pInfo->pRes);

      doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo);
3384
      if (pResBlock->info.rows > pResultInfo->threshold) {
3385
        pResBlock->info.groupId = pInfo->curGroupId;
3386
        return pResBlock;
3387 3388 3389 3390 3391 3392 3393
      }
    } else {
      return NULL;
    }
  }
}

S
slzhou 已提交
3394 3395 3396 3397 3398 3399 3400 3401
static SSDataBlock* doFill(SOperatorInfo* pOperator) {
  SFillOperatorInfo* pInfo = pOperator->info;
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

S
slzhou 已提交
3402
  SSDataBlock* fillResult = NULL;
S
slzhou 已提交
3403
  while (true) {
S
slzhou 已提交
3404
    fillResult = doFillImpl(pOperator);
S
slzhou 已提交
3405 3406 3407 3408 3409
    if (fillResult == NULL) {
      doSetOperatorCompleted(pOperator);
      break;
    }

3410
    doFilter(pInfo->pCondition, fillResult, pInfo->pColMatchColInfo);
S
slzhou 已提交
3411 3412 3413 3414 3415
    if (fillResult->info.rows > 0) {
      break;
    }
  }

S
slzhou 已提交
3416
  if (fillResult != NULL) {
3417
    pOperator->resultInfo.totalRows += fillResult->info.rows;
S
slzhou 已提交
3418
  }
S
slzhou 已提交
3419

S
slzhou 已提交
3420
  return fillResult;
S
slzhou 已提交
3421 3422
}

3423
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
C
Cary Xu 已提交
3424 3425 3426 3427 3428
  for (int32_t i = 0; i < numOfExprs; ++i) {
    SExprInfo* pExprInfo = &pExpr[i];
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
        taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
H
Haojun Liao 已提交
3429
      }
3430
    }
C
Cary Xu 已提交
3431 3432 3433

    taosMemoryFree(pExprInfo->base.pParam);
    taosMemoryFree(pExprInfo->pExpr);
H
Haojun Liao 已提交
3434 3435 3436
  }
}

3437 3438 3439 3440 3441
static void destroyOperatorInfo(SOperatorInfo* pOperator) {
  if (pOperator == NULL) {
    return;
  }

3442
  if (pOperator->fpSet.closeFn != NULL) {
3443
    pOperator->fpSet.closeFn(pOperator->info, pOperator->exprSupp.numOfExprs);
3444 3445
  }

H
Haojun Liao 已提交
3446
  if (pOperator->pDownstream != NULL) {
L
Liu Jicong 已提交
3447
    for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
H
Haojun Liao 已提交
3448
      destroyOperatorInfo(pOperator->pDownstream[i]);
3449 3450
    }

wafwerar's avatar
wafwerar 已提交
3451
    taosMemoryFreeClear(pOperator->pDownstream);
H
Haojun Liao 已提交
3452
    pOperator->numOfDownstream = 0;
3453 3454
  }

3455
  cleanupExprSupp(&pOperator->exprSupp);
wafwerar's avatar
wafwerar 已提交
3456
  taosMemoryFreeClear(pOperator);
3457 3458
}

3459 3460 3461 3462 3463 3464 3465 3466 3467 3468 3469 3470 3471 3472 3473
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) {
  *defaultPgsz = 4096;
  while (*defaultPgsz < rowSize * 4) {
    *defaultPgsz <<= 1u;
  }

  // at least four pages need to be in buffer
  *defaultBufsz = 4096 * 256;
  if ((*defaultBufsz) <= (*defaultPgsz)) {
    (*defaultBufsz) = (*defaultPgsz) * 4;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
3474 3475
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                         const char* pKey) {
3476 3477
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);

dengyihao's avatar
dengyihao 已提交
3478 3479
  pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
  pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
3480 3481
  pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);

H
Haojun Liao 已提交
3482
  if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
3483 3484 3485
    return TSDB_CODE_OUT_OF_MEMORY;
  }

dengyihao's avatar
dengyihao 已提交
3486
  uint32_t defaultPgsz = 0;
3487 3488
  uint32_t defaultBufsz = 0;
  getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
H
Haojun Liao 已提交
3489

wafwerar's avatar
wafwerar 已提交
3490 3491 3492 3493 3494 3495
  if (!osTempSpaceAvailable()) {
    terrno = TSDB_CODE_NO_AVAIL_DISK;
    qError("Init stream agg supporter failed since %s", terrstr(terrno));
    return terrno;
  }
  int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
H
Haojun Liao 已提交
3496
  if (code != TSDB_CODE_SUCCESS) {
3497
    qError("Create agg result buf failed since %s", tstrerror(code));
H
Haojun Liao 已提交
3498 3499 3500
    return code;
  }

3501 3502 3503
  return TSDB_CODE_SUCCESS;
}

3504
void cleanupAggSup(SAggSupporter* pAggSup) {
wafwerar's avatar
wafwerar 已提交
3505
  taosMemoryFreeClear(pAggSup->keyBuf);
3506
  taosHashCleanup(pAggSup->pResultRowHashTable);
H
Haojun Liao 已提交
3507
  destroyDiskbasedBuf(pAggSup->pResultBuf);
3508 3509
}

L
Liu Jicong 已提交
3510 3511
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
                    const char* pkey) {
3512 3513 3514 3515 3516
  int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

3517 3518 3519 3520 3521
  code = doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

L
Liu Jicong 已提交
3522
  for (int32_t i = 0; i < numOfCols; ++i) {
3523
    pSup->pCtx[i].pBuf = pAggSup->pResultBuf;
3524 3525
  }

3526
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
3527 3528
}

L
Liu Jicong 已提交
3529
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) {
wmmhello's avatar
wmmhello 已提交
3530
  ASSERT(numOfRows != 0);
3531 3532
  pResultInfo->capacity = numOfRows;
  pResultInfo->threshold = numOfRows * 0.75;
3533

3534 3535
  if (pResultInfo->threshold == 0) {
    pResultInfo->threshold = numOfRows;
3536 3537 3538
  }
}

3539 3540 3541 3542 3543
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
  pInfo->pRes = pBlock;
  initResultRowInfo(&pInfo->resultRowInfo);
}

5
54liuyao 已提交
3544
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
3545 3546 3547 3548 3549 3550 3551 3552 3553 3554 3555 3556 3557 3558 3559 3560 3561 3562
  if (pCtx == NULL) {
    return NULL;
  }

  for (int32_t i = 0; i < numOfOutput; ++i) {
    for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) {
      taosVariantDestroy(&pCtx[i].param[j].param);
    }

    taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
    taosMemoryFree(pCtx[i].input.pData);
    taosMemoryFree(pCtx[i].input.pColumnDataAgg);
  }

  taosMemoryFreeClear(pCtx);
  return NULL;
}

3563
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) {
3564 3565 3566 3567
  pSup->pExprInfo = pExprInfo;
  pSup->numOfExprs = numOfExpr;
  if (pSup->pExprInfo != NULL) {
    pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset);
3568 3569 3570
    if (pSup->pCtx == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
3571
  }
3572 3573

  return TSDB_CODE_SUCCESS;
3574 3575
}

3576 3577 3578 3579
void cleanupExprSupp(SExprSupp* pSupp) {
  destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs);
  if (pSupp->pExprInfo != NULL) {
    destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
C
Cary Xu 已提交
3580
    taosMemoryFreeClear(pSupp->pExprInfo);
3581 3582 3583 3584
  }
  taosMemoryFree(pSupp->rowEntryInfoOffset);
}

L
Liu Jicong 已提交
3585
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
S
slzhou 已提交
3586
                                           SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo,
3587
                                           int32_t numOfScalarExpr, bool mergeResult, SExecTaskInfo* pTaskInfo) {
wafwerar's avatar
wafwerar 已提交
3588
  SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
L
Liu Jicong 已提交
3589
  SOperatorInfo*    pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
3590 3591 3592
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
H
Haojun Liao 已提交
3593

dengyihao's avatar
dengyihao 已提交
3594
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
3595

3596
  initResultSizeInfo(&pOperator->resultInfo, 4096);
3597
  int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
L
Liu Jicong 已提交
3598
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3599 3600
    goto _error;
  }
H
Haojun Liao 已提交
3601

3602
  initBasicInfo(&pInfo->binfo, pResultBlock);
3603 3604 3605 3606
  code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3607

3608
  pInfo->binfo.mergeResultBlock = mergeResult;
3609
  pInfo->groupId = UINT64_MAX;
S
slzhou 已提交
3610
  pInfo->pCondition = pCondition;
dengyihao's avatar
dengyihao 已提交
3611
  pOperator->name = "TableAggregate";
3612
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
3613
  pOperator->blocking = true;
dengyihao's avatar
dengyihao 已提交
3614 3615 3616
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
3617

3618 3619
  pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo,
                                         aggEncodeResultRow, aggDecodeResultRow, NULL);
H
Haojun Liao 已提交
3620

3621 3622 3623 3624 3625 3626
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    STableScanInfo* pTableScanInfo = downstream->info;
    pTableScanInfo->pdInfo.pExprSup = &pOperator->exprSupp;
    pTableScanInfo->pdInfo.pAggSup = &pInfo->aggSup;
  }

H
Haojun Liao 已提交
3627 3628 3629 3630
  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3631 3632

  return pOperator;
L
Liu Jicong 已提交
3633
_error:
H
Haojun Liao 已提交
3634
  destroyAggOperatorInfo(pInfo, numOfCols);
wafwerar's avatar
wafwerar 已提交
3635
  taosMemoryFreeClear(pOperator);
H
Haojun Liao 已提交
3636 3637
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
3638 3639
}

3640
void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
3641 3642
  assert(pInfo != NULL);
  cleanupResultRowInfo(&pInfo->resultRowInfo);
H
Haojun Liao 已提交
3643
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
3644 3645
}

H
Haojun Liao 已提交
3646
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
3647
  SOptrBasicInfo* pInfo = (SOptrBasicInfo*)param;
3648
  cleanupBasicInfo(pInfo);
L
Liu Jicong 已提交
3649

D
dapan1121 已提交
3650
  taosMemoryFreeClear(param);
3651
}
H
Haojun Liao 已提交
3652

H
Haojun Liao 已提交
3653 3654 3655 3656 3657 3658 3659
static void freeItem(void* pItem) {
  void** p = pItem;
  if (*p != NULL) {
    taosMemoryFreeClear(*p);
  }
}

H
Haojun Liao 已提交
3660
void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
3661
  SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
L
Liu Jicong 已提交
3662 3663
  cleanupBasicInfo(&pInfo->binfo);

H
Haojun Liao 已提交
3664
  cleanupAggSup(&pInfo->aggSup);
S
shenglian zhou 已提交
3665
  cleanupExprSupp(&pInfo->scalarExprSup);
H
Haojun Liao 已提交
3666
  cleanupGroupResInfo(&pInfo->groupResInfo);
D
dapan1121 已提交
3667
  taosMemoryFreeClear(param);
3668
}
3669

3670
void destroyFillOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
3671
  SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param;
3672
  pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
H
Haojun Liao 已提交
3673
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
H
Haojun Liao 已提交
3674 3675 3676 3677 3678 3679 3680
  pInfo->pFinalRes = blockDataDestroy(pInfo->pFinalRes);

  if (pInfo->pNotFillExprInfo != NULL) {
    destroyExprInfo(pInfo->pNotFillExprInfo, pInfo->numOfNotFillExpr);
    taosMemoryFree(pInfo->pNotFillExprInfo);
  }

wafwerar's avatar
wafwerar 已提交
3681
  taosMemoryFreeClear(pInfo->p);
3682
  taosArrayDestroy(pInfo->pColMatchColInfo);
D
dapan1121 已提交
3683
  taosMemoryFreeClear(param);
3684 3685
}

H
Haojun Liao 已提交
3686
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
L
Liu Jicong 已提交
3687
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
3688 3689 3690
  taosRemoveRef(exchangeObjRefPool, pExInfo->self);
}

L
Liu Jicong 已提交
3691
void freeSourceDataInfo(void* p) {
3692 3693 3694 3695
  SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
  taosMemoryFreeClear(pInfo->pRsp);
}

3696
void doDestroyExchangeOperatorInfo(void* param) {
X
Xiaoyu Wang 已提交
3697
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
3698

H
Haojun Liao 已提交
3699
  taosArrayDestroy(pExInfo->pSources);
3700
  taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
3701 3702 3703 3704

  if (pExInfo->pResultBlockList != NULL) {
    taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock);
    pExInfo->pResultBlockList = NULL;
H
Haojun Liao 已提交
3705 3706
  }

3707
  blockDataDestroy(pExInfo->pDummyBlock);
L
Liu Jicong 已提交
3708

3709
  tsem_destroy(&pExInfo->ready);
D
dapan1121 已提交
3710
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
3711 3712
}

H
Haojun Liao 已提交
3713 3714 3715 3716
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SExprInfo* pNotFillExpr,
                            int32_t numOfNotFillCols, SNodeListNode* pValNode, STimeWindow win, int32_t capacity,
                            const char* id, SInterval* pInterval, int32_t fillType, int32_t order) {
  SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pValNode);
H
Haojun Liao 已提交
3717

S
slzhou 已提交
3718 3719 3720
  int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey;
  STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, startKey);
  w = getFirstQualifiedTimeWindow(startKey, &w, pInterval, order);
H
Haojun Liao 已提交
3721

L
Liu Jicong 已提交
3722 3723
  pInfo->pFillInfo = taosCreateFillInfo(w.skey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo,
                                        pInfo->primaryTsCol, order, id);
H
Haojun Liao 已提交
3724

S
slzhou 已提交
3725 3726 3727 3728 3729 3730 3731
  if (order == TSDB_ORDER_ASC) {
    pInfo->win.skey = win.skey;
    pInfo->win.ekey = win.ekey;
  } else {
    pInfo->win.skey = win.ekey;
    pInfo->win.ekey = win.skey;
  }
L
Liu Jicong 已提交
3732
  pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
3733

H
Haojun Liao 已提交
3734
  if (pInfo->pFillInfo == NULL || pInfo->p == NULL) {
H
Haojun Liao 已提交
3735 3736
    taosMemoryFree(pInfo->pFillInfo);
    taosMemoryFree(pInfo->p);
H
Haojun Liao 已提交
3737 3738 3739 3740 3741 3742
    return TSDB_CODE_OUT_OF_MEMORY;
  } else {
    return TSDB_CODE_SUCCESS;
  }
}

L
Liu Jicong 已提交
3743 3744
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode,
                                      SExecTaskInfo* pTaskInfo) {
3745 3746 3747 3748 3749 3750
  SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

L
Liu Jicong 已提交
3751
  SSDataBlock* pResBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc);
3752
  SExprInfo*   pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr);
H
Haojun Liao 已提交
3753
  pInfo->pNotFillExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pInfo->numOfNotFillExpr);
H
Haojun Liao 已提交
3754

L
Liu Jicong 已提交
3755
  SInterval* pInterval =
3756
      QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
L
Liu Jicong 已提交
3757 3758
          ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
          : &((SIntervalAggOperatorInfo*)downstream->info)->interval;
3759

3760
  int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
3761
  int32_t type = convertFillType(pPhyFillNode->mode);
3762

H
Haojun Liao 已提交
3763
  SResultInfo* pResultInfo = &pOperator->resultInfo;
3764
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
3765
  blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
3766
  initExprSupp(&pOperator->exprSupp, pExprInfo, pInfo->numOfExpr);
H
Haojun Liao 已提交
3767

H
Haojun Liao 已提交
3768 3769
  pInfo->primaryTsCol = ((STargetNode*)pPhyFillNode->pWStartTs)->slotId;
  pInfo->primarySrcSlotId = ((SColumnNode*)((STargetNode*)pPhyFillNode->pWStartTs)->pExpr)->slotId;
3770

3771
  int32_t numOfOutputCols = 0;
3772
  SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc,
3773
                                                 &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
3774

L
Liu Jicong 已提交
3775 3776 3777
  int32_t code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pInfo->pNotFillExprInfo, pInfo->numOfNotFillExpr,
                              (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity,
                              pTaskInfo->id.str, pInterval, type, order);
3778 3779 3780
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3781

3782
  pInfo->pRes = pResBlock;
H
Haojun Liao 已提交
3783 3784 3785
  pInfo->pFinalRes = createOneDataBlock(pResBlock, false);
  blockDataEnsureCapacity(pInfo->pFinalRes, pOperator->resultInfo.capacity);

3786 3787 3788 3789 3790 3791 3792
  pInfo->pCondition = pPhyFillNode->node.pConditions;
  pInfo->pColMatchColInfo = pColMatchColInfo;
  pOperator->name = "FillOperator";
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
  pOperator->exprSupp.pExprInfo = pExprInfo;
3793
  pOperator->exprSupp.numOfExprs = pInfo->numOfExpr;
3794 3795
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
3796

L
Liu Jicong 已提交
3797
  pOperator->fpSet =
3798
      createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroyFillOperatorInfo, NULL, NULL, NULL);
3799

3800
  code = appendDownstream(pOperator, &downstream, 1);
3801
  return pOperator;
H
Haojun Liao 已提交
3802

L
Liu Jicong 已提交
3803
_error:
wafwerar's avatar
wafwerar 已提交
3804 3805
  taosMemoryFreeClear(pOperator);
  taosMemoryFreeClear(pInfo);
H
Haojun Liao 已提交
3806
  return NULL;
3807 3808
}

D
dapan1121 已提交
3809
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model, char* dbFName) {
wafwerar's avatar
wafwerar 已提交
3810
  SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
3811
  setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
3812

3813
  pTaskInfo->schemaInfo.dbname = strdup(dbFName);
3814
  pTaskInfo->cost.created = taosGetTimestampMs();
H
Haojun Liao 已提交
3815
  pTaskInfo->id.queryId = queryId;
dengyihao's avatar
dengyihao 已提交
3816
  pTaskInfo->execModel = model;
H
Haojun Liao 已提交
3817

wafwerar's avatar
wafwerar 已提交
3818
  char* p = taosMemoryCalloc(1, 128);
L
Liu Jicong 已提交
3819
  snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId);
H
Haojun Liao 已提交
3820
  pTaskInfo->id.str = p;
H
Haojun Liao 已提交
3821

3822 3823
  return pTaskInfo;
}
H
Haojun Liao 已提交
3824

H
Haojun Liao 已提交
3825
static SArray* extractColumnInfo(SNodeList* pNodeList);
3826

H
Haojun Liao 已提交
3827 3828
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);

3829
int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) {
3830 3831
  SMetaReader mr = {0};
  metaReaderInit(&mr, pHandle->meta, 0);
3832
  int32_t code = metaGetTableEntryByUid(&mr, pScanNode->uid);
3833
  if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
3834 3835
    qError("failed to get the table meta, uid:0x%" PRIx64 ", suid:0x%" PRIx64 ", %s", pScanNode->uid, pScanNode->suid,
           GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
3836

D
dapan1121 已提交
3837
    metaReaderClear(&mr);
3838
    return terrno;
D
dapan1121 已提交
3839
  }
3840

3841 3842
  SSchemaInfo* pSchemaInfo = &pTaskInfo->schemaInfo;
  pSchemaInfo->tablename = strdup(mr.me.name);
3843 3844

  if (mr.me.type == TSDB_SUPER_TABLE) {
3845 3846
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
    pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
3847
  } else if (mr.me.type == TSDB_CHILD_TABLE) {
3848 3849
    tDecoderClear(&mr.coder);

3850 3851
    tb_uid_t suid = mr.me.ctbEntry.suid;
    metaGetTableEntryByUid(&mr, suid);
3852 3853
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
    pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
3854
  } else {
3855
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
3856
  }
3857 3858

  metaReaderClear(&mr);
3859

H
Haojun Liao 已提交
3860 3861 3862 3863 3864
  pSchemaInfo->qsw = extractQueriedColumnSchema(pScanNode);
  return TSDB_CODE_SUCCESS;
}

SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
3865 3866 3867
  int32_t numOfCols = LIST_LENGTH(pScanNode->pScanCols);
  int32_t numOfTags = LIST_LENGTH(pScanNode->pScanPseudoCols);

3868
  SSchemaWrapper* pqSw = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
3869
  pqSw->pSchema = taosMemoryCalloc(numOfCols + numOfTags, sizeof(SSchema));
3870

L
Liu Jicong 已提交
3871
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
3872
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanCols, i);
3873 3874
    SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

H
Haojun Liao 已提交
3875 3876 3877 3878 3879
    SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
    pSchema->colId = pColNode->colId;
    pSchema->type = pColNode->node.resType.type;
    pSchema->type = pColNode->node.resType.bytes;
    strncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
3880 3881
  }

3882
  // this the tags and pseudo function columns, we only keep the tag columns
3883
  for (int32_t i = 0; i < numOfTags; ++i) {
3884 3885 3886 3887 3888 3889 3890 3891 3892 3893 3894 3895 3896 3897
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanPseudoCols, i);

    int32_t type = nodeType(pNode->pExpr);
    if (type == QUERY_NODE_COLUMN) {
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

      SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
      pSchema->colId = pColNode->colId;
      pSchema->type = pColNode->node.resType.type;
      pSchema->type = pColNode->node.resType.bytes;
      strncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
    }
  }

H
Haojun Liao 已提交
3898
  return pqSw;
3899 3900
}

3901 3902
static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
  taosMemoryFreeClear(pSchemaInfo->dbname);
3903
  taosMemoryFreeClear(pSchemaInfo->tablename);
3904 3905
  tDeleteSSchemaWrapper(pSchemaInfo->sw);
  tDeleteSSchemaWrapper(pSchemaInfo->qsw);
3906 3907
}

wmmhello's avatar
wmmhello 已提交
3908
static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
wmmhello's avatar
wmmhello 已提交
3909
  taosArrayClear(pTableListInfo->pGroupList);
wmmhello's avatar
wmmhello 已提交
3910
  SArray* sortSupport = taosArrayInit(16, sizeof(uint64_t));
3911
  if (sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
3912 3913
  for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
3914
    uint64_t*      groupId = taosHashGet(pTableListInfo->map, &info->uid, sizeof(uint64_t));
wmmhello's avatar
wmmhello 已提交
3915 3916

    int32_t index = taosArraySearchIdx(sortSupport, groupId, compareUint64Val, TD_EQ);
3917 3918 3919 3920
    if (index == -1) {
      void*   p = taosArraySearch(sortSupport, groupId, compareUint64Val, TD_GT);
      SArray* tGroup = taosArrayInit(8, sizeof(STableKeyInfo));
      if (tGroup == NULL) {
wmmhello's avatar
wmmhello 已提交
3921 3922 3923
        taosArrayDestroy(sortSupport);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
3924
      if (taosArrayPush(tGroup, info) == NULL) {
wmmhello's avatar
wmmhello 已提交
3925 3926 3927 3928
        qError("taos push info array error");
        taosArrayDestroy(sortSupport);
        return TSDB_CODE_QRY_APP_ERROR;
      }
3929
      if (p == NULL) {
wmmhello's avatar
wmmhello 已提交
3930
        if (taosArrayPush(sortSupport, groupId) == NULL) {
wmmhello's avatar
wmmhello 已提交
3931 3932 3933 3934
          qError("taos push support array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
wmmhello's avatar
wmmhello 已提交
3935
        if (taosArrayPush(pTableListInfo->pGroupList, &tGroup) == NULL) {
wmmhello's avatar
wmmhello 已提交
3936 3937 3938 3939
          qError("taos push group array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
3940
      } else {
wmmhello's avatar
wmmhello 已提交
3941
        int32_t pos = TARRAY_ELEM_IDX(sortSupport, p);
3942
        if (taosArrayInsert(sortSupport, pos, groupId) == NULL) {
wmmhello's avatar
wmmhello 已提交
3943 3944 3945 3946
          qError("taos insert support array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
3947
        if (taosArrayInsert(pTableListInfo->pGroupList, pos, &tGroup) == NULL) {
wmmhello's avatar
wmmhello 已提交
3948 3949 3950 3951 3952
          qError("taos insert group array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
      }
3953
    } else {
wmmhello's avatar
wmmhello 已提交
3954
      SArray* tGroup = (SArray*)taosArrayGetP(pTableListInfo->pGroupList, index);
3955
      if (taosArrayPush(tGroup, info) == NULL) {
wmmhello's avatar
wmmhello 已提交
3956 3957 3958 3959 3960 3961 3962 3963 3964 3965
        qError("taos push uid array error");
        taosArrayDestroy(sortSupport);
        return TSDB_CODE_QRY_APP_ERROR;
      }
    }
  }
  taosArrayDestroy(sortSupport);
  return TDB_CODE_SUCCESS;
}

3966
bool groupbyTbname(SNodeList* pGroupList) {
3967
  bool bytbname = false;
H
Haojun Liao 已提交
3968
  if (LIST_LENGTH(pGroupList) > 0) {
3969 3970 3971 3972 3973 3974 3975 3976 3977 3978
    SNode* p = nodesListGetNode(pGroupList, 0);
    if (p->type == QUERY_NODE_FUNCTION) {
      // partition by tbname/group by tbname
      bytbname = (strcmp(((struct SFunctionNode*)p)->functionName, "tbname") == 0);
    }
  }

  return bytbname;
}

wmmhello's avatar
wmmhello 已提交
3979 3980
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group) {
  if (group == NULL) {
wmmhello's avatar
wmmhello 已提交
3981 3982 3983 3984 3985 3986 3987 3988
    return TDB_CODE_SUCCESS;
  }

  pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (pTableListInfo->map == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

3989 3990
  bool assignUid = groupbyTbname(group);

3991
  size_t  numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
3992

wmmhello's avatar
wmmhello 已提交
3993 3994 3995
  if(assignUid){
    for (int32_t i = 0; i < numOfTables; i++) {
      STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
3996
      info->groupId = info->uid;
wmmhello's avatar
wmmhello 已提交
3997 3998 3999 4000 4001 4002
      taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t));
    }
  }else{
    int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
wmmhello's avatar
wmmhello 已提交
4003 4004
    }
  }
4005

4006
  if (pTableListInfo->needSortTableByGroupId) {
wmmhello's avatar
wmmhello 已提交
4007
    return sortTableGroup(pTableListInfo);
4008 4009
  }

wmmhello's avatar
wmmhello 已提交
4010 4011 4012
  return TDB_CODE_SUCCESS;
}

4013 4014 4015 4016 4017 4018 4019 4020 4021 4022 4023 4024 4025 4026 4027 4028 4029
static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) {
  memset(pCond, 0, sizeof(SQueryTableDataCond));

  pCond->order = TSDB_ORDER_ASC;
  pCond->numOfCols = 1;
  pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
  if (pCond->colList == NULL) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return terrno;
  }

  pCond->colList->colId = 1;
  pCond->colList->type = TSDB_DATA_TYPE_TIMESTAMP;
  pCond->colList->bytes = sizeof(TSKEY);

  pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
  pCond->suid = uid;
4030
  pCond->type = TIMEWINDOW_RANGE_CONTAINED;
4031
  pCond->startVersion = -1;
L
Liu Jicong 已提交
4032
  pCond->endVersion = -1;
4033 4034 4035 4036

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
4037
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
L
Liu Jicong 已提交
4038 4039
                                  STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
                                  const char* pUser) {
4040 4041
  int32_t type = nodeType(pPhyNode);

X
Xiaoyu Wang 已提交
4042
  if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
4043
    SOperatorInfo* pOperator = NULL;
H
Haojun Liao 已提交
4044
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
dengyihao's avatar
dengyihao 已提交
4045
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
4046

L
Liu Jicong 已提交
4047 4048 4049
      int32_t code =
          createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
                                  pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
4050
      if (code) {
wmmhello's avatar
wmmhello 已提交
4051
        pTaskInfo->code = code;
D
dapan1121 已提交
4052 4053
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
4054

4055
      code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo);
S
slzhou 已提交
4056
      if (code) {
4057
        pTaskInfo->code = terrno;
wmmhello's avatar
wmmhello 已提交
4058 4059 4060
        return NULL;
      }

4061
      pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
4062 4063
      STableScanInfo* pScanInfo = pOperator->info;
      pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
S
slzhou 已提交
4064 4065
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
      STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
L
Liu Jicong 已提交
4066 4067 4068
      int32_t                   code =
          createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
                                  pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
L
Liu Jicong 已提交
4069
      if (code) {
wmmhello's avatar
wmmhello 已提交
4070
        pTaskInfo->code = code;
wmmhello's avatar
wmmhello 已提交
4071 4072
        return NULL;
      }
4073

4074
      code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo);
wmmhello's avatar
wmmhello 已提交
4075 4076 4077 4078
      if (code) {
        pTaskInfo->code = terrno;
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
4079

4080
      pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo);
wmmhello's avatar
wmmhello 已提交
4081

4082 4083
      STableScanInfo* pScanInfo = pOperator->info;
      pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
H
Haojun Liao 已提交
4084
    } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
4085
      pOperator = createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
4086
    } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
5
54liuyao 已提交
4087
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
5
54liuyao 已提交
4088
      if (pHandle->vnode) {
L
Liu Jicong 已提交
4089 4090 4091
        int32_t code =
            createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
                                    pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
L
Liu Jicong 已提交
4092
        if (code) {
wmmhello's avatar
wmmhello 已提交
4093 4094 4095
          pTaskInfo->code = code;
          return NULL;
        }
L
Liu Jicong 已提交
4096 4097 4098 4099 4100

#ifndef NDEBUG
        int32_t sz = taosArrayGetSize(pTableListInfo->pTableList);
        for (int32_t i = 0; i < sz; i++) {
          STableKeyInfo* pKeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
S
Shengliang Guan 已提交
4101
          qDebug("creating stream task: add table %" PRId64, pKeyInfo->uid);
L
Liu Jicong 已提交
4102
        }
5
54liuyao 已提交
4103
      }
L
Liu Jicong 已提交
4104
#endif
4105

H
Haojun Liao 已提交
4106
      pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan);
4107
      pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo);
H
Haojun Liao 已提交
4108
    } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
L
Liu Jicong 已提交
4109
      SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
4110
      pOperator = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo);
4111
    } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
X
Xiaoyu Wang 已提交
4112
      STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
4113
      int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanPhyNode, pTagCond, pTagIndexCond, pTableListInfo);
4114
      if (code != TSDB_CODE_SUCCESS) {
4115
        pTaskInfo->code = terrno;
4116 4117 4118
        return NULL;
      }

4119
      pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo);
4120
    } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
4121
      SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
4122 4123 4124
      pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));

      if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
4125
        int32_t code = vnodeGetAllTableList(pHandle->vnode, pBlockNode->uid, pTableListInfo->pTableList);
4126 4127 4128 4129 4130
        if (code != TSDB_CODE_SUCCESS) {
          pTaskInfo->code = terrno;
          return NULL;
        }
      } else {  // Create one table group.
4131
        STableKeyInfo info = {.uid = pBlockNode->uid, .groupId = 0};
4132 4133 4134 4135
        taosArrayPush(pTableListInfo->pTableList, &info);
      }

      SQueryTableDataCond cond = {0};
L
Liu Jicong 已提交
4136
      int32_t             code = initTableblockDistQueryCond(pBlockNode->suid, &cond);
4137 4138
      if (code != TSDB_CODE_SUCCESS) {
        return NULL;
4139
      }
H
Haojun Liao 已提交
4140 4141 4142

      STsdbReader* pReader = NULL;
      tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, "");
4143 4144
      cleanupQueryTableDataCond(&cond);

4145
      pOperator = createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo);
H
Haojun Liao 已提交
4146 4147 4148
    } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
      SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;

L
Liu Jicong 已提交
4149 4150
      int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo,
                                             pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
4151 4152 4153 4154
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
      }
4155

4156
      code = extractTableSchemaInfo(pHandle, &pScanNode->scan, pTaskInfo);
4157 4158 4159
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
H
Haojun Liao 已提交
4160 4161
      }

4162
      pOperator = createLastrowScanOperator(pScanNode, pHandle, pTaskInfo);
4163
    } else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
4164
      pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
4165 4166
    } else {
      ASSERT(0);
H
Haojun Liao 已提交
4167
    }
4168 4169 4170 4171 4172

    if (pOperator != NULL) {
      pOperator->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
    }

4173
    return pOperator;
H
Haojun Liao 已提交
4174 4175
  }

4176 4177
  int32_t num = 0;
  size_t  size = LIST_LENGTH(pPhyNode->pChildren);
H
Haojun Liao 已提交
4178

4179
  SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
dengyihao's avatar
dengyihao 已提交
4180
  for (int32_t i = 0; i < size; ++i) {
4181
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
4182
    ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pUser);
4183
    if (ops[i] == NULL) {
H
Haojun Liao 已提交
4184
      taosMemoryFree(ops);
4185
      return NULL;
4186 4187
    } else {
      ops[i]->resultDataBlockId = pChildNode->pOutputDataBlockDesc->dataBlockId;
4188
    }
4189
  }
H
Haojun Liao 已提交
4190

4191
  SOperatorInfo* pOptr = NULL;
H
Haojun Liao 已提交
4192
  if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
4193
    pOptr = createProjectOperatorInfo(ops[0], (SProjectPhysiNode*)pPhyNode, pTaskInfo);
4194
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) {
H
Haojun Liao 已提交
4195 4196
    SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
    SExprInfo*     pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
4197
    SSDataBlock*   pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
H
Haojun Liao 已提交
4198

dengyihao's avatar
dengyihao 已提交
4199
    int32_t    numOfScalarExpr = 0;
4200 4201 4202 4203 4204
    SExprInfo* pScalarExprInfo = NULL;
    if (pAggNode->pExprs != NULL) {
      pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
    }

H
Haojun Liao 已提交
4205 4206
    if (pAggNode->pGroupKeys != NULL) {
      SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys);
dengyihao's avatar
dengyihao 已提交
4207
      pOptr = createGroupOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions,
wmmhello's avatar
wmmhello 已提交
4208
                                      pScalarExprInfo, numOfScalarExpr, pTaskInfo);
H
Haojun Liao 已提交
4209
    } else {
L
Liu Jicong 已提交
4210
      pOptr = createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pAggNode->node.pConditions,
4211
                                          pScalarExprInfo, numOfScalarExpr, pAggNode->mergeDataBlock, pTaskInfo);
H
Haojun Liao 已提交
4212
    }
X
Xiaoyu Wang 已提交
4213
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type || QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
H
Haojun Liao 已提交
4214
    SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
4215

H
Haojun Liao 已提交
4216
    SExprInfo*   pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
4217
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
H
Haojun Liao 已提交
4218

dengyihao's avatar
dengyihao 已提交
4219 4220 4221 4222 4223 4224
    SInterval interval = {.interval = pIntervalPhyNode->interval,
                          .sliding = pIntervalPhyNode->sliding,
                          .intervalUnit = pIntervalPhyNode->intervalUnit,
                          .slidingUnit = pIntervalPhyNode->slidingUnit,
                          .offset = pIntervalPhyNode->offset,
                          .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
H
Haojun Liao 已提交
4225

X
Xiaoyu Wang 已提交
4226 4227 4228 4229 4230
    STimeWindowAggSupp as = {
        .waterMark = pIntervalPhyNode->window.watermark,
        .calTrigger = pIntervalPhyNode->window.triggerType,
        .maxTs = INT64_MIN,
    };
4231
    ASSERT(as.calTrigger != STREAM_TRIGGER_MAX_DELAY);
4232

4233
    int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
L
Liu Jicong 已提交
4234
    bool    isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
4235 4236
    pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pIntervalPhyNode,
                                       pTaskInfo, isStream);
4237

4238 4239
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
    SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode;
S
shenglian zhou 已提交
4240 4241 4242 4243 4244 4245 4246 4247 4248 4249

    SExprInfo*   pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);

    SInterval interval = {.interval = pIntervalPhyNode->interval,
                          .sliding = pIntervalPhyNode->sliding,
                          .intervalUnit = pIntervalPhyNode->intervalUnit,
                          .slidingUnit = pIntervalPhyNode->slidingUnit,
                          .offset = pIntervalPhyNode->offset,
                          .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
4250

S
shenglian zhou 已提交
4251
    int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
L
Liu Jicong 已提交
4252
    pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId,
dengyihao's avatar
dengyihao 已提交
4253 4254
                                                   pPhyNode->pConditions, pIntervalPhyNode->window.mergeDataBlock,
                                                   pTaskInfo);
S
shenglian zhou 已提交
4255
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
X
Xiaoyu Wang 已提交
4256
    SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
S
shenglian zhou 已提交
4257 4258 4259 4260 4261 4262 4263 4264 4265 4266

    SExprInfo*   pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);

    SInterval interval = {.interval = pIntervalPhyNode->interval,
                          .sliding = pIntervalPhyNode->sliding,
                          .intervalUnit = pIntervalPhyNode->intervalUnit,
                          .slidingUnit = pIntervalPhyNode->slidingUnit,
                          .offset = pIntervalPhyNode->offset,
                          .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
4267

S
shenglian zhou 已提交
4268
    int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
dengyihao's avatar
dengyihao 已提交
4269 4270
    pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId,
                                            pIntervalPhyNode->window.mergeDataBlock, pTaskInfo);
5
54liuyao 已提交
4271
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
4272
    int32_t children = 0;
5
54liuyao 已提交
4273 4274
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) {
5
54liuyao 已提交
4275
    int32_t children = pHandle->numOfVgroups;
5
54liuyao 已提交
4276
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
4277
  } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
4278
    pOptr = createSortOperatorInfo(ops[0], (SSortPhysiNode*)pPhyNode, pTaskInfo);
S
shenglian zhou 已提交
4279 4280
  } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT == type) {
    pOptr = createGroupSortOperatorInfo(ops[0], (SGroupSortPhysiNode*)pPhyNode, pTaskInfo);
X
Xiaoyu Wang 已提交
4281
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) {
4282
    SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode;
4283
    pOptr = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo);
4284
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
H
Haojun Liao 已提交
4285
    SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
4286
    pOptr = createSessionAggOperatorInfo(ops[0], pSessionNode, pTaskInfo);
4287
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
4288 4289 4290 4291 4292
    pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
    int32_t children = 0;
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION == type) {
4293
    int32_t children = pHandle->numOfVgroups;
4294
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
4295
  } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
4296
    pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
4297
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
dengyihao's avatar
dengyihao 已提交
4298
    SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
4299

4300 4301
    STimeWindowAggSupp as = {.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};

dengyihao's avatar
dengyihao 已提交
4302
    SExprInfo*   pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
4303
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
4304 4305
    int32_t      tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;

4306
    SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
X
Xiaoyu Wang 已提交
4307
    SColumn      col = extractColumnFromColumnNode(pColNode);
L
Liu Jicong 已提交
4308 4309
    pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pPhyNode->pConditions,
                                          pTaskInfo);
4310
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
5
54liuyao 已提交
4311
    pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
4312
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
4313
    pOptr = createMergeJoinOperatorInfo(ops, size, (SSortMergeJoinPhysiNode*)pPhyNode, pTaskInfo);
4314
  } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
H
Haojun Liao 已提交
4315
    pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
4316 4317
  } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
    pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo);
4318 4319
  } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
    pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
4320 4321
  } else {
    ASSERT(0);
H
Haojun Liao 已提交
4322
  }
4323
  taosMemoryFree(ops);
dengyihao's avatar
dengyihao 已提交
4324
  if (pOptr) pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
4325
  return pOptr;
4326
}
H
Haojun Liao 已提交
4327

H
Haojun Liao 已提交
4328
SArray* extractColumnInfo(SNodeList* pNodeList) {
L
Liu Jicong 已提交
4329
  size_t  numOfCols = LIST_LENGTH(pNodeList);
H
Haojun Liao 已提交
4330 4331 4332 4333 4334 4335
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
  if (pList == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

L
Liu Jicong 已提交
4336 4337
  for (int32_t i = 0; i < numOfCols; ++i) {
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
H
Haojun Liao 已提交
4338

4339 4340 4341
    if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

4342
      SColumn c = extractColumnFromColumnNode(pColNode);
4343 4344
      taosArrayPush(pList, &c);
    } else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) {
L
Liu Jicong 已提交
4345 4346
      SValueNode* pValNode = (SValueNode*)pNode->pExpr;
      SColumn     c = {0};
4347
      c.slotId = pNode->slotId;
L
Liu Jicong 已提交
4348 4349 4350 4351
      c.colId = pNode->slotId;
      c.type = pValNode->node.type;
      c.bytes = pValNode->node.resType.bytes;
      c.scale = pValNode->node.resType.scale;
4352 4353 4354 4355
      c.precision = pValNode->node.resType.precision;

      taosArrayPush(pList, &c);
    }
H
Haojun Liao 已提交
4356 4357 4358 4359 4360
  }

  return pList;
}

4361
#if 0
L
Liu Jicong 已提交
4362 4363
STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
                                STableListInfo* pTableListInfo, const char* idstr) {
4364
  int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo);
wmmhello's avatar
wmmhello 已提交
4365 4366 4367 4368 4369 4370
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
    code = 0;
H
Haojun Liao 已提交
4371
    qDebug("no table qualified for query, %s", idstr);
wmmhello's avatar
wmmhello 已提交
4372 4373 4374
    goto _error;
  }

4375
  SQueryTableDataCond cond = {0};
wmmhello's avatar
wmmhello 已提交
4376
  code = initQueryTableDataCond(&cond, pTableScanNode);
4377
  if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
4378
    goto _error;
X
Xiaoyu Wang 已提交
4379
  }
4380

H
Hongze Cheng 已提交
4381
  STsdbReader* pReader;
H
Haojun Liao 已提交
4382
  code = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, idstr);
H
Haojun Liao 已提交
4383 4384 4385 4386
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

4387
  cleanupQueryTableDataCond(&cond);
H
Haojun Liao 已提交
4388 4389

  return pReader;
wmmhello's avatar
wmmhello 已提交
4390 4391 4392 4393

_error:
  terrno = code;
  return NULL;
H
Haojun Liao 已提交
4394
}
4395
#endif
H
Haojun Liao 已提交
4396

L
Liu Jicong 已提交
4397 4398 4399 4400 4401 4402 4403 4404 4405 4406 4407 4408 4409
static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator");
      return TSDB_CODE_QRY_APP_ERROR;
    }

    if (pOperator->numOfDownstream > 1) {
      qError("join not supported for stream block scan");
      return TSDB_CODE_QRY_APP_ERROR;
    }
    return extractTbscanInStreamOpTree(pOperator->pDownstream[0], ppInfo);
  } else {
4410 4411 4412
    SStreamScanInfo* pInfo = pOperator->info;
    ASSERT(pInfo->pTableScanOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
    *ppInfo = pInfo->pTableScanOp->info;
L
Liu Jicong 已提交
4413 4414 4415 4416
    return 0;
  }
}

4417 4418 4419 4420 4421 4422 4423 4424 4425 4426 4427 4428 4429 4430 4431 4432 4433 4434 4435 4436 4437 4438
int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) {
  if (pNode->pChildren == NULL || LIST_LENGTH(pNode->pChildren) == 0) {
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == pNode->type) {
      *ppNode = (STableScanPhysiNode*)pNode;
      return 0;
    } else {
      ASSERT(0);
      terrno = TSDB_CODE_QRY_APP_ERROR;
      return -1;
    }
  } else {
    if (LIST_LENGTH(pNode->pChildren) != 1) {
      ASSERT(0);
      terrno = TSDB_CODE_QRY_APP_ERROR;
      return -1;
    }
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pNode->pChildren, 0);
    return extractTableScanNode(pChildNode, ppNode);
  }
  return -1;
}

4439
#if 0
L
Liu Jicong 已提交
4440 4441 4442 4443 4444
int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHandle, int64_t uid, int64_t ts) {
  STableScanInfo* pTableScanInfo = NULL;
  if (extractTbscanInStreamOpTree(pOperator, &pTableScanInfo) < 0) {
    return -1;
  }
4445

L
Liu Jicong 已提交
4446 4447 4448 4449
  STableScanPhysiNode* pNode = NULL;
  if (extractTableScanNode(plan->pNode, &pNode) < 0) {
    ASSERT(0);
  }
4450

H
Haojun Liao 已提交
4451
  tsdbReaderClose(pTableScanInfo->dataReader);
4452

L
Liu Jicong 已提交
4453
  STableListInfo info = {0};
H
Haojun Liao 已提交
4454
  pTableScanInfo->dataReader = doCreateDataReader(pNode, pHandle, &info, NULL);
L
Liu Jicong 已提交
4455 4456 4457 4458
  if (pTableScanInfo->dataReader == NULL) {
    ASSERT(0);
    qError("failed to create data reader");
    return TSDB_CODE_QRY_APP_ERROR;
4459
  }
L
Liu Jicong 已提交
4460
  // TODO: set uid and ts to data reader
4461 4462
  return 0;
}
4463
#endif
4464

C
Cary Xu 已提交
4465
int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length, int32_t* nOptrWithVal) {
wmmhello's avatar
wmmhello 已提交
4466
  int32_t code = TDB_CODE_SUCCESS;
4467
  char*   pCurrent = NULL;
wmmhello's avatar
wmmhello 已提交
4468
  int32_t currLength = 0;
4469
  if (ops->fpSet.encodeResultRow) {
C
Cary Xu 已提交
4470
    if (result == NULL || length == NULL || nOptrWithVal == NULL) {
wmmhello's avatar
wmmhello 已提交
4471 4472 4473
      return TSDB_CODE_TSC_INVALID_INPUT;
    }
    code = ops->fpSet.encodeResultRow(ops, &pCurrent, &currLength);
wmmhello's avatar
wmmhello 已提交
4474

4475 4476
    if (code != TDB_CODE_SUCCESS) {
      if (*result != NULL) {
wmmhello's avatar
wmmhello 已提交
4477 4478 4479 4480
        taosMemoryFree(*result);
        *result = NULL;
      }
      return code;
C
Cary Xu 已提交
4481 4482 4483
    } else if (currLength == 0) {
      ASSERT(!pCurrent);
      goto _downstream;
wmmhello's avatar
wmmhello 已提交
4484
    }
wmmhello's avatar
wmmhello 已提交
4485

C
Cary Xu 已提交
4486 4487
    ++(*nOptrWithVal);

C
Cary Xu 已提交
4488
    ASSERT(currLength >= 0);
wmmhello's avatar
wmmhello 已提交
4489

4490
    if (*result == NULL) {
wmmhello's avatar
wmmhello 已提交
4491
      *result = (char*)taosMemoryCalloc(1, currLength + sizeof(int32_t));
wmmhello's avatar
wmmhello 已提交
4492 4493 4494 4495 4496 4497
      if (*result == NULL) {
        taosMemoryFree(pCurrent);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      memcpy(*result + sizeof(int32_t), pCurrent, currLength);
      *(int32_t*)(*result) = currLength + sizeof(int32_t);
4498
    } else {
wmmhello's avatar
wmmhello 已提交
4499
      int32_t sizePre = *(int32_t*)(*result);
4500
      char*   tmp = (char*)taosMemoryRealloc(*result, sizePre + currLength);
wmmhello's avatar
wmmhello 已提交
4501 4502 4503 4504 4505 4506 4507 4508 4509 4510 4511 4512
      if (tmp == NULL) {
        taosMemoryFree(pCurrent);
        taosMemoryFree(*result);
        *result = NULL;
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      *result = tmp;
      memcpy(*result + sizePre, pCurrent, currLength);
      *(int32_t*)(*result) += currLength;
    }
    taosMemoryFree(pCurrent);
    *length = *(int32_t*)(*result);
wmmhello's avatar
wmmhello 已提交
4513 4514
  }

C
Cary Xu 已提交
4515
_downstream:
wmmhello's avatar
wmmhello 已提交
4516
  for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
C
Cary Xu 已提交
4517
    code = encodeOperator(ops->pDownstream[i], result, length, nOptrWithVal);
4518
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
4519
      return code;
wmmhello's avatar
wmmhello 已提交
4520 4521
    }
  }
wmmhello's avatar
wmmhello 已提交
4522
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
4523 4524
}

H
Haojun Liao 已提交
4525
int32_t decodeOperator(SOperatorInfo* ops, const char* result, int32_t length) {
wmmhello's avatar
wmmhello 已提交
4526
  int32_t code = TDB_CODE_SUCCESS;
4527 4528
  if (ops->fpSet.decodeResultRow) {
    if (result == NULL) {
wmmhello's avatar
wmmhello 已提交
4529 4530
      return TSDB_CODE_TSC_INVALID_INPUT;
    }
H
Haojun Liao 已提交
4531

4532
    ASSERT(length == *(int32_t*)result);
H
Haojun Liao 已提交
4533 4534

    const char* data = result + sizeof(int32_t);
L
Liu Jicong 已提交
4535
    code = ops->fpSet.decodeResultRow(ops, (char*)data);
4536
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
4537 4538
      return code;
    }
wmmhello's avatar
wmmhello 已提交
4539

wmmhello's avatar
wmmhello 已提交
4540
    int32_t totalLength = *(int32_t*)result;
4541 4542
    int32_t dataLength = *(int32_t*)data;

4543
    if (totalLength == dataLength + sizeof(int32_t)) {  // the last data
wmmhello's avatar
wmmhello 已提交
4544 4545
      result = NULL;
      length = 0;
4546
    } else {
wmmhello's avatar
wmmhello 已提交
4547 4548 4549 4550
      result += dataLength;
      *(int32_t*)(result) = totalLength - dataLength;
      length = totalLength - dataLength;
    }
wmmhello's avatar
wmmhello 已提交
4551 4552
  }

wmmhello's avatar
wmmhello 已提交
4553 4554
  for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
    code = decodeOperator(ops->pDownstream[i], result, length);
4555
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
4556
      return code;
wmmhello's avatar
wmmhello 已提交
4557 4558
    }
  }
wmmhello's avatar
wmmhello 已提交
4559
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
4560 4561
}

D
dapan1121 已提交
4562
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle) {
D
dapan1121 已提交
4563
  SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo;
4564

D
dapan1121 已提交
4565
  switch (pNode->type) {
D
dapan1121 已提交
4566 4567 4568 4569 4570 4571
    case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
      SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
      if (NULL == pInserterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      pInserterParam->readHandle = readHandle;
L
Liu Jicong 已提交
4572

D
dapan1121 已提交
4573 4574 4575
      *pParam = pInserterParam;
      break;
    }
D
dapan1121 已提交
4576
    case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
4577
      SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
D
dapan1121 已提交
4578 4579 4580 4581
      if (NULL == pDeleterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      int32_t tbNum = taosArrayGetSize(pTask->tableqinfoList.pTableList);
D
dapan1121 已提交
4582
      pDeleterParam->suid = pTask->tableqinfoList.suid;
D
dapan1121 已提交
4583 4584 4585 4586 4587 4588
      pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t));
      if (NULL == pDeleterParam->pUidList) {
        taosMemoryFree(pDeleterParam);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      for (int32_t i = 0; i < tbNum; ++i) {
4589
        STableKeyInfo* pTable = taosArrayGet(pTask->tableqinfoList.pTableList, i);
D
dapan1121 已提交
4590 4591 4592 4593 4594 4595 4596 4597 4598 4599 4600 4601 4602
        taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
      }

      *pParam = pDeleterParam;
      break;
    }
    default:
      break;
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
4603
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
D
dapan1121 已提交
4604
                               char* sql, EOPTR_EXEC_MODEL model) {
H
Haojun Liao 已提交
4605 4606
  uint64_t queryId = pPlan->id.queryId;

H
Haojun Liao 已提交
4607
  int32_t code = TSDB_CODE_SUCCESS;
D
dapan1121 已提交
4608
  *pTaskInfo = createExecTaskInfo(queryId, taskId, model, pPlan->dbFName);
H
Haojun Liao 已提交
4609 4610 4611 4612
  if (*pTaskInfo == NULL) {
    code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    goto _complete;
  }
H
Haojun Liao 已提交
4613

4614
  (*pTaskInfo)->sql = sql;
D
dapan1121 已提交
4615
  sql = NULL;
4616
  (*pTaskInfo)->pSubplan = pPlan;
L
Liu Jicong 已提交
4617 4618
  (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, &(*pTaskInfo)->tableqinfoList,
                                           pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user);
L
Liu Jicong 已提交
4619

D
dapan1121 已提交
4620
  if (NULL == (*pTaskInfo)->pRoot) {
4621
    code = (*pTaskInfo)->code;
D
dapan1121 已提交
4622
    goto _complete;
4623 4624
  }

H
Haojun Liao 已提交
4625 4626
  return code;

H
Haojun Liao 已提交
4627
_complete:
D
dapan1121 已提交
4628
  taosMemoryFree(sql);
H
Haojun Liao 已提交
4629
  doDestroyTask(*pTaskInfo);
H
Haojun Liao 已提交
4630 4631
  terrno = code;
  return code;
H
Haojun Liao 已提交
4632 4633
}

wmmhello's avatar
wmmhello 已提交
4634 4635 4636
static void doDestroyTableList(STableListInfo* pTableqinfoList) {
  taosArrayDestroy(pTableqinfoList->pTableList);
  taosHashCleanup(pTableqinfoList->map);
4637 4638
  if (pTableqinfoList->needSortTableByGroupId) {
    for (int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++) {
wmmhello's avatar
wmmhello 已提交
4639
      SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i);
4640 4641 4642
      if (tmp == pTableqinfoList->pTableList) {
        continue;
      }
wmmhello's avatar
wmmhello 已提交
4643 4644 4645 4646
      taosArrayDestroy(tmp);
    }
  }
  taosArrayDestroy(pTableqinfoList->pGroupList);
4647

wmmhello's avatar
wmmhello 已提交
4648 4649
  pTableqinfoList->pTableList = NULL;
  pTableqinfoList->map = NULL;
4650 4651
}

L
Liu Jicong 已提交
4652
void doDestroyTask(SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
4653 4654
  qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));

wmmhello's avatar
wmmhello 已提交
4655
  doDestroyTableList(&pTaskInfo->tableqinfoList);
H
Haojun Liao 已提交
4656
  destroyOperatorInfo(pTaskInfo->pRoot);
4657 4658 4659
  cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);

  nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
4660

wafwerar's avatar
wafwerar 已提交
4661 4662 4663
  taosMemoryFreeClear(pTaskInfo->sql);
  taosMemoryFreeClear(pTaskInfo->id.str);
  taosMemoryFreeClear(pTaskInfo);
4664 4665 4666 4667 4668 4669 4670 4671 4672 4673 4674 4675
}

static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes) {
  if (val == NULL) {
    setNull(output, type, bytes);
    return;
  }

  if (IS_VAR_DATA_TYPE(type)) {
    // Binary data overflows for sort of unknown reasons. Let trim the overflow data
    if (varDataTLen(val) > bytes) {
      int32_t maxLen = bytes - VARSTR_HEADER_SIZE;
L
Liu Jicong 已提交
4676
      int32_t len = (varDataLen(val) > maxLen) ? maxLen : varDataLen(val);
4677 4678 4679 4680 4681 4682 4683 4684 4685 4686 4687 4688
      memcpy(varDataVal(output), varDataVal(val), len);
      varDataSetLen(output, len);
    } else {
      varDataCopy(output, val);
    }
  } else {
    memcpy(output, val, bytes);
  }
}

static int64_t getQuerySupportBufSize(size_t numOfTables) {
  size_t s1 = sizeof(STableQueryInfo);
L
Liu Jicong 已提交
4689 4690
  //  size_t s3 = sizeof(STableCheckInfo);  buffer consumption in tsdb
  return (int64_t)(s1 * 1.5 * numOfTables);
4691 4692 4693 4694 4695 4696 4697
}

int32_t checkForQueryBuf(size_t numOfTables) {
  int64_t t = getQuerySupportBufSize(numOfTables);
  if (tsQueryBufferSizeBytes < 0) {
    return TSDB_CODE_SUCCESS;
  } else if (tsQueryBufferSizeBytes > 0) {
L
Liu Jicong 已提交
4698
    while (1) {
4699 4700 4701 4702 4703 4704 4705 4706 4707 4708 4709 4710 4711 4712 4713 4714 4715 4716 4717 4718 4719 4720 4721 4722 4723 4724
      int64_t s = tsQueryBufferSizeBytes;
      int64_t remain = s - t;
      if (remain >= 0) {
        if (atomic_val_compare_exchange_64(&tsQueryBufferSizeBytes, s, remain) == s) {
          return TSDB_CODE_SUCCESS;
        }
      } else {
        return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
      }
    }
  }

  // disable query processing if the value of tsQueryBufferSize is zero.
  return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
}

void releaseQueryBuf(size_t numOfTables) {
  if (tsQueryBufferSizeBytes < 0) {
    return;
  }

  int64_t t = getQuerySupportBufSize(numOfTables);

  // restore value is not enough buffer available
  atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
}
D
dapan1121 已提交
4725

H
Haojun Liao 已提交
4726
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) {
4727
  SExplainExecInfo  execInfo = {0};
H
Haojun Liao 已提交
4728
  SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo);
4729

H
Haojun Liao 已提交
4730 4731 4732 4733 4734
  pExplainInfo->numOfRows = operatorInfo->resultInfo.totalRows;
  pExplainInfo->startupCost = operatorInfo->cost.openCost;
  pExplainInfo->totalCost = operatorInfo->cost.totalCost;
  pExplainInfo->verboseLen = 0;
  pExplainInfo->verboseInfo = NULL;
D
dapan1121 已提交
4735

4736
  if (operatorInfo->fpSet.getExplainFn) {
4737 4738
    int32_t code =
        operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen);
D
dapan1121 已提交
4739
    if (code) {
4740
      qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
D
dapan1121 已提交
4741 4742 4743
      return code;
    }
  }
dengyihao's avatar
dengyihao 已提交
4744

D
dapan1121 已提交
4745
  int32_t code = 0;
D
dapan1121 已提交
4746
  for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
H
Haojun Liao 已提交
4747 4748
    code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pExecInfoList);
    if (code != TSDB_CODE_SUCCESS) {
4749
      //      taosMemoryFreeClear(*pRes);
D
dapan1121 已提交
4750 4751 4752 4753 4754
      return TSDB_CODE_QRY_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
4755
}
5
54liuyao 已提交
4756

L
Liu Jicong 已提交
4757
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput,
4758
                               int32_t size) {
4759
  pSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
5
54liuyao 已提交
4760 4761
  pSup->keySize = sizeof(int64_t) + sizeof(TSKEY);
  pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize);
4762 4763
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  pSup->pResultRows = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
5
54liuyao 已提交
4764 4765 4766
  if (pSup->pKeyBuf == NULL || pSup->pResultRows == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
4767
  pSup->valueSize = size;
5
54liuyao 已提交
4768

4769
  pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR);
5
54liuyao 已提交
4770 4771 4772 4773 4774 4775 4776 4777 4778
  int32_t pageSize = 4096;
  while (pageSize < pSup->resultRowSize * 4) {
    pageSize <<= 1u;
  }
  // at least four pages need to be in buffer
  int32_t bufSize = 4096 * 256;
  if (bufSize <= pageSize) {
    bufSize = pageSize * 4;
  }
wafwerar's avatar
wafwerar 已提交
4779 4780 4781 4782 4783 4784
  if (!osTempSpaceAvailable()) {
    terrno = TSDB_CODE_NO_AVAIL_DISK;
    qError("Init stream agg supporter failed since %s", terrstr(terrno));
    return terrno;
  }
  int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, tsTempDir);
L
Liu Jicong 已提交
4785
  for (int32_t i = 0; i < numOfOutput; ++i) {
4786 4787 4788
    pCtx[i].pBuf = pSup->pResultBuf;
  }
  return code;
5
54liuyao 已提交
4789
}