groupoperator.c 41.6 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "filter.h"
H
Haojun Liao 已提交
17
#include "function.h"
18
#include "os.h"
H
Haojun Liao 已提交
19 20 21 22 23
#include "tname.h"

#include "tdatablock.h"
#include "tmsg.h"

24
#include "executorInt.h"
H
Haojun Liao 已提交
25 26 27 28 29
#include "executorimpl.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"

H
Haojun Liao 已提交
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
typedef struct SGroupbyOperatorInfo {
  SOptrBasicInfo binfo;
  SAggSupporter  aggSup;
  SArray*        pGroupCols;     // group by columns, SArray<SColumn>
  SArray*        pGroupColVals;  // current group column values, SArray<SGroupKeys>
  bool           isInit;         // denote if current val is initialized or not
  char*          keyBuf;         // group by keys for hash
  int32_t        groupKeyLen;    // total group by column width
  SGroupResInfo  groupResInfo;
  SExprSupp      scalarSup;
} SGroupbyOperatorInfo;

// The sort in partition may be needed later.
typedef struct SPartitionOperatorInfo {
  SOptrBasicInfo binfo;
  SArray*        pGroupCols;
  SArray*        pGroupColVals;  // current group column values, SArray<SGroupKeys>
  char*          keyBuf;         // group by keys for hash
  int32_t        groupKeyLen;    // total group by column width
  SHashObj*      pGroupSet;      // quick locate the window object for each result

  SDiskbasedBuf* pBuf;              // query result buffer based on blocked-wised disk file
  int32_t        rowCapacity;       // maximum number of rows for each buffer page
  int32_t*       columnOffset;      // start position for each column data
  SArray*        sortedGroupArray;  // SDataGroupInfo sorted by group id
  int32_t        groupIndex;        // group index
  int32_t        pageIndex;         // page index of current group
  SExprSupp      scalarSup;
} SPartitionOperatorInfo;

60
static void*    getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len);
H
Haojun Liao 已提交
61
static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity);
62 63
static int32_t  setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData,
                                        int16_t bytes, uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup);
H
Haojun Liao 已提交
64
static SArray*  extractColumnInfo(SNodeList* pNodeList);
H
Haojun Liao 已提交
65

H
Haojun Liao 已提交
66
static void freeGroupKey(void* param) {
67
  SGroupKeys* pKey = (SGroupKeys*)param;
H
Haojun Liao 已提交
68 69 70
  taosMemoryFree(pKey->pData);
}

71
static void destroyGroupOperatorInfo(void* param) {
H
Haojun Liao 已提交
72
  SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
73 74 75 76
  if (pInfo == NULL) {
    return;
  }

77
  cleanupBasicInfo(&pInfo->binfo);
H
Haojun Liao 已提交
78 79
  taosMemoryFreeClear(pInfo->keyBuf);
  taosArrayDestroy(pInfo->pGroupCols);
H
Haojun Liao 已提交
80
  taosArrayDestroyEx(pInfo->pGroupColVals, freeGroupKey);
81
  cleanupExprSupp(&pInfo->scalarSup);
H
Haojun Liao 已提交
82 83 84

  cleanupGroupResInfo(&pInfo->groupResInfo);
  cleanupAggSup(&pInfo->aggSup);
D
dapan1121 已提交
85
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
86 87
}

wmmhello's avatar
wmmhello 已提交
88
static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
H
Haojun Liao 已提交
89 90
  *pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys));
  if ((*pGroupColVals) == NULL) {
H
Haojun Liao 已提交
91 92 93 94 95
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t numOfGroupCols = taosArrayGetSize(pGroupColList);
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
5
54liuyao 已提交
96
    SColumn* pCol = (SColumn*)taosArrayGet(pGroupColList, i);
97
    (*keyLen) += pCol->bytes;  // actual data + null_flag
H
Haojun Liao 已提交
98

99
    SGroupKeys key = {0};
100 101
    key.bytes = pCol->bytes;
    key.type = pCol->type;
H
Haojun Liao 已提交
102
    key.isNull = false;
103
    key.pData = taosMemoryCalloc(1, pCol->bytes);
H
Haojun Liao 已提交
104 105 106 107
    if (key.pData == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

H
Haojun Liao 已提交
108
    taosArrayPush((*pGroupColVals), &key);
H
Haojun Liao 已提交
109 110 111
  }

  int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
112
  (*keyLen) += nullFlagSize;
H
Haojun Liao 已提交
113

114
  (*keyBuf) = taosMemoryCalloc(1, (*keyLen));
H
Haojun Liao 已提交
115
  if ((*keyBuf) == NULL) {
H
Haojun Liao 已提交
116 117 118 119 120 121
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  return TSDB_CODE_SUCCESS;
}

122 123
static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex,
                            int32_t numOfGroupCols) {
H
Haojun Liao 已提交
124 125
  SColumnDataAgg* pColAgg = NULL;
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
H
Haojun Liao 已提交
126
    SColumn*         pCol = taosArrayGet(pGroupCols, i);
H
Haojun Liao 已提交
127 128
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
    if (pBlock->pBlockAgg != NULL) {
129
      pColAgg = pBlock->pBlockAgg[pCol->slotId];  // TODO is agg data matched?
H
Haojun Liao 已提交
130 131 132 133
    }

    bool isNull = colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg);

H
Haojun Liao 已提交
134
    SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
H
Haojun Liao 已提交
135 136 137 138 139 140 141 142 143 144
    if (pkey->isNull && isNull) {
      continue;
    }

    if (isNull || pkey->isNull) {
      return false;
    }

    char* val = colDataGetData(pColInfoData, rowIndex);

wmmhello's avatar
wmmhello 已提交
145 146 147
    if (pkey->type == TSDB_DATA_TYPE_JSON) {
      int32_t dataLen = getJsonValueLen(val);

148
      if (memcmp(pkey->pData, val, dataLen) == 0) {
wmmhello's avatar
wmmhello 已提交
149 150 151 152 153
        continue;
      } else {
        return false;
      }
    } else if (IS_VAR_DATA_TYPE(pkey->type)) {
H
Haojun Liao 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
      int32_t len = varDataLen(val);
      if (len == varDataLen(pkey->pData) && memcmp(varDataVal(pkey->pData), varDataVal(val), len) == 0) {
        continue;
      } else {
        return false;
      }
    } else {
      if (memcmp(pkey->pData, val, pkey->bytes) != 0) {
        return false;
      }
    }
  }

  return true;
}

wmmhello's avatar
wmmhello 已提交
170
static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex) {
H
Haojun Liao 已提交
171 172
  SColumnDataAgg* pColAgg = NULL;

173 174
  size_t numOfGroupCols = taosArrayGetSize(pGroupCols);

H
Haojun Liao 已提交
175
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
H
Haojun Liao 已提交
176
    SColumn*         pCol = taosArrayGet(pGroupCols, i);
H
Haojun Liao 已提交
177 178 179
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);

    if (pBlock->pBlockAgg != NULL) {
180
      pColAgg = pBlock->pBlockAgg[pCol->slotId];  // TODO is agg data matched?
H
Haojun Liao 已提交
181 182
    }

H
Haojun Liao 已提交
183
    SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
H
Haojun Liao 已提交
184 185 186
    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
      pkey->isNull = true;
    } else {
187
      pkey->isNull = false;
H
Haojun Liao 已提交
188
      char* val = colDataGetData(pColInfoData, rowIndex);
wmmhello's avatar
wmmhello 已提交
189
      if (pkey->type == TSDB_DATA_TYPE_JSON) {
190
        if (tTagIsJson(val)) {
wmmhello's avatar
wmmhello 已提交
191 192 193
          terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
          return;
        }
wmmhello's avatar
wmmhello 已提交
194 195 196
        int32_t dataLen = getJsonValueLen(val);
        memcpy(pkey->pData, val, dataLen);
      } else if (IS_VAR_DATA_TYPE(pkey->type)) {
H
Haojun Liao 已提交
197
        memcpy(pkey->pData, val, varDataTLen(val));
198
        ASSERT(varDataTLen(val) <= pkey->bytes);
H
Haojun Liao 已提交
199 200 201 202 203 204 205
      } else {
        memcpy(pkey->pData, val, pkey->bytes);
      }
    }
  }
}

wmmhello's avatar
wmmhello 已提交
206
static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
H
Haojun Liao 已提交
207 208 209 210 211 212 213 214 215 216 217 218 219
  ASSERT(pKey != NULL);
  size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);

  char* isNull = (char*)pKey;
  char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols;
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
    SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
    if (pkey->isNull) {
      isNull[i] = 1;
      continue;
    }

    isNull[i] = 0;
wmmhello's avatar
wmmhello 已提交
220 221 222 223 224
    if (pkey->type == TSDB_DATA_TYPE_JSON) {
      int32_t dataLen = getJsonValueLen(pkey->pData);
      memcpy(pStart, (pkey->pData), dataLen);
      pStart += dataLen;
    } else if (IS_VAR_DATA_TYPE(pkey->type)) {
H
Haojun Liao 已提交
225 226 227 228 229 230 231 232 233
      varDataCopy(pStart, pkey->pData);
      pStart += varDataTLen(pkey->pData);
      ASSERT(varDataTLen(pkey->pData) <= pkey->bytes);
    } else {
      memcpy(pStart, pkey->pData, pkey->bytes);
      pStart += pkey->bytes;
    }
  }

234
  return (int32_t)(pStart - (char*)pKey);
H
Haojun Liao 已提交
235 236 237 238 239
}

// assign the group keys or user input constant values if required
static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t totalRows, int32_t rowIndex) {
  for (int32_t i = 0; i < numOfOutput; ++i) {
240
    if (pCtx[i].functionId == -1) {  // select count(*),key from t group by key.
H
Haojun Liao 已提交
241 242 243
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[i]);

      SColumnInfoData* pColInfoData = pCtx[i].input.pData[0];
244
      // todo OPT all/all not NULL
H
Haojun Liao 已提交
245 246 247 248
      if (!colDataIsNull(pColInfoData, totalRows, rowIndex, NULL)) {
        char* dest = GET_ROWCELL_INTERBUF(pEntryInfo);
        char* data = colDataGetData(pColInfoData, rowIndex);

wmmhello's avatar
wmmhello 已提交
249 250 251 252
        if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
          int32_t dataLen = getJsonValueLen(data);
          memcpy(dest, data, dataLen);
        } else if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
253 254 255 256
          varDataCopy(dest, data);
        } else {
          memcpy(dest, data, pColInfoData->info.bytes);
        }
257
      } else {  // it is a NULL value
H
Haojun Liao 已提交
258
        pEntryInfo->isNullRes = 1;
H
Haojun Liao 已提交
259
      }
H
Haojun Liao 已提交
260 261

      pEntryInfo->numOfRes = 1;
H
Haojun Liao 已提交
262 263 264 265 266 267 268 269
    }
  }
}

static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
  SExecTaskInfo*        pTaskInfo = pOperator->pTaskInfo;
  SGroupbyOperatorInfo* pInfo = pOperator->info;

270
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
H
Haojun Liao 已提交
271 272 273 274 275 276 277 278 279
  int32_t         numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols);
  //  if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
  // qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_TASKID(pRuntimeEnv));
  //    return;
  //  }

  int32_t     len = 0;
  STimeWindow w = TSWINDOW_INITIALIZER;

wmmhello's avatar
wmmhello 已提交
280
  terrno = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
281 282 283 284
  int32_t num = 0;
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
    // Compare with the previous row of this column, and do not set the output buffer again if they are identical.
    if (!pInfo->isInit) {
285
      recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
wmmhello's avatar
wmmhello 已提交
286
      if (terrno != TSDB_CODE_SUCCESS) {  // group by json error
287
        T_LONG_JMP(pTaskInfo->env, terrno);
wmmhello's avatar
wmmhello 已提交
288
      }
H
Haojun Liao 已提交
289 290 291 292 293
      pInfo->isInit = true;
      num++;
      continue;
    }

H
Haojun Liao 已提交
294
    bool equal = groupKeyCompare(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
H
Haojun Liao 已提交
295 296 297 298 299
    if (equal) {
      num++;
      continue;
    }

H
Haojun Liao 已提交
300
    // The first row of a new block does not belongs to the previous existed group
301
    if (j == 0) {
H
Haojun Liao 已提交
302
      num++;
303
      recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
wmmhello's avatar
wmmhello 已提交
304
      if (terrno != TSDB_CODE_SUCCESS) {  // group by json error
305
        T_LONG_JMP(pTaskInfo->env, terrno);
wmmhello's avatar
wmmhello 已提交
306
      }
H
Haojun Liao 已提交
307 308 309
      continue;
    }

H
Haojun Liao 已提交
310
    len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
311 312
    int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf,
                                          len, pBlock->info.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
H
Haojun Liao 已提交
313
    if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
314
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
H
Haojun Liao 已提交
315 316 317
    }

    int32_t rowIndex = j - num;
318
    doApplyFunctions(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs);
H
Haojun Liao 已提交
319 320

    // assign the group keys or user input constant values if required
321
    doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
322
    recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
H
Haojun Liao 已提交
323 324 325 326
    num = 1;
  }

  if (num > 0) {
H
Haojun Liao 已提交
327
    len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
328 329
    int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf,
                                          len, pBlock->info.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
H
Haojun Liao 已提交
330
    if (ret != TSDB_CODE_SUCCESS) {
331
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
H
Haojun Liao 已提交
332 333 334
    }

    int32_t rowIndex = pBlock->info.rows - num;
335
    doApplyFunctions(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs);
336
    doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
H
Haojun Liao 已提交
337 338 339
  }
}

340 341 342 343
static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
  SGroupbyOperatorInfo* pInfo = pOperator->info;

  SSDataBlock* pRes = pInfo->binfo.pRes;
344
  while (1) {
345
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
H
Haojun Liao 已提交
346
    doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
347

348
    if (!hasRemainResults(&pInfo->groupResInfo)) {
H
Haojun Liao 已提交
349
      setOperatorCompleted(pOperator);
350 351 352 353 354 355 356 357 358
      break;
    }

    if (pRes->info.rows > 0) {
      break;
    }
  }

  pOperator->resultInfo.totalRows += pRes->info.rows;
359
  return (pRes->info.rows == 0) ? NULL : pRes;
360 361
}

362
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
363 364 365 366
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

367 368
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

H
Haojun Liao 已提交
369 370
  SGroupbyOperatorInfo* pInfo = pOperator->info;
  if (pOperator->status == OP_RES_TO_RETURN) {
371
    return buildGroupResultDataBlock(pOperator);
H
Haojun Liao 已提交
372 373
  }

374 375 376
  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;

377
  int64_t        st = taosGetTimestampUs();
H
Haojun Liao 已提交
378 379 380
  SOperatorInfo* downstream = pOperator->pDownstream[0];

  while (1) {
381
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
H
Haojun Liao 已提交
382 383 384 385
    if (pBlock == NULL) {
      break;
    }

386 387
    int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
    if (code != TSDB_CODE_SUCCESS) {
388
      T_LONG_JMP(pTaskInfo->env, code);
389 390
    }

H
Haojun Liao 已提交
391
    // the pDataBlock are always the same one, no need to call this again
392
    setInputDataBlock(&pOperator->exprSupp, pBlock, order, scanFlag, true);
393

394
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
395
    if (pInfo->scalarSup.pExprInfo != NULL) {
396 397
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
                                              pInfo->scalarSup.numOfExprs, NULL);
398
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
399
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
400
      }
401 402
    }

H
Haojun Liao 已提交
403 404 405 406
    doHashGroupbyAgg(pOperator, pBlock);
  }

  pOperator->status = OP_RES_TO_RETURN;
H
Haojun Liao 已提交
407

408 409 410 411 412 413 414 415 416 417 418 419 420 421
#if 0
  if(pOperator->fpSet.encodeResultRow){
    char *result = NULL;
    int32_t length = 0;
    pOperator->fpSet.encodeResultRow(pOperator, &result, &length);
    SAggSupporter* pSup = &pInfo->aggSup;
    taosHashClear(pSup->pResultRowHashTable);
    pInfo->binfo.resultRowInfo.size = 0;
    pOperator->fpSet.decodeResultRow(pOperator, result);
    if(result){
      taosMemoryFree(result);
    }
  }
#endif
422
  initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0);
423

424
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
425
  return buildGroupResultDataBlock(pOperator);
H
Haojun Liao 已提交
426 427
}

5
54liuyao 已提交
428
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
429 430 431 432 433 434
  SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo));
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

H
Haojun Liao 已提交
435 436 437 438 439 440 441 442 443 444
  SSDataBlock* pResBlock = createResDataBlock(pAggNode->node.pOutputDataBlockDesc);
  initBasicInfo(&pInfo->binfo, pResBlock);

  int32_t    numOfScalarExpr = 0;
  SExprInfo* pScalarExprInfo = NULL;
  if (pAggNode->pExprs != NULL) {
    pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
  }

  pInfo->pGroupCols = extractColumnInfo(pAggNode->pGroupKeys);
445 446 447 448
  int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
449

H
Haojun Liao 已提交
450
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
451 452
  blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);

H
Haojun Liao 已提交
453
  code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
H
Haojun Liao 已提交
454 455 456 457
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
458 459
  int32_t    num = 0;
  SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
H
Haojun Liao 已提交
460
  code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str);
461 462 463 464
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

465 466 467 468 469
  code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

470
  initResultRowInfo(&pInfo->binfo.resultRowInfo);
471
  setOperatorInfo(pOperator, "GroupbyAggOperator", 0, true, OP_NOT_OPENED, pInfo, pTaskInfo);
H
Haojun Liao 已提交
472

5
54liuyao 已提交
473
  pOperator->fpSet =
H
Haojun Liao 已提交
474
      createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, destroyGroupOperatorInfo, NULL);
H
Haojun Liao 已提交
475
  code = appendDownstream(pOperator, &downstream, 1);
476 477 478 479
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
480 481
  return pOperator;

482
_error:
H
Haojun Liao 已提交
483
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
484 485 486
  if (pInfo != NULL) {
    destroyGroupOperatorInfo(pInfo);
  }
H
Haojun Liao 已提交
487 488
  taosMemoryFreeClear(pOperator);
  return NULL;
489 490
}

H
Haojun Liao 已提交
491 492 493 494
static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
  SPartitionOperatorInfo* pInfo = pOperator->info;

  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
495
    recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
H
Haojun Liao 已提交
496 497
    int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);

498
    SDataGroupInfo* pGroupInfo = NULL;
499
    void*           pPage = getCurrentDataGroupInfo(pInfo, &pGroupInfo, len);
H
Haojun Liao 已提交
500

501 502 503 504 505
    pGroupInfo->numOfRows += 1;

    // group id
    if (pGroupInfo->groupId == 0) {
      pGroupInfo->groupId = calcGroupId(pInfo->keyBuf, len);
H
Haojun Liao 已提交
506 507
    }

508
    // number of rows
509
    int32_t* rows = (int32_t*)pPage;
H
Haojun Liao 已提交
510

511
    size_t numOfCols = pOperator->exprSupp.numOfExprs;
512
    for (int32_t i = 0; i < numOfCols; ++i) {
513
      SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i];
514
      int32_t    slotId = pExpr->base.pParam[0].pCol->slotId;
515 516

      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
H
Haojun Liao 已提交
517

H
Haojun Liao 已提交
518 519
      int32_t bytes = pColInfoData->info.bytes;
      int32_t startOffset = pInfo->columnOffset[i];
H
Haojun Liao 已提交
520

521
      int32_t* columnLen = NULL;
522
      int32_t  contentLen = 0;
H
Haojun Liao 已提交
523 524

      if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
wafwerar's avatar
wafwerar 已提交
525
        int32_t* offset = (int32_t*)((char*)pPage + startOffset);
526 527
        columnLen = (int32_t*)((char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity);
        char* data = (char*)((char*)columnLen + sizeof(int32_t));
H
Haojun Liao 已提交
528 529 530 531

        if (colDataIsNull_s(pColInfoData, j)) {
          offset[(*rows)] = -1;
          contentLen = 0;
532
        } else if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
wmmhello's avatar
wmmhello 已提交
533
          offset[*rows] = (*columnLen);
534
          char*   src = colDataGetData(pColInfoData, j);
wmmhello's avatar
wmmhello 已提交
535 536 537 538 539 540 541
          int32_t dataLen = getJsonValueLen(src);

          memcpy(data + (*columnLen), src, dataLen);
          int32_t v = (data + (*columnLen) + dataLen - (char*)pPage);
          ASSERT(v > 0);

          contentLen = dataLen;
H
Haojun Liao 已提交
542 543 544 545
        } else {
          offset[*rows] = (*columnLen);
          char* src = colDataGetData(pColInfoData, j);
          memcpy(data + (*columnLen), src, varDataTLen(src));
546 547 548
          int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage);
          ASSERT(v > 0);

H
Haojun Liao 已提交
549 550
          contentLen = varDataTLen(src);
        }
H
Haojun Liao 已提交
551
      } else {
wafwerar's avatar
wafwerar 已提交
552
        char* bitmap = (char*)pPage + startOffset;
553 554
        columnLen = (int32_t*)((char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity));
        char* data = (char*)columnLen + sizeof(int32_t);
H
Haojun Liao 已提交
555 556 557

        bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j);
        if (isNull) {
H
Haojun Liao 已提交
558
          colDataSetNull_f(bitmap, (*rows));
H
Haojun Liao 已提交
559
        } else {
H
Haojun Liao 已提交
560
          memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes);
H
Haojun Liao 已提交
561
          ASSERT((data + (*columnLen) + bytes - (char*)pPage) <= getBufPageSize(pInfo->pBuf));
H
Haojun Liao 已提交
562
        }
H
Haojun Liao 已提交
563
        contentLen = bytes;
H
Haojun Liao 已提交
564
      }
H
Haojun Liao 已提交
565 566

      (*columnLen) += contentLen;
567
      ASSERT(*columnLen >= 0);
H
Haojun Liao 已提交
568 569
    }

H
Haojun Liao 已提交
570 571
    (*rows) += 1;

H
Haojun Liao 已提交
572 573 574
    setBufPageDirty(pPage, true);
    releaseBufPage(pInfo->pBuf, pPage);
  }
H
Haojun Liao 已提交
575 576 577 578 579 580
}

void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len) {
  SDataGroupInfo* p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);

  void* pPage = NULL;
581
  if (p == NULL) {  // it is a new group
H
Haojun Liao 已提交
582 583 584 585 586 587 588
    SDataGroupInfo gi = {0};
    gi.pPageList = taosArrayInit(100, sizeof(int32_t));
    taosHashPut(pInfo->pGroupSet, pInfo->keyBuf, len, &gi, sizeof(SDataGroupInfo));

    p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);

    int32_t pageId = 0;
589
    pPage = getNewBufPage(pInfo->pBuf, &pageId);
H
Haojun Liao 已提交
590 591
    taosArrayPush(p->pPageList, &pageId);

592
    *(int32_t*)pPage = 0;
H
Haojun Liao 已提交
593 594 595 596
  } else {
    int32_t* curId = taosArrayGetLast(p->pPageList);
    pPage = getBufPage(pInfo->pBuf, *curId);

597
    int32_t* rows = (int32_t*)pPage;
H
Haojun Liao 已提交
598
    if (*rows >= pInfo->rowCapacity) {
599 600 601
      // release buffer
      releaseBufPage(pInfo->pBuf, pPage);

H
Haojun Liao 已提交
602 603
      // add a new page for current group
      int32_t pageId = 0;
604
      pPage = getNewBufPage(pInfo->pBuf, &pageId);
H
Haojun Liao 已提交
605
      taosArrayPush(p->pPageList, &pageId);
606
      memset(pPage, 0, getBufPageSize(pInfo->pBuf));
H
Haojun Liao 已提交
607 608
    }
  }
H
Haojun Liao 已提交
609

H
Haojun Liao 已提交
610 611 612 613 614 615 616 617 618 619 620 621 622 623
  *pGroupInfo = p;
  return pPage;
}

uint64_t calcGroupId(char* pData, int32_t len) {
  T_MD5_CTX context;
  tMD5Init(&context);
  tMD5Update(&context, (uint8_t*)pData, len);
  tMD5Final(&context);

  // NOTE: only extract the initial 8 bytes of the final MD5 digest
  uint64_t id = 0;
  memcpy(&id, context.digest, sizeof(uint64_t));
  return id;
H
Haojun Liao 已提交
624 625
}

H
Haojun Liao 已提交
626
int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) {
627
  size_t   numOfCols = taosArrayGetSize(pBlock->pDataBlock);
628
  int32_t* offset = taosMemoryCalloc(numOfCols, sizeof(int32_t));
H
Haojun Liao 已提交
629

630 631
  offset[0] = sizeof(int32_t) +
              sizeof(uint64_t);  // the number of rows in current page, ref to SSDataBlock paged serialization format
H
Haojun Liao 已提交
632

633
  for (int32_t i = 0; i < numOfCols - 1; ++i) {
H
Haojun Liao 已提交
634 635 636 637
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);

    int32_t bytes = pColInfoData->info.bytes;
    int32_t payloadLen = bytes * rowCapacity;
638

H
Haojun Liao 已提交
639 640 641 642 643 644 645 646 647 648 649 650
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
      // offset segment + content length + payload
      offset[i + 1] = rowCapacity * sizeof(int32_t) + sizeof(int32_t) + payloadLen + offset[i];
    } else {
      // bitmap + content length + payload
      offset[i + 1] = BitmapLen(rowCapacity) + sizeof(int32_t) + payloadLen + offset[i];
    }
  }

  return offset;
}

5
54liuyao 已提交
651
static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) {
652 653 654 655
  int32_t size = taosArrayGetSize(pInfo->sortedGroupArray);
  for (int32_t i = 0; i < size; i++) {
    SDataGroupInfo* pGp = taosArrayGet(pInfo->sortedGroupArray, i);
    taosArrayDestroy(pGp->pPageList);
5
54liuyao 已提交
656
  }
657
  taosArrayClear(pInfo->sortedGroupArray);
5
54liuyao 已提交
658 659 660
  clearDiskbasedBuf(pInfo->pBuf);
}

661 662 663
static int compareDataGroupInfo(const void* group1, const void* group2) {
  const SDataGroupInfo* pGroupInfo1 = group1;
  const SDataGroupInfo* pGroupInfo2 = group2;
664 665 666 667 668 669

  if (pGroupInfo1->groupId == pGroupInfo2->groupId) {
    ASSERT(0);
    return 0;
  }

670
  return (pGroupInfo1->groupId < pGroupInfo2->groupId) ? -1 : 1;
671 672
}

H
Haojun Liao 已提交
673 674 675
static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
  SPartitionOperatorInfo* pInfo = pOperator->info;

676 677
  SDataGroupInfo* pGroupInfo =
      (pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL;
678
  if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) {
H
Haojun Liao 已提交
679
    // try next group data
680 681
    ++pInfo->groupIndex;
    if (pInfo->groupIndex >= taosArrayGetSize(pInfo->sortedGroupArray)) {
H
Haojun Liao 已提交
682
      setOperatorCompleted(pOperator);
5
54liuyao 已提交
683
      clearPartitionOperator(pInfo);
H
Haojun Liao 已提交
684 685 686
      return NULL;
    }

687
    pGroupInfo = taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex);
H
Haojun Liao 已提交
688 689 690 691
    pInfo->pageIndex = 0;
  }

  int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex);
692
  void*    page = getBufPage(pInfo->pBuf, *pageId);
H
Haojun Liao 已提交
693

694
  blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->rowCapacity);
H
Haojun Liao 已提交
695
  blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity);
H
Haojun Liao 已提交
696 697

  pInfo->pageIndex += 1;
698
  releaseBufPage(pInfo->pBuf, page);
H
Haojun Liao 已提交
699

700
  blockDataUpdateTsWindow(pInfo->binfo.pRes, 0);
H
Haojun Liao 已提交
701
  pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId;
702 703

  pOperator->resultInfo.totalRows += pInfo->binfo.pRes->info.rows;
H
Haojun Liao 已提交
704 705 706
  return pInfo->binfo.pRes;
}

707
static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
708 709
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
710 711
  }

712 713 714
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  SPartitionOperatorInfo* pInfo = pOperator->info;
715
  SSDataBlock*            pRes = pInfo->binfo.pRes;
716

H
Haojun Liao 已提交
717
  if (pOperator->status == OP_RES_TO_RETURN) {
H
Haojun Liao 已提交
718 719
    blockDataCleanup(pRes);
    return buildPartitionResult(pOperator);
H
Haojun Liao 已提交
720 721
  }

722
  int64_t        st = taosGetTimestampUs();
H
Haojun Liao 已提交
723
  SOperatorInfo* downstream = pOperator->pDownstream[0];
H
Haojun Liao 已提交
724

H
Haojun Liao 已提交
725
  while (1) {
726
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
H
Haojun Liao 已提交
727 728 729
    if (pBlock == NULL) {
      break;
    }
H
Haojun Liao 已提交
730

731
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
732
    if (pInfo->scalarSup.pExprInfo != NULL) {
733 734
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
                                              pInfo->scalarSup.numOfExprs, NULL);
735
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
736
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
737 738 739
      }
    }

wmmhello's avatar
wmmhello 已提交
740
    terrno = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
741
    doHashPartition(pOperator, pBlock);
wmmhello's avatar
wmmhello 已提交
742
    if (terrno != TSDB_CODE_SUCCESS) {  // group by json error
743
      T_LONG_JMP(pTaskInfo->env, terrno);
wmmhello's avatar
wmmhello 已提交
744
    }
H
Haojun Liao 已提交
745 746
  }

747
  SArray* groupArray = taosArrayInit(taosHashGetSize(pInfo->pGroupSet), sizeof(SDataGroupInfo));
748 749

  void* pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL);
750 751 752 753 754 755 756 757 758 759 760
  while (pGroupIter != NULL) {
    SDataGroupInfo* pGroupInfo = pGroupIter;
    taosArrayPush(groupArray, pGroupInfo);
    pGroupIter = taosHashIterate(pInfo->pGroupSet, pGroupIter);
  }

  taosArraySort(groupArray, compareDataGroupInfo);
  pInfo->sortedGroupArray = groupArray;
  pInfo->groupIndex = -1;
  taosHashClear(pInfo->pGroupSet);

761 762
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;

H
Haojun Liao 已提交
763
  pOperator->status = OP_RES_TO_RETURN;
H
Haojun Liao 已提交
764 765 766 767
  blockDataEnsureCapacity(pRes, 4096);
  return buildPartitionResult(pOperator);
}

768
static void destroyPartitionOperatorInfo(void* param) {
H
Haojun Liao 已提交
769
  SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param;
770
  cleanupBasicInfo(&pInfo->binfo);
H
Haojun Liao 已提交
771
  taosArrayDestroy(pInfo->pGroupCols);
772

773
  for (int i = 0; i < taosArrayGetSize(pInfo->pGroupColVals); i++) {
wmmhello's avatar
wmmhello 已提交
774 775 776
    SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->pGroupColVals, i);
    taosMemoryFree(key.pData);
  }
777

H
Haojun Liao 已提交
778
  taosArrayDestroy(pInfo->pGroupColVals);
H
Haojun Liao 已提交
779
  taosMemoryFree(pInfo->keyBuf);
780
  taosArrayDestroy(pInfo->sortedGroupArray);
D
dapan1121 已提交
781 782 783 784 785 786 787 788

  void* pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL);
  while (pGroupIter != NULL) {
    SDataGroupInfo* pGroupInfo = pGroupIter;
    taosArrayDestroy(pGroupInfo->pPageList);
    pGroupIter = taosHashIterate(pInfo->pGroupSet, pGroupIter);
  }

wmmhello's avatar
wmmhello 已提交
789
  taosHashCleanup(pInfo->pGroupSet);
H
Haojun Liao 已提交
790
  taosMemoryFree(pInfo->columnOffset);
791

792
  cleanupExprSupp(&pInfo->scalarSup);
H
Haojun Liao 已提交
793
  destroyDiskbasedBuf(pInfo->pBuf);
D
dapan1121 已提交
794
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
795 796
}

797 798
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode,
                                           SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
799
  SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo));
800
  SOperatorInfo*          pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
801
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
802 803
    goto _error;
  }
804

805
  int32_t    numOfCols = 0;
806 807 808 809
  SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols);
  pInfo->pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys);

  if (pPartNode->pExprs != NULL) {
810
    int32_t    num = 0;
811
    SExprInfo* pExprInfo1 = createExprInfo(pPartNode->pExprs, NULL, &num);
812
    int32_t    code = initExprSupp(&pInfo->scalarSup, pExprInfo1, num);
813 814 815
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
816
  }
H
Haojun Liao 已提交
817 818 819 820 821 822 823

  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
  if (pInfo->pGroupSet == NULL) {
    goto _error;
  }

824
  uint32_t defaultPgsz = 0;
825
  uint32_t defaultBufsz = 0;
H
Haojun Liao 已提交
826

H
Haojun Liao 已提交
827 828
  pInfo->binfo.pRes = createResDataBlock(pPartNode->node.pOutputDataBlockDesc);
  getBufferPgSize(pInfo->binfo.pRes->info.rowSize, &defaultPgsz, &defaultBufsz);
829

wafwerar's avatar
wafwerar 已提交
830 831 832 833 834 835
  if (!osTempSpaceAvailable()) {
    terrno = TSDB_CODE_NO_AVAIL_DISK;
    pTaskInfo->code = terrno;
    qError("Create partition operator info failed since %s", terrstr(terrno));
    goto _error;
  }
H
Haojun Liao 已提交
836

wafwerar's avatar
wafwerar 已提交
837
  int32_t code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, tsTempDir);
H
Haojun Liao 已提交
838 839 840 841
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
842 843
  pInfo->rowCapacity = blockDataGetCapacityInRow(pInfo->binfo.pRes, getBufPageSize(pInfo->pBuf));
  pInfo->columnOffset = setupColumnOffset(pInfo->binfo.pRes, pInfo->rowCapacity);
844
  code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
H
Haojun Liao 已提交
845 846 847
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
848

L
Liu Jicong 已提交
849 850
  setOperatorInfo(pOperator, "PartitionOperator", QUERY_NODE_PHYSICAL_PLAN_PARTITION, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
851 852
  pOperator->exprSupp.numOfExprs = numOfCols;
  pOperator->exprSupp.pExprInfo = pExprInfo;
853

L
Liu Jicong 已提交
854
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, NULL);
H
Haojun Liao 已提交
855

H
Haojun Liao 已提交
856
  code = appendDownstream(pOperator, &downstream, 1);
857 858
  return pOperator;

859
_error:
H
Haojun Liao 已提交
860
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
861 862 863
  if (pInfo != NULL) {
    destroyPartitionOperatorInfo(pInfo);
  }
H
Haojun Liao 已提交
864
  taosMemoryFreeClear(pOperator);
865
  return NULL;
866 867
}

868 869 870
int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData,
                                int16_t bytes, uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup) {
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
871
  SResultRowInfo* pResultRowInfo = &binfo->resultRowInfo;
872
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
873 874 875 876 877

  SResultRow* pResultRow =
      doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo, false, pAggSup);
  assert(pResultRow != NULL);

878
  setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset);
879
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
880
}
881 882 883

uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId) {
  if (pExprSup->pExprInfo != NULL) {
884 885
    int32_t code =
        projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
886 887 888 889 890
    if (code != TSDB_CODE_SUCCESS) {
      qError("calaculate group id error, code:%d", code);
    }
  }
  recordNewGroupKeys(pParSup->pGroupCols, pParSup->pGroupColVals, pBlock, rowId);
891
  int32_t  len = buildGroupKeys(pParSup->keyBuf, pParSup->pGroupColVals);
892 893 894 895
  uint64_t groupId = calcGroupId(pParSup->keyBuf, len);
  return groupId;
}

896
static bool hasRemainPartion(SStreamPartitionOperatorInfo* pInfo) { return pInfo->parIte != NULL; }
897 898 899

static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
  SStreamPartitionOperatorInfo* pInfo = pOperator->info;
900
  SSDataBlock*                  pDest = pInfo->binfo.pRes;
901 902 903
  ASSERT(hasRemainPartion(pInfo));
  SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->parIte;
  blockDataCleanup(pDest);
904
  int32_t      rows = taosArrayGetSize(pParInfo->rowIds);
905 906 907 908
  SSDataBlock* pSrc = pInfo->pInputDataBlock;
  for (int32_t i = 0; i < rows; i++) {
    int32_t rowIndex = *(int32_t*)taosArrayGet(pParInfo->rowIds, i);
    for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; j++) {
909
      int32_t          slotId = pOperator->exprSupp.pExprInfo[j].base.pParam[0].pCol->slotId;
910 911
      SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, slotId);
      SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, j);
912 913
      bool             isNull = colDataIsNull(pSrcCol, pSrc->info.rows, rowIndex, NULL);
      char*            pSrcData = colDataGetData(pSrcCol, rowIndex);
914 915 916
      colDataAppend(pDestCol, pDest->info.rows, pSrcData, isNull);
    }
    pDest->info.rows++;
L
Liu Jicong 已提交
917
    if (pInfo->tbnameCalSup.numOfExprs > 0 && i == 0) {
918 919 920 921 922 923 924 925 926 927 928
      SSDataBlock* pTmpBlock = blockCopyOneRow(pSrc, rowIndex);
      SSDataBlock* pResBlock = createDataBlock();
      pResBlock->info.rowSize = TSDB_TABLE_NAME_LEN;
      SColumnInfoData data = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_TABLE_NAME_LEN, 0);
      taosArrayPush(pResBlock->pDataBlock, &data);
      blockDataEnsureCapacity(pResBlock, 1);
      projectApplyFunctions(pInfo->tbnameCalSup.pExprInfo, pResBlock, pTmpBlock, pInfo->tbnameCalSup.pCtx, 1, NULL);
      ASSERT(pResBlock->info.rows == 1);
      ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1);
      SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0);
      ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR);
929 930
      void* pData = colDataGetVarData(pCol, 0);
      // TODO check tbname validity
931
      if (pData != (void*)-1) {
932
        memset(pDest->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
L
Liu Jicong 已提交
933
        int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
934 935
        memcpy(pDest->info.parTbName, varDataVal(pData), len);
        /*pDest->info.parTbName[len + 1] = 0;*/
936 937 938
      } else {
        pDest->info.parTbName[0] = 0;
      }
L
Liu Jicong 已提交
939 940 941
      if (pParInfo->groupId && pDest->info.parTbName[0]) {
        streamStatePutParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, pDest->info.parTbName);
      }
L
Liu Jicong 已提交
942 943 944
      /*printf("\n\n set name %s\n\n", pDest->info.parTbName);*/
      blockDataDestroy(pTmpBlock);
      blockDataDestroy(pResBlock);
945
    }
946
  }
947 948
  taosArrayDestroy(pParInfo->rowIds);
  pParInfo->rowIds = NULL;
949 950 951 952 953 954 955 956 957 958 959 960 961
  blockDataUpdateTsWindow(pDest, pInfo->tsColIndex);
  pDest->info.groupId = pParInfo->groupId;
  pOperator->resultInfo.totalRows += pDest->info.rows;
  pInfo->parIte = taosHashIterate(pInfo->pPartitions, pInfo->parIte);
  ASSERT(pDest->info.rows > 0);
  printDataBlock(pDest, "stream partitionby");
  return pDest;
}

static void doStreamHashPartitionImpl(SStreamPartitionOperatorInfo* pInfo, SSDataBlock* pBlock) {
  pInfo->pInputDataBlock = pBlock;
  for (int32_t i = 0; i < pBlock->info.rows; ++i) {
    recordNewGroupKeys(pInfo->partitionSup.pGroupCols, pInfo->partitionSup.pGroupColVals, pBlock, i);
962 963 964
    int32_t             keyLen = buildGroupKeys(pInfo->partitionSup.keyBuf, pInfo->partitionSup.pGroupColVals);
    SPartitionDataInfo* pParData =
        (SPartitionDataInfo*)taosHashGet(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen);
965 966 967 968 969 970 971
    if (pParData) {
      taosArrayPush(pParData->rowIds, &i);
    } else {
      SPartitionDataInfo newParData = {0};
      newParData.groupId = calcGroupId(pInfo->partitionSup.keyBuf, keyLen);
      newParData.rowIds = taosArrayInit(64, sizeof(int32_t));
      taosArrayPush(newParData.rowIds, &i);
972
      taosHashPut(pInfo->pPartitions, pInfo->partitionSup.keyBuf, keyLen, &newParData, sizeof(SPartitionDataInfo));
973 974 975 976 977 978 979 980 981
    }
  }
}

static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

982
  SExecTaskInfo*                pTaskInfo = pOperator->pTaskInfo;
983 984 985 986 987
  SStreamPartitionOperatorInfo* pInfo = pOperator->info;
  if (hasRemainPartion(pInfo)) {
    return buildStreamPartitionResult(pOperator);
  }

988
  int64_t        st = taosGetTimestampUs();
989 990 991 992 993
  SOperatorInfo* downstream = pOperator->pDownstream[0];
  {
    pInfo->pInputDataBlock = NULL;
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
    if (pBlock == NULL) {
H
Haojun Liao 已提交
994
      setOperatorCompleted(pOperator);
995 996 997 998 999 1000 1001 1002 1003
      return NULL;
    }
    printDataBlock(pBlock, "stream partitionby recv");
    switch (pBlock->info.type) {
      case STREAM_NORMAL:
      case STREAM_PULL_DATA:
      case STREAM_INVALID:
        pInfo->binfo.pRes->info.type = pBlock->info.type;
        break;
1004 1005 1006
      case STREAM_DELETE_DATA: {
        copyDataBlock(pInfo->pDelRes, pBlock);
        pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
5
54liuyao 已提交
1007
        printDataBlock(pInfo->pDelRes, "stream partitionby delete");
1008
        return pInfo->pDelRes;
1009
      } break;
1010 1011 1012 1013 1014 1015
      default:
        return pBlock;
    }

    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
    if (pInfo->scalarSup.pExprInfo != NULL) {
1016 1017
      pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx,
                                              pInfo->scalarSup.numOfExprs, NULL);
1018 1019 1020 1021 1022 1023 1024 1025
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
        longjmp(pTaskInfo->env, pTaskInfo->code);
      }
    }
    taosHashClear(pInfo->pPartitions);
    doStreamHashPartitionImpl(pInfo, pBlock);
  }
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
1026

1027 1028 1029 1030 1031 1032 1033 1034 1035
  pInfo->parIte = taosHashIterate(pInfo->pPartitions, NULL);
  return buildStreamPartitionResult(pOperator);
}

static void destroyStreamPartitionOperatorInfo(void* param) {
  SStreamPartitionOperatorInfo* pInfo = (SStreamPartitionOperatorInfo*)param;
  cleanupBasicInfo(&pInfo->binfo);
  taosArrayDestroy(pInfo->partitionSup.pGroupCols);

1036
  for (int i = 0; i < taosArrayGetSize(pInfo->partitionSup.pGroupColVals); i++) {
1037 1038 1039 1040 1041 1042 1043
    SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->partitionSup.pGroupColVals, i);
    taosMemoryFree(key.pData);
  }
  taosArrayDestroy(pInfo->partitionSup.pGroupColVals);

  taosMemoryFree(pInfo->partitionSup.keyBuf);
  cleanupExprSupp(&pInfo->scalarSup);
L
Liu Jicong 已提交
1044 1045
  cleanupExprSupp(&pInfo->tbnameCalSup);
  cleanupExprSupp(&pInfo->tagCalSup);
1046
  blockDataDestroy(pInfo->pDelRes);
1047
  taosHashCleanup(pInfo->pPartitions);
1048 1049 1050 1051 1052 1053 1054 1055 1056 1057
  taosMemoryFreeClear(param);
}

void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup, SExprSupp* pExpr) {
  if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    return;
  }
  SStreamScanInfo* pScanInfo = downstream->info;
  pScanInfo->partitionSup = *pParSup;
  pScanInfo->pPartScalarSup = pExpr;
5
54liuyao 已提交
1058 1059 1060
  if (!pScanInfo->pUpdateInfo) {
    pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0);
  }
1061 1062
}

1063 1064
SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode,
                                                 SExecTaskInfo* pTaskInfo) {
1065 1066 1067 1068 1069 1070
  SStreamPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamPartitionOperatorInfo));
  SOperatorInfo*                pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
  int32_t code = TSDB_CODE_SUCCESS;
1071
  pInfo->partitionSup.pGroupCols = extractPartitionColInfo(pPartNode->part.pPartitionKeys);
1072

1073
  if (pPartNode->part.pExprs != NULL) {
1074
    int32_t    num = 0;
1075
    SExprInfo* pCalExprInfo = createExprInfo(pPartNode->part.pExprs, NULL, &num);
1076 1077 1078 1079 1080 1081
    code = initExprSupp(&pInfo->scalarSup, pCalExprInfo, num);
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }

1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095
  if (pPartNode->pSubtable != NULL) {
    SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
    if (pSubTableExpr == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }
    pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
    createExprFromOneNode(pSubTableExpr, pPartNode->pSubtable, 0);
    code = initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1);
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
  }

L
Liu Jicong 已提交
1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108
  if (pPartNode->pTags != NULL) {
    int32_t    numOfTags;
    SExprInfo* pTagExpr = createExprInfo(pPartNode->pTags, NULL, &numOfTags);
    if (pTagExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }
    if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }
  }

1109
  int32_t keyLen = 0;
1110 1111
  code = initGroupOptrInfo(&pInfo->partitionSup.pGroupColVals, &keyLen, &pInfo->partitionSup.keyBuf,
                           pInfo->partitionSup.pGroupCols);
1112 1113 1114 1115 1116
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
  pInfo->partitionSup.needCalc = true;

1117 1118
  pInfo->binfo.pRes = createResDataBlock(pPartNode->part.node.pOutputDataBlockDesc);
  if (pInfo->binfo.pRes == NULL) {
1119 1120
    goto _error;
  }
1121 1122 1123

  blockDataEnsureCapacity(pInfo->binfo.pRes, 4096);

1124 1125
  pInfo->parIte = NULL;
  pInfo->pInputDataBlock = NULL;
1126

1127
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
1128 1129 1130
  pInfo->pPartitions = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
  pInfo->tsColIndex = 0;
  pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
1131

1132
  int32_t    numOfCols = 0;
1133
  SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols);
1134

L
Liu Jicong 已提交
1135 1136
  setOperatorInfo(pOperator, "StreamPartitionOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
1137 1138
  pOperator->exprSupp.numOfExprs = numOfCols;
  pOperator->exprSupp.pExprInfo = pExprInfo;
L
Liu Jicong 已提交
1139 1140
  pOperator->fpSet =
      createOperatorFpSet(operatorDummyOpenFn, doStreamHashPartition, NULL, destroyStreamPartitionOperatorInfo, NULL);
1141 1142 1143 1144 1145

  initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup);
  code = appendDownstream(pOperator, &downstream, 1);
  return pOperator;

1146
_error:
1147
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
1148
  destroyStreamPartitionOperatorInfo(pInfo);
1149 1150 1151
  taosMemoryFreeClear(pOperator);
  return NULL;
}
H
Haojun Liao 已提交
1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184

SArray* extractColumnInfo(SNodeList* pNodeList) {
  size_t  numOfCols = LIST_LENGTH(pNodeList);
  SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
  if (pList == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  for (int32_t i = 0; i < numOfCols; ++i) {
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);

    if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
      SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;

      SColumn c = extractColumnFromColumnNode(pColNode);
      taosArrayPush(pList, &c);
    } else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) {
      SValueNode* pValNode = (SValueNode*)pNode->pExpr;
      SColumn     c = {0};
      c.slotId = pNode->slotId;
      c.colId = pNode->slotId;
      c.type = pValNode->node.type;
      c.bytes = pValNode->node.resType.bytes;
      c.scale = pValNode->node.resType.scale;
      c.precision = pValNode->node.resType.precision;

      taosArrayPush(pList, &c);
    }
  }

  return pList;
}