executorimpl.c 156.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

H
Haojun Liao 已提交
16 17
#include "filter.h"
#include "function.h"
18 19
#include "functionMgt.h"
#include "os.h"
H
Haojun Liao 已提交
20
#include "querynodes.h"
21
#include "tfill.h"
dengyihao's avatar
dengyihao 已提交
22
#include "tname.h"
X
Xiaoyu Wang 已提交
23
#include "tref.h"
24

H
Haojun Liao 已提交
25
#include "tdatablock.h"
26
#include "tglobal.h"
H
Haojun Liao 已提交
27
#include "tmsg.h"
H
Haojun Liao 已提交
28
#include "tsort.h"
29
#include "ttime.h"
H
Haojun Liao 已提交
30

31
#include "executorimpl.h"
dengyihao's avatar
dengyihao 已提交
32
#include "index.h"
33
#include "query.h"
34 35
#include "tcompare.h"
#include "tcompression.h"
H
Haojun Liao 已提交
36
#include "thash.h"
37
#include "ttypes.h"
dengyihao's avatar
dengyihao 已提交
38
#include "vnode.h"
39

H
Haojun Liao 已提交
40
#define IS_MAIN_SCAN(runtime)          ((runtime)->scanFlag == MAIN_SCAN)
41 42 43 44 45 46
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)

#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)

#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
wafwerar's avatar
wafwerar 已提交
47
  uint32_t v = taosRand();
48 49 50 51

  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
52
    return taosMemoryMalloc(__size);
53 54 55 56
  }
}

static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
wafwerar's avatar
wafwerar 已提交
57
  uint32_t v = taosRand();
58 59 60
  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
61
    return taosMemoryCalloc(num, __size);
62 63 64 65
  }
}

static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
wafwerar's avatar
wafwerar 已提交
66
  uint32_t v = taosRand();
67 68 69
  if (v % 5 <= 1) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
70
    return taosMemoryRealloc(p, __size);
71 72 73 74 75 76 77 78
  }
}

#define calloc  u_calloc
#define malloc  u_malloc
#define realloc u_realloc
#endif

X
Xiaoyu Wang 已提交
79
#define CLEAR_QUERY_STATUS(q, st)   ((q)->status &= (~(st)))
80 81
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)

L
Liu Jicong 已提交
82 83
int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; }

84
static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock);
85

X
Xiaoyu Wang 已提交
86
static void releaseQueryBuf(size_t numOfTables);
87

88 89 90 91
static void destroyFillOperatorInfo(void* param);
static void destroyProjectOperatorInfo(void* param);
static void destroyOrderOperatorInfo(void* param);
static void destroyAggOperatorInfo(void* param);
X
Xiaoyu Wang 已提交
92

93 94
static void destroyIntervalOperatorInfo(void* param);
static void destroyExchangeOperatorInfo(void* param);
H
Haojun Liao 已提交
95

96 97
static void destroyOperatorInfo(SOperatorInfo* pOperator);

98
void doSetOperatorCompleted(SOperatorInfo* pOperator) {
99
  pOperator->status = OP_EXEC_DONE;
H
Haojun Liao 已提交
100
  ASSERT(pOperator->pTaskInfo != NULL);
101

102
  pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000) / 1000.0;
H
Haojun Liao 已提交
103
  setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
104
}
105

H
Haojun Liao 已提交
106
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) {
107
  OPTR_SET_OPENED(pOperator);
108
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
109
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
110 111
}

112
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
L
Liu Jicong 已提交
113
                                   __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode,
114
                                   __optr_decode_fn_t decode, __optr_explain_fn_t explain) {
115 116 117 118 119 120 121 122 123 124 125 126 127 128
  SOperatorFpSet fpSet = {
      ._openFn = openFn,
      .getNextFn = nextFn,
      .getStreamResFn = streamFn,
      .cleanupFn = cleanup,
      .closeFn = closeFn,
      .encodeResultRow = encode,
      .decodeResultRow = decode,
      .getExplainFn = explain,
  };

  return fpSet;
}

129 130
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
                                  SGroupResInfo* pGroupResInfo);
H
Haojun Liao 已提交
131

132
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
133
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
134

135
#if 0
L
Liu Jicong 已提交
136 137
static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, char* pData,
                                int16_t bytes, bool masterscan, uint64_t uid) {
138 139 140
  bool existed = false;
  SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid);

L
Liu Jicong 已提交
141 142
  SResultRow** p1 =
      (SResultRow**)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
143 144 145 146 147 148 149 150 151 152 153

  // in case of repeat scan/reverse scan, no new time window added.
  if (QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQueryAttr)) {
    if (!masterscan) {  // the *p1 may be NULL in case of sliding+offset exists.
      return p1 != NULL;
    }

    if (p1 != NULL) {
      if (pResultRowInfo->size == 0) {
        existed = false;
      } else if (pResultRowInfo->size == 1) {
dengyihao's avatar
dengyihao 已提交
154
        //        existed = (pResultRowInfo->pResult[0] == (*p1));
155 156
      } else {  // check if current pResultRowInfo contains the existed pResultRow
        SET_RES_EXT_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid, pResultRowInfo);
L
Liu Jicong 已提交
157 158
        int64_t* index =
            taosHashGet(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes));
159 160 161 162 163 164 165 166 167 168 169 170 171
        if (index != NULL) {
          existed = true;
        } else {
          existed = false;
        }
      }
    }

    return existed;
  }

  return p1 != NULL;
}
172
#endif
173

174
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
L
Liu Jicong 已提交
175
  SFilePage* pData = NULL;
176 177 178

  // in the first scan, new space needed for results
  int32_t pageId = -1;
179
  if (*currentPageId == -1) {
180
    pData = getNewBufPage(pResultBuf, &pageId);
181 182
    pData->num = sizeof(SFilePage);
  } else {
183 184
    pData = getBufPage(pResultBuf, *currentPageId);
    pageId = *currentPageId;
185

wmmhello's avatar
wmmhello 已提交
186
    if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
187
      // release current page first, and prepare the next one
188
      releaseBufPage(pResultBuf, pData);
189

190
      pData = getNewBufPage(pResultBuf, &pageId);
191 192 193 194 195 196 197 198 199 200
      if (pData != NULL) {
        pData->num = sizeof(SFilePage);
      }
    }
  }

  if (pData == NULL) {
    return NULL;
  }

201 202
  setBufPageDirty(pData, true);

203 204 205 206
  // set the number of rows in current disk page
  SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
  pResultRow->pageId = pageId;
  pResultRow->offset = (int32_t)pData->num;
207
  *currentPageId = pageId;
208

wmmhello's avatar
wmmhello 已提交
209
  pData->num += interBufSize;
210 211 212
  return pResultRow;
}

213 214 215 216 217 218 219
/**
 * the struct of key in hash table
 * +----------+---------------+
 * | group id |   key data    |
 * | 8 bytes  | actual length |
 * +----------+---------------+
 */
220 221 222
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
                                   int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
                                   bool isIntervalQuery, SAggSupporter* pSup) {
223
  SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
H
Haojun Liao 已提交
224

dengyihao's avatar
dengyihao 已提交
225
  SResultRowPosition* p1 =
226
      (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
H
Haojun Liao 已提交
227

228 229
  SResultRow* pResult = NULL;

H
Haojun Liao 已提交
230 231
  // in case of repeat scan/reverse scan, no new time window added.
  if (isIntervalQuery) {
232
    if (masterscan && p1 != NULL) {  // the *p1 may be NULL in case of sliding+offset exists.
233
      pResult = getResultRowByPos(pResultBuf, p1, true);
234
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
235 236
    }
  } else {
dengyihao's avatar
dengyihao 已提交
237 238
    // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
    // pResultRowInfo object.
H
Haojun Liao 已提交
239
    if (p1 != NULL) {
240
      // todo
241
      pResult = getResultRowByPos(pResultBuf, p1, true);
242
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
243 244 245
    }
  }

L
Liu Jicong 已提交
246
  // 1. close current opened time window
247
  if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
248
    SResultRowPosition pos = pResultRowInfo->cur;
X
Xiaoyu Wang 已提交
249
    SFilePage*         pPage = getBufPage(pResultBuf, pos.pageId);
250 251 252 253 254
    releaseBufPage(pResultBuf, pPage);
  }

  // allocate a new buffer page
  if (pResult == NULL) {
H
Haojun Liao 已提交
255
    ASSERT(pSup->resultRowSize > 0);
256
    pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
257

258 259
    // add a new result set for a new group
    SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
260
    tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
L
Liu Jicong 已提交
261
                   sizeof(SResultRowPosition));
H
Haojun Liao 已提交
262 263
  }

264 265 266
  // 2. set the new time window to be the new active time window
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};

H
Haojun Liao 已提交
267
  // too many time window in query
268
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
269
      tSimpleHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
270
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
H
Haojun Liao 已提交
271 272
  }

H
Haojun Liao 已提交
273
  return pResult;
H
Haojun Liao 已提交
274 275
}

276
// a new buffer page for each table. Needs to opt this design
L
Liu Jicong 已提交
277
static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, int32_t tid, uint32_t size) {
278 279 280 281
  if (pWindowRes->pageId != -1) {
    return 0;
  }

L
Liu Jicong 已提交
282
  SFilePage* pData = NULL;
283 284 285

  // in the first scan, new space needed for results
  int32_t pageId = -1;
286
  SIDList list = getDataBufPagesIdList(pResultBuf);
287 288

  if (taosArrayGetSize(list) == 0) {
289
    pData = getNewBufPage(pResultBuf, &pageId);
290
    pData->num = sizeof(SFilePage);
291 292
  } else {
    SPageInfo* pi = getLastPageInfo(list);
293
    pData = getBufPage(pResultBuf, getPageId(pi));
294
    pageId = getPageId(pi);
295

296
    if (pData->num + size > getBufPageSize(pResultBuf)) {
297
      // release current page first, and prepare the next one
298
      releaseBufPageInfo(pResultBuf, pi);
299

300
      pData = getNewBufPage(pResultBuf, &pageId);
301
      if (pData != NULL) {
302
        pData->num = sizeof(SFilePage);
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322
      }
    }
  }

  if (pData == NULL) {
    return -1;
  }

  // set the number of rows in current disk page
  if (pWindowRes->pageId == -1) {  // not allocated yet, allocate new buffer
    pWindowRes->pageId = pageId;
    pWindowRes->offset = (int32_t)pData->num;

    pData->num += size;
    assert(pWindowRes->pageId >= 0);
  }

  return 0;
}

323
//  query_range_start, query_range_end, window_duration, window_start, window_end
324
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
325 326 327
  pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
  pColData->info.bytes = sizeof(int64_t);

328
  colInfoDataEnsureCapacity(pColData, 5);
329 330 331 332 333 334 335 336 337
  colDataAppendInt64(pColData, 0, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 1, &pQueryWindow->ekey);

  int64_t interval = 0;
  colDataAppendInt64(pColData, 2, &interval);  // this value may be variable in case of 'n' and 'y'.
  colDataAppendInt64(pColData, 3, &pQueryWindow->skey);
  colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
}

L
Liu Jicong 已提交
338
void cleanupExecTimeWindowInfo(SColumnInfoData* pColData) { colDataDestroy(pColData); }
H
Haojun Liao 已提交
339

340 341 342 343 344 345 346 347 348 349 350 351 352 353
typedef struct {
  bool    hasAgg;
  int32_t numOfRows;
  int32_t startOffset;
} SFunctionCtxStatus;

static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
  pStatus->hasAgg = pCtx->input.colDataAggIsSet;
  pStatus->numOfRows = pCtx->input.numOfRows;
  pStatus->startOffset = pCtx->input.startRowIndex;
}

static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
  pCtx->input.colDataAggIsSet = pStatus->hasAgg;
H
Haojun Liao 已提交
354
  pCtx->input.numOfRows = pStatus->numOfRows;
355 356 357 358 359
  pCtx->input.startRowIndex = pStatus->startOffset;
}

void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset,
                      int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) {
360
  for (int32_t k = 0; k < numOfOutput; ++k) {
H
Haojun Liao 已提交
361
    // keep it temporarily
362 363
    SFunctionCtxStatus status = {0};
    functionCtxSave(&pCtx[k], &status);
364

365
    pCtx[k].input.startRowIndex = offset;
366
    pCtx[k].input.numOfRows = forwardStep;
367 368 369

    // not a whole block involved in query processing, statistics data can not be used
    // NOTE: the original value of isSet have been changed here
370 371
    if (pCtx[k].input.colDataAggIsSet && forwardStep < numOfTotal) {
      pCtx[k].input.colDataAggIsSet = false;
372 373
    }

374 375
    if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) {
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
376 377

      char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
378

379
      SColumnInfoData idata = {0};
dengyihao's avatar
dengyihao 已提交
380
      idata.info.type = TSDB_DATA_TYPE_BIGINT;
381
      idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
dengyihao's avatar
dengyihao 已提交
382
      idata.pData = p;
383 384 385 386

      SScalarParam out = {.columnData = &idata};
      SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
      pCtx[k].sfp.process(&tw, 1, &out);
387
      pEntryInfo->numOfRes = 1;
388 389 390 391 392 393 394 395
    } else {
      int32_t code = TSDB_CODE_SUCCESS;
      if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
        code = pCtx[k].fpSet.process(&pCtx[k]);

        if (code != TSDB_CODE_SUCCESS) {
          qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
          taskInfo->code = code;
396
          T_LONG_JMP(taskInfo->env, code);
397
        }
398
      }
399

400
      // restore it
401
      functionCtxRestore(&pCtx[k], &status);
402
    }
403 404 405
  }
}

406 407
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
                                   bool createDummyCol);
408

409 410 411
static void doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order) {
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
412
    pCtx[i].order = order;
413
    pCtx[i].input.numOfRows = pBlock->info.rows;
414
    setBlockSMAInfo(&pCtx[i], &pExprSup->pExprInfo[i], pBlock);
415
    pCtx[i].pSrcBlock = pBlock;
416 417 418
  }
}

419
void setInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol) {
420
  if (pBlock->pBlockAgg != NULL) {
421
    doSetInputDataBlockInfo(pExprSup, pBlock, order);
422
  } else {
423
    doSetInputDataBlock(pExprSup, pBlock, order, scanFlag, createDummyCol);
H
Haojun Liao 已提交
424
  }
425 426
}

L
Liu Jicong 已提交
427 428
static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t paramIndex,
                                             int32_t numOfRows) {
429 430 431 432 433 434 435 436
  SColumnInfoData* pColInfo = NULL;
  if (pInput->pData[paramIndex] == NULL) {
    pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pColInfo == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    // Set the correct column info (data type and bytes)
437 438
    pColInfo->info.type = pFuncParam->param.nType;
    pColInfo->info.bytes = pFuncParam->param.nLen;
439 440

    pInput->pData[paramIndex] = pColInfo;
441 442
  } else {
    pColInfo = pInput->pData[paramIndex];
443 444
  }

445
  colInfoDataEnsureCapacity(pColInfo, numOfRows);
446

447
  int8_t type = pFuncParam->param.nType;
448 449
  if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
    int64_t v = pFuncParam->param.i;
dengyihao's avatar
dengyihao 已提交
450
    for (int32_t i = 0; i < numOfRows; ++i) {
451 452 453 454
      colDataAppendInt64(pColInfo, i, &v);
    }
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
dengyihao's avatar
dengyihao 已提交
455
    for (int32_t i = 0; i < numOfRows; ++i) {
456 457
      colDataAppendDouble(pColInfo, i, &v);
    }
458
  } else if (type == TSDB_DATA_TYPE_VARCHAR) {
L
Liu Jicong 已提交
459
    char* tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE);
460
    STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen);
L
Liu Jicong 已提交
461
    for (int32_t i = 0; i < numOfRows; ++i) {
462 463
      colDataAppend(pColInfo, i, tmp, false);
    }
H
Haojun Liao 已提交
464
    taosMemoryFree(tmp);
465 466 467 468 469
  }

  return TSDB_CODE_SUCCESS;
}

470 471
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
                                   bool createDummyCol) {
472
  int32_t code = TSDB_CODE_SUCCESS;
473
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
474

475
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
L
Liu Jicong 已提交
476
    pCtx[i].order = order;
477 478
    pCtx[i].input.numOfRows = pBlock->info.rows;

L
Liu Jicong 已提交
479
    pCtx[i].pSrcBlock = pBlock;
X
Xiaoyu Wang 已提交
480
    pCtx[i].scanFlag = scanFlag;
H
Haojun Liao 已提交
481

482
    SInputColumnInfoData* pInput = &pCtx[i].input;
483
    pInput->uid = pBlock->info.uid;
C
Cary Xu 已提交
484
    pInput->colDataAggIsSet = false;
485

486
    SExprInfo* pOneExpr = &pExprSup->pExprInfo[i];
487
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
dengyihao's avatar
dengyihao 已提交
488
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
G
Ganlin Zhao 已提交
489 490
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
dengyihao's avatar
dengyihao 已提交
491
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
492 493 494
        pInput->totalRows = pBlock->info.rows;
        pInput->numOfRows = pBlock->info.rows;
        pInput->startRowIndex = 0;
495

496
        // NOTE: the last parameter is the primary timestamp column
H
Haojun Liao 已提交
497
        // todo: refactor this
498
        if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
L
Liu Jicong 已提交
499 500
          pInput->pPTS = pInput->pData[j];  // in case of merge function, this is not always the ts column data.
                                            //          ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP);
501
        }
502 503
        ASSERT(pInput->pData[j] != NULL);
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
504 505 506
        // todo avoid case: top(k, 12), 12 is the value parameter.
        // sum(11), 11 is also the value parameter.
        if (createDummyCol && pOneExpr->base.numOfParams == 1) {
507 508 509 510
          pInput->totalRows = pBlock->info.rows;
          pInput->numOfRows = pBlock->info.rows;
          pInput->startRowIndex = 0;

511
          code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows);
512 513 514
          if (code != TSDB_CODE_SUCCESS) {
            return code;
          }
515
        }
G
Ganlin Zhao 已提交
516 517
      }
    }
H
Haojun Liao 已提交
518
  }
519 520

  return code;
H
Haojun Liao 已提交
521 522
}

523
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
524
  for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
H
Haojun Liao 已提交
525
    if (functionNeedToExecute(&pCtx[k])) {
526
      // todo add a dummy funtion to avoid process check
527 528 529
      if (pCtx[k].fpSet.process == NULL) {
        continue;
      }
H
Haojun Liao 已提交
530

531 532 533 534
      int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
      if (code != TSDB_CODE_SUCCESS) {
        qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
        return code;
535
      }
536 537
    }
  }
538 539

  return TSDB_CODE_SUCCESS;
540 541
}

H
Haojun Liao 已提交
542
static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) {
dengyihao's avatar
dengyihao 已提交
543
  size_t num = (pPseudoList != NULL) ? taosArrayGetSize(pPseudoList) : 0;
H
Haojun Liao 已提交
544 545 546 547 548
  for (int32_t i = 0; i < num; ++i) {
    pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i);
  }
}

549
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
X
Xiaoyu Wang 已提交
550
                              int32_t numOfOutput, SArray* pPseudoList) {
H
Haojun Liao 已提交
551
  setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571

  if (pSrcBlock == NULL) {
    for (int32_t k = 0; k < numOfOutput; ++k) {
      int32_t outputSlotId = pExpr[k].base.resSchema.slotId;

      ASSERT(pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE);
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);

      int32_t type = pExpr[k].base.pParam[0].param.nType;
      if (TSDB_DATA_TYPE_NULL == type) {
        colDataAppendNNULL(pColInfoData, 0, 1);
      } else {
        colDataAppend(pColInfoData, 0, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false);
      }
    }

    pResult->info.rows = 1;
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
572
  pResult->info.groupId = pSrcBlock->info.groupId;
573
  memcpy(pResult->info.parTbName, pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
H
Haojun Liao 已提交
574

575 576
  // if the source equals to the destination, it is to create a new column as the result of scalar
  // function or some operators.
577 578
  bool createNewColModel = (pResult == pSrcBlock);

579 580
  int32_t numOfRows = 0;

581
  for (int32_t k = 0; k < numOfOutput; ++k) {
582 583
    int32_t               outputSlotId = pExpr[k].base.resSchema.slotId;
    SqlFunctionCtx*       pfCtx = &pCtx[k];
584
    SInputColumnInfoData* pInputData = &pfCtx->input;
585

L
Liu Jicong 已提交
586
    if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) {  // it is a project query
587
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
588
      if (pResult->info.rows > 0 && !createNewColModel) {
589
        colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity, pInputData->pData[0],
590
                        pInputData->numOfRows);
591
      } else {
592
        colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info);
593
      }
594

595
      numOfRows = pInputData->numOfRows;
596
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
597
      SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
598

dengyihao's avatar
dengyihao 已提交
599
      int32_t offset = createNewColModel ? 0 : pResult->info.rows;
600 601 602 603 604 605 606 607

      int32_t type = pExpr[k].base.pParam[0].param.nType;
      if (TSDB_DATA_TYPE_NULL == type) {
        colDataAppendNNULL(pColInfoData, offset, pSrcBlock->info.rows);
      } else {
        for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
          colDataAppend(pColInfoData, i + offset, taosVariantGet(&pExpr[k].base.pParam[0].param, type), false);
        }
608
      }
609 610

      numOfRows = pSrcBlock->info.rows;
H
Haojun Liao 已提交
611
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) {
612 613 614
      SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
      taosArrayPush(pBlockList, &pSrcBlock);

615
      SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
616
      SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
617

618
      SScalarParam dest = {.columnData = &idata};
X
Xiaoyu Wang 已提交
619
      int32_t      code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
620 621 622 623
      if (code != TSDB_CODE_SUCCESS) {
        taosArrayDestroy(pBlockList);
        return code;
      }
624

dengyihao's avatar
dengyihao 已提交
625
      int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
626
      ASSERT(pResult->info.capacity > 0);
627
      colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
D
dapan1121 已提交
628
      colDataDestroy(&idata);
L
Liu Jicong 已提交
629

630
      numOfRows = dest.numOfRows;
631 632
      taosArrayDestroy(pBlockList);
    } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
633 634
      // _rowts/_c0, not tbname column
      if (fmIsPseudoColumnFunc(pfCtx->functionId) && (!fmIsScanPseudoColumnFunc(pfCtx->functionId))) {
H
Haojun Liao 已提交
635
        // do nothing
636
      } else if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) {
637 638
        SResultRowEntryInfo* pResInfo = GET_RES_INFO(pfCtx);
        pfCtx->fpSet.init(pfCtx, pResInfo);
639 640 641 642 643 644 645 646 647 648

        pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
        pfCtx->offset = createNewColModel ? 0 : pResult->info.rows;  // set the start offset

        // set the timestamp(_rowts) output buffer
        if (taosArrayGetSize(pPseudoList) > 0) {
          int32_t* outputColIndex = taosArrayGet(pPseudoList, 0);
          pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
        }

649 650 651 652 653
        // link pDstBlock to set selectivity value
        if (pfCtx->subsidiaries.num > 0) {
          pfCtx->pDstBlock = pResult;
        }

654
        numOfRows = pfCtx->fpSet.process(pfCtx);
H
Haojun Liao 已提交
655
      } else if (fmIsAggFunc(pfCtx->functionId)) {
G
Ganlin Zhao 已提交
656
        // selective value output should be set during corresponding function execution
657 658 659
        if (fmIsSelectValueFunc(pfCtx->functionId)) {
          continue;
        }
660 661
        // _group_key function for "partition by tbname" + csum(col_name) query
        SColumnInfoData* pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
662
        int32_t          slotId = pfCtx->param[0].pCol->slotId;
663 664 665

        // todo handle the json tag
        SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
666
        for (int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
667 668 669 670 671 672 673 674 675
          bool isNull = colDataIsNull_s(pInput, f);
          if (isNull) {
            colDataAppendNULL(pOutput, pResult->info.rows + f);
          } else {
            char* data = colDataGetData(pInput, f);
            colDataAppend(pOutput, pResult->info.rows + f, data, isNull);
          }
        }

H
Haojun Liao 已提交
676 677 678
      } else {
        SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
        taosArrayPush(pBlockList, &pSrcBlock);
G
Ganlin Zhao 已提交
679

680
        SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
681
        SColumnInfoData  idata = {.info = pResColData->info, .hasNull = true};
H
Haojun Liao 已提交
682

683
        SScalarParam dest = {.columnData = &idata};
X
Xiaoyu Wang 已提交
684
        int32_t      code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
685 686 687 688
        if (code != TSDB_CODE_SUCCESS) {
          taosArrayDestroy(pBlockList);
          return code;
        }
689

dengyihao's avatar
dengyihao 已提交
690
        int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
691
        ASSERT(pResult->info.capacity > 0);
692
        colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows);
D
dapan1121 已提交
693
        colDataDestroy(&idata);
694 695

        numOfRows = dest.numOfRows;
H
Haojun Liao 已提交
696 697
        taosArrayDestroy(pBlockList);
      }
698
    } else {
699
      return TSDB_CODE_OPS_NOT_SUPPORT;
700 701
    }
  }
702

703 704 705
  if (!createNewColModel) {
    pResult->info.rows += numOfRows;
  }
706 707

  return TSDB_CODE_SUCCESS;
708 709
}

5
54liuyao 已提交
710
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
711
  struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
712

713 714 715 716 717
  // in case of timestamp column, always generated results.
  int32_t functionId = pCtx->functionId;
  if (functionId == -1) {
    return false;
  }
718

719 720
  if (pCtx->scanFlag == REPEAT_SCAN) {
    return fmIsRepeatScanFunc(pCtx->functionId);
721 722
  }

723 724
  if (isRowEntryCompleted(pResInfo)) {
    return false;
725 726
  }

727 728 729
  return true;
}

730 731 732 733 734 735 736
static int32_t doCreateConstantValColumnAggInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type,
                                                int32_t paramIndex, int32_t numOfRows) {
  if (pInput->pData[paramIndex] == NULL) {
    pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pInput->pData[paramIndex] == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
737

738 739 740
    // Set the correct column info (data type and bytes)
    pInput->pData[paramIndex]->info.type = type;
    pInput->pData[paramIndex]->info.bytes = tDataTypes[type].bytes;
741
  }
H
Haojun Liao 已提交
742

743 744 745 746 747 748
  SColumnDataAgg* da = NULL;
  if (pInput->pColumnDataAgg[paramIndex] == NULL) {
    da = taosMemoryCalloc(1, sizeof(SColumnDataAgg));
    pInput->pColumnDataAgg[paramIndex] = da;
    if (da == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
749 750
    }
  } else {
751
    da = pInput->pColumnDataAgg[paramIndex];
752 753
  }

754
  ASSERT(!IS_VAR_DATA_TYPE(type));
755

756 757
  if (type == TSDB_DATA_TYPE_BIGINT) {
    int64_t v = pFuncParam->param.i;
758
    *da = (SColumnDataAgg){.numOfNull = 0, .min = v, .max = v, .sum = v * numOfRows};
759 760
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
761
    *da = (SColumnDataAgg){.numOfNull = 0};
762

763 764 765 766 767 768
    *(double*)&da->min = v;
    *(double*)&da->max = v;
    *(double*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_BOOL) {  // todo validate this data type
    bool v = pFuncParam->param.i;

769
    *da = (SColumnDataAgg){.numOfNull = 0};
770 771 772 773 774
    *(bool*)&da->min = 0;
    *(bool*)&da->max = v;
    *(bool*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_TIMESTAMP) {
    // do nothing
775
  } else {
776
    ASSERT(0);
777 778
  }

779 780
  return TSDB_CODE_SUCCESS;
}
781

782
void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock) {
783 784 785 786 787 788 789 790 791
  int32_t numOfRows = pBlock->info.rows;

  SInputColumnInfoData* pInput = &pCtx->input;
  pInput->numOfRows = numOfRows;
  pInput->totalRows = numOfRows;

  if (pBlock->pBlockAgg != NULL) {
    pInput->colDataAggIsSet = true;

792 793
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
794

795 796
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
797 798 799 800
        pInput->pColumnDataAgg[j] = pBlock->pBlockAgg[slotId];
        if (pInput->pColumnDataAgg[j] == NULL) {
          pInput->colDataAggIsSet = false;
        }
801 802 803 804

        // Here we set the column info data since the data type for each column data is required, but
        // the data in the corresponding SColumnInfoData will not be used.
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
805 806
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
        doCreateConstantValColumnAggInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
807 808
      }
    }
809
  } else {
810
    pInput->colDataAggIsSet = false;
811 812 813
  }
}

L
Liu Jicong 已提交
814
bool isTaskKilled(SExecTaskInfo* pTaskInfo) {
815 816
  // query has been executed more than tsShellActivityTimer, and the retrieve has not arrived
  // abort current query execution.
L
Liu Jicong 已提交
817 818
  if (pTaskInfo->owner != 0 &&
      ((taosGetTimestampSec() - pTaskInfo->cost.start / 1000) > 10 * getMaximumIdleDurationSec())
819 820
      /*(!needBuildResAfterQueryComplete(pTaskInfo))*/) {
    assert(pTaskInfo->cost.start != 0);
L
Liu Jicong 已提交
821 822 823
    //    qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64
    //           ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec());
    //    return true;
824 825 826 827 828
  }

  return false;
}

L
Liu Jicong 已提交
829
void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED; }
830 831

/////////////////////////////////////////////////////////////////////////////////////////////
832
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) {
L
Liu Jicong 已提交
833
  STimeWindow win = {0};
834
  win.skey = taosTimeTruncate(key, pInterval, precision);
835 836

  /*
H
Haojun Liao 已提交
837
   * if the realSkey > INT64_MAX - pInterval->interval, the query duration between
838 839
   * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
   */
840 841 842
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
  if (win.ekey < win.skey) {
    win.ekey = INT64_MAX;
843
  }
844 845

  return win;
846 847
}

848
#if 0
H
Haojun Liao 已提交
849
static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockInfo) {
850 851
  STimeWindow w = {0};

dengyihao's avatar
dengyihao 已提交
852 853
  TSKEY sk = TMIN(pQueryAttr->window.skey, pQueryAttr->window.ekey);
  TSKEY ek = TMAX(pQueryAttr->window.skey, pQueryAttr->window.ekey);
854

855
  if (true) {
L
Liu Jicong 已提交
856
    //    getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.skey, sk, ek, &w);
857 858 859 860 861 862
    assert(w.ekey >= pBlockInfo->window.skey);

    if (w.ekey < pBlockInfo->window.ekey) {
      return true;
    }

L
Liu Jicong 已提交
863 864
    while (1) {
      //      getNextTimeWindow(pQueryAttr, &w);
865 866 867 868 869 870 871 872 873 874
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

      assert(w.ekey > pBlockInfo->window.ekey);
      if (w.skey <= pBlockInfo->window.ekey && w.skey > pBlockInfo->window.skey) {
        return true;
      }
    }
  } else {
L
Liu Jicong 已提交
875
    //    getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.ekey, sk, ek, &w);
876 877 878 879 880 881
    assert(w.skey <= pBlockInfo->window.ekey);

    if (w.skey > pBlockInfo->window.skey) {
      return true;
    }

L
Liu Jicong 已提交
882 883
    while (1) {
      //      getNextTimeWindow(pQueryAttr, &w);
884 885 886 887 888 889 890 891 892 893 894 895 896
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

      assert(w.skey < pBlockInfo->window.skey);
      if (w.ekey < pBlockInfo->window.ekey && w.ekey >= pBlockInfo->window.skey) {
        return true;
      }
    }
  }

  return false;
}
897
#endif
898

L
Liu Jicong 已提交
899 900
int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
                              uint32_t* status) {
901
  *status = BLK_DATA_NOT_LOAD;
902

H
Haojun Liao 已提交
903
  pBlock->pDataBlock = NULL;
L
Liu Jicong 已提交
904
  pBlock->pBlockAgg = NULL;
H
Haojun Liao 已提交
905

L
Liu Jicong 已提交
906 907
  //  int64_t groupId = pRuntimeEnv->current->groupIndex;
  //  bool    ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
908

H
Haojun Liao 已提交
909
  STaskCostInfo* pCost = &pTaskInfo->cost;
910

911 912
//  pCost->totalBlocks += 1;
//  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
913
#if 0
914 915 916
  // Calculate all time windows that are overlapping or contain current data block.
  // If current data block is contained by all possible time window, do not load current data block.
  if (/*pQueryAttr->pFilters || */pQueryAttr->groupbyColumn || pQueryAttr->sw.gap > 0 ||
H
Haojun Liao 已提交
917
      (QUERY_IS_INTERVAL_QUERY(pQueryAttr) && overlapWithTimeWindow(pTaskInfo, &pBlock->info))) {
918
    (*status) = BLK_DATA_DATA_LOAD;
919 920 921
  }

  // check if this data block is required to load
922
  if ((*status) != BLK_DATA_DATA_LOAD) {
923 924 925 926 927 928 929
    bool needFilter = true;

    // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
    // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
    if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
      SResultRow* pResult = NULL;

H
Haojun Liao 已提交
930
      bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
931 932 933 934 935 936
      TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

      STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
      if (pQueryAttr->pointInterpQuery) {
        needFilter = chkWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
937
                                    pTableScanInfo->rowEntryInfoOffset);
938 939 940
      } else {
        if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId,
                                    pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
941
                                    pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
942
          T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
943 944 945 946
        }
      }
    } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate
      doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx,
947
                               pTableScanInfo->rowEntryInfoOffset, pTableScanInfo->numOfOutput,
948 949 950 951 952 953
                               pRuntimeEnv->current->groupIndex);
    }

    if (needFilter) {
      (*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock);
    } else {
954
      (*status) = BLK_DATA_DATA_LOAD;
955 956 957 958
    }
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
H
Haojun Liao 已提交
959
//  *status = updateBlockLoadStatus(pRuntimeEnv->pQueryAttr, *status);
960

961
  if ((*status) == BLK_DATA_NOT_LOAD || (*status) == BLK_DATA_FILTEROUT) {
962 963
    //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//           pBlockInfo->window.ekey, pBlockInfo->rows);
964
    pCost->skipBlocks += 1;
965
  } else if ((*status) == BLK_DATA_SMA_LOAD) {
966 967
    // this function never returns error?
    pCost->loadBlockStatis += 1;
968
//    tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
969 970

    if (pBlock->pBlockAgg == NULL) {  // data block statistics does not exist, load data block
971
//      pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
972 973 974
      pCost->totalCheckedRows += pBlock->info.rows;
    }
  } else {
975
    assert((*status) == BLK_DATA_DATA_LOAD);
976 977 978

    // load the data block statistics to perform further filter
    pCost->loadBlockStatis += 1;
979
//    tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg);
980 981 982 983 984 985

    if (pQueryAttr->topBotQuery && pBlock->pBlockAgg != NULL) {
      { // set previous window
        if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
          SResultRow* pResult = NULL;

H
Haojun Liao 已提交
986
          bool  masterScan = IS_MAIN_SCAN(pRuntimeEnv);
987 988 989 990 991
          TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;

          STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
          if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId,
                                      pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
992
                                      pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
993
            T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
994 995 996 997 998 999 1000 1001 1002 1003
          }
        }
      }
      bool load = false;
      for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
        int32_t functionId = pTableScanInfo->pCtx[i].functionId;
        if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM) {
//          load = topbot_datablock_filter(&pTableScanInfo->pCtx[i], (char*)&(pBlock->pBlockAgg[i].min),
//                                         (char*)&(pBlock->pBlockAgg[i].max));
          if (!load) { // current block has been discard due to filter applied
1004
            pCost->skipBlocks += 1;
1005 1006
            //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId,
//                   pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
1007
            (*status) = BLK_DATA_FILTEROUT;
1008 1009 1010 1011 1012 1013 1014
            return TSDB_CODE_SUCCESS;
          }
        }
      }
    }

    // current block has been discard due to filter applied
H
Haojun Liao 已提交
1015
//    if (!doFilterByBlockSMA(pRuntimeEnv, pBlock->pBlockAgg, pTableScanInfo->pCtx, pBlockInfo->rows)) {
1016
//      pCost->skipBlocks += 1;
1017 1018
//      qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
//             pBlockInfo->window.ekey, pBlockInfo->rows);
1019
//      (*status) = BLK_DATA_FILTEROUT;
1020 1021 1022 1023 1024
//      return TSDB_CODE_SUCCESS;
//    }

    pCost->totalCheckedRows += pBlockInfo->rows;
    pCost->loadBlocks += 1;
1025
//    pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
1026 1027 1028 1029 1030
//    if (pBlock->pDataBlock == NULL) {
//      return terrno;
//    }

//    if (pQueryAttr->pFilters != NULL) {
1031
//      filterSetColFieldData(pQueryAttr->pFilters, taosArrayGetSize(pBlock->pDataBlock), pBlock->pDataBlock);
1032
//    }
1033

1034 1035 1036 1037
//    if (pQueryAttr->pFilters != NULL || pRuntimeEnv->pTsBuf != NULL) {
//      filterColRowsInDataBlock(pRuntimeEnv, pBlock, ascQuery);
//    }
  }
H
Haojun Liao 已提交
1038
#endif
1039 1040 1041
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1042
static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo) {
1043 1044 1045 1046 1047
  if (pTableQueryInfo == NULL) {
    return;
  }
}

L
Liu Jicong 已提交
1048
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
1049
  if (status == TASK_NOT_COMPLETED) {
H
Haojun Liao 已提交
1050
    pTaskInfo->status = status;
1051 1052
  } else {
    // QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first
1053
    CLEAR_QUERY_STATUS(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
1054
    pTaskInfo->status |= status;
1055 1056 1057
  }
}

1058
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
5
54liuyao 已提交
1059
  bool init = false;
1060
  for (int32_t i = 0; i < numOfOutput; ++i) {
1061
    pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
5
54liuyao 已提交
1062 1063 1064
    if (init) {
      continue;
    }
1065 1066 1067 1068 1069

    struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
    if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
      continue;
    }
1070 1071 1072 1073 1074

    if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
      continue;
    }

1075 1076 1077 1078 1079 1080
    if (!pResInfo->initialized) {
      if (pCtx[i].functionId != -1) {
        pCtx[i].fpSet.init(&pCtx[i], pResInfo);
      } else {
        pResInfo->initialized = true;
      }
5
54liuyao 已提交
1081 1082
    } else {
      init = true;
1083 1084 1085 1086
    }
  }
}

1087 1088
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep,
                                                int32_t status);
1089

H
Haojun Liao 已提交
1090
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo, SFilterInfo* pFilterInfo) {
1091
  if (pFilterNode == NULL || pBlock->info.rows == 0) {
S
shenglian zhou 已提交
1092 1093
    return;
  }
1094

H
Haojun Liao 已提交
1095
  SFilterInfo* filter = pFilterInfo;
H
Haojun Liao 已提交
1096
  int64_t      st = taosGetTimestampUs();
H
Haojun Liao 已提交
1097

H
Haojun Liao 已提交
1098
  //  pError("start filter");
H
Haojun Liao 已提交
1099

H
Haojun Liao 已提交
1100
  // todo move to the initialization function
H
Haojun Liao 已提交
1101
  int32_t code = 0;
H
Haojun Liao 已提交
1102
  bool    needFree = false;
H
Haojun Liao 已提交
1103
  if (filter == NULL) {
1104
    needFree = true;
H
Haojun Liao 已提交
1105 1106
    code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
  }
1107

1108
  SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
1109 1110
  code = filterSetDataFromSlotId(filter, &param1);

1111
  SColumnInfoData* p = NULL;
1112
  int32_t          status = 0;
H
Haojun Liao 已提交
1113

1114
  // todo the keep seems never to be True??
1115
  bool keep = filterExecute(filter, pBlock, &p, NULL, param1.numOfCols, &status);
1116 1117 1118 1119

  if (needFree) {
    filterFreeInfo(filter);
  }
H
Haojun Liao 已提交
1120

1121
  extractQualifiedTupleByFilterResult(pBlock, p, keep, status);
H
Haojun Liao 已提交
1122

1123
  if (pColMatchInfo != NULL) {
1124
    for (int32_t i = 0; i < taosArrayGetSize(pColMatchInfo); ++i) {
1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135
      SColMatchInfo* pInfo = taosArrayGet(pColMatchInfo, i);
      if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->targetSlotId);
        if (pColData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
          blockDataUpdateTsWindow(pBlock, pInfo->targetSlotId);
          break;
        }
      }
    }
  }

1136 1137
  colDataDestroy(p);
  taosMemoryFree(p);
1138 1139
}

1140
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status) {
1141 1142 1143 1144
  if (keep) {
    return;
  }

H
Haojun Liao 已提交
1145 1146 1147
  int32_t totalRows = pBlock->info.rows;

  if (status == FILTER_RESULT_ALL_QUALIFIED) {
1148
    // here nothing needs to be done
H
Haojun Liao 已提交
1149
  } else if (status == FILTER_RESULT_NONE_QUALIFIED) {
1150
    pBlock->info.rows = 0;
H
Haojun Liao 已提交
1151
  } else {
1152
    SSDataBlock* px = createOneDataBlock(pBlock, true);
1153

1154 1155
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
    for (int32_t i = 0; i < numOfCols; ++i) {
1156 1157
      SColumnInfoData* pSrc = taosArrayGet(px->pDataBlock, i);
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
1158
      // it is a reserved column for scalar function, and no data in this column yet.
1159
      if (pDst->pData == NULL || pSrc->pData == NULL) {
1160 1161 1162
        continue;
      }

1163 1164
      colInfoDataCleanup(pDst, pBlock->info.rows);

1165
      int32_t numOfRows = 0;
1166
      for (int32_t j = 0; j < totalRows; ++j) {
1167
        if (((int8_t*)p->pData)[j] == 0) {
D
dapan1121 已提交
1168 1169
          continue;
        }
1170

D
dapan1121 已提交
1171
        if (colDataIsNull_s(pSrc, j)) {
1172
          colDataAppendNULL(pDst, numOfRows);
D
dapan1121 已提交
1173
        } else {
1174
          colDataAppend(pDst, numOfRows, colDataGetData(pSrc, j), false);
D
dapan1121 已提交
1175
        }
1176
        numOfRows += 1;
H
Haojun Liao 已提交
1177
      }
1178

1179
      // todo this value can be assigned directly
1180 1181 1182 1183 1184
      if (pBlock->info.rows == totalRows) {
        pBlock->info.rows = numOfRows;
      } else {
        ASSERT(pBlock->info.rows == numOfRows);
      }
1185
    }
1186

dengyihao's avatar
dengyihao 已提交
1187
    blockDataDestroy(px);  // fix memory leak
1188 1189 1190
  }
}

1191
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
1192
  // for simple group by query without interval, all the tables belong to one group result.
1193 1194 1195
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
  SAggOperatorInfo* pAggInfo = pOperator->info;

1196
  SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
1197 1198
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
1199

1200
  SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId,
L
Liu Jicong 已提交
1201
                                                  sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
L
Liu Jicong 已提交
1202
  assert(pResultRow != NULL);
1203 1204 1205 1206 1207 1208

  /*
   * not assign result buffer yet, add new result buffer
   * all group belong to one result set, and each group result has different group id so set the id to be one
   */
  if (pResultRow->pageId == -1) {
dengyihao's avatar
dengyihao 已提交
1209 1210
    int32_t ret =
        addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, groupId, pAggInfo->binfo.pRes->info.rowSize);
1211 1212 1213 1214 1215
    if (ret != TSDB_CODE_SUCCESS) {
      return;
    }
  }

1216
  setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
1217 1218
}

1219 1220 1221
static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
  SAggOperatorInfo* pAggInfo = pOperator->info;
  if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) {
1222 1223
    return;
  }
1224 1225

  doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId);
1226 1227

  // record the current active group id
H
Haojun Liao 已提交
1228
  pAggInfo->groupId = groupId;
1229 1230
}

dengyihao's avatar
dengyihao 已提交
1231 1232
static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs,
                              const int32_t* rowCellOffset) {
1233
  bool returnNotNull = false;
1234
  for (int32_t j = 0; j < numOfExprs; ++j) {
1235
    struct SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowCellOffset);
1236 1237 1238 1239 1240 1241 1242
    if (!isRowEntryInitialized(pResInfo)) {
      continue;
    }

    if (pRow->numOfRows < pResInfo->numOfRes) {
      pRow->numOfRows = pResInfo->numOfRes;
    }
1243

1244
    if (fmIsNotNullOutputFunc(pCtx[j].functionId)) {
1245 1246
      returnNotNull = true;
    }
1247
  }
S
shenglian zhou 已提交
1248 1249
  // if all expr skips all blocks, e.g. all null inputs for max function, output one row in final result.
  //  except for first/last, which require not null output, output no rows
1250
  if (pRow->numOfRows == 0 && !returnNotNull) {
1251
    pRow->numOfRows = 1;
1252 1253 1254
  }
}

1255 1256
static void doCopyResultToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
                                    SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) {
1257 1258 1259
  for (int32_t j = 0; j < numOfExprs; ++j) {
    int32_t slotId = pExprInfo[j].base.resSchema.slotId;

1260
    pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
1261
    if (pCtx[j].fpSet.finalize) {
1262
      if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_key") == 0) {
1263 1264
        // for groupkey along with functions that output multiple lines(e.g. Histogram)
        // need to match groupkey result for each output row of that function.
1265 1266 1267 1268 1269
        if (pCtx[j].resultInfo->numOfRes != 0) {
          pCtx[j].resultInfo->numOfRes = pRow->numOfRows;
        }
      }

1270 1271 1272
      int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
      if (TAOS_FAILED(code)) {
        qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
1273
        T_LONG_JMP(pTaskInfo->env, code);
1274 1275
      }
    } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
1276
      // do nothing
1277
    } else {
1278 1279
      // expand the result into multiple rows. E.g., _wstart, top(k, 20)
      // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
1280 1281 1282 1283 1284 1285 1286
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
      char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
      for (int32_t k = 0; k < pRow->numOfRows; ++k) {
        colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
      }
    }
  }
1287 1288
}

1289 1290 1291
// todo refactor. SResultRow has direct pointer in miainfo
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
                           SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317
  SFilePage*  page = getBufPage(pBuf, resultRowPosition->pageId);
  SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);

  SqlFunctionCtx* pCtx = pSup->pCtx;
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  const int32_t*  rowEntryOffset = pSup->rowEntryInfoOffset;

  doUpdateNumOfRows(pCtx, pRow, pSup->numOfExprs, rowEntryOffset);
  if (pRow->numOfRows == 0) {
    releaseBufPage(pBuf, page);
    return 0;
  }

  int32_t size = pBlock->info.capacity;
  while (pBlock->info.rows + pRow->numOfRows > size) {
    size = size * 1.25;
  }

  int32_t code = blockDataEnsureCapacity(pBlock, size);
  if (TAOS_FAILED(code)) {
    releaseBufPage(pBuf, page);
    qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
    T_LONG_JMP(pTaskInfo->env, code);
  }

  doCopyResultToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
1318 1319

  releaseBufPage(pBuf, page);
1320
  pBlock->info.rows += pRow->numOfRows;
1321 1322 1323
  return 0;
}

1324 1325 1326 1327 1328 1329 1330
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
                           SGroupResInfo* pGroupResInfo) {
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

1331
  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
1332

1333
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
L
Liu Jicong 已提交
1334 1335
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
    SFilePage*  page = getBufPage(pBuf, pPos->pos.pageId);
1336

1337
    SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
1338

H
Haojun Liao 已提交
1339
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
1340 1341

    // no results, continue to check the next one
1342 1343
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
1344
      releaseBufPage(pBuf, page);
1345 1346 1347
      continue;
    }

1348 1349 1350 1351 1352
    if (pBlock->info.groupId == 0) {
      pBlock->info.groupId = pPos->groupId;
    } else {
      // current value belongs to different group, it can't be packed into one datablock
      if (pBlock->info.groupId != pPos->groupId) {
1353
        releaseBufPage(pBuf, page);
1354 1355 1356 1357
        break;
      }
    }

1358
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
1359
      ASSERT(pBlock->info.rows > 0);
1360
      releaseBufPage(pBuf, page);
1361 1362 1363 1364
      break;
    }

    pGroupResInfo->index += 1;
1365
    doCopyResultToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
1366

1367
    releaseBufPage(pBuf, page);
1368
    pBlock->info.rows += pRow->numOfRows;
1369 1370
  }

X
Xiaoyu Wang 已提交
1371 1372
  qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
         pBlock->info.groupId);
1373

1374
  blockDataUpdateTsWindow(pBlock, 0);
1375 1376 1377
  return 0;
}

1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                           SDiskbasedBuf* pBuf) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SSDataBlock*   pBlock = pbInfo->pRes;

  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

  blockDataCleanup(pBlock);
  if (!hasRemainResults(pGroupResInfo)) {
    return;
  }

  // clear the existed group id
  pBlock->info.groupId = 0;
  ASSERT(!pbInfo->mergeResultBlock);
  doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
  if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
    SStreamStateAggOperatorInfo* pInfo = pOperator->info;

    char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
    if (tbname != NULL) {
      memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
    } else {
      pBlock->info.parTbName[0] = 0;
    }
  } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
             pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
             pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
    SStreamSessionAggOperatorInfo* pInfo = pOperator->info;

    char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
    if (tbname != NULL) {
      memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
    } else {
      pBlock->info.parTbName[0] = 0;
    }
  }
}

X
Xiaoyu Wang 已提交
1418 1419
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                            SDiskbasedBuf* pBuf) {
1420
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1421
  SSDataBlock*   pBlock = pbInfo->pRes;
1422

1423 1424 1425
  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

1426
  blockDataCleanup(pBlock);
1427
  if (!hasRemainResults(pGroupResInfo)) {
1428 1429 1430
    return;
  }

1431 1432
  // clear the existed group id
  pBlock->info.groupId = 0;
1433 1434 1435
  if (!pbInfo->mergeResultBlock) {
    doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
  } else {
dengyihao's avatar
dengyihao 已提交
1436
    while (hasRemainResults(pGroupResInfo)) {
1437 1438 1439
      doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
      if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
        break;
1440 1441
      }

1442 1443
      // clearing group id to continue to merge data that belong to different groups
      pBlock->info.groupId = 0;
1444
    }
1445 1446 1447

    // clear the group id info in SSDataBlock, since the client does not need it
    pBlock->info.groupId = 0;
1448 1449 1450
  }
}

L
Liu Jicong 已提交
1451
static int32_t compressQueryColData(SColumnInfoData* pColRes, int32_t numOfRows, char* data, int8_t compressed) {
1452 1453
  int32_t colSize = pColRes->info.bytes * numOfRows;
  return (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colSize, numOfRows, data,
L
Liu Jicong 已提交
1454
                                                      colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
1455 1456
}

L
Liu Jicong 已提交
1457 1458
void queryCostStatis(SExecTaskInfo* pTaskInfo) {
  STaskCostInfo* pSummary = &pTaskInfo->cost;
1459

L
Liu Jicong 已提交
1460 1461 1462
  //  uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pResultRowHashTable);
  //  hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map);
  //  pSummary->hashSize = hashSize;
1463

L
Liu Jicong 已提交
1464 1465 1466 1467 1468 1469 1470 1471 1472
  //  SResultRowPool* p = pTaskInfo->pool;
  //  if (p != NULL) {
  //    pSummary->winInfoSize = getResultRowPoolMemSize(p);
  //    pSummary->numOfTimeWindows = getNumOfAllocatedResultRows(p);
  //  } else {
  //    pSummary->winInfoSize = 0;
  //    pSummary->numOfTimeWindows = 0;
  //  }

1473 1474
  SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
  if (pSummary->pRecoder != NULL) {
1475
    qDebug(
X
Xiaoyu Wang 已提交
1476 1477 1478 1479
        "%s :cost summary: elapsed time:%.2f ms, total blocks:%d, load block SMA:%d, load data block:%d, total "
        "rows:%" PRId64 ", check rows:%" PRId64,
        GET_TASKID(pTaskInfo), pSummary->elapsedTime / 1000.0, pRecorder->totalBlocks, pRecorder->loadBlockStatis,
        pRecorder->loadBlocks, pRecorder->totalRows, pRecorder->totalCheckedRows);
1480
  }
1481

L
Liu Jicong 已提交
1482 1483 1484
  // qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb,
  // hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
  //      pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0);
1485 1486
}

L
Liu Jicong 已提交
1487 1488 1489
// static void updateOffsetVal(STaskRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
1490
//
L
Liu Jicong 已提交
1491
//   int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
1492
//
L
Liu Jicong 已提交
1493 1494 1495 1496
//   if (pQueryAttr->limit.offset == pBlockInfo->rows) {  // current block will ignore completed
//     pTableQueryInfo->lastKey = QUERY_IS_ASC_QUERY(pQueryAttr) ? pBlockInfo->window.ekey + step :
//     pBlockInfo->window.skey + step; pQueryAttr->limit.offset = 0; return;
//   }
1497
//
L
Liu Jicong 已提交
1498 1499 1500 1501 1502
//   if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//     pQueryAttr->pos = (int32_t)pQueryAttr->limit.offset;
//   } else {
//     pQueryAttr->pos = pBlockInfo->rows - (int32_t)pQueryAttr->limit.offset - 1;
//   }
1503
//
L
Liu Jicong 已提交
1504
//   assert(pQueryAttr->pos >= 0 && pQueryAttr->pos <= pBlockInfo->rows - 1);
1505
//
L
Liu Jicong 已提交
1506 1507
//   SArray *         pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//   SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1508
//
L
Liu Jicong 已提交
1509 1510
//   // update the pQueryAttr->limit.offset value, and pQueryAttr->pos value
//   TSKEY *keys = (TSKEY *) pColInfoData->pData;
1511
//
L
Liu Jicong 已提交
1512 1513 1514
//   // update the offset value
//   pTableQueryInfo->lastKey = keys[pQueryAttr->pos];
//   pQueryAttr->limit.offset = 0;
1515
//
L
Liu Jicong 已提交
1516
//   int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
1517
//
L
Liu Jicong 已提交
1518 1519 1520 1521
//   //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numBlocksOfStep:%d, numOfRes:%d,
//   lastKey:%"PRId64, GET_TASKID(pRuntimeEnv),
//          pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey);
// }
1522

L
Liu Jicong 已提交
1523 1524
// void skipBlocks(STaskRuntimeEnv *pRuntimeEnv) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
1525
//
L
Liu Jicong 已提交
1526 1527 1528
//   if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0) {
//     return;
//   }
1529
//
L
Liu Jicong 已提交
1530 1531
//   pQueryAttr->pos = 0;
//   int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
1532
//
L
Liu Jicong 已提交
1533 1534
//   STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
//   TsdbQueryHandleT pTsdbReadHandle = pRuntimeEnv->pTsdbReadHandle;
1535
//
L
Liu Jicong 已提交
1536 1537 1538
//   SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
//   while (tsdbNextDataBlock(pTsdbReadHandle)) {
//     if (isTaskKilled(pRuntimeEnv->qinfo)) {
1539
//       T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
L
Liu Jicong 已提交
1540
//     }
1541
//
L
Liu Jicong 已提交
1542
//     tsdbRetrieveDataBlockInfo(pTsdbReadHandle, &blockInfo);
1543
//
L
Liu Jicong 已提交
1544 1545 1546 1547
//     if (pQueryAttr->limit.offset > blockInfo.rows) {
//       pQueryAttr->limit.offset -= blockInfo.rows;
//       pTableQueryInfo->lastKey = (QUERY_IS_ASC_QUERY(pQueryAttr)) ? blockInfo.window.ekey : blockInfo.window.skey;
//       pTableQueryInfo->lastKey += step;
1548
//
L
Liu Jicong 已提交
1549 1550 1551 1552 1553 1554 1555
//       //qDebug("QInfo:0x%"PRIx64" skip rows:%d, offset:%" PRId64, GET_TASKID(pRuntimeEnv), blockInfo.rows,
//              pQuery->limit.offset);
//     } else {  // find the appropriated start position in current block
//       updateOffsetVal(pRuntimeEnv, &blockInfo);
//       break;
//     }
//   }
1556
//
L
Liu Jicong 已提交
1557
//   if (terrno != TSDB_CODE_SUCCESS) {
1558
//     T_LONG_JMP(pRuntimeEnv->env, terrno);
L
Liu Jicong 已提交
1559 1560 1561 1562 1563 1564 1565
//   }
// }

// static TSKEY doSkipIntervalProcess(STaskRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo,
// STableQueryInfo* pTableQueryInfo) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
1566
//
L
Liu Jicong 已提交
1567 1568 1569
//   assert(pQueryAttr->limit.offset == 0);
//   STimeWindow tw = *win;
//   getNextTimeWindow(pQueryAttr, &tw);
1570
//
L
Liu Jicong 已提交
1571 1572
//   if ((tw.skey <= pBlockInfo->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) ||
//       (tw.ekey >= pBlockInfo->window.skey && !QUERY_IS_ASC_QUERY(pQueryAttr))) {
1573
//
L
Liu Jicong 已提交
1574 1575 1576 1577
//     // load the data block and check data remaining in current data block
//     // TODO optimize performance
//     SArray *         pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//     SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1578
//
L
Liu Jicong 已提交
1579 1580 1581 1582
//     tw = *win;
//     int32_t startPos =
//         getNextQualifiedWindow(pQueryAttr, &tw, pBlockInfo, pColInfoData->pData, binarySearchForKey, -1);
//     assert(startPos >= 0);
1583
//
L
Liu Jicong 已提交
1584 1585
//     // set the abort info
//     pQueryAttr->pos = startPos;
1586
//
L
Liu Jicong 已提交
1587 1588 1589 1590
//     // reset the query start timestamp
//     pTableQueryInfo->win.skey = ((TSKEY *)pColInfoData->pData)[startPos];
//     pQueryAttr->window.skey = pTableQueryInfo->win.skey;
//     TSKEY key = pTableQueryInfo->win.skey;
1591
//
L
Liu Jicong 已提交
1592 1593
//     pWindowResInfo->prevSKey = tw.skey;
//     int32_t index = pRuntimeEnv->resultRowInfo.curIndex;
1594
//
L
Liu Jicong 已提交
1595 1596
//     int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
//     pRuntimeEnv->resultRowInfo.curIndex = index;  // restore the window index
1597
//
L
Liu Jicong 已提交
1598 1599 1600 1601
//     //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d,
//     lastKey:%" PRId64,
//            GET_TASKID(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes,
//            pQueryAttr->current->lastKey);
1602
//
L
Liu Jicong 已提交
1603 1604 1605 1606 1607
//     return key;
//   } else {  // do nothing
//     pQueryAttr->window.skey      = tw.skey;
//     pWindowResInfo->prevSKey = tw.skey;
//     pTableQueryInfo->lastKey = tw.skey;
1608
//
L
Liu Jicong 已提交
1609 1610
//     return tw.skey;
//   }
1611
//
L
Liu Jicong 已提交
1612 1613 1614 1615 1616 1617 1618 1619 1620 1621
//   return true;
// }

// static bool skipTimeInterval(STaskRuntimeEnv *pRuntimeEnv, TSKEY* start) {
//   STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//   if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//     assert(*start <= pRuntimeEnv->current->lastKey);
//   } else {
//     assert(*start >= pRuntimeEnv->current->lastKey);
//   }
1622
//
L
Liu Jicong 已提交
1623 1624 1625 1626 1627
//   // if queried with value filter, do NOT forward query start position
//   if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL ||
//   pRuntimeEnv->pFillInfo != NULL) {
//     return true;
//   }
1628
//
L
Liu Jicong 已提交
1629 1630 1631 1632 1633 1634 1635
//   /*
//    * 1. for interval without interpolation query we forward pQueryAttr->interval.interval at a time for
//    *    pQueryAttr->limit.offset times. Since hole exists, pQueryAttr->interval.interval*pQueryAttr->limit.offset
//    value is
//    *    not valid. otherwise, we only forward pQueryAttr->limit.offset number of points
//    */
//   assert(pRuntimeEnv->resultRowInfo.prevSKey == TSKEY_INITIAL_VAL);
1636
//
L
Liu Jicong 已提交
1637 1638
//   STimeWindow w = TSWINDOW_INITIALIZER;
//   bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
1639
//
L
Liu Jicong 已提交
1640 1641
//   SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo;
//   STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
1642
//
L
Liu Jicong 已提交
1643 1644 1645
//   SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
//   while (tsdbNextDataBlock(pRuntimeEnv->pTsdbReadHandle)) {
//     tsdbRetrieveDataBlockInfo(pRuntimeEnv->pTsdbReadHandle, &blockInfo);
1646
//
L
Liu Jicong 已提交
1647 1648 1649 1650 1651 1652 1653 1654 1655
//     if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
//       if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
//         getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.skey, blockInfo.window.skey, pQueryAttr->window.ekey,
//         &w); pWindowResInfo->prevSKey = w.skey;
//       }
//     } else {
//       getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.ekey, pQueryAttr->window.ekey, blockInfo.window.ekey, &w);
//       pWindowResInfo->prevSKey = w.skey;
//     }
1656
//
L
Liu Jicong 已提交
1657 1658
//     // the first time window
//     STimeWindow win = getActiveTimeWindow(pWindowResInfo, pWindowResInfo->prevSKey, pQueryAttr);
1659
//
L
Liu Jicong 已提交
1660 1661
//     while (pQueryAttr->limit.offset > 0) {
//       STimeWindow tw = win;
1662
//
L
Liu Jicong 已提交
1663 1664 1665
//       if ((win.ekey <= blockInfo.window.ekey && ascQuery) || (win.ekey >= blockInfo.window.skey && !ascQuery)) {
//         pQueryAttr->limit.offset -= 1;
//         pWindowResInfo->prevSKey = win.skey;
1666
//
L
Liu Jicong 已提交
1667 1668 1669 1670 1671 1672
//         // current time window is aligned with blockInfo.window.ekey
//         // restart it from next data block by set prevSKey to be TSKEY_INITIAL_VAL;
//         if ((win.ekey == blockInfo.window.ekey && ascQuery) || (win.ekey == blockInfo.window.skey && !ascQuery)) {
//           pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL;
//         }
//       }
1673
//
L
Liu Jicong 已提交
1674 1675 1676 1677
//       if (pQueryAttr->limit.offset == 0) {
//         *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//         return true;
//       }
1678
//
L
Liu Jicong 已提交
1679 1680
//       // current window does not ended in current data block, try next data block
//       getNextTimeWindow(pQueryAttr, &tw);
1681
//
L
Liu Jicong 已提交
1682 1683 1684 1685 1686 1687 1688 1689 1690
//       /*
//        * If the next time window still starts from current data block,
//        * load the primary timestamp column first, and then find the start position for the next queried time window.
//        * Note that only the primary timestamp column is required.
//        * TODO: Optimize for this cases. All data blocks are not needed to be loaded, only if the first actually
//        required
//        * time window resides in current data block.
//        */
//       if ((tw.skey <= blockInfo.window.ekey && ascQuery) || (tw.ekey >= blockInfo.window.skey && !ascQuery)) {
1691
//
L
Liu Jicong 已提交
1692 1693
//         SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
//         SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
1694
//
L
Liu Jicong 已提交
1695 1696 1697
//         if ((win.ekey > blockInfo.window.ekey && ascQuery) || (win.ekey < blockInfo.window.skey && !ascQuery)) {
//           pQueryAttr->limit.offset -= 1;
//         }
1698
//
L
Liu Jicong 已提交
1699 1700 1701 1702 1703 1704 1705 1706
//         if (pQueryAttr->limit.offset == 0) {
//           *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo);
//           return true;
//         } else {
//           tw = win;
//           int32_t startPos =
//               getNextQualifiedWindow(pQueryAttr, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1);
//           assert(startPos >= 0);
1707
//
L
Liu Jicong 已提交
1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718
//           // set the abort info
//           pQueryAttr->pos = startPos;
//           pTableQueryInfo->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
//           pWindowResInfo->prevSKey = tw.skey;
//           win = tw;
//         }
//       } else {
//         break;  // offset is not 0, and next time window begins or ends in the next block.
//       }
//     }
//   }
1719
//
L
Liu Jicong 已提交
1720 1721
//   // check for error
//   if (terrno != TSDB_CODE_SUCCESS) {
1722
//     T_LONG_JMP(pRuntimeEnv->env, terrno);
L
Liu Jicong 已提交
1723
//   }
1724
//
L
Liu Jicong 已提交
1725 1726
//   return true;
// }
1727

1728
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) {
H
Haojun Liao 已提交
1729
  if (p->pDownstream == NULL) {
H
Haojun Liao 已提交
1730
    assert(p->numOfDownstream == 0);
1731 1732
  }

wafwerar's avatar
wafwerar 已提交
1733
  p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES);
1734 1735 1736 1737 1738 1739 1740
  if (p->pDownstream == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  memcpy(p->pDownstream, pDownstream, num * POINTER_BYTES);
  p->numOfDownstream = num;
  return TSDB_CODE_SUCCESS;
1741 1742
}

wmmhello's avatar
wmmhello 已提交
1743
static void doDestroyTableList(STableListInfo* pTableqinfoList);
1744

1745 1746 1747 1748
typedef struct SFetchRspHandleWrapper {
  uint32_t exchangeId;
  int32_t  sourceIndex;
} SFetchRspHandleWrapper;
1749

D
dapan1121 已提交
1750
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1751
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
1752 1753 1754 1755

  SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
  if (pExchangeInfo == NULL) {
    qWarn("failed to acquire exchange operator, since it may have been released");
1756
    taosMemoryFree(pMsg->pData);
1757 1758 1759
    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
1760
  int32_t          index = pWrapper->sourceIndex;
1761
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
1762

H
Haojun Liao 已提交
1763 1764
  if (code == TSDB_CODE_SUCCESS) {
    pSourceDataInfo->pRsp = pMsg->pData;
1765

H
Haojun Liao 已提交
1766 1767
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
    pRsp->numOfRows = htonl(pRsp->numOfRows);
dengyihao's avatar
dengyihao 已提交
1768
    pRsp->compLen = htonl(pRsp->compLen);
1769
    pRsp->numOfCols = htonl(pRsp->numOfCols);
dengyihao's avatar
dengyihao 已提交
1770
    pRsp->useconds = htobe64(pRsp->useconds);
1771
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
1772

1773
    ASSERT(pRsp != NULL);
H
Haojun Liao 已提交
1774 1775
    qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfBlocks,
           pRsp->numOfRows);
H
Haojun Liao 已提交
1776
  } else {
1777
    taosMemoryFree(pMsg->pData);
H
Haojun Liao 已提交
1778
    pSourceDataInfo->code = code;
1779
    qDebug("%s fetch rsp received, index:%d, error:%s", pSourceDataInfo->taskId, index, tstrerror(code));
H
Haojun Liao 已提交
1780
  }
H
Haojun Liao 已提交
1781

H
Haojun Liao 已提交
1782
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
1783 1784 1785 1786

  tsem_post(&pExchangeInfo->ready);
  taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);

wmmhello's avatar
wmmhello 已提交
1787
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1788 1789
}

D
dapan1121 已提交
1790
void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
S
Shengliang Guan 已提交
1791 1792
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
  assert(pMsg->info.ahandle != NULL);
H
Haojun Liao 已提交
1793 1794 1795 1796

  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};

  if (pMsg->contLen > 0) {
wafwerar's avatar
wafwerar 已提交
1797
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
H
Haojun Liao 已提交
1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
  }

  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
  rpcFreeCont(pMsg->pCont);
  destroySendMsgInfo(pSendInfo);
1809 1810
}

L
Liu Jicong 已提交
1811
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
1812
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
1813

L
Liu Jicong 已提交
1814 1815
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex);
  SSourceDataInfo*       pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
1816

1817 1818
  ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY);

1819
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
X
Xiaoyu Wang 已提交
1820
  pWrapper->exchangeId = pExchangeInfo->self;
1821 1822
  pWrapper->sourceIndex = sourceIndex;

D
dapan1121 已提交
1823 1824
  if (pSource->localExec) {
    SDataBuf pBuf = {0};
1825 1826 1827
    int32_t  code =
        (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId,
                                    pSource->taskId, 0, pSource->execId, &pBuf.pData, pTaskInfo->localFetch.explainRes);
D
dapan1121 已提交
1828
    loadRemoteDataCallback(pWrapper, &pBuf, code);
D
dapan1121 已提交
1829
    taosMemoryFree(pWrapper);
D
dapan1121 已提交
1830
  } else {
D
dapan1121 已提交
1831 1832 1833
    SResFetchReq* pMsg = taosMemoryCalloc(1, sizeof(SResFetchReq));
    if (NULL == pMsg) {
      pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
H
Haojun Liao 已提交
1834
      taosMemoryFree(pWrapper);
D
dapan1121 已提交
1835 1836 1837
      return pTaskInfo->code;
    }

D
dapan1121 已提交
1838
    qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu,
1839 1840
           GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId,
           pSource->execId, sourceIndex, totalSources);
D
dapan1121 已提交
1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851

    pMsg->header.vgId = htonl(pSource->addr.nodeId);
    pMsg->sId = htobe64(pSource->schedId);
    pMsg->taskId = htobe64(pSource->taskId);
    pMsg->queryId = htobe64(pTaskInfo->id.queryId);
    pMsg->execId = htonl(pSource->execId);

    // send the fetch remote task result reques
    SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
    if (NULL == pMsgSendInfo) {
      taosMemoryFreeClear(pMsg);
H
Haojun Liao 已提交
1852
      taosMemoryFree(pWrapper);
D
dapan1121 已提交
1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863
      qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
      pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
      return pTaskInfo->code;
    }

    pMsgSendInfo->param = pWrapper;
    pMsgSendInfo->paramFreeFp = taosMemoryFree;
    pMsgSendInfo->msgInfo.pData = pMsg;
    pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
    pMsgSendInfo->msgType = pSource->fetchMsgType;
    pMsgSendInfo->fp = loadRemoteDataCallback;
1864

D
dapan1121 已提交
1865
    int64_t transporterId = 0;
1866 1867
    int32_t code =
        asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
D
dapan1121 已提交
1868
  }
1869

1870 1871 1872
  return TSDB_CODE_SUCCESS;
}

1873 1874 1875 1876 1877 1878 1879 1880
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs,
                          SOperatorInfo* pOperator) {
  pInfo->totalRows += numOfRows;
  pInfo->totalSize += dataLen;
  pInfo->totalElapsed += (taosGetTimestampUs() - startTs);
  pOperator->resultInfo.totalRows += numOfRows;
}

H
Haojun Liao 已提交
1881
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart) {
H
Haojun Liao 已提交
1882
  if (pColList == NULL) {  // data from other sources
1883
    blockDataCleanup(pRes);
dengyihao's avatar
dengyihao 已提交
1884
    *pNextStart = (char*)blockDecode(pRes, pData);
H
Haojun Liao 已提交
1885
  } else {  // extract data according to pColList
1886 1887 1888 1889 1890
    char* pStart = pData;

    int32_t numOfCols = htonl(*(int32_t*)pStart);
    pStart += sizeof(int32_t);

1891
    // todo refactor:extract method
1892
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
dengyihao's avatar
dengyihao 已提交
1893
    for (int32_t i = 0; i < numOfCols; ++i) {
1894 1895 1896 1897 1898 1899 1900
      SSysTableSchema* p = (SSysTableSchema*)pStart;

      p->colId = htons(p->colId);
      p->bytes = htonl(p->bytes);
      pStart += sizeof(SSysTableSchema);
    }

1901
    SSDataBlock* pBlock = createDataBlock();
dengyihao's avatar
dengyihao 已提交
1902
    for (int32_t i = 0; i < numOfCols; ++i) {
1903 1904
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
      blockDataAppendColInfo(pBlock, &idata);
1905 1906
    }

1907
    blockDecode(pBlock, pStart);
1908
    blockDataEnsureCapacity(pRes, pBlock->info.rows);
1909

H
Haojun Liao 已提交
1910
    // data from mnode
1911
    pRes->info.rows = pBlock->info.rows;
1912 1913
    relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
    blockDataDestroy(pBlock);
1914
  }
1915

1916 1917
  // todo move this to time window aggregator, since the primary timestamp may not be known by exchange operator.
  blockDataUpdateTsWindow(pRes, 0);
1918 1919
  return TSDB_CODE_SUCCESS;
}
1920

L
Liu Jicong 已提交
1921 1922
static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
1923
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
H
Haojun Liao 已提交
1924

1925
  int64_t              el = taosGetTimestampUs() - startTs;
H
Haojun Liao 已提交
1926
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
1927

H
Haojun Liao 已提交
1928
  pLoadInfo->totalElapsed += el;
H
Haojun Liao 已提交
1929

1930
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
L
Liu Jicong 已提交
1931 1932 1933
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
         pLoadInfo->totalElapsed / 1000.0);
1934 1935 1936 1937 1938

  doSetOperatorCompleted(pOperator);
  return NULL;
}

1939 1940
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
                                           SExecTaskInfo* pTaskInfo) {
1941 1942 1943 1944 1945 1946 1947 1948
  int32_t code = 0;
  int64_t startTs = taosGetTimestampUs();
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);

  while (1) {
    int32_t completed = 0;
    for (int32_t i = 0; i < totalSources; ++i) {
      SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
1949
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
1950
        completed += 1;
H
Haojun Liao 已提交
1951 1952
        continue;
      }
1953

1954
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
1955 1956 1957
        continue;
      }

1958 1959 1960 1961 1962
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
        code = pDataInfo->code;
        goto _error;
      }

L
Liu Jicong 已提交
1963
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
X
Xiaoyu Wang 已提交
1964
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, i);
1965

H
Haojun Liao 已提交
1966
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
1967
      if (pRsp->numOfRows == 0) {
1968 1969
        qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64
               ", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu,
D
dapan1121 已提交
1970
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows,
1971
               pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources);
1972
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
1973
        completed += 1;
D
dapan1121 已提交
1974
        taosMemoryFreeClear(pDataInfo->pRsp);
1975 1976
        continue;
      }
H
Haojun Liao 已提交
1977

1978
      SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
dengyihao's avatar
dengyihao 已提交
1979 1980 1981
      int32_t            index = 0;
      char*              pStart = pRetrieveRsp->data;
      while (index++ < pRetrieveRsp->numOfBlocks) {
1982
        SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false);
H
Haojun Liao 已提交
1983
        code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
1984 1985 1986 1987 1988 1989
        if (code != 0) {
          taosMemoryFreeClear(pDataInfo->pRsp);
          goto _error;
        }

        taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
1990 1991
      }

1992
      updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
1993

1994
      if (pRsp->completed == 1) {
dengyihao's avatar
dengyihao 已提交
1995 1996 1997 1998
        qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
               " execId:%d"
               " index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
               ", total:%.2f Kb,"
1999
               " completed:%d try next %d/%" PRIzu,
H
Haojun Liao 已提交
2000 2001 2002
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks,
               pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
               completed + 1, i + 1, totalSources);
2003
        completed += 1;
2004
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2005
      } else {
dengyihao's avatar
dengyihao 已提交
2006 2007 2008 2009
        qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
               " execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb",
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks,
               pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
2010 2011
      }

2012 2013
      taosMemoryFreeClear(pDataInfo->pRsp);

2014 2015
      if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED) {
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
2016 2017
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
        if (code != TSDB_CODE_SUCCESS) {
2018
          taosMemoryFreeClear(pDataInfo->pRsp);
2019 2020 2021 2022
          goto _error;
        }
      }

2023
      return;
2024 2025
    }

2026
    if (completed == totalSources) {
2027 2028
      setAllSourcesCompleted(pOperator, startTs);
      return;
2029
    }
H
Haojun Liao 已提交
2030 2031

    sched_yield();
2032 2033 2034 2035 2036 2037
  }

_error:
  pTaskInfo->code = code;
}

L
Liu Jicong 已提交
2038 2039 2040
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2041

L
Liu Jicong 已提交
2042
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2043 2044 2045
  int64_t startTs = taosGetTimestampUs();

  // Asynchronously send all fetch requests to all sources.
L
Liu Jicong 已提交
2046
  for (int32_t i = 0; i < totalSources; ++i) {
2047 2048
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2049 2050
      pTaskInfo->code = code;
      return code;
2051 2052 2053 2054
    }
  }

  int64_t endTs = taosGetTimestampUs();
2055
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
X
Xiaoyu Wang 已提交
2056
         totalSources, (endTs - startTs) / 1000.0);
2057

2058
  pOperator->status = OP_RES_TO_RETURN;
H
Haojun Liao 已提交
2059
  pOperator->cost.openCost = taosGetTimestampUs() - startTs;
2060

2061
  tsem_wait(&pExchangeInfo->ready);
H
Haojun Liao 已提交
2062
  return TSDB_CODE_SUCCESS;
2063 2064
}

2065
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2066 2067
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2068

L
Liu Jicong 已提交
2069
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2070
  int64_t startTs = taosGetTimestampUs();
2071

L
Liu Jicong 已提交
2072
  while (1) {
2073
    if (pExchangeInfo->current >= totalSources) {
2074 2075
      setAllSourcesCompleted(pOperator, startTs);
      return TSDB_CODE_SUCCESS;
2076
    }
2077

2078 2079 2080
    doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
    tsem_wait(&pExchangeInfo->ready);

dengyihao's avatar
dengyihao 已提交
2081
    SSourceDataInfo*       pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
X
Xiaoyu Wang 已提交
2082
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
2083

H
Haojun Liao 已提交
2084
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
2085 2086
      qError("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d error happens, code:%s", GET_TASKID(pTaskInfo),
             pSource->addr.nodeId, pSource->taskId, pSource->execId, tstrerror(pDataInfo->code));
H
Haojun Liao 已提交
2087
      pOperator->pTaskInfo->code = pDataInfo->code;
2088
      return pOperator->pTaskInfo->code;
H
Haojun Liao 已提交
2089 2090
    }

L
Liu Jicong 已提交
2091
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
H
Haojun Liao 已提交
2092
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2093
    if (pRsp->numOfRows == 0) {
2094 2095
      qDebug("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d %d of total completed, rowsOfSource:%" PRIu64
             ", totalRows:%" PRIu64 " try next",
D
dapan1121 已提交
2096
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pExchangeInfo->current + 1,
H
Haojun Liao 已提交
2097
             pDataInfo->totalRows, pLoadInfo->totalRows);
H
Haojun Liao 已提交
2098

2099
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2100
      pExchangeInfo->current += 1;
D
dapan1121 已提交
2101
      taosMemoryFreeClear(pDataInfo->pRsp);
2102 2103
      continue;
    }
H
Haojun Liao 已提交
2104

2105 2106 2107
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;

    char*   pStart = pRetrieveRsp->data;
H
Haojun Liao 已提交
2108
    int32_t code = extractDataBlockFromFetchRsp(NULL, pStart, NULL, &pStart);
2109 2110

    if (pRsp->completed == 1) {
D
dapan1121 已提交
2111
      qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64
L
Liu Jicong 已提交
2112
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
2113
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
2114 2115
             pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1,
             totalSources);
2116

2117
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2118 2119
      pExchangeInfo->current += 1;
    } else {
D
dapan1121 已提交
2120
      qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64
L
Liu Jicong 已提交
2121
             ", totalBytes:%" PRIu64,
2122
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
2123
             pLoadInfo->totalRows, pLoadInfo->totalSize);
2124 2125
    }

2126 2127 2128
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;

2129
    taosMemoryFreeClear(pDataInfo->pRsp);
2130
    return TSDB_CODE_SUCCESS;
2131
  }
2132 2133
}

L
Liu Jicong 已提交
2134
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
2135
  if (OPTR_IS_OPENED(pOperator)) {
H
Haojun Liao 已提交
2136 2137 2138
    return TSDB_CODE_SUCCESS;
  }

2139 2140
  int64_t st = taosGetTimestampUs();

L
Liu Jicong 已提交
2141
  SExchangeInfo* pExchangeInfo = pOperator->info;
2142
  if (!pExchangeInfo->seqLoadData) {
H
Haojun Liao 已提交
2143 2144 2145 2146 2147 2148
    int32_t code = prepareConcurrentlyLoad(pOperator);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
  }

2149
  OPTR_SET_OPENED(pOperator);
2150
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
2151 2152 2153
  return TSDB_CODE_SUCCESS;
}

2154 2155 2156 2157 2158
static void freeBlock(void* pParam) {
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
  blockDataDestroy(pBlock);
}

2159
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2160 2161
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2162

2163
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
2164
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2165 2166
    return NULL;
  }
2167

2168
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
H
Haojun Liao 已提交
2169

2170
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2171
  if (pOperator->status == OP_EXEC_DONE) {
L
Liu Jicong 已提交
2172 2173 2174
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
           pLoadInfo->totalElapsed / 1000.0);
2175 2176 2177
    return NULL;
  }

2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190
  size_t size = taosArrayGetSize(pExchangeInfo->pResultBlockList);
  if (size == 0 || pExchangeInfo->rspBlockIndex >= size) {
    pExchangeInfo->rspBlockIndex = 0;
    taosArrayClearEx(pExchangeInfo->pResultBlockList, freeBlock);
    if (pExchangeInfo->seqLoadData) {
      seqLoadRemoteData(pOperator);
    } else {
      concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
    }

    if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
      return NULL;
    }
2191
  }
2192 2193 2194

  // we have buffered retrieved datablock, return it directly
  return taosArrayGetP(pExchangeInfo->pResultBlockList, pExchangeInfo->rspBlockIndex++);
H
Haojun Liao 已提交
2195
}
2196

2197 2198 2199 2200 2201 2202 2203 2204
static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
  SExchangeInfo* pExchangeInfo = pOperator->info;
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

L
Liu Jicong 已提交
2205
  while (1) {
2206 2207 2208 2209 2210 2211 2212
    SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
    if (pBlock == NULL) {
      return NULL;
    }

    SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
    if (hasLimitOffsetInfo(pLimitInfo)) {
2213
      int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
2214 2215 2216
      if (status == PROJECT_RETRIEVE_CONTINUE) {
        continue;
      } else if (status == PROJECT_RETRIEVE_DONE) {
2217
        size_t rows = pBlock->info.rows;
2218 2219 2220 2221 2222 2223
        pExchangeInfo->limitInfo.numOfOutputRows += rows;

        if (rows == 0) {
          doSetOperatorCompleted(pOperator);
          return NULL;
        } else {
2224
          return pBlock;
2225 2226 2227
        }
      }
    } else {
2228
      return pBlock;
2229 2230 2231 2232
    }
  }
}

2233
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
2234
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
H
Haojun Liao 已提交
2235 2236
  if (pInfo->pSourceDataInfo == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
2237 2238
  }

L
Liu Jicong 已提交
2239
  for (int32_t i = 0; i < numOfSources; ++i) {
2240
    SSourceDataInfo dataInfo = {0};
H
Haojun Liao 已提交
2241
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
2242
    dataInfo.taskId = id;
L
Liu Jicong 已提交
2243
    dataInfo.index = i;
X
Xiaoyu Wang 已提交
2244
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
2245
    if (pDs == NULL) {
H
Haojun Liao 已提交
2246 2247 2248 2249 2250 2251 2252 2253
      taosArrayDestroy(pInfo->pSourceDataInfo);
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
}

2254
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
2255
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
H
Haojun Liao 已提交
2256

2257
  if (numOfSources == 0) {
X
Xiaoyu Wang 已提交
2258
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
2259 2260 2261
    return TSDB_CODE_INVALID_PARA;
  }

H
Haojun Liao 已提交
2262
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
wmmhello's avatar
wmmhello 已提交
2263
  if (pInfo->pSources == NULL) {
2264
    return TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
2265 2266
  }

L
Liu Jicong 已提交
2267
  for (int32_t i = 0; i < numOfSources; ++i) {
D
dapan1121 已提交
2268
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
H
Haojun Liao 已提交
2269 2270
    taosArrayPush(pInfo->pSources, pNode);
  }
2271

2272
  initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
2273 2274
  pInfo->self = taosAddRef(exchangeObjRefPool, pInfo);

2275
  return initDataSource(numOfSources, pInfo, id);
2276 2277 2278 2279 2280 2281
}

SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo) {
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
2282
    goto _error;
2283
  }
H
Haojun Liao 已提交
2284

2285
  int32_t code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
2286 2287 2288
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2289 2290

  tsem_init(&pInfo->ready, 0, 0);
2291 2292
  pInfo->pDummyBlock = createResDataBlock(pExNode->node.pOutputDataBlockDesc);
  pInfo->pResultBlockList = taosArrayInit(1, POINTER_BYTES);
2293

2294
  pInfo->seqLoadData = false;
2295
  pInfo->pTransporter = pTransporter;
2296

2297
  pOperator->name = "ExchangeOperator";
X
Xiaoyu Wang 已提交
2298
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
X
Xiaoyu Wang 已提交
2299
  pOperator->blocking = false;
2300 2301
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
2302
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
X
Xiaoyu Wang 已提交
2303
  pOperator->pTaskInfo = pTaskInfo;
2304

L
Liu Jicong 已提交
2305 2306
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
                                         destroyExchangeOperatorInfo, NULL, NULL, NULL);
2307
  return pOperator;
H
Haojun Liao 已提交
2308

L
Liu Jicong 已提交
2309
_error:
H
Haojun Liao 已提交
2310
  if (pInfo != NULL) {
2311
    doDestroyExchangeOperatorInfo(pInfo);
H
Haojun Liao 已提交
2312 2313
  }

wafwerar's avatar
wafwerar 已提交
2314
  taosMemoryFreeClear(pOperator);
2315
  pTaskInfo->code = code;
H
Haojun Liao 已提交
2316
  return NULL;
2317 2318
}

dengyihao's avatar
dengyihao 已提交
2319 2320
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                                const char* pKey);
2321

L
Liu Jicong 已提交
2322
static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) {
2323 2324 2325 2326
  size_t size = taosArrayGetSize(groupInfo);
  if (size == 0) {
    return true;
  }
2327

2328 2329
  for (int32_t i = 0; i < size; ++i) {
    int32_t* index = taosArrayGet(groupInfo, i);
2330

2331
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index);
L
Liu Jicong 已提交
2332
    bool             isNull = colDataIsNull(pColInfo, rowIndex, pBlock->info.rows, NULL);
2333

2334 2335 2336
    if ((isNull && buf[i] != NULL) || (!isNull && buf[i] == NULL)) {
      return false;
    }
2337

2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350
    char* pCell = colDataGetData(pColInfo, rowIndex);
    if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
      if (varDataLen(pCell) != varDataLen(buf[i])) {
        return false;
      } else {
        if (memcmp(varDataVal(pCell), varDataVal(buf[i]), varDataLen(pCell)) != 0) {
          return false;
        }
      }
    } else {
      if (memcmp(pCell, buf[i], pColInfo->info.bytes) != 0) {
        return false;
      }
2351 2352 2353
    }
  }

2354
  return 0;
2355 2356
}

2357
static bool saveCurrentTuple(char** rowColData, SArray* pColumnList, SSDataBlock* pBlock, int32_t rowIndex) {
L
Liu Jicong 已提交
2358
  int32_t size = (int32_t)taosArrayGetSize(pColumnList);
2359

L
Liu Jicong 已提交
2360 2361
  for (int32_t i = 0; i < size; ++i) {
    int32_t*         index = taosArrayGet(pColumnList, i);
2362
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index);
H
Haojun Liao 已提交
2363

2364 2365 2366
    char* data = colDataGetData(pColInfo, rowIndex);
    memcpy(rowColData[i], data, colDataGetLength(pColInfo, rowIndex));
  }
2367

2368 2369
  return true;
}
2370

X
Xiaoyu Wang 已提交
2371
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) {
2372
  // todo add more information about exchange operation
2373
  int32_t type = pOperator->operatorType;
X
Xiaoyu Wang 已提交
2374
  if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN ||
2375
      type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN ||
2376
      type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN) {
2377 2378 2379
    *order = TSDB_ORDER_ASC;
    *scanFlag = MAIN_SCAN;
    return TSDB_CODE_SUCCESS;
2380
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
2381 2382 2383 2384
    STableScanInfo* pTableScanInfo = pOperator->info;
    *order = pTableScanInfo->cond.order;
    *scanFlag = pTableScanInfo->scanFlag;
    return TSDB_CODE_SUCCESS;
2385 2386 2387 2388 2389
  } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) {
    STableMergeScanInfo* pTableScanInfo = pOperator->info;
    *order = pTableScanInfo->cond.order;
    *scanFlag = pTableScanInfo->scanFlag;
    return TSDB_CODE_SUCCESS;
2390
  } else {
H
Haojun Liao 已提交
2391
    if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
2392
      return TSDB_CODE_INVALID_PARA;
H
Haojun Liao 已提交
2393
    } else {
2394
      return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag);
2395 2396 2397
    }
  }
}
2398

2399
// this is a blocking operator
L
Liu Jicong 已提交
2400
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
2401 2402
  if (OPTR_IS_OPENED(pOperator)) {
    return TSDB_CODE_SUCCESS;
2403 2404
  }

H
Haojun Liao 已提交
2405
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
2406
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
2407

2408 2409
  SExprSupp*     pSup = &pOperator->exprSupp;
  SOperatorInfo* downstream = pOperator->pDownstream[0];
2410

2411 2412
  int64_t st = taosGetTimestampUs();

2413 2414 2415
  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;

H
Haojun Liao 已提交
2416
  while (1) {
2417
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
2418 2419 2420 2421
    if (pBlock == NULL) {
      break;
    }

2422 2423
    int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
    if (code != TSDB_CODE_SUCCESS) {
2424
      T_LONG_JMP(pTaskInfo->env, code);
2425
    }
2426

2427
    // there is an scalar expression that needs to be calculated before apply the group aggregation.
2428 2429 2430
    if (pAggInfo->scalarExprSup.pExprInfo != NULL) {
      SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
      code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
2431
      if (code != TSDB_CODE_SUCCESS) {
2432
        T_LONG_JMP(pTaskInfo->env, code);
2433
      }
2434 2435
    }

2436
    // the pDataBlock are always the same one, no need to call this again
2437
    setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId);
2438
    setInputDataBlock(pSup, pBlock, order, scanFlag, true);
2439
    code = doAggregateImpl(pOperator, pSup->pCtx);
2440
    if (code != 0) {
2441
      T_LONG_JMP(pTaskInfo->env, code);
2442
    }
2443 2444
  }

2445
  initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
H
Haojun Liao 已提交
2446
  OPTR_SET_OPENED(pOperator);
2447

2448
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
H
Haojun Liao 已提交
2449 2450 2451
  return TSDB_CODE_SUCCESS;
}

2452
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2453
  SAggOperatorInfo* pAggInfo = pOperator->info;
H
Haojun Liao 已提交
2454 2455 2456 2457 2458 2459
  SOptrBasicInfo*   pInfo = &pAggInfo->binfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

L
Liu Jicong 已提交
2460
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2461
  pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
H
Haojun Liao 已提交
2462
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
2463
    doSetOperatorCompleted(pOperator);
H
Haojun Liao 已提交
2464 2465 2466
    return NULL;
  }

H
Haojun Liao 已提交
2467
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
S
slzhou 已提交
2468 2469
  while (1) {
    doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
H
Haojun Liao 已提交
2470
    doFilter(pAggInfo->pCondition, pInfo->pRes, NULL, NULL);
S
slzhou 已提交
2471

2472
    if (!hasRemainResults(&pAggInfo->groupResInfo)) {
S
slzhou 已提交
2473 2474 2475
      doSetOperatorCompleted(pOperator);
      break;
    }
2476

S
slzhou 已提交
2477 2478 2479 2480
    if (pInfo->pRes->info.rows > 0) {
      break;
    }
  }
2481

2482
  size_t rows = blockDataGetNumOfRows(pInfo->pRes);
2483 2484
  pOperator->resultInfo.totalRows += rows;

2485
  return (rows == 0) ? NULL : pInfo->pRes;
2486 2487
}

wmmhello's avatar
wmmhello 已提交
2488
int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length) {
2489
  if (result == NULL || length == NULL) {
wmmhello's avatar
wmmhello 已提交
2490 2491 2492
    return TSDB_CODE_TSC_INVALID_INPUT;
  }
  SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
2493
  SAggSupporter*  pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
2494
  int32_t         size = tSimpleHashGetSize(pSup->pResultRowHashTable);
2495 2496 2497
  size_t          keyLen = sizeof(uint64_t) * 2;  // estimate the key length
  int32_t         totalSize =
      sizeof(int32_t) + sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize);
wmmhello's avatar
wmmhello 已提交
2498

C
Cary Xu 已提交
2499 2500 2501 2502 2503 2504
  // no result
  if (getTotalBufSize(pSup->pResultBuf) == 0) {
    *result = NULL;
    *length = 0;
    return TSDB_CODE_SUCCESS;
  }
2505

wmmhello's avatar
wmmhello 已提交
2506
  *result = (char*)taosMemoryCalloc(1, totalSize);
L
Liu Jicong 已提交
2507
  if (*result == NULL) {
wmmhello's avatar
wmmhello 已提交
2508
    return TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
2509
  }
wmmhello's avatar
wmmhello 已提交
2510

wmmhello's avatar
wmmhello 已提交
2511
  int32_t offset = sizeof(int32_t);
wmmhello's avatar
wmmhello 已提交
2512 2513
  *(int32_t*)(*result + offset) = size;
  offset += sizeof(int32_t);
2514 2515

  // prepare memory
2516
  SResultRowPosition* pos = &pInfo->resultRowInfo.cur;
dengyihao's avatar
dengyihao 已提交
2517 2518
  void*               pPage = getBufPage(pSup->pResultBuf, pos->pageId);
  SResultRow*         pRow = (SResultRow*)((char*)pPage + pos->offset);
2519 2520
  setBufPageDirty(pPage, true);
  releaseBufPage(pSup->pResultBuf, pPage);
L
Liu Jicong 已提交
2521

2522 2523 2524 2525
  int32_t iter = 0;
  void*   pIter = NULL;
  while ((pIter = tSimpleHashIterate(pSup->pResultRowHashTable, pIter, &iter))) {
    void*               key = tSimpleHashGetKey(pIter, &keyLen);
2526
    SResultRowPosition* p1 = (SResultRowPosition*)pIter;
2527

dengyihao's avatar
dengyihao 已提交
2528
    pPage = (SFilePage*)getBufPage(pSup->pResultBuf, p1->pageId);
2529
    pRow = (SResultRow*)((char*)pPage + p1->offset);
2530 2531
    setBufPageDirty(pPage, true);
    releaseBufPage(pSup->pResultBuf, pPage);
wmmhello's avatar
wmmhello 已提交
2532 2533 2534

    // recalculate the result size
    int32_t realTotalSize = offset + sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize;
L
Liu Jicong 已提交
2535
    if (realTotalSize > totalSize) {
wmmhello's avatar
wmmhello 已提交
2536
      char* tmp = (char*)taosMemoryRealloc(*result, realTotalSize);
L
Liu Jicong 已提交
2537
      if (tmp == NULL) {
wafwerar's avatar
wafwerar 已提交
2538
        taosMemoryFree(*result);
wmmhello's avatar
wmmhello 已提交
2539
        *result = NULL;
wmmhello's avatar
wmmhello 已提交
2540
        return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
2541
      } else {
wmmhello's avatar
wmmhello 已提交
2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553
        *result = tmp;
      }
    }
    // save key
    *(int32_t*)(*result + offset) = keyLen;
    offset += sizeof(int32_t);
    memcpy(*result + offset, key, keyLen);
    offset += keyLen;

    // save value
    *(int32_t*)(*result + offset) = pSup->resultRowSize;
    offset += sizeof(int32_t);
2554
    memcpy(*result + offset, pRow, pSup->resultRowSize);
wmmhello's avatar
wmmhello 已提交
2555 2556 2557
    offset += pSup->resultRowSize;
  }

wmmhello's avatar
wmmhello 已提交
2558 2559 2560 2561
  *(int32_t*)(*result) = offset;
  *length = offset;

  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2562 2563
}

2564
int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) {
2565
  if (result == NULL) {
wmmhello's avatar
wmmhello 已提交
2566
    return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
2567
  }
wmmhello's avatar
wmmhello 已提交
2568
  SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
2569
  SAggSupporter*  pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
wmmhello's avatar
wmmhello 已提交
2570 2571

  //  int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
2572
  int32_t length = *(int32_t*)(result);
wmmhello's avatar
wmmhello 已提交
2573
  int32_t offset = sizeof(int32_t);
2574 2575 2576 2577

  int32_t count = *(int32_t*)(result + offset);
  offset += sizeof(int32_t);

L
Liu Jicong 已提交
2578
  while (count-- > 0 && length > offset) {
wmmhello's avatar
wmmhello 已提交
2579 2580 2581
    int32_t keyLen = *(int32_t*)(result + offset);
    offset += sizeof(int32_t);

L
Liu Jicong 已提交
2582
    uint64_t    tableGroupId = *(uint64_t*)(result + offset);
2583
    SResultRow* resultRow = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
L
Liu Jicong 已提交
2584
    if (!resultRow) {
wmmhello's avatar
wmmhello 已提交
2585
      return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
2586
    }
2587

wmmhello's avatar
wmmhello 已提交
2588
    // add a new result set for a new group
2589
    SResultRowPosition pos = {.pageId = resultRow->pageId, .offset = resultRow->offset};
2590
    tSimpleHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &pos, sizeof(SResultRowPosition));
wmmhello's avatar
wmmhello 已提交
2591 2592 2593

    offset += keyLen;
    int32_t valueLen = *(int32_t*)(result + offset);
L
Liu Jicong 已提交
2594
    if (valueLen != pSup->resultRowSize) {
wmmhello's avatar
wmmhello 已提交
2595
      return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
2596 2597 2598 2599 2600 2601 2602 2603 2604
    }
    offset += sizeof(int32_t);
    int32_t pageId = resultRow->pageId;
    int32_t pOffset = resultRow->offset;
    memcpy(resultRow, result + offset, valueLen);
    resultRow->pageId = pageId;
    resultRow->offset = pOffset;
    offset += valueLen;

dengyihao's avatar
dengyihao 已提交
2605
    pInfo->resultRowInfo.cur = (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset};
H
Haojun Liao 已提交
2606
    // releaseBufPage(pSup->pResultBuf, getBufPage(pSup->pResultBuf, pageId));
wmmhello's avatar
wmmhello 已提交
2607 2608
  }

L
Liu Jicong 已提交
2609
  if (offset != length) {
wmmhello's avatar
wmmhello 已提交
2610
    return TSDB_CODE_TSC_INVALID_INPUT;
wmmhello's avatar
wmmhello 已提交
2611
  }
wmmhello's avatar
wmmhello 已提交
2612
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
2613 2614
}

2615 2616 2617 2618 2619
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
  if (pLimitInfo->remainGroupOffset > 0) {
    if (pLimitInfo->currentGroupId == 0) {  // it is the first group
      pLimitInfo->currentGroupId = pBlock->info.groupId;
      blockDataCleanup(pBlock);
2620
      return PROJECT_RETRIEVE_CONTINUE;
2621 2622 2623
    } else if (pLimitInfo->currentGroupId != pBlock->info.groupId) {
      // now it is the data from a new group
      pLimitInfo->remainGroupOffset -= 1;
2624 2625

      // ignore data block in current group
2626 2627
      if (pLimitInfo->remainGroupOffset > 0) {
        blockDataCleanup(pBlock);
2628 2629 2630 2631 2632
        return PROJECT_RETRIEVE_CONTINUE;
      }
    }

    // set current group id of the project operator
2633
    pLimitInfo->currentGroupId = pBlock->info.groupId;
2634 2635
  }

2636
  // here check for a new group data, we need to handle the data of the previous group.
2637 2638 2639
  if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.groupId) {
    pLimitInfo->numOfOutputGroups += 1;
    if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
2640
      pOperator->status = OP_EXEC_DONE;
2641
      blockDataCleanup(pBlock);
2642 2643 2644 2645 2646

      return PROJECT_RETRIEVE_DONE;
    }

    // reset the value for a new group data
2647 2648
    pLimitInfo->numOfOutputRows = 0;
    pLimitInfo->remainOffset = pLimitInfo->limit.offset;
2649 2650 2651 2652 2653

    // existing rows that belongs to previous group.
    if (pBlock->info.rows > 0) {
      return PROJECT_RETRIEVE_DONE;
    }
2654 2655 2656 2657 2658
  }

  // here we reach the start position, according to the limit/offset requirements.

  // set current group id
2659
  pLimitInfo->currentGroupId = pBlock->info.groupId;
2660

2661 2662 2663
  if (pLimitInfo->remainOffset >= pBlock->info.rows) {
    pLimitInfo->remainOffset -= pBlock->info.rows;
    blockDataCleanup(pBlock);
2664
    return PROJECT_RETRIEVE_CONTINUE;
2665 2666 2667
  } else if (pLimitInfo->remainOffset < pBlock->info.rows && pLimitInfo->remainOffset > 0) {
    blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset);
    pLimitInfo->remainOffset = 0;
2668 2669
  }

2670
  // check for the limitation in each group
2671 2672 2673 2674
  if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) {
    int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows);
    blockDataKeepFirstNRows(pBlock, keepRows);
    if (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups) {
2675 2676 2677
      pOperator->status = OP_EXEC_DONE;
    }

2678
    return PROJECT_RETRIEVE_DONE;
2679
  }
2680

2681
  // todo optimize performance
2682 2683
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
  // they may not belong to the same group the limit/offset value is not valid in this case.
2684 2685
  if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || pLimitInfo->slimit.offset != -1 ||
      pLimitInfo->slimit.limit != -1) {
2686
    return PROJECT_RETRIEVE_DONE;
L
Liu Jicong 已提交
2687
  } else {  // not full enough, continue to accumulate the output data in the buffer.
2688 2689 2690 2691
    return PROJECT_RETRIEVE_CONTINUE;
  }
}

2692
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
L
Liu Jicong 已提交
2693 2694
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
                                               SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) {
2695
  pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
2696 2697 2698 2699 2700
  SSDataBlock* pResBlock = pInfo->pFinalRes;

  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;
  getTableScanInfo(pOperator, &order, &scanFlag);
H
Haojun Liao 已提交
2701

L
Liu Jicong 已提交
2702 2703
  int64_t ekey =
      Q_STATUS_EQUAL(pTaskInfo->status, TASK_COMPLETED) ? pInfo->win.ekey : pInfo->existNewGroupBlock->info.window.ekey;
2704 2705
  taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));

2706
  blockDataCleanup(pInfo->pRes);
2707 2708 2709 2710
  doApplyScalarCalculation(pOperator, pInfo->existNewGroupBlock, order, scanFlag);

  taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, ekey);
  taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
2711

2712 2713
  int32_t numOfResultRows = pResultInfo->capacity - pResBlock->info.rows;
  taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows);
H
Haojun Liao 已提交
2714

2715
  pInfo->curGroupId = pInfo->existNewGroupBlock->info.groupId;
2716 2717 2718
  pInfo->existNewGroupBlock = NULL;
}

L
Liu Jicong 已提交
2719 2720
static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
                                            SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) {
2721
  if (taosFillHasMoreResults(pInfo->pFillInfo)) {
H
Haojun Liao 已提交
2722 2723
    int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows;
    taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pFinalRes, numOfResultRows);
2724 2725
    pInfo->pRes->info.groupId = pInfo->curGroupId;
    return;
2726 2727 2728 2729
  }

  // handle the cached new group data block
  if (pInfo->existNewGroupBlock) {
2730 2731 2732 2733 2734 2735
    doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo);
  }
}

static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
  SFillOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
2736
  SExprSupp*         pSup = &pOperator->exprSupp;
2737
  setInputDataBlock(pSup, pBlock, order, scanFlag, false);
2738 2739
  projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL);

2740 2741 2742 2743
  // reset the row value before applying the no-fill functions to the input data block, which is "pBlock" in this case.
  pInfo->pRes->info.rows = 0;
  SExprSupp* pNoFillSupp = &pInfo->noFillExprSupp;
  setInputDataBlock(pNoFillSupp, pBlock, order, scanFlag, false);
2744

2745 2746
  projectApplyFunctions(pNoFillSupp->pExprInfo, pInfo->pRes, pBlock, pNoFillSupp->pCtx, pNoFillSupp->numOfExprs, NULL);
  pInfo->pRes->info.groupId = pBlock->info.groupId;
2747 2748
}

S
slzhou 已提交
2749
static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2750 2751
  SFillOperatorInfo* pInfo = pOperator->info;
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
2752

H
Haojun Liao 已提交
2753
  SResultInfo* pResultInfo = &pOperator->resultInfo;
H
Haojun Liao 已提交
2754
  SSDataBlock* pResBlock = pInfo->pFinalRes;
2755 2756

  blockDataCleanup(pResBlock);
2757

H
Haojun Liao 已提交
2758 2759
  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;
2760
  getTableScanInfo(pOperator, &order, &scanFlag);
2761

2762
  doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo);
2763
  if (pResBlock->info.rows > 0) {
2764
    pResBlock->info.groupId = pInfo->curGroupId;
2765
    return pResBlock;
H
Haojun Liao 已提交
2766
  }
2767

H
Haojun Liao 已提交
2768
  SOperatorInfo* pDownstream = pOperator->pDownstream[0];
L
Liu Jicong 已提交
2769
  while (1) {
2770
    SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream);
2771 2772
    if (pBlock == NULL) {
      if (pInfo->totalInputRows == 0) {
2773
        doSetOperatorCompleted(pOperator);
2774 2775
        return NULL;
      }
2776

2777
      taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
2778
    } else {
2779
      blockDataUpdateTsWindow(pBlock, pInfo->primarySrcSlotId);
2780 2781

      blockDataCleanup(pInfo->pRes);
2782 2783
      blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
      blockDataEnsureCapacity(pInfo->pFinalRes, pBlock->info.rows);
2784
      doApplyScalarCalculation(pOperator, pBlock, order, scanFlag);
2785

H
Haojun Liao 已提交
2786 2787 2788
      if (pInfo->curGroupId == 0 || pInfo->curGroupId == pInfo->pRes->info.groupId) {
        pInfo->curGroupId = pInfo->pRes->info.groupId;  // the first data block
        pInfo->totalInputRows += pInfo->pRes->info.rows;
2789

H
Haojun Liao 已提交
2790 2791 2792 2793 2794
        if (order == pInfo->pFillInfo->order) {
          taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, pBlock->info.window.ekey);
        } else {
          taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, pBlock->info.window.skey);
        }
H
Haojun Liao 已提交
2795
        taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
L
Liu Jicong 已提交
2796
      } else if (pInfo->curGroupId != pBlock->info.groupId) {  // the new group data block
2797 2798 2799 2800 2801
        pInfo->existNewGroupBlock = pBlock;

        // Fill the previous group data block, before handle the data block of new group.
        // Close the fill operation for previous group data block
        taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
2802 2803 2804
      }
    }

2805 2806
    int32_t numOfResultRows = pOperator->resultInfo.capacity - pResBlock->info.rows;
    taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows);
2807 2808

    // current group has no more result to return
2809
    if (pResBlock->info.rows > 0) {
2810 2811
      // 1. The result in current group not reach the threshold of output result, continue
      // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
2812
      if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || pInfo->existNewGroupBlock != NULL) {
2813
        pResBlock->info.groupId = pInfo->curGroupId;
2814
        return pResBlock;
2815 2816
      }

2817
      doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo);
2818
      if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) {
2819
        pResBlock->info.groupId = pInfo->curGroupId;
2820
        return pResBlock;
2821 2822 2823
      }
    } else if (pInfo->existNewGroupBlock) {  // try next group
      assert(pBlock != NULL);
2824 2825 2826 2827

      blockDataCleanup(pResBlock);

      doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo);
2828
      if (pResBlock->info.rows > pResultInfo->threshold) {
2829
        pResBlock->info.groupId = pInfo->curGroupId;
2830
        return pResBlock;
2831 2832 2833 2834 2835 2836 2837
      }
    } else {
      return NULL;
    }
  }
}

S
slzhou 已提交
2838 2839 2840 2841 2842 2843 2844 2845
static SSDataBlock* doFill(SOperatorInfo* pOperator) {
  SFillOperatorInfo* pInfo = pOperator->info;
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;

  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

S
slzhou 已提交
2846
  SSDataBlock* fillResult = NULL;
S
slzhou 已提交
2847
  while (true) {
S
slzhou 已提交
2848
    fillResult = doFillImpl(pOperator);
S
slzhou 已提交
2849 2850 2851 2852 2853
    if (fillResult == NULL) {
      doSetOperatorCompleted(pOperator);
      break;
    }

H
Haojun Liao 已提交
2854
    doFilter(pInfo->pCondition, fillResult, pInfo->pColMatchColInfo, NULL);
S
slzhou 已提交
2855 2856 2857 2858 2859
    if (fillResult->info.rows > 0) {
      break;
    }
  }

S
slzhou 已提交
2860
  if (fillResult != NULL) {
2861
    pOperator->resultInfo.totalRows += fillResult->info.rows;
S
slzhou 已提交
2862
  }
S
slzhou 已提交
2863

S
slzhou 已提交
2864
  return fillResult;
S
slzhou 已提交
2865 2866
}

2867
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
C
Cary Xu 已提交
2868 2869 2870 2871 2872
  for (int32_t i = 0; i < numOfExprs; ++i) {
    SExprInfo* pExprInfo = &pExpr[i];
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
        taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
H
Haojun Liao 已提交
2873
      }
2874
    }
C
Cary Xu 已提交
2875 2876 2877

    taosMemoryFree(pExprInfo->base.pParam);
    taosMemoryFree(pExprInfo->pExpr);
H
Haojun Liao 已提交
2878 2879 2880
  }
}

2881 2882 2883 2884 2885
static void destroyOperatorInfo(SOperatorInfo* pOperator) {
  if (pOperator == NULL) {
    return;
  }

2886
  if (pOperator->fpSet.closeFn != NULL) {
2887
    pOperator->fpSet.closeFn(pOperator->info);
2888 2889
  }

H
Haojun Liao 已提交
2890
  if (pOperator->pDownstream != NULL) {
L
Liu Jicong 已提交
2891
    for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
H
Haojun Liao 已提交
2892
      destroyOperatorInfo(pOperator->pDownstream[i]);
2893 2894
    }

wafwerar's avatar
wafwerar 已提交
2895
    taosMemoryFreeClear(pOperator->pDownstream);
H
Haojun Liao 已提交
2896
    pOperator->numOfDownstream = 0;
2897 2898
  }

2899
  cleanupExprSupp(&pOperator->exprSupp);
wafwerar's avatar
wafwerar 已提交
2900
  taosMemoryFreeClear(pOperator);
2901 2902
}

2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914 2915 2916 2917
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) {
  *defaultPgsz = 4096;
  while (*defaultPgsz < rowSize * 4) {
    *defaultPgsz <<= 1u;
  }

  // at least four pages need to be in buffer
  *defaultBufsz = 4096 * 256;
  if ((*defaultBufsz) <= (*defaultPgsz)) {
    (*defaultBufsz) = (*defaultPgsz) * 4;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
2918 2919
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
                         const char* pKey) {
L
Liu Jicong 已提交
2920
  int32_t    code = 0;
2921 2922
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);

2923
  pAggSup->currentPageId = -1;
dengyihao's avatar
dengyihao 已提交
2924 2925
  pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
  pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
2926
  pAggSup->pResultRowHashTable = tSimpleHashInit(10, hashFn);
2927

H
Haojun Liao 已提交
2928
  if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
2929 2930 2931
    return TSDB_CODE_OUT_OF_MEMORY;
  }

dengyihao's avatar
dengyihao 已提交
2932
  uint32_t defaultPgsz = 0;
2933 2934
  uint32_t defaultBufsz = 0;
  getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
H
Haojun Liao 已提交
2935

wafwerar's avatar
wafwerar 已提交
2936
  if (!osTempSpaceAvailable()) {
H
Haojun Liao 已提交
2937 2938 2939
    code = TSDB_CODE_NO_AVAIL_DISK;
    qError("Init stream agg supporter failed since %s, %s", terrstr(code), pKey);
    return code;
wafwerar's avatar
wafwerar 已提交
2940
  }
2941

H
Haojun Liao 已提交
2942
  code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
H
Haojun Liao 已提交
2943
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2944
    qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey);
H
Haojun Liao 已提交
2945 2946 2947
    return code;
  }

H
Haojun Liao 已提交
2948
  return code;
2949 2950
}

2951
void cleanupAggSup(SAggSupporter* pAggSup) {
wafwerar's avatar
wafwerar 已提交
2952
  taosMemoryFreeClear(pAggSup->keyBuf);
2953
  tSimpleHashCleanup(pAggSup->pResultRowHashTable);
H
Haojun Liao 已提交
2954
  destroyDiskbasedBuf(pAggSup->pResultBuf);
2955 2956
}

L
Liu Jicong 已提交
2957 2958
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
                    const char* pkey) {
2959 2960 2961 2962 2963
  int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

2964 2965 2966 2967 2968
  code = doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

L
Liu Jicong 已提交
2969
  for (int32_t i = 0; i < numOfCols; ++i) {
2970
    pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf;
2971 2972
  }

2973
  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
2974 2975
}

L
Liu Jicong 已提交
2976
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) {
wmmhello's avatar
wmmhello 已提交
2977
  ASSERT(numOfRows != 0);
2978 2979
  pResultInfo->capacity = numOfRows;
  pResultInfo->threshold = numOfRows * 0.75;
2980

2981 2982
  if (pResultInfo->threshold == 0) {
    pResultInfo->threshold = numOfRows;
2983 2984 2985
  }
}

2986 2987 2988 2989 2990
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
  pInfo->pRes = pBlock;
  initResultRowInfo(&pInfo->resultRowInfo);
}

5
54liuyao 已提交
2991
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
2992 2993 2994 2995 2996 2997 2998 2999 3000 3001
  if (pCtx == NULL) {
    return NULL;
  }

  for (int32_t i = 0; i < numOfOutput; ++i) {
    for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) {
      taosVariantDestroy(&pCtx[i].param[j].param);
    }

    taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
3002
    taosMemoryFreeClear(pCtx[i].subsidiaries.buf);
3003 3004 3005 3006 3007 3008 3009 3010
    taosMemoryFree(pCtx[i].input.pData);
    taosMemoryFree(pCtx[i].input.pColumnDataAgg);
  }

  taosMemoryFreeClear(pCtx);
  return NULL;
}

3011
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) {
3012 3013 3014 3015
  pSup->pExprInfo = pExprInfo;
  pSup->numOfExprs = numOfExpr;
  if (pSup->pExprInfo != NULL) {
    pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset);
3016 3017 3018
    if (pSup->pCtx == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
3019
  }
3020 3021

  return TSDB_CODE_SUCCESS;
3022 3023
}

3024 3025 3026 3027
void cleanupExprSupp(SExprSupp* pSupp) {
  destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs);
  if (pSupp->pExprInfo != NULL) {
    destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
C
Cary Xu 已提交
3028
    taosMemoryFreeClear(pSupp->pExprInfo);
3029
  }
H
Haojun Liao 已提交
3030 3031 3032 3033 3034 3035

  if (pSupp->pFilterInfo != NULL) {
    filterFreeInfo(pSupp->pFilterInfo);
    pSupp->pFilterInfo = NULL;
  }

3036 3037 3038
  taosMemoryFree(pSupp->rowEntryInfoOffset);
}

H
Haojun Liao 已提交
3039
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode,SExecTaskInfo* pTaskInfo) {
wafwerar's avatar
wafwerar 已提交
3040
  SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
L
Liu Jicong 已提交
3041
  SOperatorInfo*    pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
3042 3043 3044
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
H
Haojun Liao 已提交
3045

H
Haojun Liao 已提交
3046 3047 3048 3049 3050 3051 3052 3053
  SSDataBlock* pResBlock = createResDataBlock(pAggNode->node.pOutputDataBlockDesc);
  initBasicInfo(&pInfo->binfo, pResBlock);

  int32_t    numOfScalarExpr = 0;
  SExprInfo* pScalarExprInfo = NULL;
  if (pAggNode->pExprs != NULL) {
    pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
  }
3054

H
Haojun Liao 已提交
3055
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
3056
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
3057 3058 3059 3060

  int32_t      num = 0;
  SExprInfo*   pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
  int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
L
Liu Jicong 已提交
3061
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3062 3063
    goto _error;
  }
H
Haojun Liao 已提交
3064

3065 3066 3067 3068
  code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3069

H
Haojun Liao 已提交
3070
  pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
3071
  pInfo->groupId = UINT64_MAX;
H
Haojun Liao 已提交
3072
  pInfo->pCondition = pAggNode->node.pConditions;
dengyihao's avatar
dengyihao 已提交
3073
  pOperator->name = "TableAggregate";
3074
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
3075
  pOperator->blocking = true;
dengyihao's avatar
dengyihao 已提交
3076 3077 3078
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
3079

3080 3081
  pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo,
                                         aggEncodeResultRow, aggDecodeResultRow, NULL);
H
Haojun Liao 已提交
3082

3083 3084 3085 3086 3087 3088
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    STableScanInfo* pTableScanInfo = downstream->info;
    pTableScanInfo->pdInfo.pExprSup = &pOperator->exprSupp;
    pTableScanInfo->pdInfo.pAggSup = &pInfo->aggSup;
  }

H
Haojun Liao 已提交
3089 3090 3091 3092
  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3093 3094

  return pOperator;
H
Haojun Liao 已提交
3095

L
Liu Jicong 已提交
3096
_error:
H
Haojun Liao 已提交
3097 3098 3099 3100
  if (pInfo != NULL) {
    destroyAggOperatorInfo(pInfo);
  }

H
Haojun Liao 已提交
3101
  cleanupExprSupp(&pOperator->exprSupp);
wafwerar's avatar
wafwerar 已提交
3102
  taosMemoryFreeClear(pOperator);
H
Haojun Liao 已提交
3103

H
Haojun Liao 已提交
3104 3105
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
3106 3107
}

3108
void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
3109
  assert(pInfo != NULL);
H
Haojun Liao 已提交
3110
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
3111 3112
}

H
Haojun Liao 已提交
3113 3114 3115 3116 3117 3118 3119
static void freeItem(void* pItem) {
  void** p = pItem;
  if (*p != NULL) {
    taosMemoryFreeClear(*p);
  }
}

3120
void destroyAggOperatorInfo(void* param) {
L
Liu Jicong 已提交
3121
  SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
L
Liu Jicong 已提交
3122 3123
  cleanupBasicInfo(&pInfo->binfo);

H
Haojun Liao 已提交
3124
  cleanupAggSup(&pInfo->aggSup);
S
shenglian zhou 已提交
3125
  cleanupExprSupp(&pInfo->scalarExprSup);
H
Haojun Liao 已提交
3126
  cleanupGroupResInfo(&pInfo->groupResInfo);
D
dapan1121 已提交
3127
  taosMemoryFreeClear(param);
3128
}
3129

3130
void destroyFillOperatorInfo(void* param) {
L
Liu Jicong 已提交
3131
  SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param;
3132
  pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
H
Haojun Liao 已提交
3133
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
H
Haojun Liao 已提交
3134 3135
  pInfo->pFinalRes = blockDataDestroy(pInfo->pFinalRes);

3136
  cleanupExprSupp(&pInfo->noFillExprSupp);
H
Haojun Liao 已提交
3137

wafwerar's avatar
wafwerar 已提交
3138
  taosMemoryFreeClear(pInfo->p);
3139
  taosArrayDestroy(pInfo->pColMatchColInfo);
D
dapan1121 已提交
3140
  taosMemoryFreeClear(param);
3141 3142
}

3143
void destroyExchangeOperatorInfo(void* param) {
L
Liu Jicong 已提交
3144
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
3145 3146 3147
  taosRemoveRef(exchangeObjRefPool, pExInfo->self);
}

L
Liu Jicong 已提交
3148
void freeSourceDataInfo(void* p) {
3149 3150 3151 3152
  SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
  taosMemoryFreeClear(pInfo->pRsp);
}

3153
void doDestroyExchangeOperatorInfo(void* param) {
X
Xiaoyu Wang 已提交
3154
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
3155

H
Haojun Liao 已提交
3156
  taosArrayDestroy(pExInfo->pSources);
3157
  taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
3158 3159 3160 3161

  if (pExInfo->pResultBlockList != NULL) {
    taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock);
    pExInfo->pResultBlockList = NULL;
H
Haojun Liao 已提交
3162 3163
  }

3164
  blockDataDestroy(pExInfo->pDummyBlock);
L
Liu Jicong 已提交
3165

3166
  tsem_destroy(&pExInfo->ready);
D
dapan1121 已提交
3167
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
3168 3169
}

H
Haojun Liao 已提交
3170 3171 3172 3173
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SExprInfo* pNotFillExpr,
                            int32_t numOfNotFillCols, SNodeListNode* pValNode, STimeWindow win, int32_t capacity,
                            const char* id, SInterval* pInterval, int32_t fillType, int32_t order) {
  SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pValNode);
H
Haojun Liao 已提交
3174

H
Haojun Liao 已提交
3175 3176 3177
  int64_t     startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey;
  STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, startKey);
  w = getFirstQualifiedTimeWindow(startKey, &w, pInterval, order);
H
Haojun Liao 已提交
3178

L
Liu Jicong 已提交
3179 3180
  pInfo->pFillInfo = taosCreateFillInfo(w.skey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo,
                                        pInfo->primaryTsCol, order, id);
H
Haojun Liao 已提交
3181

H
Haojun Liao 已提交
3182 3183 3184 3185 3186 3187 3188
  if (order == TSDB_ORDER_ASC) {
    pInfo->win.skey = win.skey;
    pInfo->win.ekey = win.ekey;
  } else {
    pInfo->win.skey = win.ekey;
    pInfo->win.ekey = win.skey;
  }
L
Liu Jicong 已提交
3189
  pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
3190

H
Haojun Liao 已提交
3191
  if (pInfo->pFillInfo == NULL || pInfo->p == NULL) {
H
Haojun Liao 已提交
3192 3193
    taosMemoryFree(pInfo->pFillInfo);
    taosMemoryFree(pInfo->p);
H
Haojun Liao 已提交
3194 3195 3196 3197 3198 3199
    return TSDB_CODE_OUT_OF_MEMORY;
  } else {
    return TSDB_CODE_SUCCESS;
  }
}

3200
static bool isWstartColumnExist(SFillOperatorInfo* pInfo) {
3201
  if (pInfo->noFillExprSupp.numOfExprs == 0) {
3202 3203
    return false;
  }
3204 3205 3206

  for (int32_t i = 0; i < pInfo->noFillExprSupp.numOfExprs; ++i) {
    SExprInfo* exprInfo = pInfo->noFillExprSupp.pExprInfo + i;
3207
    if (exprInfo->pExpr->nodeType == QUERY_NODE_COLUMN && exprInfo->base.numOfParams == 1 &&
3208 3209 3210 3211 3212 3213 3214
        exprInfo->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_START) {
      return true;
    }
  }
  return false;
}

3215 3216
static int32_t createPrimaryTsExprIfNeeded(SFillOperatorInfo* pInfo, SFillPhysiNode* pPhyFillNode, SExprSupp* pExprSupp,
                                           const char* idStr) {
3217
  bool wstartExist = isWstartColumnExist(pInfo);
3218

3219 3220
  if (wstartExist == false) {
    if (pPhyFillNode->pWStartTs->type != QUERY_NODE_TARGET) {
3221
      qError("pWStartTs of fill physical node is not a target node, %s", idStr);
3222 3223 3224
      return TSDB_CODE_QRY_SYS_ERROR;
    }

3225 3226
    SExprInfo* pExpr = taosMemoryRealloc(pExprSupp->pExprInfo, (pExprSupp->numOfExprs + 1) * sizeof(SExprInfo));
    if (pExpr == NULL) {
3227 3228 3229
      return TSDB_CODE_OUT_OF_MEMORY;
    }

3230 3231 3232
    createExprFromTargetNode(&pExpr[pExprSupp->numOfExprs], (STargetNode*)pPhyFillNode->pWStartTs);
    pExprSupp->numOfExprs += 1;
    pExprSupp->pExprInfo = pExpr;
3233
  }
3234

3235 3236 3237
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
3238 3239
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode,
                                      SExecTaskInfo* pTaskInfo) {
3240 3241 3242 3243 3244 3245
  SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

H
Haojun Liao 已提交
3246 3247 3248 3249
  pInfo->pRes = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc);
  SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr);
  pOperator->exprSupp.pExprInfo = pExprInfo;

3250 3251 3252 3253 3254 3255 3256 3257
  SExprSupp* pNoFillSupp = &pInfo->noFillExprSupp;
  pNoFillSupp->pExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pNoFillSupp->numOfExprs);
  int32_t code = createPrimaryTsExprIfNeeded(pInfo, pPhyFillNode, pNoFillSupp, pTaskInfo->id.str);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  code = initExprSupp(pNoFillSupp, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs);
3258 3259 3260
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
3261

L
Liu Jicong 已提交
3262
  SInterval* pInterval =
3263
      QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
L
Liu Jicong 已提交
3264 3265
          ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
          : &((SIntervalAggOperatorInfo*)downstream->info)->interval;
3266

3267
  int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
3268
  int32_t type = convertFillType(pPhyFillNode->mode);
3269

H
Haojun Liao 已提交
3270
  SResultInfo* pResultInfo = &pOperator->resultInfo;
H
Haojun Liao 已提交
3271

3272
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
3273 3274 3275 3276 3277
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
  code = initExprSupp(&pOperator->exprSupp, pExprInfo, pInfo->numOfExpr);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
3278

H
Haojun Liao 已提交
3279 3280
  pInfo->primaryTsCol = ((STargetNode*)pPhyFillNode->pWStartTs)->slotId;
  pInfo->primarySrcSlotId = ((SColumnNode*)((STargetNode*)pPhyFillNode->pWStartTs)->pExpr)->slotId;
3281

3282
  int32_t numOfOutputCols = 0;
H
Haojun Liao 已提交
3283 3284
  pInfo->pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc,
                                                &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
3285

3286
  code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs,
3287 3288
                      (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity,
                      pTaskInfo->id.str, pInterval, type, order);
3289 3290 3291
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3292

H
Haojun Liao 已提交
3293
  pInfo->pFinalRes = createOneDataBlock(pInfo->pRes, false);
H
Haojun Liao 已提交
3294 3295
  blockDataEnsureCapacity(pInfo->pFinalRes, pOperator->resultInfo.capacity);

3296 3297 3298 3299 3300
  pInfo->pCondition = pPhyFillNode->node.pConditions;
  pOperator->name = "FillOperator";
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
3301
  pOperator->exprSupp.numOfExprs = pInfo->numOfExpr;
3302 3303
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
3304

L
Liu Jicong 已提交
3305
  pOperator->fpSet =
3306
      createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroyFillOperatorInfo, NULL, NULL, NULL);
3307

3308
  code = appendDownstream(pOperator, &downstream, 1);
3309
  return pOperator;
H
Haojun Liao 已提交
3310

L
Liu Jicong 已提交
3311
_error:
H
Haojun Liao 已提交
3312 3313 3314 3315
  if (pInfo != NULL) {
    destroyFillOperatorInfo(pInfo);
  }

wafwerar's avatar
wafwerar 已提交
3316
  taosMemoryFreeClear(pOperator);
H
Haojun Liao 已提交
3317
  return NULL;
3318 3319
}

D
dapan1121 已提交
3320
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model, char* dbFName) {
wafwerar's avatar
wafwerar 已提交
3321
  SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
3322
  setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
3323

3324
  pTaskInfo->schemaInfo.dbname = strdup(dbFName);
3325
  pTaskInfo->cost.created = taosGetTimestampMs();
H
Haojun Liao 已提交
3326
  pTaskInfo->id.queryId = queryId;
dengyihao's avatar
dengyihao 已提交
3327
  pTaskInfo->execModel = model;
H
Haojun Liao 已提交
3328

wafwerar's avatar
wafwerar 已提交
3329
  char* p = taosMemoryCalloc(1, 128);
L
Liu Jicong 已提交
3330
  snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId);
H
Haojun Liao 已提交
3331
  pTaskInfo->id.str = p;
H
Haojun Liao 已提交
3332

3333 3334
  return pTaskInfo;
}
H
Haojun Liao 已提交
3335

H
Haojun Liao 已提交
3336 3337
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);

3338
int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) {
3339 3340
  SMetaReader mr = {0};
  metaReaderInit(&mr, pHandle->meta, 0);
3341
  int32_t code = metaGetTableEntryByUid(&mr, pScanNode->uid);
3342
  if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
3343 3344
    qError("failed to get the table meta, uid:0x%" PRIx64 ", suid:0x%" PRIx64 ", %s", pScanNode->uid, pScanNode->suid,
           GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
3345

D
dapan1121 已提交
3346
    metaReaderClear(&mr);
3347
    return terrno;
D
dapan1121 已提交
3348
  }
3349

3350 3351
  SSchemaInfo* pSchemaInfo = &pTaskInfo->schemaInfo;
  pSchemaInfo->tablename = strdup(mr.me.name);
3352 3353

  if (mr.me.type == TSDB_SUPER_TABLE) {
3354 3355
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
    pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
3356
  } else if (mr.me.type == TSDB_CHILD_TABLE) {
3357 3358
    tDecoderClear(&mr.coder);

3359 3360
    tb_uid_t suid = mr.me.ctbEntry.suid;
    metaGetTableEntryByUid(&mr, suid);
3361 3362
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
    pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
3363
  } else {
3364
    pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
3365
  }
3366 3367

  metaReaderClear(&mr);
3368

H
Haojun Liao 已提交
3369 3370 3371 3372 3373
  pSchemaInfo->qsw = extractQueriedColumnSchema(pScanNode);
  return TSDB_CODE_SUCCESS;
}

SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
3374 3375 3376
  int32_t numOfCols = LIST_LENGTH(pScanNode->pScanCols);
  int32_t numOfTags = LIST_LENGTH(pScanNode->pScanPseudoCols);

3377
  SSchemaWrapper* pqSw = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
3378
  pqSw->pSchema = taosMemoryCalloc(numOfCols + numOfTags, sizeof(SSchema));
3379

L
Liu Jicong 已提交
3380
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
3381
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanCols, i);
3382 3383
    SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

H
Haojun Liao 已提交
3384 3385 3386
    SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
    pSchema->colId = pColNode->colId;
    pSchema->type = pColNode->node.resType.type;
H
Haojun Liao 已提交
3387 3388
    pSchema->bytes = pColNode->node.resType.bytes;
    tstrncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
3389 3390
  }

3391
  // this the tags and pseudo function columns, we only keep the tag columns
3392
  for (int32_t i = 0; i < numOfTags; ++i) {
3393 3394 3395 3396 3397 3398 3399 3400 3401
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanPseudoCols, i);

    int32_t type = nodeType(pNode->pExpr);
    if (type == QUERY_NODE_COLUMN) {
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

      SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
      pSchema->colId = pColNode->colId;
      pSchema->type = pColNode->node.resType.type;
H
Haojun Liao 已提交
3402
      pSchema->bytes = pColNode->node.resType.bytes;
H
Haojun Liao 已提交
3403
      tstrncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
3404 3405 3406
    }
  }

H
Haojun Liao 已提交
3407
  return pqSw;
3408 3409
}

3410 3411
static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
  taosMemoryFreeClear(pSchemaInfo->dbname);
3412
  taosMemoryFreeClear(pSchemaInfo->tablename);
3413 3414
  tDeleteSSchemaWrapper(pSchemaInfo->sw);
  tDeleteSSchemaWrapper(pSchemaInfo->qsw);
3415 3416
}

3417
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); }
3418

H
Haojun Liao 已提交
3419
static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
wmmhello's avatar
wmmhello 已提交
3420
  taosArrayClear(pTableListInfo->pGroupList);
H
Haojun Liao 已提交
3421
  SArray* sortSupport = taosArrayInit(16, sizeof(uint64_t));
3422
  if (sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
3423 3424
  for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
    STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
3425
    uint64_t*      groupId = taosHashGet(pTableListInfo->map, &info->uid, sizeof(uint64_t));
wmmhello's avatar
wmmhello 已提交
3426 3427

    int32_t index = taosArraySearchIdx(sortSupport, groupId, compareUint64Val, TD_EQ);
3428 3429 3430 3431
    if (index == -1) {
      void*   p = taosArraySearch(sortSupport, groupId, compareUint64Val, TD_GT);
      SArray* tGroup = taosArrayInit(8, sizeof(STableKeyInfo));
      if (tGroup == NULL) {
wmmhello's avatar
wmmhello 已提交
3432 3433 3434
        taosArrayDestroy(sortSupport);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
3435
      if (taosArrayPush(tGroup, info) == NULL) {
wmmhello's avatar
wmmhello 已提交
3436 3437 3438 3439
        qError("taos push info array error");
        taosArrayDestroy(sortSupport);
        return TSDB_CODE_QRY_APP_ERROR;
      }
3440
      if (p == NULL) {
wmmhello's avatar
wmmhello 已提交
3441
        if (taosArrayPush(sortSupport, groupId) == NULL) {
wmmhello's avatar
wmmhello 已提交
3442 3443 3444 3445
          qError("taos push support array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
wmmhello's avatar
wmmhello 已提交
3446
        if (taosArrayPush(pTableListInfo->pGroupList, &tGroup) == NULL) {
wmmhello's avatar
wmmhello 已提交
3447 3448 3449 3450
          qError("taos push group array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
3451
      } else {
wmmhello's avatar
wmmhello 已提交
3452
        int32_t pos = TARRAY_ELEM_IDX(sortSupport, p);
3453
        if (taosArrayInsert(sortSupport, pos, groupId) == NULL) {
wmmhello's avatar
wmmhello 已提交
3454 3455 3456 3457
          qError("taos insert support array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
3458
        if (taosArrayInsert(pTableListInfo->pGroupList, pos, &tGroup) == NULL) {
wmmhello's avatar
wmmhello 已提交
3459 3460 3461 3462 3463
          qError("taos insert group array error");
          taosArrayDestroy(sortSupport);
          return TSDB_CODE_QRY_APP_ERROR;
        }
      }
3464
    } else {
wmmhello's avatar
wmmhello 已提交
3465
      SArray* tGroup = (SArray*)taosArrayGetP(pTableListInfo->pGroupList, index);
3466
      if (taosArrayPush(tGroup, info) == NULL) {
wmmhello's avatar
wmmhello 已提交
3467 3468 3469 3470 3471 3472 3473 3474 3475 3476
        qError("taos push uid array error");
        taosArrayDestroy(sortSupport);
        return TSDB_CODE_QRY_APP_ERROR;
      }
    }
  }
  taosArrayDestroy(sortSupport);
  return TDB_CODE_SUCCESS;
}

3477
bool groupbyTbname(SNodeList* pGroupList) {
3478
  bool bytbname = false;
H
Haojun Liao 已提交
3479
  if (LIST_LENGTH(pGroupList) > 0) {
3480 3481 3482 3483 3484 3485 3486 3487 3488 3489
    SNode* p = nodesListGetNode(pGroupList, 0);
    if (p->type == QUERY_NODE_FUNCTION) {
      // partition by tbname/group by tbname
      bytbname = (strcmp(((struct SFunctionNode*)p)->functionName, "tbname") == 0);
    }
  }

  return bytbname;
}

wmmhello's avatar
wmmhello 已提交
3490 3491
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group) {
  if (group == NULL) {
wmmhello's avatar
wmmhello 已提交
3492 3493 3494 3495 3496 3497 3498 3499
    return TDB_CODE_SUCCESS;
  }

  pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (pTableListInfo->map == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

3500 3501
  bool assignUid = groupbyTbname(group);

H
Haojun Liao 已提交
3502
  size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
3503

H
Haojun Liao 已提交
3504 3505 3506
  if (assignUid) {
    for (int32_t i = 0; i < numOfTables; i++) {
      STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
3507
      info->groupId = info->uid;
H
Haojun Liao 已提交
3508 3509 3510 3511 3512 3513
      taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t));
    }
  } else {
    int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
wmmhello's avatar
wmmhello 已提交
3514 3515
    }
  }
3516

3517
  if (pTableListInfo->needSortTableByGroupId) {
H
Haojun Liao 已提交
3518
    return sortTableGroup(pTableListInfo);
3519 3520
  }

wmmhello's avatar
wmmhello 已提交
3521 3522 3523
  return TDB_CODE_SUCCESS;
}

3524 3525 3526 3527 3528 3529 3530 3531 3532 3533 3534 3535 3536 3537 3538 3539 3540
static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) {
  memset(pCond, 0, sizeof(SQueryTableDataCond));

  pCond->order = TSDB_ORDER_ASC;
  pCond->numOfCols = 1;
  pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
  if (pCond->colList == NULL) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return terrno;
  }

  pCond->colList->colId = 1;
  pCond->colList->type = TSDB_DATA_TYPE_TIMESTAMP;
  pCond->colList->bytes = sizeof(TSKEY);

  pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
  pCond->suid = uid;
3541
  pCond->type = TIMEWINDOW_RANGE_CONTAINED;
3542
  pCond->startVersion = -1;
L
Liu Jicong 已提交
3543
  pCond->endVersion = -1;
3544 3545 3546 3547

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
3548
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
L
Liu Jicong 已提交
3549 3550
                                  STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
                                  const char* pUser) {
3551 3552
  int32_t type = nodeType(pPhyNode);

X
Xiaoyu Wang 已提交
3553
  if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
3554
    SOperatorInfo* pOperator = NULL;
H
Haojun Liao 已提交
3555
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
dengyihao's avatar
dengyihao 已提交
3556
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
3557

L
Liu Jicong 已提交
3558 3559 3560
      int32_t code =
          createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
                                  pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
3561
      if (code) {
wmmhello's avatar
wmmhello 已提交
3562
        pTaskInfo->code = code;
3563
        qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), GET_TASKID(pTaskInfo));
D
dapan1121 已提交
3564 3565
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
3566

3567
      code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo);
S
slzhou 已提交
3568
      if (code) {
3569
        pTaskInfo->code = terrno;
wmmhello's avatar
wmmhello 已提交
3570 3571 3572
        return NULL;
      }

3573
      pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
3574 3575
      STableScanInfo* pScanInfo = pOperator->info;
      pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
S
slzhou 已提交
3576 3577
    } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
      STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
L
Liu Jicong 已提交
3578 3579 3580
      int32_t                   code =
          createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
                                  pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
L
Liu Jicong 已提交
3581
      if (code) {
wmmhello's avatar
wmmhello 已提交
3582
        pTaskInfo->code = code;
H
Haojun Liao 已提交
3583
        qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
wmmhello's avatar
wmmhello 已提交
3584 3585
        return NULL;
      }
3586

3587
      code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo);
wmmhello's avatar
wmmhello 已提交
3588 3589 3590 3591
      if (code) {
        pTaskInfo->code = terrno;
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
3592

3593
      pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo);
wmmhello's avatar
wmmhello 已提交
3594

3595 3596
      STableScanInfo* pScanInfo = pOperator->info;
      pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
H
Haojun Liao 已提交
3597
    } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
3598 3599
      pOperator = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode,
                                             pTaskInfo);
H
Haojun Liao 已提交
3600
    } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
5
54liuyao 已提交
3601
      STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
5
54liuyao 已提交
3602
      if (pHandle->vnode) {
L
Liu Jicong 已提交
3603 3604 3605
        int32_t code =
            createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
                                    pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
L
Liu Jicong 已提交
3606
        if (code) {
wmmhello's avatar
wmmhello 已提交
3607
          pTaskInfo->code = code;
H
Haojun Liao 已提交
3608
          qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
wmmhello's avatar
wmmhello 已提交
3609 3610
          return NULL;
        }
L
Liu Jicong 已提交
3611 3612 3613 3614 3615

#ifndef NDEBUG
        int32_t sz = taosArrayGetSize(pTableListInfo->pTableList);
        for (int32_t i = 0; i < sz; i++) {
          STableKeyInfo* pKeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
S
Shengliang Guan 已提交
3616
          qDebug("creating stream task: add table %" PRId64, pKeyInfo->uid);
L
Liu Jicong 已提交
3617 3618
        }
#endif
3619
      }
3620

H
Haojun Liao 已提交
3621
      pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan);
3622
      pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo);
H
Haojun Liao 已提交
3623
    } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
L
Liu Jicong 已提交
3624
      SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
3625
      pOperator = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo);
3626
    } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
X
Xiaoyu Wang 已提交
3627
      STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
3628
      int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanPhyNode, pTagCond, pTagIndexCond, pTableListInfo);
3629
      if (code != TSDB_CODE_SUCCESS) {
3630
        pTaskInfo->code = code;
H
Haojun Liao 已提交
3631
        qError("failed to getTableList, code: %s", tstrerror(code));
3632 3633 3634
        return NULL;
      }

3635
      pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo);
3636
    } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
3637
      SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
3638 3639 3640
      pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));

      if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
3641
        int32_t code = vnodeGetAllTableList(pHandle->vnode, pBlockNode->uid, pTableListInfo->pTableList);
3642 3643 3644 3645 3646
        if (code != TSDB_CODE_SUCCESS) {
          pTaskInfo->code = terrno;
          return NULL;
        }
      } else {  // Create one table group.
3647
        STableKeyInfo info = {.uid = pBlockNode->uid, .groupId = 0};
3648 3649 3650 3651
        taosArrayPush(pTableListInfo->pTableList, &info);
      }

      SQueryTableDataCond cond = {0};
L
Liu Jicong 已提交
3652
      int32_t             code = initTableblockDistQueryCond(pBlockNode->suid, &cond);
3653 3654
      if (code != TSDB_CODE_SUCCESS) {
        return NULL;
3655
      }
H
Haojun Liao 已提交
3656 3657 3658

      STsdbReader* pReader = NULL;
      tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, "");
3659 3660
      cleanupQueryTableDataCond(&cond);

3661
      pOperator = createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo);
H
Haojun Liao 已提交
3662 3663 3664
    } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
      SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;

L
Liu Jicong 已提交
3665 3666
      int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo,
                                             pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
3667 3668 3669 3670
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
      }
3671

3672
      code = extractTableSchemaInfo(pHandle, &pScanNode->scan, pTaskInfo);
3673 3674 3675
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
H
Haojun Liao 已提交
3676 3677
      }

3678
      pOperator = createCacherowsScanOperator(pScanNode, pHandle, pTaskInfo);
3679
    } else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
3680
      pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
3681 3682
    } else {
      ASSERT(0);
H
Haojun Liao 已提交
3683
    }
3684 3685 3686 3687 3688

    if (pOperator != NULL) {
      pOperator->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
    }

3689
    return pOperator;
H
Haojun Liao 已提交
3690 3691
  }

3692 3693
  int32_t num = 0;
  size_t  size = LIST_LENGTH(pPhyNode->pChildren);
H
Haojun Liao 已提交
3694

3695
  SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
dengyihao's avatar
dengyihao 已提交
3696
  for (int32_t i = 0; i < size; ++i) {
3697
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
3698
    ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pUser);
3699
    if (ops[i] == NULL) {
H
Haojun Liao 已提交
3700
      taosMemoryFree(ops);
3701 3702
      return NULL;
    }
3703 3704

    ops[i]->resultDataBlockId = pChildNode->pOutputDataBlockDesc->dataBlockId;
3705
  }
H
Haojun Liao 已提交
3706

3707
  SOperatorInfo* pOptr = NULL;
H
Haojun Liao 已提交
3708
  if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
3709
    pOptr = createProjectOperatorInfo(ops[0], (SProjectPhysiNode*)pPhyNode, pTaskInfo);
3710
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) {
H
Haojun Liao 已提交
3711 3712
    SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
    if (pAggNode->pGroupKeys != NULL) {
H
Haojun Liao 已提交
3713
      pOptr = createGroupOperatorInfo(ops[0], pAggNode, pTaskInfo);
H
Haojun Liao 已提交
3714
    } else {
H
Haojun Liao 已提交
3715
      pOptr = createAggregateOperatorInfo(ops[0], pAggNode, pTaskInfo);
H
Haojun Liao 已提交
3716
    }
3717
  } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type) {
H
Haojun Liao 已提交
3718
    SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
3719

H
Haojun Liao 已提交
3720
    SExprInfo*   pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
3721
    SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
H
Haojun Liao 已提交
3722

dengyihao's avatar
dengyihao 已提交
3723 3724 3725 3726 3727 3728
    SInterval interval = {.interval = pIntervalPhyNode->interval,
                          .sliding = pIntervalPhyNode->sliding,
                          .intervalUnit = pIntervalPhyNode->intervalUnit,
                          .slidingUnit = pIntervalPhyNode->slidingUnit,
                          .offset = pIntervalPhyNode->offset,
                          .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
H
Haojun Liao 已提交
3729

X
Xiaoyu Wang 已提交
3730 3731 3732 3733 3734
    STimeWindowAggSupp as = {
        .waterMark = pIntervalPhyNode->window.watermark,
        .calTrigger = pIntervalPhyNode->window.triggerType,
        .maxTs = INT64_MIN,
    };
3735
    ASSERT(as.calTrigger != STREAM_TRIGGER_MAX_DELAY);
3736

3737
    int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
L
Liu Jicong 已提交
3738
    bool    isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
3739 3740
    pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pIntervalPhyNode,
                                       pTaskInfo, isStream);
3741

3742
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
3743
    pOptr = createStreamIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo);
3744 3745
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
    SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode;
3746
    pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo);
S
shenglian zhou 已提交
3747
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
X
Xiaoyu Wang 已提交
3748
    SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
3749
    pOptr = createMergeIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo);
5
54liuyao 已提交
3750
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) {
3751
    int32_t children = 0;
5
54liuyao 已提交
3752 3753
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) {
5
54liuyao 已提交
3754
    int32_t children = pHandle->numOfVgroups;
5
54liuyao 已提交
3755
    pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
3756
  } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
3757
    pOptr = createSortOperatorInfo(ops[0], (SSortPhysiNode*)pPhyNode, pTaskInfo);
S
shenglian zhou 已提交
3758 3759
  } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT == type) {
    pOptr = createGroupSortOperatorInfo(ops[0], (SGroupSortPhysiNode*)pPhyNode, pTaskInfo);
X
Xiaoyu Wang 已提交
3760
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) {
3761
    SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode;
3762
    pOptr = createMultiwayMergeOperatorInfo(ops, size, pMergePhyNode, pTaskInfo);
3763
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
H
Haojun Liao 已提交
3764
    SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
H
Haojun Liao 已提交
3765
    pOptr = createSessionAggOperatorInfo(ops[0], pSessionNode, pTaskInfo);
3766
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
3767 3768 3769 3770 3771
    pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
    int32_t children = 0;
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION == type) {
3772
    int32_t children = pHandle->numOfVgroups;
3773
    pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
H
Haojun Liao 已提交
3774
  } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
3775
    pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
3776
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION == type) {
3777
    pOptr = createStreamPartitionOperatorInfo(ops[0], (SStreamPartitionPhysiNode*)pPhyNode, pTaskInfo);
3778
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
dengyihao's avatar
dengyihao 已提交
3779
    SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
3780
    pOptr = createStatewindowOperatorInfo(ops[0], pStateNode, pTaskInfo);
3781
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
5
54liuyao 已提交
3782
    pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
3783
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
3784
    pOptr = createMergeJoinOperatorInfo(ops, size, (SSortMergeJoinPhysiNode*)pPhyNode, pTaskInfo);
3785
  } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
H
Haojun Liao 已提交
3786
    pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo);
5
54liuyao 已提交
3787 3788
  } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL == type) {
    pOptr = createStreamFillOperatorInfo(ops[0], (SStreamFillPhysiNode*)pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
3789 3790
  } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
    pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo);
3791 3792
  } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
    pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo);
H
Haojun Liao 已提交
3793 3794
  } else {
    ASSERT(0);
H
Haojun Liao 已提交
3795
  }
3796

3797
  taosMemoryFree(ops);
3798 3799 3800 3801
  if (pOptr) {
    pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
  }

3802
  return pOptr;
3803
}
H
Haojun Liao 已提交
3804

L
Liu Jicong 已提交
3805 3806 3807 3808 3809 3810 3811 3812 3813 3814 3815 3816 3817
static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator");
      return TSDB_CODE_QRY_APP_ERROR;
    }

    if (pOperator->numOfDownstream > 1) {
      qError("join not supported for stream block scan");
      return TSDB_CODE_QRY_APP_ERROR;
    }
    return extractTbscanInStreamOpTree(pOperator->pDownstream[0], ppInfo);
  } else {
3818 3819 3820
    SStreamScanInfo* pInfo = pOperator->info;
    ASSERT(pInfo->pTableScanOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
    *ppInfo = pInfo->pTableScanOp->info;
L
Liu Jicong 已提交
3821 3822 3823 3824
    return 0;
  }
}

3825 3826 3827 3828 3829 3830 3831 3832 3833 3834 3835 3836 3837 3838 3839 3840 3841 3842 3843 3844 3845 3846
int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) {
  if (pNode->pChildren == NULL || LIST_LENGTH(pNode->pChildren) == 0) {
    if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == pNode->type) {
      *ppNode = (STableScanPhysiNode*)pNode;
      return 0;
    } else {
      ASSERT(0);
      terrno = TSDB_CODE_QRY_APP_ERROR;
      return -1;
    }
  } else {
    if (LIST_LENGTH(pNode->pChildren) != 1) {
      ASSERT(0);
      terrno = TSDB_CODE_QRY_APP_ERROR;
      return -1;
    }
    SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pNode->pChildren, 0);
    return extractTableScanNode(pChildNode, ppNode);
  }
  return -1;
}

3847
#if 0
L
Liu Jicong 已提交
3848 3849 3850 3851 3852
int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHandle, int64_t uid, int64_t ts) {
  STableScanInfo* pTableScanInfo = NULL;
  if (extractTbscanInStreamOpTree(pOperator, &pTableScanInfo) < 0) {
    return -1;
  }
3853

L
Liu Jicong 已提交
3854 3855 3856 3857
  STableScanPhysiNode* pNode = NULL;
  if (extractTableScanNode(plan->pNode, &pNode) < 0) {
    ASSERT(0);
  }
3858

H
Haojun Liao 已提交
3859
  tsdbReaderClose(pTableScanInfo->dataReader);
3860

L
Liu Jicong 已提交
3861
  STableListInfo info = {0};
H
Haojun Liao 已提交
3862
  pTableScanInfo->dataReader = doCreateDataReader(pNode, pHandle, &info, NULL);
L
Liu Jicong 已提交
3863 3864 3865 3866
  if (pTableScanInfo->dataReader == NULL) {
    ASSERT(0);
    qError("failed to create data reader");
    return TSDB_CODE_QRY_APP_ERROR;
3867
  }
L
Liu Jicong 已提交
3868
  // TODO: set uid and ts to data reader
3869 3870
  return 0;
}
3871
#endif
3872

C
Cary Xu 已提交
3873
int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length, int32_t* nOptrWithVal) {
wmmhello's avatar
wmmhello 已提交
3874
  int32_t code = TDB_CODE_SUCCESS;
3875
  char*   pCurrent = NULL;
wmmhello's avatar
wmmhello 已提交
3876
  int32_t currLength = 0;
3877
  if (ops->fpSet.encodeResultRow) {
C
Cary Xu 已提交
3878
    if (result == NULL || length == NULL || nOptrWithVal == NULL) {
wmmhello's avatar
wmmhello 已提交
3879 3880 3881
      return TSDB_CODE_TSC_INVALID_INPUT;
    }
    code = ops->fpSet.encodeResultRow(ops, &pCurrent, &currLength);
wmmhello's avatar
wmmhello 已提交
3882

3883 3884
    if (code != TDB_CODE_SUCCESS) {
      if (*result != NULL) {
wmmhello's avatar
wmmhello 已提交
3885 3886 3887 3888
        taosMemoryFree(*result);
        *result = NULL;
      }
      return code;
C
Cary Xu 已提交
3889 3890 3891
    } else if (currLength == 0) {
      ASSERT(!pCurrent);
      goto _downstream;
wmmhello's avatar
wmmhello 已提交
3892
    }
wmmhello's avatar
wmmhello 已提交
3893

C
Cary Xu 已提交
3894 3895
    ++(*nOptrWithVal);

C
Cary Xu 已提交
3896
    ASSERT(currLength >= 0);
wmmhello's avatar
wmmhello 已提交
3897

3898
    if (*result == NULL) {
wmmhello's avatar
wmmhello 已提交
3899
      *result = (char*)taosMemoryCalloc(1, currLength + sizeof(int32_t));
wmmhello's avatar
wmmhello 已提交
3900 3901 3902 3903 3904 3905
      if (*result == NULL) {
        taosMemoryFree(pCurrent);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      memcpy(*result + sizeof(int32_t), pCurrent, currLength);
      *(int32_t*)(*result) = currLength + sizeof(int32_t);
3906
    } else {
wmmhello's avatar
wmmhello 已提交
3907
      int32_t sizePre = *(int32_t*)(*result);
3908
      char*   tmp = (char*)taosMemoryRealloc(*result, sizePre + currLength);
wmmhello's avatar
wmmhello 已提交
3909 3910 3911 3912 3913 3914 3915 3916 3917 3918 3919 3920
      if (tmp == NULL) {
        taosMemoryFree(pCurrent);
        taosMemoryFree(*result);
        *result = NULL;
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      *result = tmp;
      memcpy(*result + sizePre, pCurrent, currLength);
      *(int32_t*)(*result) += currLength;
    }
    taosMemoryFree(pCurrent);
    *length = *(int32_t*)(*result);
wmmhello's avatar
wmmhello 已提交
3921 3922
  }

C
Cary Xu 已提交
3923
_downstream:
wmmhello's avatar
wmmhello 已提交
3924
  for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
C
Cary Xu 已提交
3925
    code = encodeOperator(ops->pDownstream[i], result, length, nOptrWithVal);
3926
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
3927
      return code;
wmmhello's avatar
wmmhello 已提交
3928 3929
    }
  }
wmmhello's avatar
wmmhello 已提交
3930
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
3931 3932
}

H
Haojun Liao 已提交
3933
int32_t decodeOperator(SOperatorInfo* ops, const char* result, int32_t length) {
wmmhello's avatar
wmmhello 已提交
3934
  int32_t code = TDB_CODE_SUCCESS;
3935 3936
  if (ops->fpSet.decodeResultRow) {
    if (result == NULL) {
wmmhello's avatar
wmmhello 已提交
3937 3938
      return TSDB_CODE_TSC_INVALID_INPUT;
    }
H
Haojun Liao 已提交
3939

3940
    ASSERT(length == *(int32_t*)result);
H
Haojun Liao 已提交
3941 3942

    const char* data = result + sizeof(int32_t);
L
Liu Jicong 已提交
3943
    code = ops->fpSet.decodeResultRow(ops, (char*)data);
3944
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
3945 3946
      return code;
    }
wmmhello's avatar
wmmhello 已提交
3947

wmmhello's avatar
wmmhello 已提交
3948
    int32_t totalLength = *(int32_t*)result;
3949 3950
    int32_t dataLength = *(int32_t*)data;

3951
    if (totalLength == dataLength + sizeof(int32_t)) {  // the last data
wmmhello's avatar
wmmhello 已提交
3952 3953
      result = NULL;
      length = 0;
3954
    } else {
wmmhello's avatar
wmmhello 已提交
3955 3956 3957 3958
      result += dataLength;
      *(int32_t*)(result) = totalLength - dataLength;
      length = totalLength - dataLength;
    }
wmmhello's avatar
wmmhello 已提交
3959 3960
  }

wmmhello's avatar
wmmhello 已提交
3961 3962
  for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
    code = decodeOperator(ops->pDownstream[i], result, length);
3963
    if (code != TDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
3964
      return code;
wmmhello's avatar
wmmhello 已提交
3965 3966
    }
  }
wmmhello's avatar
wmmhello 已提交
3967
  return TDB_CODE_SUCCESS;
wmmhello's avatar
wmmhello 已提交
3968 3969
}

D
dapan1121 已提交
3970
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle) {
D
dapan1121 已提交
3971
  SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo;
3972

D
dapan1121 已提交
3973
  switch (pNode->type) {
D
dapan1121 已提交
3974 3975 3976 3977 3978 3979
    case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
      SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
      if (NULL == pInserterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      pInserterParam->readHandle = readHandle;
L
Liu Jicong 已提交
3980

D
dapan1121 已提交
3981 3982 3983
      *pParam = pInserterParam;
      break;
    }
D
dapan1121 已提交
3984
    case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
3985
      SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
D
dapan1121 已提交
3986 3987 3988 3989
      if (NULL == pDeleterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      int32_t tbNum = taosArrayGetSize(pTask->tableqinfoList.pTableList);
D
dapan1121 已提交
3990
      pDeleterParam->suid = pTask->tableqinfoList.suid;
D
dapan1121 已提交
3991 3992 3993 3994 3995 3996
      pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t));
      if (NULL == pDeleterParam->pUidList) {
        taosMemoryFree(pDeleterParam);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      for (int32_t i = 0; i < tbNum; ++i) {
3997
        STableKeyInfo* pTable = taosArrayGet(pTask->tableqinfoList.pTableList, i);
D
dapan1121 已提交
3998 3999 4000 4001 4002 4003 4004 4005 4006 4007 4008 4009 4010
        taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
      }

      *pParam = pDeleterParam;
      break;
    }
    default:
      break;
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
4011
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
D
dapan1121 已提交
4012
                               char* sql, EOPTR_EXEC_MODEL model) {
H
Haojun Liao 已提交
4013 4014
  uint64_t queryId = pPlan->id.queryId;

H
Haojun Liao 已提交
4015
  int32_t code = TSDB_CODE_SUCCESS;
D
dapan1121 已提交
4016
  *pTaskInfo = createExecTaskInfo(queryId, taskId, model, pPlan->dbFName);
H
Haojun Liao 已提交
4017 4018 4019 4020
  if (*pTaskInfo == NULL) {
    code = TSDB_CODE_QRY_OUT_OF_MEMORY;
    goto _complete;
  }
H
Haojun Liao 已提交
4021

H
Haojun Liao 已提交
4022 4023 4024 4025
  if (pHandle && pHandle->pStateBackend) {
    (*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend;
  }

4026
  (*pTaskInfo)->sql = sql;
D
dapan1121 已提交
4027
  sql = NULL;
H
Haojun Liao 已提交
4028

4029
  (*pTaskInfo)->pSubplan = pPlan;
L
Liu Jicong 已提交
4030 4031
  (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, &(*pTaskInfo)->tableqinfoList,
                                           pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user);
L
Liu Jicong 已提交
4032

D
dapan1121 已提交
4033
  if (NULL == (*pTaskInfo)->pRoot) {
4034
    code = (*pTaskInfo)->code;
D
dapan1121 已提交
4035
    goto _complete;
4036 4037
  }

H
Haojun Liao 已提交
4038 4039
  return code;

H
Haojun Liao 已提交
4040
_complete:
D
dapan1121 已提交
4041
  taosMemoryFree(sql);
H
Haojun Liao 已提交
4042
  doDestroyTask(*pTaskInfo);
H
Haojun Liao 已提交
4043 4044
  terrno = code;
  return code;
H
Haojun Liao 已提交
4045 4046
}

4047
void doDestroyTableList(STableListInfo* pTableqinfoList) {
wmmhello's avatar
wmmhello 已提交
4048 4049
  taosArrayDestroy(pTableqinfoList->pTableList);
  taosHashCleanup(pTableqinfoList->map);
4050 4051
  if (pTableqinfoList->needSortTableByGroupId) {
    for (int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++) {
wmmhello's avatar
wmmhello 已提交
4052
      SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i);
4053 4054 4055
      if (tmp == pTableqinfoList->pTableList) {
        continue;
      }
wmmhello's avatar
wmmhello 已提交
4056 4057 4058 4059
      taosArrayDestroy(tmp);
    }
  }
  taosArrayDestroy(pTableqinfoList->pGroupList);
4060

wmmhello's avatar
wmmhello 已提交
4061 4062
  pTableqinfoList->pTableList = NULL;
  pTableqinfoList->map = NULL;
4063 4064
}

L
Liu Jicong 已提交
4065
void doDestroyTask(SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
4066 4067
  qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));

wmmhello's avatar
wmmhello 已提交
4068
  doDestroyTableList(&pTaskInfo->tableqinfoList);
H
Haojun Liao 已提交
4069
  destroyOperatorInfo(pTaskInfo->pRoot);
4070
  cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);
4071
  cleanupStreamInfo(&pTaskInfo->streamInfo);
4072

D
dapan1121 已提交
4073
  if (!pTaskInfo->localFetch.localExec) {
D
dapan1121 已提交
4074 4075
    nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
  }
4076

wafwerar's avatar
wafwerar 已提交
4077 4078 4079
  taosMemoryFreeClear(pTaskInfo->sql);
  taosMemoryFreeClear(pTaskInfo->id.str);
  taosMemoryFreeClear(pTaskInfo);
4080 4081 4082 4083
}

static int64_t getQuerySupportBufSize(size_t numOfTables) {
  size_t s1 = sizeof(STableQueryInfo);
L
Liu Jicong 已提交
4084 4085
  //  size_t s3 = sizeof(STableCheckInfo);  buffer consumption in tsdb
  return (int64_t)(s1 * 1.5 * numOfTables);
4086 4087 4088 4089 4090 4091 4092
}

int32_t checkForQueryBuf(size_t numOfTables) {
  int64_t t = getQuerySupportBufSize(numOfTables);
  if (tsQueryBufferSizeBytes < 0) {
    return TSDB_CODE_SUCCESS;
  } else if (tsQueryBufferSizeBytes > 0) {
L
Liu Jicong 已提交
4093
    while (1) {
4094 4095 4096 4097 4098 4099 4100 4101 4102 4103 4104 4105 4106 4107 4108 4109 4110 4111 4112 4113 4114 4115 4116 4117 4118 4119
      int64_t s = tsQueryBufferSizeBytes;
      int64_t remain = s - t;
      if (remain >= 0) {
        if (atomic_val_compare_exchange_64(&tsQueryBufferSizeBytes, s, remain) == s) {
          return TSDB_CODE_SUCCESS;
        }
      } else {
        return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
      }
    }
  }

  // disable query processing if the value of tsQueryBufferSize is zero.
  return TSDB_CODE_QRY_NOT_ENOUGH_BUFFER;
}

void releaseQueryBuf(size_t numOfTables) {
  if (tsQueryBufferSizeBytes < 0) {
    return;
  }

  int64_t t = getQuerySupportBufSize(numOfTables);

  // restore value is not enough buffer available
  atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
}
D
dapan1121 已提交
4120

H
Haojun Liao 已提交
4121
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) {
4122
  SExplainExecInfo  execInfo = {0};
H
Haojun Liao 已提交
4123
  SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo);
4124

H
Haojun Liao 已提交
4125 4126 4127 4128 4129
  pExplainInfo->numOfRows = operatorInfo->resultInfo.totalRows;
  pExplainInfo->startupCost = operatorInfo->cost.openCost;
  pExplainInfo->totalCost = operatorInfo->cost.totalCost;
  pExplainInfo->verboseLen = 0;
  pExplainInfo->verboseInfo = NULL;
D
dapan1121 已提交
4130

4131
  if (operatorInfo->fpSet.getExplainFn) {
4132 4133
    int32_t code =
        operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen);
D
dapan1121 已提交
4134
    if (code) {
4135
      qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
D
dapan1121 已提交
4136 4137 4138
      return code;
    }
  }
dengyihao's avatar
dengyihao 已提交
4139

D
dapan1121 已提交
4140
  int32_t code = 0;
D
dapan1121 已提交
4141
  for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
H
Haojun Liao 已提交
4142 4143
    code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pExecInfoList);
    if (code != TSDB_CODE_SUCCESS) {
4144
      //      taosMemoryFreeClear(*pRes);
D
dapan1121 已提交
4145 4146 4147 4148 4149
      return TSDB_CODE_QRY_OUT_OF_MEMORY;
    }
  }

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
4150
}
5
54liuyao 已提交
4151

4152 4153
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
                     SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup) {
4154 4155 4156 4157 4158 4159
  SWinKey key = {
      .ts = win->skey,
      .groupId = tableGroupId,
  };
  char*   value = NULL;
  int32_t size = pAggSup->resultRowSize;
5
54liuyao 已提交
4160

4161
  if (streamStateAddIfNotExist(pState, &key, (void**)&value, &size) < 0) {
4162 4163 4164 4165 4166 4167 4168 4169 4170 4171
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }
  *pResult = (SResultRow*)value;
  ASSERT(*pResult);
  // set time window for current result
  (*pResult)->win = (*win);
  setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset);
  return TSDB_CODE_SUCCESS;
}

4172 4173
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult) {
  streamStateReleaseBuf(pState, pKey, pResult);
4174 4175 4176
  return TSDB_CODE_SUCCESS;
}

4177 4178
int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize) {
  streamStatePut(pState, pKey, pResult, resSize);
4179 4180 4181
  return TSDB_CODE_SUCCESS;
}

4182
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
4183
                                   SGroupResInfo* pGroupResInfo) {
4184
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
4185 4186 4187 4188 4189 4190 4191 4192 4193 4194 4195 4196 4197 4198 4199
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);

  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
    int32_t     size = 0;
    void*       pVal = NULL;
    SWinKey     key = {
            .ts = *(TSKEY*)pPos->key,
            .groupId = pPos->groupId,
    };
4200
    int32_t code = streamStateGet(pState, &key, &pVal, &size);
4201 4202 4203 4204 4205 4206
    ASSERT(code == 0);
    SResultRow* pRow = (SResultRow*)pVal;
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
    // no results, continue to check the next one
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
4207
      releaseOutputBuf(pState, &key, pRow);
4208 4209 4210 4211 4212
      continue;
    }

    if (pBlock->info.groupId == 0) {
      pBlock->info.groupId = pPos->groupId;
4213 4214 4215 4216
      SStreamIntervalOperatorInfo* pInfo = pOperator->info;
      char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
      if (tbname != NULL) {
        memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
L
Liu Jicong 已提交
4217 4218
      } else {
        pBlock->info.parTbName[0] = 0;
4219
      }
4220 4221 4222
    } else {
      // current value belongs to different group, it can't be packed into one datablock
      if (pBlock->info.groupId != pPos->groupId) {
4223
        releaseOutputBuf(pState, &key, pRow);
4224 4225 4226 4227 4228 4229
        break;
      }
    }

    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
      ASSERT(pBlock->info.rows > 0);
4230
      releaseOutputBuf(pState, &key, pRow);
4231 4232 4233 4234 4235 4236 4237 4238 4239 4240
      break;
    }

    pGroupResInfo->index += 1;

    for (int32_t j = 0; j < numOfExprs; ++j) {
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
      if (pCtx[j].fpSet.finalize) {
4241 4242 4243 4244
        int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
        if (TAOS_FAILED(code1)) {
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
          T_LONG_JMP(pTaskInfo->env, code1);
4245 4246 4247 4248 4249 4250 4251 4252 4253 4254 4255 4256 4257
        }
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
      } else {
        // expand the result into multiple rows. E.g., _wstart, top(k, 20)
        // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
        for (int32_t k = 0; k < pRow->numOfRows; ++k) {
          colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
        }
      }
    }
5
54liuyao 已提交
4258

4259
    pBlock->info.rows += pRow->numOfRows;
4260
    releaseOutputBuf(pState, &key, pRow);
4261 4262 4263 4264
  }
  blockDataUpdateTsWindow(pBlock, 0);
  return TSDB_CODE_SUCCESS;
}
5
54liuyao 已提交
4265 4266 4267 4268 4269 4270 4271

int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size) {
  streamStateSessionPut(pState, key, (const void*)buf, size);
  releaseOutputBuf(pState, NULL, (SResultRow*)buf);
  return TSDB_CODE_SUCCESS;
}

4272
int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
5
54liuyao 已提交
4273
                                    SExprSupp* pSup, SGroupResInfo* pGroupResInfo) {
4274
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
5
54liuyao 已提交
4275 4276 4277 4278 4279 4280 4281 4282 4283 4284 4285 4286 4287 4288 4289 4290 4291 4292 4293 4294 4295 4296 4297 4298
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);

  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
    SSessionKey* pKey = taosArrayGet(pGroupResInfo->pRows, i);
    int32_t      size = 0;
    void*        pVal = NULL;
    int32_t      code = streamStateSessionGet(pState, pKey, &pVal, &size);
    ASSERT(code == 0);
    SResultRow* pRow = (SResultRow*)pVal;
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
    // no results, continue to check the next one
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
      releaseOutputBuf(pState, NULL, pRow);
      continue;
    }

    if (pBlock->info.groupId == 0) {
      pBlock->info.groupId = pKey->groupId;
4299 4300 4301 4302 4303 4304 4305 4306 4307 4308 4309 4310 4311 4312 4313 4314 4315 4316 4317 4318 4319 4320 4321 4322 4323

      if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
        SStreamStateAggOperatorInfo* pInfo = pOperator->info;

        char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
        if (tbname != NULL) {
          memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
        } else {
          pBlock->info.parTbName[0] = 0;
        }
      } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
                 pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
                 pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
        SStreamSessionAggOperatorInfo* pInfo = pOperator->info;

        char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
        if (tbname != NULL) {
          memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
        } else {
          pBlock->info.parTbName[0] = 0;
        }
      } else {
        ASSERT(0);
      }

5
54liuyao 已提交
4324 4325 4326 4327 4328 4329 4330 4331 4332 4333 4334 4335 4336 4337 4338 4339 4340 4341 4342 4343 4344 4345 4346 4347 4348 4349 4350 4351 4352 4353 4354 4355 4356 4357 4358 4359 4360 4361 4362 4363 4364 4365 4366 4367 4368
    } else {
      // current value belongs to different group, it can't be packed into one datablock
      if (pBlock->info.groupId != pKey->groupId) {
        releaseOutputBuf(pState, NULL, pRow);
        break;
      }
    }

    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
      ASSERT(pBlock->info.rows > 0);
      releaseOutputBuf(pState, NULL, pRow);
      break;
    }

    pGroupResInfo->index += 1;

    for (int32_t j = 0; j < numOfExprs; ++j) {
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
      if (pCtx[j].fpSet.finalize) {
        int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
        if (TAOS_FAILED(code1)) {
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
          T_LONG_JMP(pTaskInfo->env, code1);
        }
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
      } else {
        // expand the result into multiple rows. E.g., _wstart, top(k, 20)
        // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
        for (int32_t k = 0; k < pRow->numOfRows; ++k) {
          colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
        }
      }
    }

    pBlock->info.rows += pRow->numOfRows;
    // saveSessionDiscBuf(pState, pKey, pVal, size);
    releaseOutputBuf(pState, NULL, pRow);
  }
  blockDataUpdateTsWindow(pBlock, 0);
  return TSDB_CODE_SUCCESS;
4369
}