executorInt.c 40.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

H
Haojun Liao 已提交
16 17
#include "filter.h"
#include "function.h"
18 19
#include "functionMgt.h"
#include "os.h"
H
Haojun Liao 已提交
20
#include "querynodes.h"
21
#include "tfill.h"
dengyihao's avatar
dengyihao 已提交
22
#include "tname.h"
23

H
Haojun Liao 已提交
24
#include "tdatablock.h"
H
Haojun Liao 已提交
25
#include "tmsg.h"
26
#include "ttime.h"
H
Haojun Liao 已提交
27

28
#include "executorInt.h"
dengyihao's avatar
dengyihao 已提交
29
#include "index.h"
30
#include "operator.h"
31
#include "query.h"
32
#include "querytask.h"
33
#include "tcompare.h"
H
Haojun Liao 已提交
34
#include "thash.h"
35
#include "ttypes.h"
36
#include "storageapi.h"
37

X
Xiaoyu Wang 已提交
38
#define SET_REVERSE_SCAN_FLAG(runtime)    ((runtime)->scanFlag = REVERSE_SCAN)
39 40 41 42
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)

#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
wafwerar's avatar
wafwerar 已提交
43
  uint32_t v = taosRand();
44 45 46 47

  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
48
    return taosMemoryMalloc(__size);
49 50 51 52
  }
}

static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
wafwerar's avatar
wafwerar 已提交
53
  uint32_t v = taosRand();
54 55 56
  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
57
    return taosMemoryCalloc(num, __size);
58 59 60 61
  }
}

static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
wafwerar's avatar
wafwerar 已提交
62
  uint32_t v = taosRand();
63 64 65
  if (v % 5 <= 1) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
66
    return taosMemoryRealloc(p, __size);
67 68 69 70 71 72 73 74
  }
}

#define calloc  u_calloc
#define malloc  u_malloc
#define realloc u_realloc
#endif

75
static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock);
76

X
Xiaoyu Wang 已提交
77 78
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
79

H
Haojun Liao 已提交
80 81 82 83
static void    extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep,
                                                   int32_t status);
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
                                   bool createDummyCol);
H
Haojun Liao 已提交
84
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
D
dapan1121 已提交
85
                                  SGroupResInfo* pGroupResInfo, int32_t threshold, bool ignoreGroup);
86

87
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
L
Liu Jicong 已提交
88
  SFilePage* pData = NULL;
89 90 91

  // in the first scan, new space needed for results
  int32_t pageId = -1;
92
  if (*currentPageId == -1) {
93
    pData = getNewBufPage(pResultBuf, &pageId);
94 95
    pData->num = sizeof(SFilePage);
  } else {
96
    pData = getBufPage(pResultBuf, *currentPageId);
97 98 99 100 101
    if (pData == NULL) {
      qError("failed to get buffer, code:%s", tstrerror(terrno));
      return NULL;
    }

102
    pageId = *currentPageId;
103

wmmhello's avatar
wmmhello 已提交
104
    if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
105
      // release current page first, and prepare the next one
106
      releaseBufPage(pResultBuf, pData);
107

108
      pData = getNewBufPage(pResultBuf, &pageId);
109 110 111 112 113 114 115 116 117 118
      if (pData != NULL) {
        pData->num = sizeof(SFilePage);
      }
    }
  }

  if (pData == NULL) {
    return NULL;
  }

119 120
  setBufPageDirty(pData, true);

121 122
  // set the number of rows in current disk page
  SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
123

X
Xiaoyu Wang 已提交
124
  memset((char*)pResultRow, 0, interBufSize);
125 126 127
  pResultRow->pageId = pageId;
  pResultRow->offset = (int32_t)pData->num;

128
  *currentPageId = pageId;
wmmhello's avatar
wmmhello 已提交
129
  pData->num += interBufSize;
130 131 132
  return pResultRow;
}

133 134 135 136 137 138 139
/**
 * the struct of key in hash table
 * +----------+---------------+
 * | group id |   key data    |
 * | 8 bytes  | actual length |
 * +----------+---------------+
 */
140 141
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
                                   int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
D
dapan1121 已提交
142
                                   bool isIntervalQuery, SAggSupporter* pSup, bool keepGroup) {
143
  SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
D
dapan1121 已提交
144 145 146
  if (!keepGroup) {
    *(uint64_t*)pSup->keyBuf = calcGroupId(pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
  }
H
Haojun Liao 已提交
147

dengyihao's avatar
dengyihao 已提交
148
  SResultRowPosition* p1 =
149
      (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
H
Haojun Liao 已提交
150

151 152
  SResultRow* pResult = NULL;

H
Haojun Liao 已提交
153 154
  // in case of repeat scan/reverse scan, no new time window added.
  if (isIntervalQuery) {
155
    if (p1 != NULL) {  // the *p1 may be NULL in case of sliding+offset exists.
156
      pResult = getResultRowByPos(pResultBuf, p1, true);
157 158 159 160
      if (NULL == pResult) {
        T_LONG_JMP(pTaskInfo->env, terrno);
      }

161
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
162 163
    }
  } else {
dengyihao's avatar
dengyihao 已提交
164 165
    // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
    // pResultRowInfo object.
H
Haojun Liao 已提交
166
    if (p1 != NULL) {
167
      // todo
168
      pResult = getResultRowByPos(pResultBuf, p1, true);
169 170 171 172
      if (NULL == pResult) {
        T_LONG_JMP(pTaskInfo->env, terrno);
      }

173
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
174 175 176
    }
  }

L
Liu Jicong 已提交
177
  // 1. close current opened time window
178
  if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
179
    SResultRowPosition pos = pResultRowInfo->cur;
X
Xiaoyu Wang 已提交
180
    SFilePage*         pPage = getBufPage(pResultBuf, pos.pageId);
181 182 183 184
    if (pPage == NULL) {
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
      T_LONG_JMP(pTaskInfo->env, terrno);
    }
185 186 187 188 189
    releaseBufPage(pResultBuf, pPage);
  }

  // allocate a new buffer page
  if (pResult == NULL) {
190
    pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
191 192 193
    if (pResult == NULL) {
      T_LONG_JMP(pTaskInfo->env, terrno);
    }
194

195 196
    // add a new result set for a new group
    SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
197
    tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
L
Liu Jicong 已提交
198
                   sizeof(SResultRowPosition));
H
Haojun Liao 已提交
199 200
  }

201 202 203
  // 2. set the new time window to be the new active time window
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};

H
Haojun Liao 已提交
204
  // too many time window in query
205
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
206
      tSimpleHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
207
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
H
Haojun Liao 已提交
208 209
  }

H
Haojun Liao 已提交
210
  return pResult;
H
Haojun Liao 已提交
211 212
}

213
//  query_range_start, query_range_end, window_duration, window_start, window_end
214
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
215 216 217
  pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
  pColData->info.bytes = sizeof(int64_t);

H
Haojun Liao 已提交
218
  colInfoDataEnsureCapacity(pColData, 5, false);
219 220
  colDataSetInt64(pColData, 0, &pQueryWindow->skey);
  colDataSetInt64(pColData, 1, &pQueryWindow->ekey);
221 222

  int64_t interval = 0;
223 224 225
  colDataSetInt64(pColData, 2, &interval);  // this value may be variable in case of 'n' and 'y'.
  colDataSetInt64(pColData, 3, &pQueryWindow->skey);
  colDataSetInt64(pColData, 4, &pQueryWindow->ekey);
226 227
}

G
Ganlin Zhao 已提交
228
static void doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
229 230
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
231
    pCtx[i].order = order;
232
    pCtx[i].input.numOfRows = pBlock->info.rows;
233
    setBlockSMAInfo(&pCtx[i], &pExprSup->pExprInfo[i], pBlock);
234
    pCtx[i].pSrcBlock = pBlock;
G
Ganlin Zhao 已提交
235
    pCtx[i].scanFlag = scanFlag;
236 237 238
  }
}

239
void setInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol) {
240
  if (pBlock->pBlockAgg != NULL) {
G
Ganlin Zhao 已提交
241
    doSetInputDataBlockInfo(pExprSup, pBlock, order, scanFlag);
242
  } else {
243
    doSetInputDataBlock(pExprSup, pBlock, order, scanFlag, createDummyCol);
H
Haojun Liao 已提交
244
  }
245 246
}

L
Liu Jicong 已提交
247 248
static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t paramIndex,
                                             int32_t numOfRows) {
249 250 251 252 253 254 255 256
  SColumnInfoData* pColInfo = NULL;
  if (pInput->pData[paramIndex] == NULL) {
    pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pColInfo == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    // Set the correct column info (data type and bytes)
257 258
    pColInfo->info.type = pFuncParam->param.nType;
    pColInfo->info.bytes = pFuncParam->param.nLen;
259 260

    pInput->pData[paramIndex] = pColInfo;
261 262
  } else {
    pColInfo = pInput->pData[paramIndex];
263 264
  }

H
Haojun Liao 已提交
265
  colInfoDataEnsureCapacity(pColInfo, numOfRows, false);
266

267
  int8_t type = pFuncParam->param.nType;
268 269
  if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
    int64_t v = pFuncParam->param.i;
dengyihao's avatar
dengyihao 已提交
270
    for (int32_t i = 0; i < numOfRows; ++i) {
271
      colDataSetInt64(pColInfo, i, &v);
272 273 274
    }
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
dengyihao's avatar
dengyihao 已提交
275
    for (int32_t i = 0; i < numOfRows; ++i) {
276
      colDataSetDouble(pColInfo, i, &v);
277
    }
D
Dingle Zhang 已提交
278
  } else if (type == TSDB_DATA_TYPE_VARCHAR || type == TSDB_DATA_TYPE_GEOMETRY) {
L
Liu Jicong 已提交
279
    char* tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE);
280
    STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen);
L
Liu Jicong 已提交
281
    for (int32_t i = 0; i < numOfRows; ++i) {
282
      colDataSetVal(pColInfo, i, tmp, false);
283
    }
H
Haojun Liao 已提交
284
    taosMemoryFree(tmp);
285 286 287 288 289
  }

  return TSDB_CODE_SUCCESS;
}

290 291
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
                                   bool createDummyCol) {
292
  int32_t         code = TSDB_CODE_SUCCESS;
293
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
294

295
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
L
Liu Jicong 已提交
296
    pCtx[i].order = order;
297 298
    pCtx[i].input.numOfRows = pBlock->info.rows;

L
Liu Jicong 已提交
299
    pCtx[i].pSrcBlock = pBlock;
X
Xiaoyu Wang 已提交
300
    pCtx[i].scanFlag = scanFlag;
H
Haojun Liao 已提交
301

302
    SInputColumnInfoData* pInput = &pCtx[i].input;
H
Haojun Liao 已提交
303
    pInput->uid = pBlock->info.id.uid;
H
Haojun Liao 已提交
304
    pInput->colDataSMAIsSet = false;
305

306
    SExprInfo* pOneExpr = &pExprSup->pExprInfo[i];
307
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
dengyihao's avatar
dengyihao 已提交
308
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
G
Ganlin Zhao 已提交
309 310
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
dengyihao's avatar
dengyihao 已提交
311
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
312 313 314
        pInput->totalRows = pBlock->info.rows;
        pInput->numOfRows = pBlock->info.rows;
        pInput->startRowIndex = 0;
315

316
        // NOTE: the last parameter is the primary timestamp column
H
Haojun Liao 已提交
317
        // todo: refactor this
318
        if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
L
Liu Jicong 已提交
319
          pInput->pPTS = pInput->pData[j];  // in case of merge function, this is not always the ts column data.
320
        }
321 322
        ASSERT(pInput->pData[j] != NULL);
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
323 324 325
        // todo avoid case: top(k, 12), 12 is the value parameter.
        // sum(11), 11 is also the value parameter.
        if (createDummyCol && pOneExpr->base.numOfParams == 1) {
326 327 328 329
          pInput->totalRows = pBlock->info.rows;
          pInput->numOfRows = pBlock->info.rows;
          pInput->startRowIndex = 0;

330
          code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows);
331 332 333
          if (code != TSDB_CODE_SUCCESS) {
            return code;
          }
334
        }
G
Ganlin Zhao 已提交
335 336
      }
    }
H
Haojun Liao 已提交
337
  }
338 339

  return code;
H
Haojun Liao 已提交
340 341
}

5
54liuyao 已提交
342
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
343
  struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
344

345 346 347 348 349
  // in case of timestamp column, always generated results.
  int32_t functionId = pCtx->functionId;
  if (functionId == -1) {
    return false;
  }
350

G
Ganlin Zhao 已提交
351
  if (pCtx->scanFlag == PRE_SCAN) {
352
    return fmIsRepeatScanFunc(pCtx->functionId);
353 354
  }

355 356
  if (isRowEntryCompleted(pResInfo)) {
    return false;
357 358
  }

359 360 361
  return true;
}

H
Haojun Liao 已提交
362
static int32_t doCreateConstantValColumnSMAInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type,
363 364 365 366 367 368
                                                int32_t paramIndex, int32_t numOfRows) {
  if (pInput->pData[paramIndex] == NULL) {
    pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pInput->pData[paramIndex] == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
369

370 371 372
    // Set the correct column info (data type and bytes)
    pInput->pData[paramIndex]->info.type = type;
    pInput->pData[paramIndex]->info.bytes = tDataTypes[type].bytes;
373
  }
H
Haojun Liao 已提交
374

375 376 377 378 379 380
  SColumnDataAgg* da = NULL;
  if (pInput->pColumnDataAgg[paramIndex] == NULL) {
    da = taosMemoryCalloc(1, sizeof(SColumnDataAgg));
    pInput->pColumnDataAgg[paramIndex] = da;
    if (da == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
381 382
    }
  } else {
383
    da = pInput->pColumnDataAgg[paramIndex];
384 385
  }

386 387
  if (type == TSDB_DATA_TYPE_BIGINT) {
    int64_t v = pFuncParam->param.i;
388
    *da = (SColumnDataAgg){.numOfNull = 0, .min = v, .max = v, .sum = v * numOfRows};
389 390
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
391
    *da = (SColumnDataAgg){.numOfNull = 0};
392

393 394 395 396 397 398
    *(double*)&da->min = v;
    *(double*)&da->max = v;
    *(double*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_BOOL) {  // todo validate this data type
    bool v = pFuncParam->param.i;

399
    *da = (SColumnDataAgg){.numOfNull = 0};
400 401 402 403 404
    *(bool*)&da->min = 0;
    *(bool*)&da->max = v;
    *(bool*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_TIMESTAMP) {
    // do nothing
405
  } else {
H
Haojun Liao 已提交
406
    qError("invalid constant type for sma info");
407 408
  }

409 410
  return TSDB_CODE_SUCCESS;
}
411

412
void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock) {
413 414 415 416 417 418 419
  int32_t numOfRows = pBlock->info.rows;

  SInputColumnInfoData* pInput = &pCtx->input;
  pInput->numOfRows = numOfRows;
  pInput->totalRows = numOfRows;

  if (pBlock->pBlockAgg != NULL) {
H
Haojun Liao 已提交
420
    pInput->colDataSMAIsSet = true;
421

422 423
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
424

425 426
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
427 428
        pInput->pColumnDataAgg[j] = pBlock->pBlockAgg[slotId];
        if (pInput->pColumnDataAgg[j] == NULL) {
H
Haojun Liao 已提交
429
          pInput->colDataSMAIsSet = false;
430
        }
431 432 433 434

        // Here we set the column info data since the data type for each column data is required, but
        // the data in the corresponding SColumnInfoData will not be used.
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
435
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
H
Haojun Liao 已提交
436
        doCreateConstantValColumnSMAInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
437 438
      }
    }
439
  } else {
H
Haojun Liao 已提交
440
    pInput->colDataSMAIsSet = false;
441 442 443 444
  }
}

/////////////////////////////////////////////////////////////////////////////////////////////
H
Haojun Liao 已提交
445
STimeWindow getAlignQueryTimeWindow(const SInterval* pInterval, int64_t key) {
L
Liu Jicong 已提交
446
  STimeWindow win = {0};
H
Haojun Liao 已提交
447
  win.skey = taosTimeTruncate(key, pInterval);
448 449

  /*
H
Haojun Liao 已提交
450
   * if the realSkey > INT64_MAX - pInterval->interval, the query duration between
451 452
   * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
   */
H
Haojun Liao 已提交
453
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
454 455
  if (win.ekey < win.skey) {
    win.ekey = INT64_MAX;
456
  }
457 458

  return win;
459 460
}

461
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
5
54liuyao 已提交
462
  bool init = false;
463
  for (int32_t i = 0; i < numOfOutput; ++i) {
464
    pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
5
54liuyao 已提交
465 466 467
    if (init) {
      continue;
    }
468 469 470 471 472

    struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
    if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
      continue;
    }
473

474
    if (pCtx[i].isPseudoFunc) {
475 476 477
      continue;
    }

478 479 480 481 482 483
    if (!pResInfo->initialized) {
      if (pCtx[i].functionId != -1) {
        pCtx[i].fpSet.init(&pCtx[i], pResInfo);
      } else {
        pResInfo->initialized = true;
      }
5
54liuyao 已提交
484 485
    } else {
      init = true;
486 487 488 489
    }
  }
}

490 491 492 493 494 495 496 497 498 499 500 501 502 503
void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
    if (pResInfo == NULL) {
      continue;
    }

    pResInfo->initialized = false;
    pResInfo->numOfRes = 0;
    pResInfo->isNullRes = 0;
    pResInfo->complete = false;
  }
}

H
Haojun Liao 已提交
504 505
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) {
  if (pFilterInfo == NULL || pBlock->info.rows == 0) {
S
shenglian zhou 已提交
506 507
    return;
  }
508

509
  SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
510
  int32_t            code = filterSetDataFromSlotId(pFilterInfo, &param1);
511

512
  SColumnInfoData* p = NULL;
513
  int32_t          status = 0;
H
Haojun Liao 已提交
514

515
  // todo the keep seems never to be True??
H
Haojun Liao 已提交
516
  bool keep = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
517
  extractQualifiedTupleByFilterResult(pBlock, p, keep, status);
H
Haojun Liao 已提交
518

519
  if (pColMatchInfo != NULL) {
520
    size_t size = taosArrayGetSize(pColMatchInfo->pList);
H
Haojun Liao 已提交
521
    for (int32_t i = 0; i < size; ++i) {
H
Haojun Liao 已提交
522
      SColMatchItem* pInfo = taosArrayGet(pColMatchInfo->pList, i);
523
      if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
524
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->dstSlotId);
525
        if (pColData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
H
Haojun Liao 已提交
526
          blockDataUpdateTsWindow(pBlock, pInfo->dstSlotId);
527 528 529 530 531 532
          break;
        }
      }
    }
  }

533 534
  colDataDestroy(p);
  taosMemoryFree(p);
535 536
}

537
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status) {
538 539 540 541
  if (keep) {
    return;
  }

H
Haojun Liao 已提交
542
  int8_t* pIndicator = (int8_t*)p->pData;
H
Haojun Liao 已提交
543 544 545
  int32_t totalRows = pBlock->info.rows;

  if (status == FILTER_RESULT_ALL_QUALIFIED) {
546
    // here nothing needs to be done
H
Haojun Liao 已提交
547
  } else if (status == FILTER_RESULT_NONE_QUALIFIED) {
548
    pBlock->info.rows = 0;
H
Haojun Liao 已提交
549
  } else {
H
Haojun Liao 已提交
550
    int32_t bmLen = BitmapLen(totalRows);
H
Haojun Liao 已提交
551 552
    char*   pBitmap = NULL;
    int32_t maxRows = 0;
553

554 555
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
    for (int32_t i = 0; i < numOfCols; ++i) {
556
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
557
      // it is a reserved column for scalar function, and no data in this column yet.
H
Haojun Liao 已提交
558
      if (pDst->pData == NULL) {
559 560 561
        continue;
      }

562
      int32_t numOfRows = 0;
H
Haojun Liao 已提交
563 564
      if (IS_VAR_DATA_TYPE(pDst->info.type)) {
        int32_t j = 0;
565

X
Xiaoyu Wang 已提交
566
        while (j < totalRows) {
H
Haojun Liao 已提交
567
          if (pIndicator[j] == 0) {
568
            j += 1;
H
Haojun Liao 已提交
569 570
            continue;
          }
571

H
Haojun Liao 已提交
572 573 574 575
          if (colDataIsNull_var(pDst, j)) {
            colDataSetNull_var(pDst, numOfRows);
          } else {
            char* p1 = colDataGetVarData(pDst, j);
D
dapan1121 已提交
576
            colDataReassignVal(pDst, numOfRows, j, p1);
H
Haojun Liao 已提交
577
          }
H
Haojun Liao 已提交
578 579
          numOfRows += 1;
          j += 1;
D
dapan1121 已提交
580
        }
581

H
Haojun Liao 已提交
582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601
        if (maxRows < numOfRows) {
          maxRows = numOfRows;
        }
      } else {
        if (pBitmap == NULL) {
          pBitmap = taosMemoryCalloc(1, bmLen);
        }

        memcpy(pBitmap, pDst->nullbitmap, bmLen);
        memset(pDst->nullbitmap, 0, bmLen);

        int32_t j = 0;

        switch (pDst->info.type) {
          case TSDB_DATA_TYPE_BIGINT:
          case TSDB_DATA_TYPE_UBIGINT:
          case TSDB_DATA_TYPE_DOUBLE:
          case TSDB_DATA_TYPE_TIMESTAMP:
            while (j < totalRows) {
              if (pIndicator[j] == 0) {
H
Haojun Liao 已提交
602
                j += 1;
H
Haojun Liao 已提交
603 604 605 606 607 608 609
                continue;
              }

              if (colDataIsNull_f(pBitmap, j)) {
                colDataSetNull_f(pDst->nullbitmap, numOfRows);
              } else {
                ((int64_t*)pDst->pData)[numOfRows] = ((int64_t*)pDst->pData)[j];
H
Haojun Liao 已提交
610
              }
H
Haojun Liao 已提交
611 612 613 614 615 616 617 618 619
              numOfRows += 1;
              j += 1;
            }
            break;
          case TSDB_DATA_TYPE_FLOAT:
          case TSDB_DATA_TYPE_INT:
          case TSDB_DATA_TYPE_UINT:
            while (j < totalRows) {
              if (pIndicator[j] == 0) {
H
Haojun Liao 已提交
620
                j += 1;
H
Haojun Liao 已提交
621
                continue;
H
Haojun Liao 已提交
622
              }
H
Haojun Liao 已提交
623 624 625
              if (colDataIsNull_f(pBitmap, j)) {
                colDataSetNull_f(pDst->nullbitmap, numOfRows);
              } else {
H
Haojun Liao 已提交
626
                ((int32_t*)pDst->pData)[numOfRows] = ((int32_t*)pDst->pData)[j];
H
Haojun Liao 已提交
627
              }
H
Haojun Liao 已提交
628 629 630 631 632 633 634 635
              numOfRows += 1;
              j += 1;
            }
            break;
          case TSDB_DATA_TYPE_SMALLINT:
          case TSDB_DATA_TYPE_USMALLINT:
            while (j < totalRows) {
              if (pIndicator[j] == 0) {
H
Haojun Liao 已提交
636
                j += 1;
H
Haojun Liao 已提交
637
                continue;
H
Haojun Liao 已提交
638
              }
H
Haojun Liao 已提交
639 640 641
              if (colDataIsNull_f(pBitmap, j)) {
                colDataSetNull_f(pDst->nullbitmap, numOfRows);
              } else {
H
Haojun Liao 已提交
642
                ((int16_t*)pDst->pData)[numOfRows] = ((int16_t*)pDst->pData)[j];
H
Haojun Liao 已提交
643
              }
H
Haojun Liao 已提交
644 645 646 647 648 649 650 651 652
              numOfRows += 1;
              j += 1;
            }
            break;
          case TSDB_DATA_TYPE_BOOL:
          case TSDB_DATA_TYPE_TINYINT:
          case TSDB_DATA_TYPE_UTINYINT:
            while (j < totalRows) {
              if (pIndicator[j] == 0) {
H
Haojun Liao 已提交
653
                j += 1;
H
Haojun Liao 已提交
654
                continue;
H
Haojun Liao 已提交
655
              }
H
Haojun Liao 已提交
656 657 658 659 660 661 662 663 664
              if (colDataIsNull_f(pBitmap, j)) {
                colDataSetNull_f(pDst->nullbitmap, numOfRows);
              } else {
                ((int8_t*)pDst->pData)[numOfRows] = ((int8_t*)pDst->pData)[j];
              }
              numOfRows += 1;
              j += 1;
            }
            break;
D
dapan1121 已提交
665
        }
H
Haojun Liao 已提交
666
      }
667

H
Haojun Liao 已提交
668 669
      if (maxRows < numOfRows) {
        maxRows = numOfRows;
670
      }
671
    }
672

H
Haojun Liao 已提交
673
    pBlock->info.rows = maxRows;
H
Haojun Liao 已提交
674 675 676
    if (pBitmap != NULL) {
      taosMemoryFree(pBitmap);
    }
677 678 679
  }
}

H
Haojun Liao 已提交
680
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset) {
681
  bool returnNotNull = false;
682
  for (int32_t j = 0; j < numOfExprs; ++j) {
683
    SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
684 685
    if (!isRowEntryInitialized(pResInfo)) {
      continue;
dengyihao's avatar
dengyihao 已提交
686
    } else {
687 688 689 690 691
    }

    if (pRow->numOfRows < pResInfo->numOfRes) {
      pRow->numOfRows = pResInfo->numOfRes;
    }
692

693
    if (pCtx[j].isNotNullFunc) {
694 695
      returnNotNull = true;
    }
696
  }
S
shenglian zhou 已提交
697 698
  // if all expr skips all blocks, e.g. all null inputs for max function, output one row in final result.
  //  except for first/last, which require not null output, output no rows
699
  if (pRow->numOfRows == 0 && !returnNotNull) {
700
    pRow->numOfRows = 1;
701 702 703
  }
}

H
Haojun Liao 已提交
704 705
void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
                              SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) {
706 707 708
  for (int32_t j = 0; j < numOfExprs; ++j) {
    int32_t slotId = pExprInfo[j].base.resSchema.slotId;

709
    pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
710
    if (pCtx[j].fpSet.finalize) {
711
      if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_key") == 0) {
712 713
        // for groupkey along with functions that output multiple lines(e.g. Histogram)
        // need to match groupkey result for each output row of that function.
714 715 716 717
        if (pCtx[j].resultInfo->numOfRes != 0) {
          pCtx[j].resultInfo->numOfRes = pRow->numOfRows;
        }
      }
718 719
      
      blockDataEnsureCapacity(pBlock, pBlock->info.rows + pCtx[j].resultInfo->numOfRes);
720 721 722
      int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
      if (TAOS_FAILED(code)) {
        qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
723
        T_LONG_JMP(pTaskInfo->env, code);
724 725
      }
    } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
726
      // do nothing
727
    } else {
728 729
      // expand the result into multiple rows. E.g., _wstart, top(k, 20)
      // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
730 731 732
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
      char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
      for (int32_t k = 0; k < pRow->numOfRows; ++k) {
733
        colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
734 735 736
      }
    }
  }
737 738
}

739 740 741
// todo refactor. SResultRow has direct pointer in miainfo
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
                           SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
X
Xiaoyu Wang 已提交
742
  SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId);
743 744 745 746 747
  if (page == NULL) {
    qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
    T_LONG_JMP(pTaskInfo->env, terrno);
  }

748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771
  SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);

  SqlFunctionCtx* pCtx = pSup->pCtx;
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  const int32_t*  rowEntryOffset = pSup->rowEntryInfoOffset;

  doUpdateNumOfRows(pCtx, pRow, pSup->numOfExprs, rowEntryOffset);
  if (pRow->numOfRows == 0) {
    releaseBufPage(pBuf, page);
    return 0;
  }

  int32_t size = pBlock->info.capacity;
  while (pBlock->info.rows + pRow->numOfRows > size) {
    size = size * 1.25;
  }

  int32_t code = blockDataEnsureCapacity(pBlock, size);
  if (TAOS_FAILED(code)) {
    releaseBufPage(pBuf, page);
    qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
    T_LONG_JMP(pTaskInfo->env, code);
  }

H
Haojun Liao 已提交
772
  copyResultrowToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
773 774

  releaseBufPage(pBuf, page);
775
  pBlock->info.rows += pRow->numOfRows;
776 777 778
  return 0;
}

779
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
D
dapan1121 已提交
780
                           SGroupResInfo* pGroupResInfo, int32_t threshold, bool ignoreGroup) {
781 782 783 784 785
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

786
  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
787

788
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
L
Liu Jicong 已提交
789 790
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
    SFilePage*  page = getBufPage(pBuf, pPos->pos.pageId);
791 792 793 794
    if (page == NULL) {
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
      T_LONG_JMP(pTaskInfo->env, terrno);
    }
795

796
    SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
797

H
Haojun Liao 已提交
798
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
799 800

    // no results, continue to check the next one
801 802
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
803
      releaseBufPage(pBuf, page);
804 805 806
      continue;
    }

D
dapan1121 已提交
807 808 809 810 811 812 813 814 815
    if (!ignoreGroup) {
      if (pBlock->info.id.groupId == 0) {
        pBlock->info.id.groupId = pPos->groupId;
      } else {
        // current value belongs to different group, it can't be packed into one datablock
        if (pBlock->info.id.groupId != pPos->groupId) {
          releaseBufPage(pBuf, page);
          break;
        }
816 817 818
      }
    }

819
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
820
      uint32_t newSize = pBlock->info.rows + pRow->numOfRows + ((numOfRows - i) > 1 ? 1 : 0);
D
dapan1121 已提交
821
      blockDataEnsureCapacity(pBlock, newSize);
D
dapan1121 已提交
822
      qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s",
D
dapan1121 已提交
823
             newSize, pBlock->info.capacity, GET_TASKID(pTaskInfo));
dengyihao's avatar
dengyihao 已提交
824
      // todo set the pOperator->resultInfo size
825 826 827
    }

    pGroupResInfo->index += 1;
H
Haojun Liao 已提交
828
    copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
829

830
    releaseBufPage(pBuf, page);
831
    pBlock->info.rows += pRow->numOfRows;
832 833 834
    if (pBlock->info.rows >= threshold) {
      break;
    }
835 836
  }

D
dapan1121 已提交
837
  qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
H
Haojun Liao 已提交
838
         pBlock->info.id.groupId);
H
Haojun Liao 已提交
839
  pBlock->info.dataLoad = 1;
840
  blockDataUpdateTsWindow(pBlock, 0);
841 842 843
  return 0;
}

844 845 846
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                           SDiskbasedBuf* pBuf) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
847 848
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;

849 850 851 852 853 854 855 856 857 858 859
  SSDataBlock*   pBlock = pbInfo->pRes;

  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

  blockDataCleanup(pBlock);
  if (!hasRemainResults(pGroupResInfo)) {
    return;
  }

  // clear the existed group id
H
Haojun Liao 已提交
860
  pBlock->info.id.groupId = 0;
861
  ASSERT(!pbInfo->mergeResultBlock);
D
dapan1121 已提交
862
  doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold, false);
863

864
  void* tbname = NULL;
865
  if (pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
866 867 868
    pBlock->info.parTbName[0] = 0;
  } else {
    memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
869
  }
870 871

  pAPI->stateStore.streamStateFreeVal(tbname);
872 873
}

X
Xiaoyu Wang 已提交
874 875
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                            SDiskbasedBuf* pBuf) {
876
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
877
  SSDataBlock*   pBlock = pbInfo->pRes;
878

879 880 881
  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

882
  blockDataCleanup(pBlock);
883
  if (!hasRemainResults(pGroupResInfo)) {
884 885 886
    return;
  }

887
  // clear the existed group id
H
Haojun Liao 已提交
888
  pBlock->info.id.groupId = 0;
889
  if (!pbInfo->mergeResultBlock) {
D
dapan1121 已提交
890
    doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold, false);
891
  } else {
dengyihao's avatar
dengyihao 已提交
892
    while (hasRemainResults(pGroupResInfo)) {
D
dapan1121 已提交
893
      doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold, true);
894 895
      if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
        break;
896 897
      }

898
      // clearing group id to continue to merge data that belong to different groups
H
Haojun Liao 已提交
899
      pBlock->info.id.groupId = 0;
900
    }
901 902

    // clear the group id info in SSDataBlock, since the client does not need it
H
Haojun Liao 已提交
903
    pBlock->info.id.groupId = 0;
904 905 906
  }
}

907
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
C
Cary Xu 已提交
908 909 910 911 912
  for (int32_t i = 0; i < numOfExprs; ++i) {
    SExprInfo* pExprInfo = &pExpr[i];
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
        taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
5
54liuyao 已提交
913 914
      } else if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_VALUE) {
        taosVariantDestroy(&pExprInfo->base.pParam[j].param);
H
Haojun Liao 已提交
915
      }
916
    }
C
Cary Xu 已提交
917 918 919

    taosMemoryFree(pExprInfo->base.pParam);
    taosMemoryFree(pExprInfo->pExpr);
H
Haojun Liao 已提交
920 921 922
  }
}

923 924 925 926 927 928
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) {
  *defaultPgsz = 4096;
  while (*defaultPgsz < rowSize * 4) {
    *defaultPgsz <<= 1u;
  }

929
  // The default buffer for each operator in query is 10MB.
930
  // at least four pages need to be in buffer
931 932
  // TODO: make this variable to be configurable.
  *defaultBufsz = 4096 * 2560;
933 934 935 936 937 938 939
  if ((*defaultBufsz) <= (*defaultPgsz)) {
    (*defaultBufsz) = (*defaultPgsz) * 4;
  }

  return 0;
}

L
Liu Jicong 已提交
940
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) {
H
Haojun Liao 已提交
941 942 943 944
  if (numOfRows == 0) {
    numOfRows = 4096;
  }

945 946
  pResultInfo->capacity = numOfRows;
  pResultInfo->threshold = numOfRows * 0.75;
947

948 949
  if (pResultInfo->threshold == 0) {
    pResultInfo->threshold = numOfRows;
950 951 952
  }
}

953 954 955 956 957
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
  pInfo->pRes = pBlock;
  initResultRowInfo(&pInfo->resultRowInfo);
}

958
static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
959 960 961 962 963 964 965 966 967 968
  if (pCtx == NULL) {
    return NULL;
  }

  for (int32_t i = 0; i < numOfOutput; ++i) {
    for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) {
      taosVariantDestroy(&pCtx[i].param[j].param);
    }

    taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
969
    taosMemoryFreeClear(pCtx[i].subsidiaries.buf);
970 971
    taosMemoryFree(pCtx[i].input.pData);
    taosMemoryFree(pCtx[i].input.pColumnDataAgg);
972 973 974 975

    if (pCtx[i].udfName != NULL) {
      taosMemoryFree(pCtx[i].udfName);
    }
976 977 978 979 980 981
  }

  taosMemoryFreeClear(pCtx);
  return NULL;
}

982
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr, SFunctionStateStore* pStore) {
983 984 985
  pSup->pExprInfo = pExprInfo;
  pSup->numOfExprs = numOfExpr;
  if (pSup->pExprInfo != NULL) {
986
    pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset, pStore);
987 988 989
    if (pSup->pCtx == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
990
  }
991 992

  return TSDB_CODE_SUCCESS;
993 994
}

995 996 997 998
void cleanupExprSupp(SExprSupp* pSupp) {
  destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs);
  if (pSupp->pExprInfo != NULL) {
    destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
C
Cary Xu 已提交
999
    taosMemoryFreeClear(pSupp->pExprInfo);
1000
  }
H
Haojun Liao 已提交
1001 1002 1003 1004 1005 1006

  if (pSupp->pFilterInfo != NULL) {
    filterFreeInfo(pSupp->pFilterInfo);
    pSupp->pFilterInfo = NULL;
  }

1007 1008 1009
  taosMemoryFree(pSupp->rowEntryInfoOffset);
}

X
Xiaoyu Wang 已提交
1010
void cleanupBasicInfo(SOptrBasicInfo* pInfo) { pInfo->pRes = blockDataDestroy(pInfo->pRes); }
1011

1012
bool groupbyTbname(SNodeList* pGroupList) {
1013
  bool bytbname = false;
1014
  if (LIST_LENGTH(pGroupList) == 1) {
1015 1016 1017 1018 1019 1020 1021 1022 1023 1024
    SNode* p = nodesListGetNode(pGroupList, 0);
    if (p->type == QUERY_NODE_FUNCTION) {
      // partition by tbname/group by tbname
      bytbname = (strcmp(((struct SFunctionNode*)p)->functionName, "tbname") == 0);
    }
  }

  return bytbname;
}

1025
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo* pTask, SReadHandle* readHandle) {
D
dapan1121 已提交
1026
  switch (pNode->type) {
D
dapan1121 已提交
1027 1028 1029 1030 1031 1032
    case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
      SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
      if (NULL == pInserterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      pInserterParam->readHandle = readHandle;
L
Liu Jicong 已提交
1033

D
dapan1121 已提交
1034 1035 1036
      *pParam = pInserterParam;
      break;
    }
D
dapan1121 已提交
1037
    case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
1038
      SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
D
dapan1121 已提交
1039 1040 1041
      if (NULL == pDeleterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
1042

X
Xiaoyu Wang 已提交
1043
      SArray*         pInfoList = getTableListInfo(pTask);
1044 1045 1046
      STableListInfo* pTableListInfo = taosArrayGetP(pInfoList, 0);
      taosArrayDestroy(pInfoList);

1047
      pDeleterParam->suid = tableListGetSuid(pTableListInfo);
H
Haojun Liao 已提交
1048 1049

      // TODO extract uid list
1050 1051
      int32_t numOfTables = tableListGetSize(pTableListInfo);
      pDeleterParam->pUidList = taosArrayInit(numOfTables, sizeof(uint64_t));
D
dapan1121 已提交
1052 1053 1054 1055
      if (NULL == pDeleterParam->pUidList) {
        taosMemoryFree(pDeleterParam);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
H
Haojun Liao 已提交
1056

1057
      for (int32_t i = 0; i < numOfTables; ++i) {
1058
        STableKeyInfo* pTable = tableListGetInfo(pTableListInfo, i);
D
dapan1121 已提交
1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071
        taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
      }

      *pParam = pDeleterParam;
      break;
    }
    default:
      break;
  }

  return TSDB_CODE_SUCCESS;
}

1072 1073
int32_t releaseOutputBuf(void* pState, SWinKey* pKey, SResultRow* pResult, SStateStore* pAPI) {
  pAPI->streamStateReleaseBuf(pState, pKey, pResult);
dengyihao's avatar
dengyihao 已提交
1074 1075
  return TSDB_CODE_SUCCESS;
}
1076

1077 1078 1079
int32_t saveSessionDiscBuf(void* pState, SSessionKey* key, void* buf, int32_t size, SStateStore* pAPI) {
  pAPI->streamStateSessionPut(pState, key, (const void*)buf, size);
  releaseOutputBuf(pState, NULL, (SResultRow*)buf, pAPI);
1080 1081 1082
  return TSDB_CODE_SUCCESS;
}

1083
int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock,
5
54liuyao 已提交
1084
                                    SExprSupp* pSup, SGroupResInfo* pGroupResInfo) {
1085
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
1086 1087
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;

5
54liuyao 已提交
1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);

  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
    SSessionKey* pKey = taosArrayGet(pGroupResInfo->pRows, i);
    int32_t      size = 0;
    void*        pVal = NULL;
1099
    int32_t      code = pAPI->stateStore.streamStateSessionGet(pState, pKey, &pVal, &size);
5
54liuyao 已提交
1100
    ASSERT(code == 0);
1101 1102
    if (code == -1) {
      // coverity scan
5
54liuyao 已提交
1103
      pGroupResInfo->index += 1;
1104 1105
      continue;
    }
5
54liuyao 已提交
1106 1107 1108 1109 1110
    SResultRow* pRow = (SResultRow*)pVal;
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
    // no results, continue to check the next one
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
1111
      releaseOutputBuf(pState, NULL, pRow, &pAPI->stateStore);
5
54liuyao 已提交
1112 1113 1114
      continue;
    }

H
Haojun Liao 已提交
1115 1116
    if (pBlock->info.id.groupId == 0) {
      pBlock->info.id.groupId = pKey->groupId;
1117

1118
      void* tbname = NULL;
1119
      if (pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
1120
        pBlock->info.parTbName[0] = 0;
1121
      } else {
1122
        memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
1123
      }
1124
      pAPI->stateStore.streamStateFreeVal(tbname);
5
54liuyao 已提交
1125 1126
    } else {
      // current value belongs to different group, it can't be packed into one datablock
H
Haojun Liao 已提交
1127
      if (pBlock->info.id.groupId != pKey->groupId) {
1128
        releaseOutputBuf(pState, NULL, pRow, &pAPI->stateStore);
5
54liuyao 已提交
1129 1130 1131 1132 1133 1134
        break;
      }
    }

    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
      ASSERT(pBlock->info.rows > 0);
1135
      releaseOutputBuf(pState, NULL, pRow, &pAPI->stateStore);
5
54liuyao 已提交
1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158
      break;
    }

    pGroupResInfo->index += 1;

    for (int32_t j = 0; j < numOfExprs; ++j) {
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
      if (pCtx[j].fpSet.finalize) {
        int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
        if (TAOS_FAILED(code1)) {
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
          T_LONG_JMP(pTaskInfo->env, code1);
        }
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
      } else {
        // expand the result into multiple rows. E.g., _wstart, top(k, 20)
        // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
        for (int32_t k = 0; k < pRow->numOfRows; ++k) {
1159
          colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
5
54liuyao 已提交
1160 1161 1162 1163
        }
      }
    }

1164
    pBlock->info.dataLoad = 1;
5
54liuyao 已提交
1165
    pBlock->info.rows += pRow->numOfRows;
1166
    releaseOutputBuf(pState, NULL, pRow, &pAPI->stateStore);
5
54liuyao 已提交
1167 1168 1169
  }
  blockDataUpdateTsWindow(pBlock, 0);
  return TSDB_CODE_SUCCESS;
1170
}
L
Liu Jicong 已提交
1171 1172

void qStreamCloseTsdbReader(void* task) {
D
dapan1121 已提交
1173 1174 1175 1176
  if (task == NULL) {
    return;
  }

L
Liu Jicong 已提交
1177 1178
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)task;
  SOperatorInfo* pOp = pTaskInfo->pRoot;
D
dapan1121 已提交
1179

1180 1181
  qDebug("stream close tsdb reader, reset status uid:%" PRId64 " ts:%" PRId64, pTaskInfo->streamInfo.currentOffset.uid,
         pTaskInfo->streamInfo.currentOffset.ts);
D
dapan1121 已提交
1182 1183

  // todo refactor, other thread may already use this read to extract data.
1184
  pTaskInfo->streamInfo.currentOffset = (STqOffsetVal){0};
L
Liu Jicong 已提交
1185 1186 1187 1188 1189 1190
  while (pOp->numOfDownstream == 1 && pOp->pDownstream[0]) {
    SOperatorInfo* pDownstreamOp = pOp->pDownstream[0];
    if (pDownstreamOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
      SStreamScanInfo* pInfo = pDownstreamOp->info;
      if (pInfo->pTableScanOp) {
        STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
D
dapan1121 已提交
1191 1192

        setOperatorCompleted(pInfo->pTableScanOp);
X
Xiaoyu Wang 已提交
1193
        while (pTaskInfo->owner != 0) {
D
dapan1121 已提交
1194 1195 1196 1197
          taosMsleep(100);
          qDebug("wait for the reader stopping");
        }

1198
        pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pTSInfo->base.dataReader);
L
Liu Jicong 已提交
1199
        pTSInfo->base.dataReader = NULL;
D
dapan1121 已提交
1200 1201 1202 1203

        // restore the status, todo refactor.
        pInfo->pTableScanOp->status = OP_OPENED;
        pTaskInfo->status = TASK_NOT_COMPLETED;
L
Liu Jicong 已提交
1204 1205 1206 1207 1208
        return;
      }
    }
  }
}