eventwindowoperator.c 11.2 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "executorimpl.h"
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
#include "tcommon.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "ttime.h"

typedef struct SEventWindowOperatorInfo {
  SOptrBasicInfo     binfo;
  SAggSupporter      aggSup;
  SExprSupp          scalarSup;
  SGroupResInfo      groupResInfo;
  SWindowRowsSup     winSup;
  bool               hasKey;
  SStateKeys         stateKey;
  int32_t            tsSlotId;  // primary timestamp column slot id
  STimeWindowAggSupp twAggSup;
H
Haojun Liao 已提交
35 36 37 38 39

  SFilterInfo* pStartCondInfo;
  SFilterInfo* pEndCondInfo;
  bool         inWindow;
  SResultRow*  pRow;
H
Haojun Liao 已提交
40 41
} SEventWindowOperatorInfo;

H
Haojun Liao 已提交
42 43 44
static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator);
static void         destroyEWindowOperatorInfo(void* param);
static void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock);
H
Haojun Liao 已提交
45 46
static SSDataBlock* doEventWindowAgg(SOperatorInfo* pOperator);

H
Haojun Liao 已提交
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
// todo : move to  util
static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex,
                                     uint64_t groupId) {
  pRowSup->startRowIndex = rowIndex;
  pRowSup->numOfRows = 0;
  pRowSup->win.skey = tsList[rowIndex];
  pRowSup->groupId = groupId;
}

static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
  pRowSup->win.ekey = ts;
  pRowSup->prevTs = ts;
  pRowSup->numOfRows += 1;
  pRowSup->groupId = groupId;
}

static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, bool includeEndpoint) {
  int64_t* ts = (int64_t*)pColData->pData;
  int32_t  delta = includeEndpoint ? 1 : 0;

  int64_t duration = pWin->ekey - pWin->skey + delta;
  ts[2] = duration;            // set the duration
  ts[3] = pWin->skey;          // window start key
  ts[4] = pWin->ekey + delta;  // window end key
}

H
Haojun Liao 已提交
73
SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode,
H
Haojun Liao 已提交
74 75 76 77 78 79 80
                                             SExecTaskInfo* pTaskInfo) {
  SEventWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SEventWindowOperatorInfo));
  SOperatorInfo*            pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

H
Haojun Liao 已提交
81 82 83 84 85 86 87 88 89 90 91 92
  SEventWinodwPhysiNode* pEventWindowNode = (SEventWinodwPhysiNode*)physiNode;

  int32_t tsSlotId = ((SColumnNode*)pEventWindowNode->window.pTspk)->slotId;
  int32_t code = filterInitFromNode((SNode*)pEventWindowNode->pStartCond, &pInfo->pStartCondInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  code = filterInitFromNode((SNode*)pEventWindowNode->pEndCond, &pInfo->pEndCondInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
93

H
Haojun Liao 已提交
94
  if (pEventWindowNode->window.pExprs != NULL) {
H
Haojun Liao 已提交
95
    int32_t    numOfScalarExpr = 0;
H
Haojun Liao 已提交
96 97
    SExprInfo* pScalarExprInfo = createExprInfo(pEventWindowNode->window.pExprs, NULL, &numOfScalarExpr);
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr);
H
Haojun Liao 已提交
98 99 100 101 102
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }

H
Haojun Liao 已提交
103
  code = filterInitFromNode((SNode*)pEventWindowNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
H
Haojun Liao 已提交
104 105 106 107 108 109 110
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;

  int32_t    num = 0;
H
Haojun Liao 已提交
111
  SExprInfo* pExprInfo = createExprInfo(pEventWindowNode->window.pFuncs, NULL, &num);
H
Haojun Liao 已提交
112 113 114 115 116 117 118
  initResultSizeInfo(&pOperator->resultInfo, 4096);

  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
119 120 121
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pEventWindowNode->window.node.pOutputDataBlockDesc);
  blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);

H
Haojun Liao 已提交
122 123 124
  initBasicInfo(&pInfo->binfo, pResBlock);
  initResultRowInfo(&pInfo->binfo.resultRowInfo);

H
Haojun Liao 已提交
125 126
  pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pEventWindowNode->window.watermark,
                                         .calTrigger = pEventWindowNode->window.triggerType};
H
Haojun Liao 已提交
127 128 129 130 131

  initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);

  pInfo->tsSlotId = tsSlotId;

H
Haojun Liao 已提交
132
  setOperatorInfo(pOperator, "EventWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
H
Haojun Liao 已提交
133
                  pTaskInfo);
H
Haojun Liao 已提交
134
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, eventWindowAggregate, NULL, destroyEWindowOperatorInfo,
H
Haojun Liao 已提交
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
                                         optrDefaultBufFn, NULL);

  code = appendDownstream(pOperator, &downstream, 1);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

  return pOperator;

_error:
  if (pInfo != NULL) {
    destroyEWindowOperatorInfo(pInfo);
  }

  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}

void destroyEWindowOperatorInfo(void* param) {
  SEventWindowOperatorInfo* pInfo = (SEventWindowOperatorInfo*)param;
  if (pInfo == NULL) {
    return;
  }

  cleanupBasicInfo(&pInfo->binfo);
  colDataDestroy(&pInfo->twAggSup.timeWindowData);

  cleanupAggSup(&pInfo->aggSup);
  cleanupGroupResInfo(&pInfo->groupResInfo);
  taosMemoryFreeClear(param);
}

H
Haojun Liao 已提交
168
static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
169 170 171 172 173
  SEventWindowOperatorInfo* pInfo = pOperator->info;
  SExecTaskInfo*            pTaskInfo = pOperator->pTaskInfo;

  SExprSupp* pSup = &pOperator->exprSupp;
  int32_t    order = TSDB_ORDER_ASC;
H
Haojun Liao 已提交
174

H
Haojun Liao 已提交
175 176 177
  SSDataBlock* pRes = pInfo->binfo.pRes;

  blockDataCleanup(pRes);
H
Haojun Liao 已提交
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197

  SOperatorInfo* downstream = pOperator->pDownstream[0];
  while (1) {
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
    if (pBlock == NULL) {
      break;
    }

    setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
    blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);

    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
    if (pInfo->scalarSup.pExprInfo != NULL) {
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
                                              pInfo->scalarSup.numOfExprs, NULL);
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
      }
    }

H
Haojun Liao 已提交
198
    eventWindowAggImpl(pOperator, pInfo, pBlock);
H
Haojun Liao 已提交
199 200 201
    if (pRes->info.rows >= pOperator->resultInfo.threshold) {
      return pRes;
    }
H
Haojun Liao 已提交
202 203
  }

H
Haojun Liao 已提交
204
  return pRes->info.rows == 0 ? NULL : pRes;
H
Haojun Liao 已提交
205
}
H
Haojun Liao 已提交
206

H
Haojun Liao 已提交
207 208 209 210 211 212 213 214 215 216
static int32_t setSingleOutputTupleBufv1(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
                                         SExprSupp* pExprSup, SAggSupporter* pAggSup) {
  if (*pResult == NULL) {
    SResultRow* p = taosMemoryCalloc(1, pAggSup->resultRowSize);
    pResultRowInfo->cur = (SResultRowPosition){.pageId = p->pageId, .offset = p->offset};
    *pResult = p;
  }

  (*pResult)->win = *win;
  setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
H
Haojun Liao 已提交
217 218 219
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
static void doEventWindowAggImpl(SEventWindowOperatorInfo* pInfo, SExprSupp* pSup, int32_t startIndex, int32_t endIndex,
                                 const SSDataBlock* pBlock, int64_t* tsList, SExecTaskInfo* pTaskInfo) {
  SWindowRowsSup* pRowSup = &pInfo->winSup;

  int32_t numOfOutput = pSup->numOfExprs;
  int32_t numOfRows = endIndex - startIndex + 1;

  doKeepTuple(pRowSup, tsList[endIndex], pBlock->info.id.groupId);

  int32_t ret =
      setSingleOutputTupleBufv1(&pInfo->binfo.resultRowInfo, &pRowSup->win, &pInfo->pRow, pSup, &pInfo->aggSup);
  if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
    T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
  }

  updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
  applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startIndex, numOfRows,
                                  pBlock->info.rows, numOfOutput);
}

void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock) {
H
Haojun Liao 已提交
241 242 243
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SExprSupp*     pSup = &pOperator->exprSupp;

H
Haojun Liao 已提交
244
  SSDataBlock* pRes = pInfo->binfo.pRes;
H
Haojun Liao 已提交
245
  int64_t gid = pBlock->info.id.groupId;
H
Haojun Liao 已提交
246 247 248 249

  SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
  TSKEY*           tsList = (TSKEY*)pColInfoData->pData;

H
Haojun Liao 已提交
250 251
  SColumnInfoData *ps = NULL, *pe = NULL;

H
Haojun Liao 已提交
252 253 254
  SWindowRowsSup* pRowSup = &pInfo->winSup;
  pRowSup->numOfRows = 0;

H
Haojun Liao 已提交
255 256
  SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
  int32_t            code = filterSetDataFromSlotId(pInfo->pStartCondInfo, &param1);
H
Haojun Liao 已提交
257

H
Haojun Liao 已提交
258 259
  int32_t status1 = 0;
  bool    keep1 = filterExecute(pInfo->pStartCondInfo, pBlock, &ps, NULL, param1.numOfCols, &status1);
H
Haojun Liao 已提交
260

H
Haojun Liao 已提交
261 262 263 264 265
  SFilterColumnParam param2 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
  code = filterSetDataFromSlotId(pInfo->pEndCondInfo, &param2);

  int32_t status2 = 0;
  bool    keep2 = filterExecute(pInfo->pEndCondInfo, pBlock, &pe, NULL, param2.numOfCols, &status2);
H
Haojun Liao 已提交
266

H
Haojun Liao 已提交
267 268
  int32_t rowIndex = 0;
  int32_t startIndex = pInfo->inWindow ? 0 : -1;
H
Haojun Liao 已提交
269

H
Haojun Liao 已提交
270 271 272 273 274 275
  while (rowIndex < pBlock->info.rows) {
    if (pInfo->inWindow) {  // let's find the first end value
      for (rowIndex = startIndex; rowIndex < pBlock->info.rows; ++rowIndex) {
        if (((bool*)pe->pData)[rowIndex]) {
          break;
        }
H
Haojun Liao 已提交
276 277
      }

H
Haojun Liao 已提交
278 279
      if (rowIndex < pBlock->info.rows) {
        doEventWindowAggImpl(pInfo, pSup, startIndex, rowIndex, pBlock, tsList, pTaskInfo);
H
Haojun Liao 已提交
280

H
Haojun Liao 已提交
281
        doUpdateNumOfRows(pSup->pCtx, pInfo->pRow, pSup->numOfExprs, pSup->rowEntryInfoOffset);
H
Haojun Liao 已提交
282

H
Haojun Liao 已提交
283 284 285 286 287 288 289
        // check buffer size
        if (pRes->info.rows + pInfo->pRow->numOfRows >= pRes->info.capacity) {
          int32_t newSize = pRes->info.rows + pInfo->pRow->numOfRows;
          blockDataEnsureCapacity(pRes, newSize);
        }

        copyResultrowToDataBlock(pSup->pExprInfo, pSup->numOfExprs, pInfo->pRow, pSup->pCtx, pRes,
H
Haojun Liao 已提交
290
                                 pSup->rowEntryInfoOffset, pTaskInfo);
H
Haojun Liao 已提交
291

H
Haojun Liao 已提交
292
        pRes->info.rows += pInfo->pRow->numOfRows;
H
Haojun Liao 已提交
293

H
Haojun Liao 已提交
294 295
        pInfo->inWindow = false;
        rowIndex += 1;
H
Haojun Liao 已提交
296
      } else {
H
Haojun Liao 已提交
297 298 299 300 301 302 303 304 305 306
        doEventWindowAggImpl(pInfo, pSup, startIndex, pBlock->info.rows - 1, pBlock, tsList, pTaskInfo);
      }
    } else {  // find the first start value that is fulfill for the start condition
      for (; rowIndex < pBlock->info.rows; ++rowIndex) {
        if (((bool*)ps->pData)[rowIndex]) {
          doKeepNewWindowStartInfo(pRowSup, tsList, rowIndex, gid);
          pInfo->inWindow = true;
          startIndex = rowIndex;
          break;
        }
H
Haojun Liao 已提交
307 308
      }

H
Haojun Liao 已提交
309 310 311 312 313
      if (pInfo->inWindow) {
        continue;
      } else {
        break;
      }
H
Haojun Liao 已提交
314 315 316
    }
  }
}