executorInt.c 43.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

H
Haojun Liao 已提交
16 17
#include "filter.h"
#include "function.h"
18 19
#include "functionMgt.h"
#include "os.h"
H
Haojun Liao 已提交
20
#include "querynodes.h"
21
#include "tfill.h"
dengyihao's avatar
dengyihao 已提交
22
#include "tname.h"
23

H
Haojun Liao 已提交
24
#include "tdatablock.h"
H
Haojun Liao 已提交
25
#include "tmsg.h"
26
#include "ttime.h"
H
Haojun Liao 已提交
27

28
#include "executorInt.h"
dengyihao's avatar
dengyihao 已提交
29
#include "index.h"
30
#include "operator.h"
31
#include "query.h"
32
#include "querytask.h"
33
#include "tcompare.h"
H
Haojun Liao 已提交
34
#include "thash.h"
35
#include "ttypes.h"
dengyihao's avatar
dengyihao 已提交
36
#include "vnode.h"
37

X
Xiaoyu Wang 已提交
38
#define SET_REVERSE_SCAN_FLAG(runtime)    ((runtime)->scanFlag = REVERSE_SCAN)
39 40 41 42
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)

#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
wafwerar's avatar
wafwerar 已提交
43
  uint32_t v = taosRand();
44 45 46 47

  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
48
    return taosMemoryMalloc(__size);
49 50 51 52
  }
}

static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
wafwerar's avatar
wafwerar 已提交
53
  uint32_t v = taosRand();
54 55 56
  if (v % 1000 <= 0) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
57
    return taosMemoryCalloc(num, __size);
58 59 60 61
  }
}

static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
wafwerar's avatar
wafwerar 已提交
62
  uint32_t v = taosRand();
63 64 65
  if (v % 5 <= 1) {
    return NULL;
  } else {
wafwerar's avatar
wafwerar 已提交
66
    return taosMemoryRealloc(p, __size);
67 68 69 70 71 72 73 74
  }
}

#define calloc  u_calloc
#define malloc  u_malloc
#define realloc u_realloc
#endif

75
static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock);
76

X
Xiaoyu Wang 已提交
77 78
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
79

H
Haojun Liao 已提交
80 81 82 83
static void    extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep,
                                                   int32_t status);
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
                                   bool createDummyCol);
H
Haojun Liao 已提交
84
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
85
                                  SGroupResInfo* pGroupResInfo, int32_t threshold);
86

87
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
L
Liu Jicong 已提交
88
  SFilePage* pData = NULL;
89 90 91

  // in the first scan, new space needed for results
  int32_t pageId = -1;
92
  if (*currentPageId == -1) {
93
    pData = getNewBufPage(pResultBuf, &pageId);
94 95
    pData->num = sizeof(SFilePage);
  } else {
96
    pData = getBufPage(pResultBuf, *currentPageId);
97 98 99 100 101
    if (pData == NULL) {
      qError("failed to get buffer, code:%s", tstrerror(terrno));
      return NULL;
    }

102
    pageId = *currentPageId;
103

wmmhello's avatar
wmmhello 已提交
104
    if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
105
      // release current page first, and prepare the next one
106
      releaseBufPage(pResultBuf, pData);
107

108
      pData = getNewBufPage(pResultBuf, &pageId);
109 110 111 112 113 114 115 116 117 118
      if (pData != NULL) {
        pData->num = sizeof(SFilePage);
      }
    }
  }

  if (pData == NULL) {
    return NULL;
  }

119 120
  setBufPageDirty(pData, true);

121 122
  // set the number of rows in current disk page
  SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
123

X
Xiaoyu Wang 已提交
124
  memset((char*)pResultRow, 0, interBufSize);
125 126 127
  pResultRow->pageId = pageId;
  pResultRow->offset = (int32_t)pData->num;

128
  *currentPageId = pageId;
wmmhello's avatar
wmmhello 已提交
129
  pData->num += interBufSize;
130 131 132
  return pResultRow;
}

133 134 135 136 137 138 139
/**
 * the struct of key in hash table
 * +----------+---------------+
 * | group id |   key data    |
 * | 8 bytes  | actual length |
 * +----------+---------------+
 */
140 141
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
                                   int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
D
dapan1121 已提交
142
                                   bool isIntervalQuery, SAggSupporter* pSup, bool keepGroup) {
143
  SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
D
dapan1121 已提交
144 145 146
  if (!keepGroup) {
    *(uint64_t*)pSup->keyBuf = calcGroupId(pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
  }
H
Haojun Liao 已提交
147

dengyihao's avatar
dengyihao 已提交
148
  SResultRowPosition* p1 =
149
      (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
H
Haojun Liao 已提交
150

151 152
  SResultRow* pResult = NULL;

H
Haojun Liao 已提交
153 154
  // in case of repeat scan/reverse scan, no new time window added.
  if (isIntervalQuery) {
155
    if (p1 != NULL) {  // the *p1 may be NULL in case of sliding+offset exists.
156
      pResult = getResultRowByPos(pResultBuf, p1, true);
157 158 159 160
      if (NULL == pResult) {
        T_LONG_JMP(pTaskInfo->env, terrno);
      }

161
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
162 163
    }
  } else {
dengyihao's avatar
dengyihao 已提交
164 165
    // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
    // pResultRowInfo object.
H
Haojun Liao 已提交
166
    if (p1 != NULL) {
167
      // todo
168
      pResult = getResultRowByPos(pResultBuf, p1, true);
169 170 171 172
      if (NULL == pResult) {
        T_LONG_JMP(pTaskInfo->env, terrno);
      }

173
      ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
H
Haojun Liao 已提交
174 175 176
    }
  }

L
Liu Jicong 已提交
177
  // 1. close current opened time window
178
  if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
179
    SResultRowPosition pos = pResultRowInfo->cur;
X
Xiaoyu Wang 已提交
180
    SFilePage*         pPage = getBufPage(pResultBuf, pos.pageId);
181 182 183 184
    if (pPage == NULL) {
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
      T_LONG_JMP(pTaskInfo->env, terrno);
    }
185 186 187 188 189
    releaseBufPage(pResultBuf, pPage);
  }

  // allocate a new buffer page
  if (pResult == NULL) {
190
    pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
191 192 193
    if (pResult == NULL) {
      T_LONG_JMP(pTaskInfo->env, terrno);
    }
194

195 196
    // add a new result set for a new group
    SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
197
    tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
L
Liu Jicong 已提交
198
                   sizeof(SResultRowPosition));
H
Haojun Liao 已提交
199 200
  }

201 202 203
  // 2. set the new time window to be the new active time window
  pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};

H
Haojun Liao 已提交
204
  // too many time window in query
205
  if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
206
      tSimpleHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
207
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
H
Haojun Liao 已提交
208 209
  }

H
Haojun Liao 已提交
210
  return pResult;
H
Haojun Liao 已提交
211 212
}

213
//  query_range_start, query_range_end, window_duration, window_start, window_end
214
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
215 216 217
  pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
  pColData->info.bytes = sizeof(int64_t);

H
Haojun Liao 已提交
218
  colInfoDataEnsureCapacity(pColData, 5, false);
219 220
  colDataSetInt64(pColData, 0, &pQueryWindow->skey);
  colDataSetInt64(pColData, 1, &pQueryWindow->ekey);
221 222

  int64_t interval = 0;
223 224 225
  colDataSetInt64(pColData, 2, &interval);  // this value may be variable in case of 'n' and 'y'.
  colDataSetInt64(pColData, 3, &pQueryWindow->skey);
  colDataSetInt64(pColData, 4, &pQueryWindow->ekey);
226 227
}

G
Ganlin Zhao 已提交
228
static void doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
229 230
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
231
    pCtx[i].order = order;
232
    pCtx[i].input.numOfRows = pBlock->info.rows;
233
    setBlockSMAInfo(&pCtx[i], &pExprSup->pExprInfo[i], pBlock);
234
    pCtx[i].pSrcBlock = pBlock;
G
Ganlin Zhao 已提交
235
    pCtx[i].scanFlag = scanFlag;
236 237 238
  }
}

239
void setInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol) {
240
  if (pBlock->pBlockAgg != NULL) {
G
Ganlin Zhao 已提交
241
    doSetInputDataBlockInfo(pExprSup, pBlock, order, scanFlag);
242
  } else {
243
    doSetInputDataBlock(pExprSup, pBlock, order, scanFlag, createDummyCol);
H
Haojun Liao 已提交
244
  }
245 246
}

L
Liu Jicong 已提交
247 248
static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t paramIndex,
                                             int32_t numOfRows) {
249 250 251 252 253 254 255 256
  SColumnInfoData* pColInfo = NULL;
  if (pInput->pData[paramIndex] == NULL) {
    pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pColInfo == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

    // Set the correct column info (data type and bytes)
257 258
    pColInfo->info.type = pFuncParam->param.nType;
    pColInfo->info.bytes = pFuncParam->param.nLen;
259 260

    pInput->pData[paramIndex] = pColInfo;
261 262
  } else {
    pColInfo = pInput->pData[paramIndex];
263 264
  }

H
Haojun Liao 已提交
265
  colInfoDataEnsureCapacity(pColInfo, numOfRows, false);
266

267
  int8_t type = pFuncParam->param.nType;
268 269
  if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
    int64_t v = pFuncParam->param.i;
dengyihao's avatar
dengyihao 已提交
270
    for (int32_t i = 0; i < numOfRows; ++i) {
271
      colDataSetInt64(pColInfo, i, &v);
272 273 274
    }
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
dengyihao's avatar
dengyihao 已提交
275
    for (int32_t i = 0; i < numOfRows; ++i) {
276
      colDataSetDouble(pColInfo, i, &v);
277
    }
278
  } else if (type == TSDB_DATA_TYPE_VARCHAR) {
L
Liu Jicong 已提交
279
    char* tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE);
280
    STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen);
L
Liu Jicong 已提交
281
    for (int32_t i = 0; i < numOfRows; ++i) {
282
      colDataSetVal(pColInfo, i, tmp, false);
283
    }
H
Haojun Liao 已提交
284
    taosMemoryFree(tmp);
285 286 287 288 289
  }

  return TSDB_CODE_SUCCESS;
}

290 291
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
                                   bool createDummyCol) {
292
  int32_t         code = TSDB_CODE_SUCCESS;
293
  SqlFunctionCtx* pCtx = pExprSup->pCtx;
294

295
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
L
Liu Jicong 已提交
296
    pCtx[i].order = order;
297 298
    pCtx[i].input.numOfRows = pBlock->info.rows;

L
Liu Jicong 已提交
299
    pCtx[i].pSrcBlock = pBlock;
X
Xiaoyu Wang 已提交
300
    pCtx[i].scanFlag = scanFlag;
H
Haojun Liao 已提交
301

302
    SInputColumnInfoData* pInput = &pCtx[i].input;
H
Haojun Liao 已提交
303
    pInput->uid = pBlock->info.id.uid;
H
Haojun Liao 已提交
304
    pInput->colDataSMAIsSet = false;
305

306
    SExprInfo* pOneExpr = &pExprSup->pExprInfo[i];
307
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
dengyihao's avatar
dengyihao 已提交
308
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
G
Ganlin Zhao 已提交
309 310
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
dengyihao's avatar
dengyihao 已提交
311
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
312 313 314
        pInput->totalRows = pBlock->info.rows;
        pInput->numOfRows = pBlock->info.rows;
        pInput->startRowIndex = 0;
315

316
        // NOTE: the last parameter is the primary timestamp column
H
Haojun Liao 已提交
317
        // todo: refactor this
318
        if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
L
Liu Jicong 已提交
319
          pInput->pPTS = pInput->pData[j];  // in case of merge function, this is not always the ts column data.
320
        }
321 322
        ASSERT(pInput->pData[j] != NULL);
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
323 324 325
        // todo avoid case: top(k, 12), 12 is the value parameter.
        // sum(11), 11 is also the value parameter.
        if (createDummyCol && pOneExpr->base.numOfParams == 1) {
326 327 328 329
          pInput->totalRows = pBlock->info.rows;
          pInput->numOfRows = pBlock->info.rows;
          pInput->startRowIndex = 0;

330
          code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows);
331 332 333
          if (code != TSDB_CODE_SUCCESS) {
            return code;
          }
334
        }
G
Ganlin Zhao 已提交
335 336
      }
    }
H
Haojun Liao 已提交
337
  }
338 339

  return code;
H
Haojun Liao 已提交
340 341
}

5
54liuyao 已提交
342
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
343
  struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
344

345 346 347 348 349
  // in case of timestamp column, always generated results.
  int32_t functionId = pCtx->functionId;
  if (functionId == -1) {
    return false;
  }
350

G
Ganlin Zhao 已提交
351
  if (pCtx->scanFlag == PRE_SCAN) {
352
    return fmIsRepeatScanFunc(pCtx->functionId);
353 354
  }

355 356
  if (isRowEntryCompleted(pResInfo)) {
    return false;
357 358
  }

359 360 361
  return true;
}

H
Haojun Liao 已提交
362
static int32_t doCreateConstantValColumnSMAInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type,
363 364 365 366 367 368
                                                int32_t paramIndex, int32_t numOfRows) {
  if (pInput->pData[paramIndex] == NULL) {
    pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData));
    if (pInput->pData[paramIndex] == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
369

370 371 372
    // Set the correct column info (data type and bytes)
    pInput->pData[paramIndex]->info.type = type;
    pInput->pData[paramIndex]->info.bytes = tDataTypes[type].bytes;
373
  }
H
Haojun Liao 已提交
374

375 376 377 378 379 380
  SColumnDataAgg* da = NULL;
  if (pInput->pColumnDataAgg[paramIndex] == NULL) {
    da = taosMemoryCalloc(1, sizeof(SColumnDataAgg));
    pInput->pColumnDataAgg[paramIndex] = da;
    if (da == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
381 382
    }
  } else {
383
    da = pInput->pColumnDataAgg[paramIndex];
384 385
  }

386 387
  if (type == TSDB_DATA_TYPE_BIGINT) {
    int64_t v = pFuncParam->param.i;
388
    *da = (SColumnDataAgg){.numOfNull = 0, .min = v, .max = v, .sum = v * numOfRows};
389 390
  } else if (type == TSDB_DATA_TYPE_DOUBLE) {
    double v = pFuncParam->param.d;
391
    *da = (SColumnDataAgg){.numOfNull = 0};
392

393 394 395 396 397 398
    *(double*)&da->min = v;
    *(double*)&da->max = v;
    *(double*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_BOOL) {  // todo validate this data type
    bool v = pFuncParam->param.i;

399
    *da = (SColumnDataAgg){.numOfNull = 0};
400 401 402 403 404
    *(bool*)&da->min = 0;
    *(bool*)&da->max = v;
    *(bool*)&da->sum = v * numOfRows;
  } else if (type == TSDB_DATA_TYPE_TIMESTAMP) {
    // do nothing
405
  } else {
H
Haojun Liao 已提交
406
    qError("invalid constant type for sma info");
407 408
  }

409 410
  return TSDB_CODE_SUCCESS;
}
411

412
void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock) {
413 414 415 416 417 418 419
  int32_t numOfRows = pBlock->info.rows;

  SInputColumnInfoData* pInput = &pCtx->input;
  pInput->numOfRows = numOfRows;
  pInput->totalRows = numOfRows;

  if (pBlock->pBlockAgg != NULL) {
H
Haojun Liao 已提交
420
    pInput->colDataSMAIsSet = true;
421

422 423
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
424

425 426
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
        int32_t slotId = pFuncParam->pCol->slotId;
427 428
        pInput->pColumnDataAgg[j] = pBlock->pBlockAgg[slotId];
        if (pInput->pColumnDataAgg[j] == NULL) {
H
Haojun Liao 已提交
429
          pInput->colDataSMAIsSet = false;
430
        }
431 432 433 434

        // Here we set the column info data since the data type for each column data is required, but
        // the data in the corresponding SColumnInfoData will not be used.
        pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
435
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
H
Haojun Liao 已提交
436
        doCreateConstantValColumnSMAInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
437 438
      }
    }
439
  } else {
H
Haojun Liao 已提交
440
    pInput->colDataSMAIsSet = false;
441 442 443 444
  }
}

/////////////////////////////////////////////////////////////////////////////////////////////
445
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) {
L
Liu Jicong 已提交
446
  STimeWindow win = {0};
447
  win.skey = taosTimeTruncate(key, pInterval, precision);
448 449

  /*
H
Haojun Liao 已提交
450
   * if the realSkey > INT64_MAX - pInterval->interval, the query duration between
451 452
   * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
   */
453 454 455
  win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
  if (win.ekey < win.skey) {
    win.ekey = INT64_MAX;
456
  }
457 458

  return win;
459 460
}

461
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
5
54liuyao 已提交
462
  bool init = false;
463
  for (int32_t i = 0; i < numOfOutput; ++i) {
464
    pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
5
54liuyao 已提交
465 466 467
    if (init) {
      continue;
    }
468 469 470 471 472

    struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
    if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
      continue;
    }
473

474
    if (pCtx[i].isPseudoFunc) {
475 476 477
      continue;
    }

478 479 480 481 482 483
    if (!pResInfo->initialized) {
      if (pCtx[i].functionId != -1) {
        pCtx[i].fpSet.init(&pCtx[i], pResInfo);
      } else {
        pResInfo->initialized = true;
      }
5
54liuyao 已提交
484 485
    } else {
      init = true;
486 487 488 489
    }
  }
}

490 491 492 493 494 495 496 497 498 499 500 501 502 503
void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
    if (pResInfo == NULL) {
      continue;
    }

    pResInfo->initialized = false;
    pResInfo->numOfRes = 0;
    pResInfo->isNullRes = 0;
    pResInfo->complete = false;
  }
}

H
Haojun Liao 已提交
504 505
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) {
  if (pFilterInfo == NULL || pBlock->info.rows == 0) {
S
shenglian zhou 已提交
506 507
    return;
  }
508

509
  SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
510
  int32_t            code = filterSetDataFromSlotId(pFilterInfo, &param1);
511

512
  SColumnInfoData* p = NULL;
513
  int32_t          status = 0;
H
Haojun Liao 已提交
514

515
  // todo the keep seems never to be True??
H
Haojun Liao 已提交
516
  bool keep = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
517
  extractQualifiedTupleByFilterResult(pBlock, p, keep, status);
H
Haojun Liao 已提交
518

519
  if (pColMatchInfo != NULL) {
520
    size_t size = taosArrayGetSize(pColMatchInfo->pList);
H
Haojun Liao 已提交
521
    for (int32_t i = 0; i < size; ++i) {
H
Haojun Liao 已提交
522
      SColMatchItem* pInfo = taosArrayGet(pColMatchInfo->pList, i);
523
      if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
524
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->dstSlotId);
525
        if (pColData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
H
Haojun Liao 已提交
526
          blockDataUpdateTsWindow(pBlock, pInfo->dstSlotId);
527 528 529 530 531 532
          break;
        }
      }
    }
  }

533 534
  colDataDestroy(p);
  taosMemoryFree(p);
535 536
}

537
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status) {
538 539 540 541
  if (keep) {
    return;
  }

H
Haojun Liao 已提交
542
  int8_t* pIndicator = (int8_t*)p->pData;
H
Haojun Liao 已提交
543 544 545
  int32_t totalRows = pBlock->info.rows;

  if (status == FILTER_RESULT_ALL_QUALIFIED) {
546
    // here nothing needs to be done
H
Haojun Liao 已提交
547
  } else if (status == FILTER_RESULT_NONE_QUALIFIED) {
548
    pBlock->info.rows = 0;
H
Haojun Liao 已提交
549
  } else {
H
Haojun Liao 已提交
550
    int32_t bmLen = BitmapLen(totalRows);
H
Haojun Liao 已提交
551 552
    char*   pBitmap = NULL;
    int32_t maxRows = 0;
553

554 555
    size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
    for (int32_t i = 0; i < numOfCols; ++i) {
556
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
557
      // it is a reserved column for scalar function, and no data in this column yet.
H
Haojun Liao 已提交
558
      if (pDst->pData == NULL) {
559 560 561
        continue;
      }

562
      int32_t numOfRows = 0;
H
Haojun Liao 已提交
563 564
      if (IS_VAR_DATA_TYPE(pDst->info.type)) {
        int32_t j = 0;
565 566
        pDst->varmeta.length = 0;

X
Xiaoyu Wang 已提交
567
        while (j < totalRows) {
H
Haojun Liao 已提交
568
          if (pIndicator[j] == 0) {
569
            j += 1;
H
Haojun Liao 已提交
570 571
            continue;
          }
572

H
Haojun Liao 已提交
573 574 575 576
          if (colDataIsNull_var(pDst, j)) {
            colDataSetNull_var(pDst, numOfRows);
          } else {
            char* p1 = colDataGetVarData(pDst, j);
577
            colDataSetVal(pDst, numOfRows, p1, false);
H
Haojun Liao 已提交
578
          }
H
Haojun Liao 已提交
579 580
          numOfRows += 1;
          j += 1;
D
dapan1121 已提交
581
        }
582

H
Haojun Liao 已提交
583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602
        if (maxRows < numOfRows) {
          maxRows = numOfRows;
        }
      } else {
        if (pBitmap == NULL) {
          pBitmap = taosMemoryCalloc(1, bmLen);
        }

        memcpy(pBitmap, pDst->nullbitmap, bmLen);
        memset(pDst->nullbitmap, 0, bmLen);

        int32_t j = 0;

        switch (pDst->info.type) {
          case TSDB_DATA_TYPE_BIGINT:
          case TSDB_DATA_TYPE_UBIGINT:
          case TSDB_DATA_TYPE_DOUBLE:
          case TSDB_DATA_TYPE_TIMESTAMP:
            while (j < totalRows) {
              if (pIndicator[j] == 0) {
H
Haojun Liao 已提交
603
                j += 1;
H
Haojun Liao 已提交
604 605 606 607 608 609 610
                continue;
              }

              if (colDataIsNull_f(pBitmap, j)) {
                colDataSetNull_f(pDst->nullbitmap, numOfRows);
              } else {
                ((int64_t*)pDst->pData)[numOfRows] = ((int64_t*)pDst->pData)[j];
H
Haojun Liao 已提交
611
              }
H
Haojun Liao 已提交
612 613 614 615 616 617 618 619 620
              numOfRows += 1;
              j += 1;
            }
            break;
          case TSDB_DATA_TYPE_FLOAT:
          case TSDB_DATA_TYPE_INT:
          case TSDB_DATA_TYPE_UINT:
            while (j < totalRows) {
              if (pIndicator[j] == 0) {
H
Haojun Liao 已提交
621
                j += 1;
H
Haojun Liao 已提交
622
                continue;
H
Haojun Liao 已提交
623
              }
H
Haojun Liao 已提交
624 625 626
              if (colDataIsNull_f(pBitmap, j)) {
                colDataSetNull_f(pDst->nullbitmap, numOfRows);
              } else {
H
Haojun Liao 已提交
627
                ((int32_t*)pDst->pData)[numOfRows] = ((int32_t*)pDst->pData)[j];
H
Haojun Liao 已提交
628
              }
H
Haojun Liao 已提交
629 630 631 632 633 634 635 636
              numOfRows += 1;
              j += 1;
            }
            break;
          case TSDB_DATA_TYPE_SMALLINT:
          case TSDB_DATA_TYPE_USMALLINT:
            while (j < totalRows) {
              if (pIndicator[j] == 0) {
H
Haojun Liao 已提交
637
                j += 1;
H
Haojun Liao 已提交
638
                continue;
H
Haojun Liao 已提交
639
              }
H
Haojun Liao 已提交
640 641 642
              if (colDataIsNull_f(pBitmap, j)) {
                colDataSetNull_f(pDst->nullbitmap, numOfRows);
              } else {
H
Haojun Liao 已提交
643
                ((int16_t*)pDst->pData)[numOfRows] = ((int16_t*)pDst->pData)[j];
H
Haojun Liao 已提交
644
              }
H
Haojun Liao 已提交
645 646 647 648 649 650 651 652 653
              numOfRows += 1;
              j += 1;
            }
            break;
          case TSDB_DATA_TYPE_BOOL:
          case TSDB_DATA_TYPE_TINYINT:
          case TSDB_DATA_TYPE_UTINYINT:
            while (j < totalRows) {
              if (pIndicator[j] == 0) {
H
Haojun Liao 已提交
654
                j += 1;
H
Haojun Liao 已提交
655
                continue;
H
Haojun Liao 已提交
656
              }
H
Haojun Liao 已提交
657 658 659 660 661 662 663 664 665
              if (colDataIsNull_f(pBitmap, j)) {
                colDataSetNull_f(pDst->nullbitmap, numOfRows);
              } else {
                ((int8_t*)pDst->pData)[numOfRows] = ((int8_t*)pDst->pData)[j];
              }
              numOfRows += 1;
              j += 1;
            }
            break;
D
dapan1121 已提交
666
        }
H
Haojun Liao 已提交
667
      }
668

H
Haojun Liao 已提交
669 670
      if (maxRows < numOfRows) {
        maxRows = numOfRows;
671
      }
672
    }
673

H
Haojun Liao 已提交
674
    pBlock->info.rows = maxRows;
H
Haojun Liao 已提交
675 676 677
    if (pBitmap != NULL) {
      taosMemoryFree(pBitmap);
    }
678 679 680
  }
}

H
Haojun Liao 已提交
681
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset) {
682
  bool returnNotNull = false;
683
  for (int32_t j = 0; j < numOfExprs; ++j) {
684
    SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
685 686
    if (!isRowEntryInitialized(pResInfo)) {
      continue;
dengyihao's avatar
dengyihao 已提交
687
    } else {
688 689 690 691 692
    }

    if (pRow->numOfRows < pResInfo->numOfRes) {
      pRow->numOfRows = pResInfo->numOfRes;
    }
693

694
    if (pCtx[j].isNotNullFunc) {
695 696
      returnNotNull = true;
    }
697
  }
S
shenglian zhou 已提交
698 699
  // if all expr skips all blocks, e.g. all null inputs for max function, output one row in final result.
  //  except for first/last, which require not null output, output no rows
700
  if (pRow->numOfRows == 0 && !returnNotNull) {
701
    pRow->numOfRows = 1;
702 703 704
  }
}

H
Haojun Liao 已提交
705 706
void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
                              SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) {
707 708 709
  for (int32_t j = 0; j < numOfExprs; ++j) {
    int32_t slotId = pExprInfo[j].base.resSchema.slotId;

710
    pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
711
    if (pCtx[j].fpSet.finalize) {
712
      if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_key") == 0) {
713 714
        // for groupkey along with functions that output multiple lines(e.g. Histogram)
        // need to match groupkey result for each output row of that function.
715 716 717 718 719
        if (pCtx[j].resultInfo->numOfRes != 0) {
          pCtx[j].resultInfo->numOfRes = pRow->numOfRows;
        }
      }

720 721 722
      int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
      if (TAOS_FAILED(code)) {
        qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
723
        T_LONG_JMP(pTaskInfo->env, code);
724 725
      }
    } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
726
      // do nothing
727
    } else {
728 729
      // expand the result into multiple rows. E.g., _wstart, top(k, 20)
      // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
730 731 732
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
      char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
      for (int32_t k = 0; k < pRow->numOfRows; ++k) {
733
        colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
734 735 736
      }
    }
  }
737 738
}

739 740 741
// todo refactor. SResultRow has direct pointer in miainfo
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
                           SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
X
Xiaoyu Wang 已提交
742
  SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId);
743 744 745 746 747
  if (page == NULL) {
    qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
    T_LONG_JMP(pTaskInfo->env, terrno);
  }

748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771
  SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);

  SqlFunctionCtx* pCtx = pSup->pCtx;
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  const int32_t*  rowEntryOffset = pSup->rowEntryInfoOffset;

  doUpdateNumOfRows(pCtx, pRow, pSup->numOfExprs, rowEntryOffset);
  if (pRow->numOfRows == 0) {
    releaseBufPage(pBuf, page);
    return 0;
  }

  int32_t size = pBlock->info.capacity;
  while (pBlock->info.rows + pRow->numOfRows > size) {
    size = size * 1.25;
  }

  int32_t code = blockDataEnsureCapacity(pBlock, size);
  if (TAOS_FAILED(code)) {
    releaseBufPage(pBuf, page);
    qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
    T_LONG_JMP(pTaskInfo->env, code);
  }

H
Haojun Liao 已提交
772
  copyResultrowToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
773 774

  releaseBufPage(pBuf, page);
775
  pBlock->info.rows += pRow->numOfRows;
776 777 778
  return 0;
}

779
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
780
                           SGroupResInfo* pGroupResInfo, int32_t threshold) {
781 782 783 784 785
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

786
  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
787

788
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
L
Liu Jicong 已提交
789 790
    SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
    SFilePage*  page = getBufPage(pBuf, pPos->pos.pageId);
791 792 793 794
    if (page == NULL) {
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
      T_LONG_JMP(pTaskInfo->env, terrno);
    }
795

796
    SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
797

H
Haojun Liao 已提交
798
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
799 800

    // no results, continue to check the next one
801 802
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
803
      releaseBufPage(pBuf, page);
804 805 806
      continue;
    }

H
Haojun Liao 已提交
807 808
    if (pBlock->info.id.groupId == 0) {
      pBlock->info.id.groupId = pPos->groupId;
809 810
    } else {
      // current value belongs to different group, it can't be packed into one datablock
H
Haojun Liao 已提交
811
      if (pBlock->info.id.groupId != pPos->groupId) {
812
        releaseBufPage(pBuf, page);
813 814 815 816
        break;
      }
    }

817
    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
818
      blockDataEnsureCapacity(pBlock, pBlock->info.rows + pRow->numOfRows);
D
dapan1121 已提交
819
      qDebug("datablock capacity not sufficient, expand to required:%" PRId64 ", current capacity:%d, %s",
dengyihao's avatar
dengyihao 已提交
820 821
             (pRow->numOfRows + pBlock->info.rows), pBlock->info.capacity, GET_TASKID(pTaskInfo));
      // todo set the pOperator->resultInfo size
822 823 824
    }

    pGroupResInfo->index += 1;
H
Haojun Liao 已提交
825
    copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
826

827
    releaseBufPage(pBuf, page);
828
    pBlock->info.rows += pRow->numOfRows;
829 830 831
    if (pBlock->info.rows >= threshold) {
      break;
    }
832 833
  }

D
dapan1121 已提交
834
  qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
H
Haojun Liao 已提交
835
         pBlock->info.id.groupId);
H
Haojun Liao 已提交
836
  pBlock->info.dataLoad = 1;
837
  blockDataUpdateTsWindow(pBlock, 0);
838 839 840
  return 0;
}

841 842 843 844 845 846 847 848 849 850 851 852 853 854
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                           SDiskbasedBuf* pBuf) {
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SSDataBlock*   pBlock = pbInfo->pRes;

  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

  blockDataCleanup(pBlock);
  if (!hasRemainResults(pGroupResInfo)) {
    return;
  }

  // clear the existed group id
H
Haojun Liao 已提交
855
  pBlock->info.id.groupId = 0;
856
  ASSERT(!pbInfo->mergeResultBlock);
857
  doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold);
858

859
  void* tbname = NULL;
H
Haojun Liao 已提交
860
  if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
861 862 863
    pBlock->info.parTbName[0] = 0;
  } else {
    memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
864
  }
865
  tdbFree(tbname);
866 867
}

X
Xiaoyu Wang 已提交
868 869
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
                            SDiskbasedBuf* pBuf) {
870
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
871
  SSDataBlock*   pBlock = pbInfo->pRes;
872

873 874 875
  // set output datablock version
  pBlock->info.version = pTaskInfo->version;

876
  blockDataCleanup(pBlock);
877
  if (!hasRemainResults(pGroupResInfo)) {
878 879 880
    return;
  }

881
  // clear the existed group id
H
Haojun Liao 已提交
882
  pBlock->info.id.groupId = 0;
883
  if (!pbInfo->mergeResultBlock) {
884
    doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold);
885
  } else {
dengyihao's avatar
dengyihao 已提交
886
    while (hasRemainResults(pGroupResInfo)) {
887
      doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold);
888 889
      if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
        break;
890 891
      }

892
      // clearing group id to continue to merge data that belong to different groups
H
Haojun Liao 已提交
893
      pBlock->info.id.groupId = 0;
894
    }
895 896

    // clear the group id info in SSDataBlock, since the client does not need it
H
Haojun Liao 已提交
897
    pBlock->info.id.groupId = 0;
898 899 900
  }
}

901
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
C
Cary Xu 已提交
902 903 904 905 906
  for (int32_t i = 0; i < numOfExprs; ++i) {
    SExprInfo* pExprInfo = &pExpr[i];
    for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
      if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
        taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
5
54liuyao 已提交
907 908
      } else if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_VALUE) {
        taosVariantDestroy(&pExprInfo->base.pParam[j].param);
H
Haojun Liao 已提交
909
      }
910
    }
C
Cary Xu 已提交
911 912 913

    taosMemoryFree(pExprInfo->base.pParam);
    taosMemoryFree(pExprInfo->pExpr);
H
Haojun Liao 已提交
914 915 916
  }
}

917 918 919 920 921 922
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) {
  *defaultPgsz = 4096;
  while (*defaultPgsz < rowSize * 4) {
    *defaultPgsz <<= 1u;
  }

923
  // The default buffer for each operator in query is 10MB.
924
  // at least four pages need to be in buffer
925 926
  // TODO: make this variable to be configurable.
  *defaultBufsz = 4096 * 2560;
927 928 929 930 931 932 933
  if ((*defaultBufsz) <= (*defaultPgsz)) {
    (*defaultBufsz) = (*defaultPgsz) * 4;
  }

  return 0;
}

L
Liu Jicong 已提交
934
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) {
H
Haojun Liao 已提交
935 936 937 938
  if (numOfRows == 0) {
    numOfRows = 4096;
  }

939 940
  pResultInfo->capacity = numOfRows;
  pResultInfo->threshold = numOfRows * 0.75;
941

942 943
  if (pResultInfo->threshold == 0) {
    pResultInfo->threshold = numOfRows;
944 945 946
  }
}

947 948 949 950 951
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
  pInfo->pRes = pBlock;
  initResultRowInfo(&pInfo->resultRowInfo);
}

952
static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
953 954 955 956 957 958 959 960 961 962
  if (pCtx == NULL) {
    return NULL;
  }

  for (int32_t i = 0; i < numOfOutput; ++i) {
    for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) {
      taosVariantDestroy(&pCtx[i].param[j].param);
    }

    taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
963
    taosMemoryFreeClear(pCtx[i].subsidiaries.buf);
964 965
    taosMemoryFree(pCtx[i].input.pData);
    taosMemoryFree(pCtx[i].input.pColumnDataAgg);
966 967 968 969

    if (pCtx[i].udfName != NULL) {
      taosMemoryFree(pCtx[i].udfName);
    }
970 971 972 973 974 975
  }

  taosMemoryFreeClear(pCtx);
  return NULL;
}

976
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) {
977 978 979 980
  pSup->pExprInfo = pExprInfo;
  pSup->numOfExprs = numOfExpr;
  if (pSup->pExprInfo != NULL) {
    pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset);
981 982 983
    if (pSup->pCtx == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
984
  }
985 986

  return TSDB_CODE_SUCCESS;
987 988
}

989 990 991 992
void cleanupExprSupp(SExprSupp* pSupp) {
  destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs);
  if (pSupp->pExprInfo != NULL) {
    destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
C
Cary Xu 已提交
993
    taosMemoryFreeClear(pSupp->pExprInfo);
994
  }
H
Haojun Liao 已提交
995 996 997 998 999 1000

  if (pSupp->pFilterInfo != NULL) {
    filterFreeInfo(pSupp->pFilterInfo);
    pSupp->pFilterInfo = NULL;
  }

1001 1002 1003
  taosMemoryFree(pSupp->rowEntryInfoOffset);
}

X
Xiaoyu Wang 已提交
1004
void cleanupBasicInfo(SOptrBasicInfo* pInfo) { pInfo->pRes = blockDataDestroy(pInfo->pRes); }
1005

1006
bool groupbyTbname(SNodeList* pGroupList) {
1007
  bool bytbname = false;
1008
  if (LIST_LENGTH(pGroupList) == 1) {
1009 1010 1011 1012 1013 1014 1015 1016 1017 1018
    SNode* p = nodesListGetNode(pGroupList, 0);
    if (p->type == QUERY_NODE_FUNCTION) {
      // partition by tbname/group by tbname
      bytbname = (strcmp(((struct SFunctionNode*)p)->functionName, "tbname") == 0);
    }
  }

  return bytbname;
}

1019
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo* pTask, SReadHandle* readHandle) {
D
dapan1121 已提交
1020
  switch (pNode->type) {
D
dapan1121 已提交
1021 1022 1023 1024 1025 1026
    case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
      SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
      if (NULL == pInserterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      pInserterParam->readHandle = readHandle;
L
Liu Jicong 已提交
1027

D
dapan1121 已提交
1028 1029 1030
      *pParam = pInserterParam;
      break;
    }
D
dapan1121 已提交
1031
    case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
1032
      SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
D
dapan1121 已提交
1033 1034 1035
      if (NULL == pDeleterParam) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
1036

X
Xiaoyu Wang 已提交
1037
      SArray*         pInfoList = getTableListInfo(pTask);
1038 1039 1040
      STableListInfo* pTableListInfo = taosArrayGetP(pInfoList, 0);
      taosArrayDestroy(pInfoList);

1041
      pDeleterParam->suid = tableListGetSuid(pTableListInfo);
H
Haojun Liao 已提交
1042 1043

      // TODO extract uid list
1044 1045
      int32_t numOfTables = tableListGetSize(pTableListInfo);
      pDeleterParam->pUidList = taosArrayInit(numOfTables, sizeof(uint64_t));
D
dapan1121 已提交
1046 1047 1048 1049
      if (NULL == pDeleterParam->pUidList) {
        taosMemoryFree(pDeleterParam);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
H
Haojun Liao 已提交
1050

1051
      for (int32_t i = 0; i < numOfTables; ++i) {
1052
        STableKeyInfo* pTable = tableListGetInfo(pTableListInfo, i);
D
dapan1121 已提交
1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065
        taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
      }

      *pParam = pDeleterParam;
      break;
    }
    default:
      break;
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090
int32_t resultRowEncode(void* k, int32_t* size, char* buf) {
  // SResultRow* key = k;
  // int         len = 0;
  // int         struLen = *size;
  // len += taosEncodeFixedI32((void**)&buf, key->pageId);

  // uint32_t offset = key->offset;
  // len += taosEncodeFixedU32((void**)&buf, offset);

  // len += taosEncodeFixedI8((void**)&buf, key->startInterp);
  // len += taosEncodeFixedI8((void**)&buf, key->endInterp);
  // len += taosEncodeFixedI8((void**)&buf, key->closed);
  // len += taosEncodeFixedU32((void**)&buf, key->numOfRows);

  // len += taosEncodeFixedI64((void**)&buf, key->win.skey);
  // len += taosEncodeFixedI64((void**)&buf, key->win.ekey);

  // int32_t numOfEntryInfo = (struLen - sizeof(SResultRow)) / sizeof(struct SResultRowEntryInfo);
  // len += taosEncodeFixedI32((void**)&buf, numOfEntryInfo);
  // for (int i = 0; i < numOfEntryInfo; i++) {
  //   SResultRowEntryInfo* p = &key->pEntryInfo[i];

  //   uint8_t value = p->initialized ? 1 : 0;
  //   len += taosEncodeFixedU8((void**)&buf, value);

dengyihao's avatar
dengyihao 已提交
1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166
  //   value = p->complete ? 1 : 0;
  //   len += taosEncodeFixedU8((void**)&buf, value);

  //   value = p->isNullRes;
  //   len += taosEncodeFixedU8((void**)&buf, value);

  //   len += taosEncodeFixedU16((void**)&buf, p->numOfRes);
  // }
  // {
  //   char* strBuf = taosMemoryCalloc(1, *size * 100);
  //   resultRowToString(key, *size, strBuf);
  //   qWarn("encode result row:%s", strBuf);
  // }

  // return len;
  return 0;
}

int32_t resultRowDecode(void** k, size_t size, char* buf) {
  // char*    p1 = buf;
  // int32_t  numOfEntryInfo = 0;
  // uint32_t entryOffset = sizeof(int32_t) + sizeof(uint32_t) + sizeof(int8_t) + sizeof(int8_t) + sizeof(int8_t) +
  //                        sizeof(uint32_t) + sizeof(int64_t) + sizeof(int64_t);
  // taosDecodeFixedI32(p1 + entryOffset, &numOfEntryInfo);

  // char* p = buf;
  // size = sizeof(SResultRow) + numOfEntryInfo * sizeof(SResultRowEntryInfo);
  // SResultRow* key = taosMemoryCalloc(1, size);

  // p = taosDecodeFixedI32(p, (int32_t*)&key->pageId);
  // uint32_t offset = 0;
  // p = taosDecodeFixedU32(p, &offset);
  // key->offset = offset;

  // p = taosDecodeFixedI8(p, (int8_t*)(&key->startInterp));
  // p = taosDecodeFixedI8(p, (int8_t*)(&key->endInterp));
  // p = taosDecodeFixedI8(p, (int8_t*)&key->closed);
  // p = taosDecodeFixedU32(p, &key->numOfRows);

  // p = taosDecodeFixedI64(p, &key->win.skey);
  // p = taosDecodeFixedI64(p, &key->win.ekey);
  // p = taosDecodeFixedI32(p, &numOfEntryInfo);
  // for (int i = 0; i < numOfEntryInfo; i++) {
  //   SResultRowEntryInfo* pInfo = &key->pEntryInfo[i];
  //   uint8_t              value = 0;
  //   p = taosDecodeFixedU8(p, &value);
  //   pInfo->initialized = (value == 1) ? true : false;

  //   p = taosDecodeFixedU8(p, &value);
  //   pInfo->complete = (value == 1) ? true : false;

  //   p = taosDecodeFixedU8(p, &value);
  //   pInfo->isNullRes = value;

  //   p = taosDecodeFixedU16(p, &pInfo->numOfRes);
  // }
  // *k = key;

  // {
  //   char* strBuf = taosMemoryCalloc(1, size * 100);
  //   resultRowToString(key, size, strBuf);
  //   qWarn("decode result row:%s", strBuf);
  // }
  // return size;
  return 0;
}

int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize) {
  // char* buf = taosMemoryCalloc(1, resSize * 10);
  // int   len = resultRowEncode((void*)pResult, &resSize, buf);
  // char* buf = taosMemoryCalloc(1, resSize);
  // memcpy(buf, pResult, resSize);
  streamStatePut(pState, pKey, (char*)pResult, resSize);
  // taosMemoryFree(buf);
  return TSDB_CODE_SUCCESS;
}
1167

1168 1169
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult) {
  streamStateReleaseBuf(pState, pKey, pResult);
1170 1171 1172
  return TSDB_CODE_SUCCESS;
}

5
54liuyao 已提交
1173 1174 1175 1176 1177 1178
int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size) {
  streamStateSessionPut(pState, key, (const void*)buf, size);
  releaseOutputBuf(pState, NULL, (SResultRow*)buf);
  return TSDB_CODE_SUCCESS;
}

1179
int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
5
54liuyao 已提交
1180
                                    SExprSupp* pSup, SGroupResInfo* pGroupResInfo) {
1181
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
5
54liuyao 已提交
1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194
  SExprInfo*      pExprInfo = pSup->pExprInfo;
  int32_t         numOfExprs = pSup->numOfExprs;
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
  SqlFunctionCtx* pCtx = pSup->pCtx;

  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);

  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
    SSessionKey* pKey = taosArrayGet(pGroupResInfo->pRows, i);
    int32_t      size = 0;
    void*        pVal = NULL;
    int32_t      code = streamStateSessionGet(pState, pKey, &pVal, &size);
    ASSERT(code == 0);
1195 1196
    if (code == -1) {
      // coverity scan
5
54liuyao 已提交
1197
      pGroupResInfo->index += 1;
1198 1199
      continue;
    }
5
54liuyao 已提交
1200 1201 1202 1203 1204 1205 1206 1207 1208
    SResultRow* pRow = (SResultRow*)pVal;
    doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
    // no results, continue to check the next one
    if (pRow->numOfRows == 0) {
      pGroupResInfo->index += 1;
      releaseOutputBuf(pState, NULL, pRow);
      continue;
    }

H
Haojun Liao 已提交
1209 1210
    if (pBlock->info.id.groupId == 0) {
      pBlock->info.id.groupId = pKey->groupId;
1211

1212
      void* tbname = NULL;
H
Haojun Liao 已提交
1213
      if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
1214
        pBlock->info.parTbName[0] = 0;
1215
      } else {
1216
        memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
1217
      }
dengyihao's avatar
dengyihao 已提交
1218
      streamFreeVal(tbname);
5
54liuyao 已提交
1219 1220
    } else {
      // current value belongs to different group, it can't be packed into one datablock
H
Haojun Liao 已提交
1221
      if (pBlock->info.id.groupId != pKey->groupId) {
5
54liuyao 已提交
1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252
        releaseOutputBuf(pState, NULL, pRow);
        break;
      }
    }

    if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
      ASSERT(pBlock->info.rows > 0);
      releaseOutputBuf(pState, NULL, pRow);
      break;
    }

    pGroupResInfo->index += 1;

    for (int32_t j = 0; j < numOfExprs; ++j) {
      int32_t slotId = pExprInfo[j].base.resSchema.slotId;

      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
      if (pCtx[j].fpSet.finalize) {
        int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
        if (TAOS_FAILED(code1)) {
          qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
          T_LONG_JMP(pTaskInfo->env, code1);
        }
      } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
        // do nothing, todo refactor
      } else {
        // expand the result into multiple rows. E.g., _wstart, top(k, 20)
        // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
        SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
        char*            in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
        for (int32_t k = 0; k < pRow->numOfRows; ++k) {
1253
          colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
5
54liuyao 已提交
1254 1255 1256 1257
        }
      }
    }

1258
    pBlock->info.dataLoad = 1;
5
54liuyao 已提交
1259 1260 1261 1262 1263
    pBlock->info.rows += pRow->numOfRows;
    releaseOutputBuf(pState, NULL, pRow);
  }
  blockDataUpdateTsWindow(pBlock, 0);
  return TSDB_CODE_SUCCESS;
1264
}
L
Liu Jicong 已提交
1265 1266

void qStreamCloseTsdbReader(void* task) {
D
dapan1121 已提交
1267 1268 1269 1270
  if (task == NULL) {
    return;
  }

L
Liu Jicong 已提交
1271 1272
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)task;
  SOperatorInfo* pOp = pTaskInfo->pRoot;
D
dapan1121 已提交
1273

1274 1275
  qDebug("stream close tsdb reader, reset status uid:%" PRId64 " ts:%" PRId64, pTaskInfo->streamInfo.currentOffset.uid,
         pTaskInfo->streamInfo.currentOffset.ts);
D
dapan1121 已提交
1276 1277

  // todo refactor, other thread may already use this read to extract data.
1278
  pTaskInfo->streamInfo.currentOffset = (STqOffsetVal){0};
L
Liu Jicong 已提交
1279 1280 1281 1282 1283 1284
  while (pOp->numOfDownstream == 1 && pOp->pDownstream[0]) {
    SOperatorInfo* pDownstreamOp = pOp->pDownstream[0];
    if (pDownstreamOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
      SStreamScanInfo* pInfo = pDownstreamOp->info;
      if (pInfo->pTableScanOp) {
        STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
D
dapan1121 已提交
1285 1286

        setOperatorCompleted(pInfo->pTableScanOp);
X
Xiaoyu Wang 已提交
1287
        while (pTaskInfo->owner != 0) {
D
dapan1121 已提交
1288 1289 1290 1291
          taosMsleep(100);
          qDebug("wait for the reader stopping");
        }

L
Liu Jicong 已提交
1292 1293
        tsdbReaderClose(pTSInfo->base.dataReader);
        pTSInfo->base.dataReader = NULL;
D
dapan1121 已提交
1294 1295 1296 1297

        // restore the status, todo refactor.
        pInfo->pTableScanOp->status = OP_OPENED;
        pTaskInfo->status = TASK_NOT_COMPLETED;
L
Liu Jicong 已提交
1298 1299 1300 1301 1302
        return;
      }
    }
  }
}