groupoperator.c 36.4 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "function.h"
17
#include "os.h"
H
Haojun Liao 已提交
18 19 20 21 22
#include "tname.h"

#include "tdatablock.h"
#include "tmsg.h"

23
#include "executorInt.h"
H
Haojun Liao 已提交
24 25 26 27 28
#include "executorimpl.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"

29
static void*    getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len);
H
Haojun Liao 已提交
30
static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity);
31 32
static int32_t  setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData,
                                        int16_t bytes, uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup);
H
Haojun Liao 已提交
33

H
Haojun Liao 已提交
34
static void freeGroupKey(void* param) {
35
  SGroupKeys* pKey = (SGroupKeys*)param;
H
Haojun Liao 已提交
36 37 38
  taosMemoryFree(pKey->pData);
}

39
static void destroyGroupOperatorInfo(void* param) {
H
Haojun Liao 已提交
40
  SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
41 42 43 44
  if (pInfo == NULL) {
    return;
  }

45
  cleanupBasicInfo(&pInfo->binfo);
H
Haojun Liao 已提交
46 47
  taosMemoryFreeClear(pInfo->keyBuf);
  taosArrayDestroy(pInfo->pGroupCols);
H
Haojun Liao 已提交
48
  taosArrayDestroyEx(pInfo->pGroupColVals, freeGroupKey);
49
  cleanupExprSupp(&pInfo->scalarSup);
H
Haojun Liao 已提交
50 51 52

  cleanupGroupResInfo(&pInfo->groupResInfo);
  cleanupAggSup(&pInfo->aggSup);
D
dapan1121 已提交
53
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
54 55
}

wmmhello's avatar
wmmhello 已提交
56
static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
H
Haojun Liao 已提交
57 58
  *pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys));
  if ((*pGroupColVals) == NULL) {
H
Haojun Liao 已提交
59 60 61 62 63 64
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t numOfGroupCols = taosArrayGetSize(pGroupColList);
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
    SColumn* pCol = taosArrayGet(pGroupColList, i);
65
    (*keyLen) += pCol->bytes;  // actual data + null_flag
H
Haojun Liao 已提交
66

67
    SGroupKeys key = {0};
68 69
    key.bytes = pCol->bytes;
    key.type = pCol->type;
H
Haojun Liao 已提交
70
    key.isNull = false;
71
    key.pData = taosMemoryCalloc(1, pCol->bytes);
H
Haojun Liao 已提交
72 73 74 75
    if (key.pData == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

H
Haojun Liao 已提交
76
    taosArrayPush((*pGroupColVals), &key);
H
Haojun Liao 已提交
77 78 79
  }

  int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
80
  (*keyLen) += nullFlagSize;
H
Haojun Liao 已提交
81

82
  (*keyBuf) = taosMemoryCalloc(1, (*keyLen));
H
Haojun Liao 已提交
83
  if ((*keyBuf) == NULL) {
H
Haojun Liao 已提交
84 85 86 87 88 89
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  return TSDB_CODE_SUCCESS;
}

90 91
static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex,
                            int32_t numOfGroupCols) {
H
Haojun Liao 已提交
92 93
  SColumnDataAgg* pColAgg = NULL;
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
H
Haojun Liao 已提交
94
    SColumn*         pCol = taosArrayGet(pGroupCols, i);
H
Haojun Liao 已提交
95 96
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
    if (pBlock->pBlockAgg != NULL) {
97
      pColAgg = pBlock->pBlockAgg[pCol->slotId];  // TODO is agg data matched?
H
Haojun Liao 已提交
98 99 100 101
    }

    bool isNull = colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg);

H
Haojun Liao 已提交
102
    SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
H
Haojun Liao 已提交
103 104 105 106 107 108 109 110 111 112
    if (pkey->isNull && isNull) {
      continue;
    }

    if (isNull || pkey->isNull) {
      return false;
    }

    char* val = colDataGetData(pColInfoData, rowIndex);

wmmhello's avatar
wmmhello 已提交
113 114 115
    if (pkey->type == TSDB_DATA_TYPE_JSON) {
      int32_t dataLen = getJsonValueLen(val);

116
      if (memcmp(pkey->pData, val, dataLen) == 0) {
wmmhello's avatar
wmmhello 已提交
117 118 119 120 121
        continue;
      } else {
        return false;
      }
    } else if (IS_VAR_DATA_TYPE(pkey->type)) {
H
Haojun Liao 已提交
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
      int32_t len = varDataLen(val);
      if (len == varDataLen(pkey->pData) && memcmp(varDataVal(pkey->pData), varDataVal(val), len) == 0) {
        continue;
      } else {
        return false;
      }
    } else {
      if (memcmp(pkey->pData, val, pkey->bytes) != 0) {
        return false;
      }
    }
  }

  return true;
}

wmmhello's avatar
wmmhello 已提交
138
static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex) {
H
Haojun Liao 已提交
139 140
  SColumnDataAgg* pColAgg = NULL;

141 142
  size_t numOfGroupCols = taosArrayGetSize(pGroupCols);

H
Haojun Liao 已提交
143
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
H
Haojun Liao 已提交
144
    SColumn*         pCol = taosArrayGet(pGroupCols, i);
H
Haojun Liao 已提交
145 146 147
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);

    if (pBlock->pBlockAgg != NULL) {
148
      pColAgg = pBlock->pBlockAgg[pCol->slotId];  // TODO is agg data matched?
H
Haojun Liao 已提交
149 150
    }

H
Haojun Liao 已提交
151
    SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
H
Haojun Liao 已提交
152 153 154
    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
      pkey->isNull = true;
    } else {
155
      pkey->isNull = false;
H
Haojun Liao 已提交
156
      char* val = colDataGetData(pColInfoData, rowIndex);
wmmhello's avatar
wmmhello 已提交
157
      if (pkey->type == TSDB_DATA_TYPE_JSON) {
158
        if (tTagIsJson(val)) {
wmmhello's avatar
wmmhello 已提交
159 160 161
          terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
          return;
        }
wmmhello's avatar
wmmhello 已提交
162 163 164
        int32_t dataLen = getJsonValueLen(val);
        memcpy(pkey->pData, val, dataLen);
      } else if (IS_VAR_DATA_TYPE(pkey->type)) {
H
Haojun Liao 已提交
165
        memcpy(pkey->pData, val, varDataTLen(val));
166
        ASSERT(varDataTLen(val) <= pkey->bytes);
H
Haojun Liao 已提交
167 168 169 170 171 172 173
      } else {
        memcpy(pkey->pData, val, pkey->bytes);
      }
    }
  }
}

wmmhello's avatar
wmmhello 已提交
174
static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
H
Haojun Liao 已提交
175 176 177 178 179 180 181 182 183 184 185 186 187
  ASSERT(pKey != NULL);
  size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);

  char* isNull = (char*)pKey;
  char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols;
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
    SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
    if (pkey->isNull) {
      isNull[i] = 1;
      continue;
    }

    isNull[i] = 0;
wmmhello's avatar
wmmhello 已提交
188 189 190 191 192
    if (pkey->type == TSDB_DATA_TYPE_JSON) {
      int32_t dataLen = getJsonValueLen(pkey->pData);
      memcpy(pStart, (pkey->pData), dataLen);
      pStart += dataLen;
    } else if (IS_VAR_DATA_TYPE(pkey->type)) {
H
Haojun Liao 已提交
193 194 195 196 197 198 199 200 201
      varDataCopy(pStart, pkey->pData);
      pStart += varDataTLen(pkey->pData);
      ASSERT(varDataTLen(pkey->pData) <= pkey->bytes);
    } else {
      memcpy(pStart, pkey->pData, pkey->bytes);
      pStart += pkey->bytes;
    }
  }

202
  return (int32_t)(pStart - (char*)pKey);
H
Haojun Liao 已提交
203 204 205 206 207
}

// assign the group keys or user input constant values if required
static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t totalRows, int32_t rowIndex) {
  for (int32_t i = 0; i < numOfOutput; ++i) {
208
    if (pCtx[i].functionId == -1) {  // select count(*),key from t group by key.
H
Haojun Liao 已提交
209 210 211
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[i]);

      SColumnInfoData* pColInfoData = pCtx[i].input.pData[0];
212
      // todo OPT all/all not NULL
H
Haojun Liao 已提交
213 214 215 216
      if (!colDataIsNull(pColInfoData, totalRows, rowIndex, NULL)) {
        char* dest = GET_ROWCELL_INTERBUF(pEntryInfo);
        char* data = colDataGetData(pColInfoData, rowIndex);

wmmhello's avatar
wmmhello 已提交
217 218 219 220
        if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
          int32_t dataLen = getJsonValueLen(data);
          memcpy(dest, data, dataLen);
        } else if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
221 222 223 224
          varDataCopy(dest, data);
        } else {
          memcpy(dest, data, pColInfoData->info.bytes);
        }
225
      } else {  // it is a NULL value
H
Haojun Liao 已提交
226
        pEntryInfo->isNullRes = 1;
H
Haojun Liao 已提交
227
      }
H
Haojun Liao 已提交
228 229

      pEntryInfo->numOfRes = 1;
H
Haojun Liao 已提交
230 231 232 233 234 235 236 237
    }
  }
}

static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
  SExecTaskInfo*        pTaskInfo = pOperator->pTaskInfo;
  SGroupbyOperatorInfo* pInfo = pOperator->info;

238
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
H
Haojun Liao 已提交
239 240 241 242 243 244 245 246 247
  int32_t         numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols);
  //  if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
  // qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_TASKID(pRuntimeEnv));
  //    return;
  //  }

  int32_t     len = 0;
  STimeWindow w = TSWINDOW_INITIALIZER;

wmmhello's avatar
wmmhello 已提交
248
  terrno = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
249 250 251 252
  int32_t num = 0;
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
    // Compare with the previous row of this column, and do not set the output buffer again if they are identical.
    if (!pInfo->isInit) {
253
      recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
wmmhello's avatar
wmmhello 已提交
254
      if (terrno != TSDB_CODE_SUCCESS) {  // group by json error
255
        T_LONG_JMP(pTaskInfo->env, terrno);
wmmhello's avatar
wmmhello 已提交
256
      }
H
Haojun Liao 已提交
257 258 259 260 261
      pInfo->isInit = true;
      num++;
      continue;
    }

H
Haojun Liao 已提交
262
    bool equal = groupKeyCompare(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
H
Haojun Liao 已提交
263 264 265 266 267
    if (equal) {
      num++;
      continue;
    }

H
Haojun Liao 已提交
268
    // The first row of a new block does not belongs to the previous existed group
269
    if (j == 0) {
H
Haojun Liao 已提交
270
      num++;
271
      recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
wmmhello's avatar
wmmhello 已提交
272
      if (terrno != TSDB_CODE_SUCCESS) {  // group by json error
273
        T_LONG_JMP(pTaskInfo->env, terrno);
wmmhello's avatar
wmmhello 已提交
274
      }
H
Haojun Liao 已提交
275 276 277
      continue;
    }

H
Haojun Liao 已提交
278
    len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
279 280
    int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf,
                                          len, pBlock->info.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
H
Haojun Liao 已提交
281
    if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
282
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
H
Haojun Liao 已提交
283 284 285
    }

    int32_t rowIndex = j - num;
286
    doApplyFunctions(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs);
H
Haojun Liao 已提交
287 288

    // assign the group keys or user input constant values if required
289
    doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
290
    recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
H
Haojun Liao 已提交
291 292 293 294
    num = 1;
  }

  if (num > 0) {
H
Haojun Liao 已提交
295
    len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
296 297
    int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf,
                                          len, pBlock->info.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
H
Haojun Liao 已提交
298
    if (ret != TSDB_CODE_SUCCESS) {
299
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
H
Haojun Liao 已提交
300 301 302
    }

    int32_t rowIndex = pBlock->info.rows - num;
303
    doApplyFunctions(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs);
304
    doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
H
Haojun Liao 已提交
305 306 307
  }
}

308 309 310 311
static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
  SGroupbyOperatorInfo* pInfo = pOperator->info;

  SSDataBlock* pRes = pInfo->binfo.pRes;
312
  while (1) {
313
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
314
    doFilter(pInfo->pCondition, pRes, NULL);
315

316
    if (!hasRemainResults(&pInfo->groupResInfo)) {
317 318 319 320 321 322 323 324 325 326
      doSetOperatorCompleted(pOperator);
      break;
    }

    if (pRes->info.rows > 0) {
      break;
    }
  }

  pOperator->resultInfo.totalRows += pRes->info.rows;
327
  return (pRes->info.rows == 0) ? NULL : pRes;
328 329
}

330
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
331 332 333 334
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

335 336
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

H
Haojun Liao 已提交
337
  SGroupbyOperatorInfo* pInfo = pOperator->info;
338
  SSDataBlock*          pRes = pInfo->binfo.pRes;
339

H
Haojun Liao 已提交
340
  if (pOperator->status == OP_RES_TO_RETURN) {
341
    return buildGroupResultDataBlock(pOperator);
H
Haojun Liao 已提交
342 343
  }

344 345 346
  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;

347
  int64_t        st = taosGetTimestampUs();
H
Haojun Liao 已提交
348 349 350
  SOperatorInfo* downstream = pOperator->pDownstream[0];

  while (1) {
351
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
H
Haojun Liao 已提交
352 353 354 355
    if (pBlock == NULL) {
      break;
    }

356 357
    int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
    if (code != TSDB_CODE_SUCCESS) {
358
      T_LONG_JMP(pTaskInfo->env, code);
359 360
    }

H
Haojun Liao 已提交
361
    // the pDataBlock are always the same one, no need to call this again
362
    setInputDataBlock(pOperator, pOperator->exprSupp.pCtx, pBlock, order, scanFlag, true);
363

364
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
365
    if (pInfo->scalarSup.pExprInfo != NULL) {
366 367
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
                                              pInfo->scalarSup.numOfExprs, NULL);
368
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
369
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
370
      }
371 372
    }

H
Haojun Liao 已提交
373 374 375 376
    doHashGroupbyAgg(pOperator, pBlock);
  }

  pOperator->status = OP_RES_TO_RETURN;
H
Haojun Liao 已提交
377

378 379 380 381 382 383 384 385 386 387 388 389 390 391
#if 0
  if(pOperator->fpSet.encodeResultRow){
    char *result = NULL;
    int32_t length = 0;
    pOperator->fpSet.encodeResultRow(pOperator, &result, &length);
    SAggSupporter* pSup = &pInfo->aggSup;
    taosHashClear(pSup->pResultRowHashTable);
    pInfo->binfo.resultRowInfo.size = 0;
    pOperator->fpSet.decodeResultRow(pOperator, result);
    if(result){
      taosMemoryFree(result);
    }
  }
#endif
392
  blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
393
  initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0);
394

395
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
396
  return buildGroupResultDataBlock(pOperator);
H
Haojun Liao 已提交
397 398
}

H
Haojun Liao 已提交
399 400 401
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
                                       SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
                                       SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
402 403 404 405 406 407
  SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo));
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

408 409
  pInfo->pGroupCols = pGroupColList;
  pInfo->pCondition = pCondition;
410

411 412 413 414
  int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
415

416
  code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
H
Haojun Liao 已提交
417 418 419 420
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

421
  initResultSizeInfo(&pOperator->resultInfo, 4096);
422 423 424 425 426
  code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, pInfo->groupKeyLen, pTaskInfo->id.str);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

427
  initBasicInfo(&pInfo->binfo, pResultBlock);
428
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
429

430 431 432
  pOperator->name = "GroupbyAggOperator";
  pOperator->blocking = true;
  pOperator->status = OP_NOT_OPENED;
433
  // pOperator->operatorType = OP_Groupby;
434 435
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
H
Haojun Liao 已提交
436

437 438
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, NULL,
                                         destroyGroupOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
H
Haojun Liao 已提交
439
  code = appendDownstream(pOperator, &downstream, 1);
440 441 442 443
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
444 445
  return pOperator;

446
_error:
H
Haojun Liao 已提交
447
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
448
  destroyGroupOperatorInfo(pInfo);
H
Haojun Liao 已提交
449 450
  taosMemoryFreeClear(pOperator);
  return NULL;
451 452
}

H
Haojun Liao 已提交
453
static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
454
  //  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
H
Haojun Liao 已提交
455 456 457 458

  SPartitionOperatorInfo* pInfo = pOperator->info;

  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
459
    recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
H
Haojun Liao 已提交
460 461
    int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);

462
    SDataGroupInfo* pGroupInfo = NULL;
463
    void*           pPage = getCurrentDataGroupInfo(pInfo, &pGroupInfo, len);
H
Haojun Liao 已提交
464

465 466 467 468 469
    pGroupInfo->numOfRows += 1;

    // group id
    if (pGroupInfo->groupId == 0) {
      pGroupInfo->groupId = calcGroupId(pInfo->keyBuf, len);
H
Haojun Liao 已提交
470 471
    }

472
    // number of rows
473
    int32_t* rows = (int32_t*)pPage;
H
Haojun Liao 已提交
474

475
    size_t numOfCols = pOperator->exprSupp.numOfExprs;
476
    for (int32_t i = 0; i < numOfCols; ++i) {
477
      SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i];
478
      int32_t    slotId = pExpr->base.pParam[0].pCol->slotId;
479 480

      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
H
Haojun Liao 已提交
481

H
Haojun Liao 已提交
482 483
      int32_t bytes = pColInfoData->info.bytes;
      int32_t startOffset = pInfo->columnOffset[i];
H
Haojun Liao 已提交
484

485
      int32_t* columnLen = NULL;
486
      int32_t  contentLen = 0;
H
Haojun Liao 已提交
487 488

      if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
wafwerar's avatar
wafwerar 已提交
489
        int32_t* offset = (int32_t*)((char*)pPage + startOffset);
490 491
        columnLen = (int32_t*)((char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity);
        char* data = (char*)((char*)columnLen + sizeof(int32_t));
H
Haojun Liao 已提交
492 493 494 495

        if (colDataIsNull_s(pColInfoData, j)) {
          offset[(*rows)] = -1;
          contentLen = 0;
496
        } else if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
wmmhello's avatar
wmmhello 已提交
497
          offset[*rows] = (*columnLen);
498
          char*   src = colDataGetData(pColInfoData, j);
wmmhello's avatar
wmmhello 已提交
499 500 501 502 503 504 505
          int32_t dataLen = getJsonValueLen(src);

          memcpy(data + (*columnLen), src, dataLen);
          int32_t v = (data + (*columnLen) + dataLen - (char*)pPage);
          ASSERT(v > 0);

          contentLen = dataLen;
H
Haojun Liao 已提交
506 507 508 509
        } else {
          offset[*rows] = (*columnLen);
          char* src = colDataGetData(pColInfoData, j);
          memcpy(data + (*columnLen), src, varDataTLen(src));
510 511 512
          int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage);
          ASSERT(v > 0);

H
Haojun Liao 已提交
513 514
          contentLen = varDataTLen(src);
        }
H
Haojun Liao 已提交
515
      } else {
wafwerar's avatar
wafwerar 已提交
516
        char* bitmap = (char*)pPage + startOffset;
517 518
        columnLen = (int32_t*)((char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity));
        char* data = (char*)columnLen + sizeof(int32_t);
H
Haojun Liao 已提交
519 520 521

        bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j);
        if (isNull) {
H
Haojun Liao 已提交
522
          colDataSetNull_f(bitmap, (*rows));
H
Haojun Liao 已提交
523
        } else {
H
Haojun Liao 已提交
524
          memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes);
H
Haojun Liao 已提交
525
          ASSERT((data + (*columnLen) + bytes - (char*)pPage) <= getBufPageSize(pInfo->pBuf));
H
Haojun Liao 已提交
526
        }
H
Haojun Liao 已提交
527
        contentLen = bytes;
H
Haojun Liao 已提交
528
      }
H
Haojun Liao 已提交
529 530

      (*columnLen) += contentLen;
531
      ASSERT(*columnLen >= 0);
H
Haojun Liao 已提交
532 533
    }

H
Haojun Liao 已提交
534 535
    (*rows) += 1;

H
Haojun Liao 已提交
536 537 538
    setBufPageDirty(pPage, true);
    releaseBufPage(pInfo->pBuf, pPage);
  }
H
Haojun Liao 已提交
539 540 541 542 543 544
}

void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len) {
  SDataGroupInfo* p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);

  void* pPage = NULL;
545
  if (p == NULL) {  // it is a new group
H
Haojun Liao 已提交
546 547 548 549 550 551 552
    SDataGroupInfo gi = {0};
    gi.pPageList = taosArrayInit(100, sizeof(int32_t));
    taosHashPut(pInfo->pGroupSet, pInfo->keyBuf, len, &gi, sizeof(SDataGroupInfo));

    p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);

    int32_t pageId = 0;
553
    pPage = getNewBufPage(pInfo->pBuf, &pageId);
H
Haojun Liao 已提交
554 555
    taosArrayPush(p->pPageList, &pageId);

556
    *(int32_t*)pPage = 0;
H
Haojun Liao 已提交
557 558 559 560
  } else {
    int32_t* curId = taosArrayGetLast(p->pPageList);
    pPage = getBufPage(pInfo->pBuf, *curId);

561
    int32_t* rows = (int32_t*)pPage;
H
Haojun Liao 已提交
562
    if (*rows >= pInfo->rowCapacity) {
563 564 565
      // release buffer
      releaseBufPage(pInfo->pBuf, pPage);

H
Haojun Liao 已提交
566 567
      // add a new page for current group
      int32_t pageId = 0;
568
      pPage = getNewBufPage(pInfo->pBuf, &pageId);
H
Haojun Liao 已提交
569
      taosArrayPush(p->pPageList, &pageId);
570
      memset(pPage, 0, getBufPageSize(pInfo->pBuf));
H
Haojun Liao 已提交
571 572
    }
  }
H
Haojun Liao 已提交
573

H
Haojun Liao 已提交
574 575 576 577 578 579 580 581 582 583 584 585 586 587
  *pGroupInfo = p;
  return pPage;
}

uint64_t calcGroupId(char* pData, int32_t len) {
  T_MD5_CTX context;
  tMD5Init(&context);
  tMD5Update(&context, (uint8_t*)pData, len);
  tMD5Final(&context);

  // NOTE: only extract the initial 8 bytes of the final MD5 digest
  uint64_t id = 0;
  memcpy(&id, context.digest, sizeof(uint64_t));
  return id;
H
Haojun Liao 已提交
588 589
}

H
Haojun Liao 已提交
590
int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) {
591
  size_t   numOfCols = taosArrayGetSize(pBlock->pDataBlock);
592
  int32_t* offset = taosMemoryCalloc(numOfCols, sizeof(int32_t));
H
Haojun Liao 已提交
593

594 595
  offset[0] = sizeof(int32_t) +
              sizeof(uint64_t);  // the number of rows in current page, ref to SSDataBlock paged serialization format
H
Haojun Liao 已提交
596

597
  for (int32_t i = 0; i < numOfCols - 1; ++i) {
H
Haojun Liao 已提交
598 599 600 601
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);

    int32_t bytes = pColInfoData->info.bytes;
    int32_t payloadLen = bytes * rowCapacity;
602

H
Haojun Liao 已提交
603 604 605 606 607 608 609 610 611 612 613 614
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
      // offset segment + content length + payload
      offset[i + 1] = rowCapacity * sizeof(int32_t) + sizeof(int32_t) + payloadLen + offset[i];
    } else {
      // bitmap + content length + payload
      offset[i + 1] = BitmapLen(rowCapacity) + sizeof(int32_t) + payloadLen + offset[i];
    }
  }

  return offset;
}

5
54liuyao 已提交
615
static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) {
616 617 618
  void* ite = NULL;
  while ((ite = taosHashIterate(pInfo->pGroupSet, ite)) != NULL) {
    taosArrayDestroy(((SDataGroupInfo*)ite)->pPageList);
5
54liuyao 已提交
619
  }
620
  taosArrayClear(pInfo->sortedGroupArray);
5
54liuyao 已提交
621 622 623
  clearDiskbasedBuf(pInfo->pBuf);
}

624 625 626
static int compareDataGroupInfo(const void* group1, const void* group2) {
  const SDataGroupInfo* pGroupInfo1 = group1;
  const SDataGroupInfo* pGroupInfo2 = group2;
627 628 629 630 631 632

  if (pGroupInfo1->groupId == pGroupInfo2->groupId) {
    ASSERT(0);
    return 0;
  }

633
  return (pGroupInfo1->groupId < pGroupInfo2->groupId) ? -1 : 1;
634 635
}

H
Haojun Liao 已提交
636 637 638
static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
  SPartitionOperatorInfo* pInfo = pOperator->info;

639 640
  SDataGroupInfo* pGroupInfo =
      (pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL;
641
  if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) {
H
Haojun Liao 已提交
642
    // try next group data
643 644
    ++pInfo->groupIndex;
    if (pInfo->groupIndex >= taosArrayGetSize(pInfo->sortedGroupArray)) {
645
      doSetOperatorCompleted(pOperator);
5
54liuyao 已提交
646
      clearPartitionOperator(pInfo);
H
Haojun Liao 已提交
647 648 649
      return NULL;
    }

650
    pGroupInfo = taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex);
H
Haojun Liao 已提交
651 652 653 654
    pInfo->pageIndex = 0;
  }

  int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex);
655
  void*    page = getBufPage(pInfo->pBuf, *pageId);
H
Haojun Liao 已提交
656

657
  blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->rowCapacity);
H
Haojun Liao 已提交
658
  blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity);
H
Haojun Liao 已提交
659 660

  pInfo->pageIndex += 1;
661
  releaseBufPage(pInfo->pBuf, page);
H
Haojun Liao 已提交
662

663
  blockDataUpdateTsWindow(pInfo->binfo.pRes, 0);
H
Haojun Liao 已提交
664
  pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId;
665 666

  pOperator->resultInfo.totalRows += pInfo->binfo.pRes->info.rows;
H
Haojun Liao 已提交
667 668 669
  return pInfo->binfo.pRes;
}

670
static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
671 672
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
673 674
  }

675 676 677
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  SPartitionOperatorInfo* pInfo = pOperator->info;
678
  SSDataBlock*            pRes = pInfo->binfo.pRes;
679

H
Haojun Liao 已提交
680
  if (pOperator->status == OP_RES_TO_RETURN) {
H
Haojun Liao 已提交
681 682
    blockDataCleanup(pRes);
    return buildPartitionResult(pOperator);
H
Haojun Liao 已提交
683 684
  }

685
  int64_t        st = taosGetTimestampUs();
H
Haojun Liao 已提交
686
  SOperatorInfo* downstream = pOperator->pDownstream[0];
H
Haojun Liao 已提交
687

H
Haojun Liao 已提交
688
  while (1) {
689
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
H
Haojun Liao 已提交
690 691 692
    if (pBlock == NULL) {
      break;
    }
H
Haojun Liao 已提交
693

694
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
695
    if (pInfo->scalarSup.pExprInfo != NULL) {
696 697
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
                                              pInfo->scalarSup.numOfExprs, NULL);
698
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
699
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
700 701 702
      }
    }

wmmhello's avatar
wmmhello 已提交
703
    terrno = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
704
    doHashPartition(pOperator, pBlock);
wmmhello's avatar
wmmhello 已提交
705
    if (terrno != TSDB_CODE_SUCCESS) {  // group by json error
706
      T_LONG_JMP(pTaskInfo->env, terrno);
wmmhello's avatar
wmmhello 已提交
707
    }
H
Haojun Liao 已提交
708 709
  }

710
  SArray* groupArray = taosArrayInit(taosHashGetSize(pInfo->pGroupSet), sizeof(SDataGroupInfo));
711 712

  void* pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL);
713 714 715 716 717 718 719 720 721 722 723
  while (pGroupIter != NULL) {
    SDataGroupInfo* pGroupInfo = pGroupIter;
    taosArrayPush(groupArray, pGroupInfo);
    pGroupIter = taosHashIterate(pInfo->pGroupSet, pGroupIter);
  }

  taosArraySort(groupArray, compareDataGroupInfo);
  pInfo->sortedGroupArray = groupArray;
  pInfo->groupIndex = -1;
  taosHashClear(pInfo->pGroupSet);

724 725
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;

H
Haojun Liao 已提交
726
  pOperator->status = OP_RES_TO_RETURN;
H
Haojun Liao 已提交
727 728 729 730
  blockDataEnsureCapacity(pRes, 4096);
  return buildPartitionResult(pOperator);
}

731
static void destroyPartitionOperatorInfo(void* param) {
H
Haojun Liao 已提交
732
  SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param;
733
  cleanupBasicInfo(&pInfo->binfo);
H
Haojun Liao 已提交
734
  taosArrayDestroy(pInfo->pGroupCols);
735

736
  for (int i = 0; i < taosArrayGetSize(pInfo->pGroupColVals); i++) {
wmmhello's avatar
wmmhello 已提交
737 738 739
    SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->pGroupColVals, i);
    taosMemoryFree(key.pData);
  }
740

H
Haojun Liao 已提交
741
  taosArrayDestroy(pInfo->pGroupColVals);
H
Haojun Liao 已提交
742
  taosMemoryFree(pInfo->keyBuf);
743
  taosArrayDestroy(pInfo->sortedGroupArray);
wmmhello's avatar
wmmhello 已提交
744
  taosHashCleanup(pInfo->pGroupSet);
H
Haojun Liao 已提交
745
  taosMemoryFree(pInfo->columnOffset);
746

747
  cleanupExprSupp(&pInfo->scalarSup);
H
Haojun Liao 已提交
748
  destroyDiskbasedBuf(pInfo->pBuf);
D
dapan1121 已提交
749
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
750 751
}

752 753
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode,
                                           SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
754
  SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo));
755
  SOperatorInfo*          pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
756
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
757 758
    goto _error;
  }
759

760 761
  SSDataBlock* pResBlock = createResDataBlock(pPartNode->node.pOutputDataBlockDesc);

762
  int32_t    numOfCols = 0;
763 764 765 766 767
  SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols);

  pInfo->pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys);

  if (pPartNode->pExprs != NULL) {
768
    int32_t    num = 0;
769
    SExprInfo* pExprInfo1 = createExprInfo(pPartNode->pExprs, NULL, &num);
770
    int32_t    code = initExprSupp(&pInfo->scalarSup, pExprInfo1, num);
771 772 773
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
774
  }
H
Haojun Liao 已提交
775 776 777 778 779 780 781

  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
  if (pInfo->pGroupSet == NULL) {
    goto _error;
  }

782
  uint32_t defaultPgsz = 0;
783
  uint32_t defaultBufsz = 0;
784
  getBufferPgSize(pResBlock->info.rowSize, &defaultPgsz, &defaultBufsz);
785

wafwerar's avatar
wafwerar 已提交
786 787 788 789 790 791 792
  if (!osTempSpaceAvailable()) {
    terrno = TSDB_CODE_NO_AVAIL_DISK;
    pTaskInfo->code = terrno;
    qError("Create partition operator info failed since %s", terrstr(terrno));
    goto _error;
  }
  int32_t code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, tsTempDir);
H
Haojun Liao 已提交
793 794 795 796
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

797 798 799
  pInfo->rowCapacity = blockDataGetCapacityInRow(pResBlock, getBufPageSize(pInfo->pBuf));
  pInfo->columnOffset = setupColumnOffset(pResBlock, pInfo->rowCapacity);
  code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
H
Haojun Liao 已提交
800 801 802
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
803

804 805 806
  pOperator->name = "PartitionOperator";
  pOperator->blocking = true;
  pOperator->status = OP_NOT_OPENED;
H
Haojun Liao 已提交
807
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
808 809 810 811 812
  pInfo->binfo.pRes = pResBlock;
  pOperator->exprSupp.numOfExprs = numOfCols;
  pOperator->exprSupp.pExprInfo = pExprInfo;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
813 814 815

  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, NULL, destroyPartitionOperatorInfo,
                                         NULL, NULL, NULL);
H
Haojun Liao 已提交
816

H
Haojun Liao 已提交
817
  code = appendDownstream(pOperator, &downstream, 1);
818 819
  return pOperator;

820
_error:
H
Haojun Liao 已提交
821
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
822 823
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
824
  return NULL;
825 826
}

827 828 829
int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData,
                                int16_t bytes, uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup) {
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
830
  SResultRowInfo* pResultRowInfo = &binfo->resultRowInfo;
831
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
832 833 834 835 836

  SResultRow* pResultRow =
      doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo, false, pAggSup);
  assert(pResultRow != NULL);

837
  setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset);
838
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
839
}
840 841 842

uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId) {
  if (pExprSup->pExprInfo != NULL) {
843 844
    int32_t code =
        projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
845 846 847 848 849
    if (code != TSDB_CODE_SUCCESS) {
      qError("calaculate group id error, code:%d", code);
    }
  }
  recordNewGroupKeys(pParSup->pGroupCols, pParSup->pGroupColVals, pBlock, rowId);
850
  int32_t  len = buildGroupKeys(pParSup->keyBuf, pParSup->pGroupColVals);
851 852 853 854
  uint64_t groupId = calcGroupId(pParSup->keyBuf, len);
  return groupId;
}

855
static bool hasRemainPartion(SStreamPartitionOperatorInfo* pInfo) { return pInfo->parIte != NULL; }
856 857 858

static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
  SStreamPartitionOperatorInfo* pInfo = pOperator->info;
859
  SSDataBlock*                  pDest = pInfo->binfo.pRes;
860 861 862
  ASSERT(hasRemainPartion(pInfo));
  SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->parIte;
  blockDataCleanup(pDest);
863
  int32_t      rows = taosArrayGetSize(pParInfo->rowIds);
864 865 866 867
  SSDataBlock* pSrc = pInfo->pInputDataBlock;
  for (int32_t i = 0; i < rows; i++) {
    int32_t rowIndex = *(int32_t*)taosArrayGet(pParInfo->rowIds, i);
    for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; j++) {
868
      int32_t          slotId = pOperator->exprSupp.pExprInfo[j].base.pParam[0].pCol->slotId;
869 870
      SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, slotId);
      SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, j);
871 872
      bool             isNull = colDataIsNull(pSrcCol, pSrc->info.rows, rowIndex, NULL);
      char*            pSrcData = colDataGetData(pSrcCol, rowIndex);
873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889
      colDataAppend(pDestCol, pDest->info.rows, pSrcData, isNull);
    }
    pDest->info.rows++;
  }
  blockDataUpdateTsWindow(pDest, pInfo->tsColIndex);
  pDest->info.groupId = pParInfo->groupId;
  pOperator->resultInfo.totalRows += pDest->info.rows;
  pInfo->parIte = taosHashIterate(pInfo->pPartitions, pInfo->parIte);
  ASSERT(pDest->info.rows > 0);
  printDataBlock(pDest, "stream partitionby");
  return pDest;
}

static void doStreamHashPartitionImpl(SStreamPartitionOperatorInfo* pInfo, SSDataBlock* pBlock) {
  pInfo->pInputDataBlock = pBlock;
  for (int32_t i = 0; i < pBlock->info.rows; ++i) {
    recordNewGroupKeys(pInfo->partitionSup.pGroupCols, pInfo->partitionSup.pGroupColVals, pBlock, i);
890 891 892
    int32_t             keyLen = buildGroupKeys(pInfo->partitionSup.keyBuf, pInfo->partitionSup.pGroupColVals);
    SPartitionDataInfo* pParData =
        (SPartitionDataInfo*)taosHashGet(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen);
893 894 895 896 897 898 899
    if (pParData) {
      taosArrayPush(pParData->rowIds, &i);
    } else {
      SPartitionDataInfo newParData = {0};
      newParData.groupId = calcGroupId(pInfo->partitionSup.keyBuf, keyLen);
      newParData.rowIds = taosArrayInit(64, sizeof(int32_t));
      taosArrayPush(newParData.rowIds, &i);
900
      taosHashPut(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen, &newParData, sizeof(SPartitionDataInfo));
901 902 903 904 905 906 907 908 909
    }
  }
}

static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

910
  SExecTaskInfo*                pTaskInfo = pOperator->pTaskInfo;
911 912 913 914 915
  SStreamPartitionOperatorInfo* pInfo = pOperator->info;
  if (hasRemainPartion(pInfo)) {
    return buildStreamPartitionResult(pOperator);
  }

916
  int64_t        st = taosGetTimestampUs();
917 918 919 920 921 922 923 924 925 926 927 928 929 930 931
  SOperatorInfo* downstream = pOperator->pDownstream[0];
  {
    pInfo->pInputDataBlock = NULL;
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
    if (pBlock == NULL) {
      doSetOperatorCompleted(pOperator);
      return NULL;
    }
    printDataBlock(pBlock, "stream partitionby recv");
    switch (pBlock->info.type) {
      case STREAM_NORMAL:
      case STREAM_PULL_DATA:
      case STREAM_INVALID:
        pInfo->binfo.pRes->info.type = pBlock->info.type;
        break;
932 933 934
      case STREAM_DELETE_DATA: {
        copyDataBlock(pInfo->pDelRes, pBlock);
        pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
935
        return pInfo->pDelRes;
936
      } break;
937 938 939 940 941 942
      default:
        return pBlock;
    }

    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
    if (pInfo->scalarSup.pExprInfo != NULL) {
943 944
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
                                              pInfo->scalarSup.numOfExprs, NULL);
945 946 947 948 949 950 951 952
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
        longjmp(pTaskInfo->env, pTaskInfo->code);
      }
    }
    taosHashClear(pInfo->pPartitions);
    doStreamHashPartitionImpl(pInfo, pBlock);
  }
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
953

954 955 956 957 958 959 960 961 962
  pInfo->parIte = taosHashIterate(pInfo->pPartitions, NULL);
  return buildStreamPartitionResult(pOperator);
}

static void destroyStreamPartitionOperatorInfo(void* param) {
  SStreamPartitionOperatorInfo* pInfo = (SStreamPartitionOperatorInfo*)param;
  cleanupBasicInfo(&pInfo->binfo);
  taosArrayDestroy(pInfo->partitionSup.pGroupCols);

963
  for (int i = 0; i < taosArrayGetSize(pInfo->partitionSup.pGroupColVals); i++) {
964 965 966 967 968 969 970
    SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->partitionSup.pGroupColVals, i);
    taosMemoryFree(key.pData);
  }
  taosArrayDestroy(pInfo->partitionSup.pGroupColVals);

  taosMemoryFree(pInfo->partitionSup.keyBuf);
  cleanupExprSupp(&pInfo->scalarSup);
971
  blockDataDestroy(pInfo->pDelRes);
972 973 974 975 976 977 978 979 980 981 982 983
  taosMemoryFreeClear(param);
}

void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup, SExprSupp* pExpr) {
  if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    return;
  }
  SStreamScanInfo* pScanInfo = downstream->info;
  pScanInfo->partitionSup = *pParSup;
  pScanInfo->pPartScalarSup = pExpr;
}

984 985
SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode,
                                                 SExecTaskInfo* pTaskInfo) {
986 987 988 989 990 991 992 993 994
  SStreamPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamPartitionOperatorInfo));
  SOperatorInfo*                pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
  int32_t code = TSDB_CODE_SUCCESS;
  pInfo->partitionSup.pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys);

  if (pPartNode->pExprs != NULL) {
995
    int32_t    num = 0;
996 997 998 999 1000 1001 1002 1003
    SExprInfo* pCalExprInfo = createExprInfo(pPartNode->pExprs, NULL, &num);
    code = initExprSupp(&pInfo->scalarSup, pCalExprInfo, num);
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }

  int32_t keyLen = 0;
1004 1005
  code = initGroupOptrInfo(&pInfo->partitionSup.pGroupColVals, &keyLen, &pInfo->partitionSup.keyBuf,
                           pInfo->partitionSup.pGroupCols);
1006 1007 1008 1009 1010 1011 1012 1013 1014 1015
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
  pInfo->partitionSup.needCalc = true;

  SSDataBlock* pResBlock = createResDataBlock(pPartNode->node.pOutputDataBlockDesc);
  if (!pResBlock) {
    goto _error;
  }
  blockDataEnsureCapacity(pResBlock, 4096);
1016 1017 1018
  pInfo->binfo.pRes = pResBlock;
  pInfo->parIte = NULL;
  pInfo->pInputDataBlock = NULL;
1019
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
1020 1021 1022
  pInfo->pPartitions = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
  pInfo->tsColIndex = 0;
  pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
1023

1024
  int32_t    numOfCols = 0;
1025 1026
  SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols);

1027 1028 1029
  pOperator->name = "StreamPartitionOperator";
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
1030
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION;
1031 1032 1033 1034 1035 1036
  pOperator->exprSupp.numOfExprs = numOfCols;
  pOperator->exprSupp.pExprInfo = pExprInfo;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamHashPartition, NULL, NULL,
                                         destroyStreamPartitionOperatorInfo, NULL, NULL, NULL);
1037 1038 1039 1040 1041

  initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup);
  code = appendDownstream(pOperator, &downstream, 1);
  return pOperator;

1042
_error:
1043
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
1044
  destroyStreamPartitionOperatorInfo(pInfo);
1045 1046 1047
  taosMemoryFreeClear(pOperator);
  return NULL;
}