groupoperator.c 22.9 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "os.h"
#include "function.h"
#include "tname.h"

#include "tdatablock.h"
#include "tmsg.h"

#include "executorimpl.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
wmmhello's avatar
wmmhello 已提交
27
#include "executorInt.h"
H
Haojun Liao 已提交
28

H
Haojun Liao 已提交
29
static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity);
H
Haojun Liao 已提交
30
static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len);
H
Haojun Liao 已提交
31

H
Haojun Liao 已提交
32
static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
H
Haojun Liao 已提交
33 34 35 36 37 38 39
  SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
  doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
  taosMemoryFreeClear(pInfo->keyBuf);
  taosArrayDestroy(pInfo->pGroupCols);
  taosArrayDestroy(pInfo->pGroupColVals);
}

wmmhello's avatar
wmmhello 已提交
40
int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
H
Haojun Liao 已提交
41 42
  *pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys));
  if ((*pGroupColVals) == NULL) {
H
Haojun Liao 已提交
43 44 45 46 47 48
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t numOfGroupCols = taosArrayGetSize(pGroupColList);
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
    SColumn* pCol = taosArrayGet(pGroupColList, i);
49
    (*keyLen) += pCol->bytes; // actual data + null_flag
H
Haojun Liao 已提交
50

51
    SGroupKeys key = {0};
H
Haojun Liao 已提交
52 53
    key.bytes  = pCol->bytes;
    key.type   = pCol->type;
H
Haojun Liao 已提交
54
    key.isNull = false;
H
Haojun Liao 已提交
55
    key.pData  = taosMemoryCalloc(1, pCol->bytes);
H
Haojun Liao 已提交
56 57 58 59
    if (key.pData == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

H
Haojun Liao 已提交
60
    taosArrayPush((*pGroupColVals), &key);
H
Haojun Liao 已提交
61 62 63
  }

  int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
64
  (*keyLen) += nullFlagSize;
H
Haojun Liao 已提交
65

66
  (*keyBuf) = taosMemoryCalloc(1, (*keyLen));
H
Haojun Liao 已提交
67
  if ((*keyBuf) == NULL) {
H
Haojun Liao 已提交
68 69 70 71 72 73
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
74
static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) {
H
Haojun Liao 已提交
75 76
  SColumnDataAgg* pColAgg = NULL;
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
H
Haojun Liao 已提交
77
    SColumn*         pCol = taosArrayGet(pGroupCols, i);
H
Haojun Liao 已提交
78 79
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
    if (pBlock->pBlockAgg != NULL) {
80
      pColAgg = pBlock->pBlockAgg[pCol->slotId];  // TODO is agg data matched?
H
Haojun Liao 已提交
81 82 83 84
    }

    bool isNull = colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg);

H
Haojun Liao 已提交
85
    SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
H
Haojun Liao 已提交
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
    if (pkey->isNull && isNull) {
      continue;
    }

    if (isNull || pkey->isNull) {
      return false;
    }

    char* val = colDataGetData(pColInfoData, rowIndex);

    if (IS_VAR_DATA_TYPE(pkey->type)) {
      int32_t len = varDataLen(val);
      if (len == varDataLen(pkey->pData) && memcmp(varDataVal(pkey->pData), varDataVal(val), len) == 0) {
        continue;
      } else {
        return false;
      }
    } else {
      if (memcmp(pkey->pData, val, pkey->bytes) != 0) {
        return false;
      }
    }
  }

  return true;
}

wmmhello's avatar
wmmhello 已提交
113
void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex) {
H
Haojun Liao 已提交
114 115
  SColumnDataAgg* pColAgg = NULL;

116 117
  size_t numOfGroupCols = taosArrayGetSize(pGroupCols);

H
Haojun Liao 已提交
118
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
H
Haojun Liao 已提交
119
    SColumn*         pCol = taosArrayGet(pGroupCols, i);
H
Haojun Liao 已提交
120 121 122
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);

    if (pBlock->pBlockAgg != NULL) {
123
      pColAgg = pBlock->pBlockAgg[pCol->slotId];  // TODO is agg data matched?
H
Haojun Liao 已提交
124 125
    }

H
Haojun Liao 已提交
126
    SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
H
Haojun Liao 已提交
127 128 129
    if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
      pkey->isNull = true;
    } else {
130
      pkey->isNull = false;
H
Haojun Liao 已提交
131 132 133
      char* val = colDataGetData(pColInfoData, rowIndex);
      if (IS_VAR_DATA_TYPE(pkey->type)) {
        memcpy(pkey->pData, val, varDataTLen(val));
134
        ASSERT(varDataTLen(val) <= pkey->bytes);
H
Haojun Liao 已提交
135 136 137 138 139 140 141
      } else {
        memcpy(pkey->pData, val, pkey->bytes);
      }
    }
  }
}

wmmhello's avatar
wmmhello 已提交
142
int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
H
Haojun Liao 已提交
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
  ASSERT(pKey != NULL);
  size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);

  char* isNull = (char*)pKey;
  char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols;
  for (int32_t i = 0; i < numOfGroupCols; ++i) {
    SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
    if (pkey->isNull) {
      isNull[i] = 1;
      continue;
    }

    isNull[i] = 0;
    if (IS_VAR_DATA_TYPE(pkey->type)) {
      varDataCopy(pStart, pkey->pData);
      pStart += varDataTLen(pkey->pData);
      ASSERT(varDataTLen(pkey->pData) <= pkey->bytes);
    } else {
      memcpy(pStart, pkey->pData, pkey->bytes);
      pStart += pkey->bytes;
    }
  }

H
Haojun Liao 已提交
166
  return (int32_t) (pStart - (char*)pKey);
H
Haojun Liao 已提交
167 168 169 170 171
}

// assign the group keys or user input constant values if required
static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t totalRows, int32_t rowIndex) {
  for (int32_t i = 0; i < numOfOutput; ++i) {
172
    if (pCtx[i].functionId == -1) {       // select count(*),key from t group by key.
H
Haojun Liao 已提交
173 174 175
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[i]);

      SColumnInfoData* pColInfoData = pCtx[i].input.pData[0];
176
      // todo OPT all/all not NULL
H
Haojun Liao 已提交
177 178 179 180
      if (!colDataIsNull(pColInfoData, totalRows, rowIndex, NULL)) {
        char* dest = GET_ROWCELL_INTERBUF(pEntryInfo);
        char* data = colDataGetData(pColInfoData, rowIndex);

181 182 183 184 185
        if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
          varDataCopy(dest, data);
        } else {
          memcpy(dest, data, pColInfoData->info.bytes);
        }
H
Haojun Liao 已提交
186 187
      } else { // it is a NULL value
        pEntryInfo->isNullRes = 1;
H
Haojun Liao 已提交
188
      }
H
Haojun Liao 已提交
189 190

      pEntryInfo->numOfRes = 1;
H
Haojun Liao 已提交
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
    }
  }
}

static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
  SExecTaskInfo*        pTaskInfo = pOperator->pTaskInfo;
  SGroupbyOperatorInfo* pInfo = pOperator->info;

  SqlFunctionCtx* pCtx = pInfo->binfo.pCtx;
  int32_t         numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols);
  //  if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
  // qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_TASKID(pRuntimeEnv));
  //    return;
  //  }

  int32_t     len = 0;
  STimeWindow w = TSWINDOW_INITIALIZER;

  int32_t num = 0;
  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
    // Compare with the previous row of this column, and do not set the output buffer again if they are identical.
    if (!pInfo->isInit) {
213
      recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
H
Haojun Liao 已提交
214 215 216 217 218
      pInfo->isInit = true;
      num++;
      continue;
    }

H
Haojun Liao 已提交
219
    bool equal = groupKeyCompare(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
H
Haojun Liao 已提交
220 221 222 223 224
    if (equal) {
      num++;
      continue;
    }

H
Haojun Liao 已提交
225
    // The first row of a new block does not belongs to the previous existed group
226
    if (j == 0) {
H
Haojun Liao 已提交
227
      num++;
228
      recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
H
Haojun Liao 已提交
229 230 231
      continue;
    }

H
Haojun Liao 已提交
232
    len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
233
    int32_t ret = setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfExprs, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
H
Haojun Liao 已提交
234 235 236 237 238
    if (ret != TSDB_CODE_SUCCESS) {  // null data, too many state code
      longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
    }

    int32_t rowIndex = j - num;
239
    doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC);
H
Haojun Liao 已提交
240 241

    // assign the group keys or user input constant values if required
242
    doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex);
243
    recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
H
Haojun Liao 已提交
244 245 246 247
    num = 1;
  }

  if (num > 0) {
H
Haojun Liao 已提交
248
    len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
H
Haojun Liao 已提交
249
    int32_t ret =
250
        setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfExprs, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len,
H
Haojun Liao 已提交
251 252 253 254 255 256
                                   0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
    if (ret != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
    }

    int32_t rowIndex = pBlock->info.rows - num;
257
    doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC);
258
    doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex);
H
Haojun Liao 已提交
259 260 261
  }
}

262
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
263 264 265 266
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

267 268
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

H
Haojun Liao 已提交
269
  SGroupbyOperatorInfo* pInfo = pOperator->info;
270 271
  SSDataBlock* pRes = pInfo->binfo.pRes;

H
Haojun Liao 已提交
272
  if (pOperator->status == OP_RES_TO_RETURN) {
273
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
274 275 276

    size_t rows = pRes->info.rows;
    if (rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) {
277
      doSetOperatorCompleted(pOperator);
H
Haojun Liao 已提交
278
    }
279 280

    pOperator->resultInfo.totalRows += rows;
H
Haojun Liao 已提交
281
    return (pRes->info.rows == 0)? NULL:pRes;
H
Haojun Liao 已提交
282 283
  }

284 285 286
  int32_t order = TSDB_ORDER_ASC;
  int32_t scanFlag = MAIN_SCAN;

287
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
288 289 290
  SOperatorInfo* downstream = pOperator->pDownstream[0];

  while (1) {
291
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
H
Haojun Liao 已提交
292 293 294 295
    if (pBlock == NULL) {
      break;
    }

296 297 298 299 300
    int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
    if (code != TSDB_CODE_SUCCESS) {
      longjmp(pTaskInfo->env, code);
    }

H
Haojun Liao 已提交
301
    // the pDataBlock are always the same one, no need to call this again
302
    setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, scanFlag, true);
303

304
    // there is an scalar expression that needs to be calculated right before apply the group aggregation.
305
    if (pInfo->pScalarExprInfo != NULL) {
306 307 308 309
      pTaskInfo->code = projectApplyFunctions(pInfo->pScalarExprInfo, pBlock, pBlock, pInfo->pScalarFuncCtx, pInfo->numOfScalarExpr, NULL);
      if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
        longjmp(pTaskInfo->env, pTaskInfo->code);
      }
310 311
    }

H
Haojun Liao 已提交
312 313 314 315 316 317
    doHashGroupbyAgg(pOperator, pBlock);
  }

  pOperator->status = OP_RES_TO_RETURN;
  closeAllResultRows(&pInfo->binfo.resultRowInfo);
  //  if (!stableQuery) { // finalize include the update of result rows
318
  //    finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs);
H
Haojun Liao 已提交
319
  //  } else {
320
  //    updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfExprs, &pInfo->binfo.resultRowInfo,
H
Haojun Liao 已提交
321 322
  //    pInfo->binfo.rowCellInfoOffset);
  //  }
323 324 325 326 327 328 329 330 331 332 333 334 335 336
#if 0
  if(pOperator->fpSet.encodeResultRow){
    char *result = NULL;
    int32_t length = 0;
    pOperator->fpSet.encodeResultRow(pOperator, &result, &length);
    SAggSupporter* pSup = &pInfo->aggSup;
    taosHashClear(pSup->pResultRowHashTable);
    pInfo->binfo.resultRowInfo.size = 0;
    pOperator->fpSet.decodeResultRow(pOperator, result);
    if(result){
      taosMemoryFree(result);
    }
  }
#endif
337
  blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
338
  initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0);
339

340 341
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;

342
  while(1) {
343
    doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
344
    doFilter(pInfo->pCondition, pRes, NULL);
345

346
    bool hasRemain = hashRemainDataInGroupInfo(&pInfo->groupResInfo);
347
    if (!hasRemain) {
348
      doSetOperatorCompleted(pOperator);
349 350 351 352 353 354
      break;
    }

    if (pRes->info.rows > 0) {
      break;
    }
H
Haojun Liao 已提交
355 356
  }

357 358 359 360
  size_t rows = pRes->info.rows;
  pOperator->resultInfo.totalRows += rows;

  return (rows == 0)? NULL:pRes;
H
Haojun Liao 已提交
361 362
}

363
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList,
wmmhello's avatar
wmmhello 已提交
364
    SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
365 366 367 368 369 370
  SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo));
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

371 372 373 374 375
  pInfo->pGroupCols      = pGroupColList;
  pInfo->pCondition      = pCondition;

  pInfo->pScalarExprInfo = pScalarExprInfo;
  pInfo->numOfScalarExpr = numOfScalarExpr;
376
  pInfo->pScalarFuncCtx  = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->rowCellInfoOffset);
377

H
Haojun Liao 已提交
378
  int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
H
Haojun Liao 已提交
379 380 381 382
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

383 384
  initResultSizeInfo(pOperator, 4096);
  initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, pInfo->groupKeyLen, pTaskInfo->id.str);
385 386
  initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);

H
Haojun Liao 已提交
387
  pOperator->name         = "GroupbyAggOperator";
388
  pOperator->blocking     = true;
H
Haojun Liao 已提交
389
  pOperator->status       = OP_NOT_OPENED;
390
  // pOperator->operatorType = OP_Groupby;
H
Haojun Liao 已提交
391
  pOperator->pExpr        = pExprInfo;
392
  pOperator->numOfExprs  = numOfCols;
H
Haojun Liao 已提交
393
  pOperator->info         = pInfo;
394
  pOperator->pTaskInfo    = pTaskInfo;
H
Haojun Liao 已提交
395

396
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, NULL, destroyGroupOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
H
Haojun Liao 已提交
397 398 399 400
  code = appendDownstream(pOperator, &downstream, 1);
  return pOperator;

  _error:
H
Haojun Liao 已提交
401
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
402 403 404
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
  return NULL;
405 406
}

H
Haojun Liao 已提交
407
static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
H
Haojun Liao 已提交
408
//  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
H
Haojun Liao 已提交
409 410 411 412

  SPartitionOperatorInfo* pInfo = pOperator->info;

  for (int32_t j = 0; j < pBlock->info.rows; ++j) {
413
    recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
H
Haojun Liao 已提交
414 415
    int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);

H
Haojun Liao 已提交
416 417
    SDataGroupInfo* pGInfo = NULL;
    void *pPage = getCurrentDataGroupInfo(pInfo, &pGInfo, len);
H
Haojun Liao 已提交
418

H
Haojun Liao 已提交
419 420 421
    pGInfo->numOfRows += 1;
    if (pGInfo->groupId == 0) {
      pGInfo->groupId = calcGroupId(pInfo->keyBuf, len);
H
Haojun Liao 已提交
422 423
    }

424
    // number of rows
H
Haojun Liao 已提交
425
    int32_t* rows = (int32_t*) pPage;
H
Haojun Liao 已提交
426

427 428
    // group id

429
    size_t numOfCols = pOperator->numOfExprs;
H
Haojun Liao 已提交
430
    for(int32_t i = 0; i < numOfCols; ++i) {
431 432 433 434
      SExprInfo* pExpr = &pOperator->pExpr[i];
      int32_t slotId = pExpr->base.pParam[0].pCol->slotId;

      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
H
Haojun Liao 已提交
435

H
Haojun Liao 已提交
436 437
      int32_t bytes = pColInfoData->info.bytes;
      int32_t startOffset = pInfo->columnOffset[i];
H
Haojun Liao 已提交
438

439 440
      int32_t* columnLen  = NULL;
      int32_t  contentLen = 0;
H
Haojun Liao 已提交
441 442

      if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
wafwerar's avatar
wafwerar 已提交
443
        int32_t* offset = (int32_t*)((char*)pPage + startOffset);
444 445
        columnLen       = (int32_t*) ((char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity);
        char*    data   = (char*)((char*) columnLen + sizeof(int32_t));
H
Haojun Liao 已提交
446 447 448 449 450 451 452 453

        if (colDataIsNull_s(pColInfoData, j)) {
          offset[(*rows)] = -1;
          contentLen = 0;
        } else {
          offset[*rows] = (*columnLen);
          char* src = colDataGetData(pColInfoData, j);
          memcpy(data + (*columnLen), src, varDataTLen(src));
454 455 456
          int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage);
          ASSERT(v > 0);

H
Haojun Liao 已提交
457 458
          contentLen = varDataTLen(src);
        }
H
Haojun Liao 已提交
459
      } else {
wafwerar's avatar
wafwerar 已提交
460
        char* bitmap = (char*)pPage + startOffset;
461
        columnLen    = (int32_t*) ((char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity));
H
Haojun Liao 已提交
462
        char* data   = (char*) columnLen + sizeof(int32_t);
H
Haojun Liao 已提交
463 464 465

        bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j);
        if (isNull) {
H
Haojun Liao 已提交
466
          colDataSetNull_f(bitmap, (*rows));
H
Haojun Liao 已提交
467
        } else {
H
Haojun Liao 已提交
468
          memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes);
H
Haojun Liao 已提交
469
        }
H
Haojun Liao 已提交
470
        contentLen = bytes;
H
Haojun Liao 已提交
471
      }
H
Haojun Liao 已提交
472 473

      (*columnLen) += contentLen;
474
      ASSERT(*columnLen >= 0);
H
Haojun Liao 已提交
475 476
    }

H
Haojun Liao 已提交
477 478
    (*rows) += 1;

H
Haojun Liao 已提交
479 480 481
    setBufPageDirty(pPage, true);
    releaseBufPage(pInfo->pBuf, pPage);
  }
H
Haojun Liao 已提交
482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505
}

void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len) {
  SDataGroupInfo* p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);

  void* pPage = NULL;
  if (p == NULL) { // it is a new group
    SDataGroupInfo gi = {0};
    gi.pPageList = taosArrayInit(100, sizeof(int32_t));
    taosHashPut(pInfo->pGroupSet, pInfo->keyBuf, len, &gi, sizeof(SDataGroupInfo));

    p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);

    int32_t pageId = 0;
    pPage = getNewBufPage(pInfo->pBuf, 0, &pageId);
    taosArrayPush(p->pPageList, &pageId);

    *(int32_t *) pPage = 0;
  } else {
    int32_t* curId = taosArrayGetLast(p->pPageList);
    pPage = getBufPage(pInfo->pBuf, *curId);

    int32_t *rows = (int32_t*) pPage;
    if (*rows >= pInfo->rowCapacity) {
506 507 508
      // release buffer
      releaseBufPage(pInfo->pBuf, pPage);

H
Haojun Liao 已提交
509 510 511 512
      // add a new page for current group
      int32_t pageId = 0;
      pPage = getNewBufPage(pInfo->pBuf, 0, &pageId);
      taosArrayPush(p->pPageList, &pageId);
513
      memset(pPage, 0, getBufPageSize(pInfo->pBuf));
H
Haojun Liao 已提交
514 515
    }
  }
H
Haojun Liao 已提交
516

H
Haojun Liao 已提交
517 518 519 520 521 522 523 524 525 526 527 528 529 530
  *pGroupInfo = p;
  return pPage;
}

uint64_t calcGroupId(char* pData, int32_t len) {
  T_MD5_CTX context;
  tMD5Init(&context);
  tMD5Update(&context, (uint8_t*)pData, len);
  tMD5Final(&context);

  // NOTE: only extract the initial 8 bytes of the final MD5 digest
  uint64_t id = 0;
  memcpy(&id, context.digest, sizeof(uint64_t));
  return id;
H
Haojun Liao 已提交
531 532
}

H
Haojun Liao 已提交
533 534 535 536
int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) {
  size_t numOfCols = pBlock->info.numOfCols;
  int32_t* offset = taosMemoryCalloc(pBlock->info.numOfCols, sizeof(int32_t));

537
  offset[0] = sizeof(int32_t) + sizeof(uint64_t);  // the number of rows in current page, ref to SSDataBlock paged serialization format
H
Haojun Liao 已提交
538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556

  for(int32_t i = 0; i < numOfCols - 1; ++i) {
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);

    int32_t bytes = pColInfoData->info.bytes;
    int32_t payloadLen = bytes * rowCapacity;
    
    if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
      // offset segment + content length + payload
      offset[i + 1] = rowCapacity * sizeof(int32_t) + sizeof(int32_t) + payloadLen + offset[i];
    } else {
      // bitmap + content length + payload
      offset[i + 1] = BitmapLen(rowCapacity) + sizeof(int32_t) + payloadLen + offset[i];
    }
  }

  return offset;
}

H
Haojun Liao 已提交
557 558 559 560 561 562 563 564
static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
  SPartitionOperatorInfo* pInfo = pOperator->info;

  SDataGroupInfo* pGroupInfo = pInfo->pGroupIter;
  if (pInfo->pGroupIter == NULL || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) {
    // try next group data
    pInfo->pGroupIter = taosHashIterate(pInfo->pGroupSet, pInfo->pGroupIter);
    if (pInfo->pGroupIter == NULL) {
565
      doSetOperatorCompleted(pOperator);
H
Haojun Liao 已提交
566 567 568 569 570 571 572 573 574 575
      return NULL;
    }

    pGroupInfo = pInfo->pGroupIter;
    pInfo->pageIndex = 0;
  }

  int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex);
  void* page = getBufPage(pInfo->pBuf, *pageId);

H
Haojun Liao 已提交
576
  blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity);
H
Haojun Liao 已提交
577 578

  pInfo->pageIndex += 1;
579
  releaseBufPage(pInfo->pBuf, page);
H
Haojun Liao 已提交
580

581
  blockDataUpdateTsWindow(pInfo->binfo.pRes, 0);
H
Haojun Liao 已提交
582
  pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId;
583 584

  pOperator->resultInfo.totalRows += pInfo->binfo.pRes->info.rows;
H
Haojun Liao 已提交
585 586 587
  return pInfo->binfo.pRes;
}

588
static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
589 590
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
591 592
  }

H
Haojun Liao 已提交
593 594
  SGroupbyOperatorInfo* pInfo = pOperator->info;
  SSDataBlock* pRes = pInfo->binfo.pRes;
595

H
Haojun Liao 已提交
596
  if (pOperator->status == OP_RES_TO_RETURN) {
H
Haojun Liao 已提交
597 598
    blockDataCleanup(pRes);
    return buildPartitionResult(pOperator);
H
Haojun Liao 已提交
599 600
  }

601
  int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
602
  SOperatorInfo* downstream = pOperator->pDownstream[0];
H
Haojun Liao 已提交
603

H
Haojun Liao 已提交
604
  while (1) {
605
    SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
H
Haojun Liao 已提交
606 607 608
    if (pBlock == NULL) {
      break;
    }
H
Haojun Liao 已提交
609

H
Haojun Liao 已提交
610
    doHashPartition(pOperator, pBlock);
H
Haojun Liao 已提交
611 612
  }

613 614
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;

H
Haojun Liao 已提交
615
  pOperator->status = OP_RES_TO_RETURN;
H
Haojun Liao 已提交
616 617 618 619 620
  blockDataEnsureCapacity(pRes, 4096);
  return buildPartitionResult(pOperator);
}

static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) {
H
Haojun Liao 已提交
621
  SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param;
H
Haojun Liao 已提交
622 623
  doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
  taosArrayDestroy(pInfo->pGroupCols);
wmmhello's avatar
wmmhello 已提交
624 625 626 627
  for(int i = 0; i < taosArrayGetSize(pInfo->pGroupColVals); i++){
    SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->pGroupColVals, i);
    taosMemoryFree(key.pData);
  }
H
Haojun Liao 已提交
628
  taosArrayDestroy(pInfo->pGroupColVals);
H
Haojun Liao 已提交
629
  taosMemoryFree(pInfo->keyBuf);
wmmhello's avatar
wmmhello 已提交
630
  taosHashCleanup(pInfo->pGroupSet);
H
Haojun Liao 已提交
631
  taosMemoryFree(pInfo->columnOffset);
H
Haojun Liao 已提交
632 633
}

634
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList,
wmmhello's avatar
wmmhello 已提交
635
                                           SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
636 637
  SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo));
  SOperatorInfo*        pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
638
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
639 640
    goto _error;
  }
641

H
Haojun Liao 已提交
642 643 644 645 646 647 648 649
  pInfo->pGroupCols = pGroupColList;

  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
  pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
  if (pInfo->pGroupSet == NULL) {
    goto _error;
  }

650 651 652 653 654
  uint32_t defaultPgsz  = 0;
  uint32_t defaultBufsz = 0;
  getBufferPgSize(pResultBlock->info.rowSize, &defaultPgsz, &defaultBufsz);

  int32_t code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, TD_TMP_DIR_PATH);
H
Haojun Liao 已提交
655 656 657 658
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
659 660
  pInfo->rowCapacity = blockDataGetCapacityInRow(pResultBlock, getBufPageSize(pInfo->pBuf));
  pInfo->columnOffset = setupColumnOffset(pResultBlock, pInfo->rowCapacity);
H
Haojun Liao 已提交
661 662 663 664
  code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
H
Haojun Liao 已提交
665 666

  pOperator->name         = "PartitionOperator";
667
  pOperator->blocking     = true;
668
  pOperator->status       = OP_NOT_OPENED;
H
Haojun Liao 已提交
669 670
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
  pInfo->binfo.pRes       = pResultBlock;
671
  pOperator->numOfExprs   = numOfCols;
672
  pOperator->pExpr        = pExprInfo;
H
Haojun Liao 已提交
673
  pOperator->info         = pInfo;
674
  pOperator->pTaskInfo    = pTaskInfo;
675 676 677

  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, NULL, destroyPartitionOperatorInfo,
                                         NULL, NULL, NULL);
H
Haojun Liao 已提交
678

H
Haojun Liao 已提交
679
  code = appendDownstream(pOperator, &downstream, 1);
680 681 682
  return pOperator;

  _error:
H
Haojun Liao 已提交
683
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
684 685
  taosMemoryFreeClear(pInfo);
  taosMemoryFreeClear(pOperator);
686
  return NULL;
H
Haojun Liao 已提交
687
}