cachescanoperator.c 11.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "function.h"
H
Hongze Cheng 已提交
17
#include "os.h"
18 19 20 21 22 23 24 25 26 27
#include "tname.h"

#include "tdatablock.h"
#include "tmsg.h"

#include "executorimpl.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"

H
Haojun Liao 已提交
28 29 30 31 32 33 34 35 36 37 38 39
typedef struct SCacheRowsScanInfo {
  SSDataBlock*    pRes;
  SReadHandle     readHandle;
  void*           pLastrowReader;
  SColMatchInfo   matchInfo;
  int32_t*        pSlotIds;
  SExprSupp       pseudoExprSup;
  int32_t         retrieveType;
  int32_t         currentGroupIndex;
  SSDataBlock*    pBufferredRes;
  SArray*         pUidList;
  int32_t         indexOfBufferedRes;
40
  STableListInfo* pTableList;
H
Haojun Liao 已提交
41 42
} SCacheRowsScanInfo;

43
static SSDataBlock* doScanCache(SOperatorInfo* pOperator);
H
Haojun Liao 已提交
44
static void         destroyCacheScanOperator(void* param);
45
static int32_t      extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds);
H
Haojun Liao 已提交
46
static int32_t      removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo);
47

H
Haojun Liao 已提交
48 49
#define SCAN_ROW_TYPE(_t)  ((_t)? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW)

50
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
51
                                           STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
52
  int32_t           code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
53
  SCacheRowsScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SCacheRowsScanInfo));
54 55
  SOperatorInfo*    pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
56
    code = TSDB_CODE_OUT_OF_MEMORY;
57 58 59 60
    goto _error;
  }

  pInfo->readHandle = *readHandle;
61 62

  SDataBlockDescNode* pDescNode = pScanNode->scan.node.pOutputDataBlockDesc;
H
Haojun Liao 已提交
63
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
64 65

  int32_t numOfCols = 0;
66 67
  code =
      extractColMatchInfo(pScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
68 69 70 71
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
72
  removeRedundantTsCol(pScanNode, &pInfo->matchInfo);
73

H
Haojun Liao 已提交
74
  code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds);
75 76 77 78
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

79
  int32_t totalTables = tableListGetSize(pTableListInfo);
H
Haojun Liao 已提交
80 81
  int32_t capacity = 0;

82 83
  pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));

H
Haojun Liao 已提交
84
  // partition by tbname
85
  if (oneTableForEachGroup(pTableListInfo) || (totalTables == 1)) {
H
Haojun Liao 已提交
86
    pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | SCAN_ROW_TYPE(pScanNode->ignoreNull);
87

88
    STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
89

90
    uint64_t suid = tableListGetSuid(pTableListInfo);
H
Haojun Liao 已提交
91
    code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables,
H
Haojun Liao 已提交
92
                                   taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, pTaskInfo->id.str);
93 94 95 96
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }

H
Haojun Liao 已提交
97 98
    capacity = TMIN(totalTables, 4096);

99
    pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false);
H
Haojun Liao 已提交
100
    blockDataEnsureCapacity(pInfo->pBufferredRes, capacity);
101
  } else {  // by tags
H
Haojun Liao 已提交
102 103
    pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE | SCAN_ROW_TYPE(pScanNode->ignoreNull);
    capacity = 1;  // only one row output
104
  }
105

H
Haojun Liao 已提交
106 107 108
  initResultSizeInfo(&pOperator->resultInfo, capacity);
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

X
Xiaoyu Wang 已提交
109
  if (pScanNode->scan.pScanPseudoCols != NULL) {
110 111 112
    SExprSupp* p = &pInfo->pseudoExprSup;
    p->pExprInfo = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &p->numOfExprs);
    p->pCtx = createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset);
113
  }
114

H
Haojun Liao 已提交
115
  setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
116 117 118
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);

  pOperator->fpSet =
119
      createOperatorFpSet(optrDummyOpenFn, doScanCache, NULL, destroyCacheScanOperator, optrDefaultBufFn, NULL);
120

121 122 123
  pOperator->cost.openCost = 0;
  return pOperator;

124
  _error:
125
  pTaskInfo->code = code;
H
Haojun Liao 已提交
126
  destroyCacheScanOperator(pInfo);
127 128 129 130
  taosMemoryFree(pOperator);
  return NULL;
}

131
SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
132 133 134 135
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

H
Haojun Liao 已提交
136
  SCacheRowsScanInfo* pInfo = pOperator->info;
137
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
138
  STableListInfo*   pTableList = pInfo->pTableList;
H
Haojun Liao 已提交
139 140 141

  uint64_t suid = tableListGetSuid(pTableList);
  int32_t  size = tableListGetSize(pTableList);
142
  if (size == 0) {
H
Haojun Liao 已提交
143
    setOperatorCompleted(pOperator);
144 145 146
    return NULL;
  }

147 148
  blockDataCleanup(pInfo->pRes);

149
  // check if it is a group by tbname
150
  if ((pInfo->retrieveType & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) {
151 152 153 154
    if (isTaskKilled(pTaskInfo)) {
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
    }

155 156 157 158
    if (pInfo->indexOfBufferedRes >= pInfo->pBufferredRes->info.rows) {
      blockDataCleanup(pInfo->pBufferredRes);
      taosArrayClear(pInfo->pUidList);

H
Hongze Cheng 已提交
159 160
      int32_t code =
          tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds, pInfo->pUidList);
161
      if (code != TSDB_CODE_SUCCESS) {
162
        T_LONG_JMP(pTaskInfo->env, code);
163 164 165 166
      }

      // check for tag values
      int32_t resultRows = pInfo->pBufferredRes->info.rows;
167 168 169

      // the results may be null, if last values are all null
      ASSERT(resultRows == 0 || resultRows == taosArrayGetSize(pInfo->pUidList));
170 171 172
      pInfo->indexOfBufferedRes = 0;
    }

173 174
    SSDataBlock* pRes = pInfo->pRes;

175
    if (pInfo->indexOfBufferedRes < pInfo->pBufferredRes->info.rows) {
H
Haojun Liao 已提交
176 177 178
      for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
        SColMatchItem* pMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i);
        int32_t        slotId = pMatchInfo->dstSlotId;
179 180

        SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId);
181
        SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, slotId);
182

H
Haojun Liao 已提交
183
        if (colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes)) {
184
          colDataSetNULL(pDst, 0);
H
Haojun Liao 已提交
185 186
        } else {
          char* p = colDataGetData(pSrc, pInfo->indexOfBufferedRes);
187
          colDataSetVal(pDst, 0, p, false);
H
Haojun Liao 已提交
188
        }
189 190
      }

H
Haojun Liao 已提交
191
      pRes->info.id.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes);
192
      pRes->info.rows = 1;
193

H
Haojun Liao 已提交
194 195
      SExprSupp* pSup = &pInfo->pseudoExprSup;
      int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes,
196
                                            pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
H
Haojun Liao 已提交
197 198 199
      if (code != TSDB_CODE_SUCCESS) {
        pTaskInfo->code = code;
        return NULL;
200 201
      }

H
Haojun Liao 已提交
202
      pRes->info.id.groupId = getTableGroupId(pTableList, pRes->info.id.uid);
203
      pInfo->indexOfBufferedRes += 1;
204
      return pRes;
205
    } else {
H
Haojun Liao 已提交
206
      setOperatorCompleted(pOperator);
207
      return NULL;
208
    }
209
  } else {
H
Haojun Liao 已提交
210
    size_t totalGroups = tableListGetOutputGroups(pTableList);
211 212

    while (pInfo->currentGroupIndex < totalGroups) {
213 214 215 216
      if (isTaskKilled(pTaskInfo)) {
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
      }

217 218 219
      STableKeyInfo* pList = NULL;
      int32_t num = 0;

H
Haojun Liao 已提交
220
      int32_t code = tableListGetGroupList(pTableList, pInfo->currentGroupIndex, &pList, &num);
221 222 223 224
      if (code != TSDB_CODE_SUCCESS) {
        T_LONG_JMP(pTaskInfo->env, code);
      }

225 226 227 228 229 230 231 232 233
      code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
                                     taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader,
                                     pTaskInfo->id.str);
      if (code != TSDB_CODE_SUCCESS) {
        pInfo->currentGroupIndex += 1;
        taosArrayClear(pInfo->pUidList);
        continue;
      }

234 235
      taosArrayClear(pInfo->pUidList);

236
      code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList);
237
      if (code != TSDB_CODE_SUCCESS) {
238
        T_LONG_JMP(pTaskInfo->env, code);
239
      }
240

241 242 243 244 245 246 247
      pInfo->currentGroupIndex += 1;

      // check for tag values
      if (pInfo->pRes->info.rows > 0) {
        if (pInfo->pseudoExprSup.numOfExprs > 0) {
          SExprSupp* pSup = &pInfo->pseudoExprSup;

248
          STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pList)[0];
H
Haojun Liao 已提交
249
          pInfo->pRes->info.id.groupId = pKeyInfo->groupId;
250

H
Haojun Liao 已提交
251 252 253
          if (taosArrayGetSize(pInfo->pUidList) > 0) {
            ASSERT((pInfo->retrieveType & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW);

H
Haojun Liao 已提交
254
            pInfo->pRes->info.id.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0);
255
            code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, pInfo->pRes->info.rows,
256
                                          GET_TASKID(pTaskInfo), NULL);
H
Haojun Liao 已提交
257 258 259 260
            if (code != TSDB_CODE_SUCCESS) {
              pTaskInfo->code = code;
              return NULL;
            }
261
          }
262 263
        }

H
Haojun Liao 已提交
264
        pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader);
265
        return pInfo->pRes;
H
Haojun Liao 已提交
266 267
      } else {
        pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader);
268
      }
269 270
    }

H
Haojun Liao 已提交
271
    setOperatorCompleted(pOperator);
272
    return NULL;
273 274 275
  }
}

H
Haojun Liao 已提交
276
void destroyCacheScanOperator(void* param) {
H
Haojun Liao 已提交
277
  SCacheRowsScanInfo* pInfo = (SCacheRowsScanInfo*)param;
278
  blockDataDestroy(pInfo->pRes);
H
Haojun Liao 已提交
279 280 281
  blockDataDestroy(pInfo->pBufferredRes);
  taosMemoryFree(pInfo->pSlotIds);
  taosArrayDestroy(pInfo->pUidList);
H
Haojun Liao 已提交
282
  taosArrayDestroy(pInfo->matchInfo.pList);
H
Haojun Liao 已提交
283 284 285 286 287

  if (pInfo->pLastrowReader != NULL) {
    pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader);
  }

H
Haojun Liao 已提交
288
  cleanupExprSupp(&pInfo->pseudoExprSup);
289 290 291
  taosMemoryFreeClear(param);
}

292
int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds) {
H
Hongze Cheng 已提交
293
  size_t numOfCols = taosArrayGetSize(pColMatchInfo);
294 295

  *pSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t));
H
Hongze Cheng 已提交
296
  if (*pSlotIds == NULL) {
297 298 299
    return TSDB_CODE_OUT_OF_MEMORY;
  }

300 301
  SSchemaWrapper* pWrapper = pTaskInfo->schemaInfo.sw;

302
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
303
    SColMatchItem* pColMatch = taosArrayGet(pColMatchInfo, i);
304
    for (int32_t j = 0; j < pWrapper->nCols; ++j) {
305
      if (pColMatch->colId == pWrapper->pSchema[j].colId && pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
306
        (*pSlotIds)[pColMatch->dstSlotId] = -1;
307 308 309
        break;
      }

310
      if (pColMatch->colId == pWrapper->pSchema[j].colId) {
H
Haojun Liao 已提交
311
        (*pSlotIds)[pColMatch->dstSlotId] = j;
312 313 314 315 316 317
        break;
      }
    }
  }

  return TSDB_CODE_SUCCESS;
318
}
319

H
Haojun Liao 已提交
320
int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo) {
321
  if (!pScanNode->ignoreNull) {  // retrieve cached last value
H
Haojun Liao 已提交
322
    return TSDB_CODE_SUCCESS;
323 324
  }

H
Haojun Liao 已提交
325
  size_t size = taosArrayGetSize(pColMatchInfo->pList);
326
  SArray* pMatchInfo = taosArrayInit(size, sizeof(SColMatchItem));
327

H
Haojun Liao 已提交
328 329
  for (int32_t i = 0; i < size; ++i) {
    SColMatchItem* pColInfo = taosArrayGet(pColMatchInfo->pList, i);
330

H
Haojun Liao 已提交
331
    int32_t    slotId = pColInfo->dstSlotId;
332 333 334 335 336 337 338 339
    SNodeList* pList = pScanNode->scan.node.pOutputDataBlockDesc->pSlots;

    SSlotDescNode* pDesc = (SSlotDescNode*)nodesListGetNode(pList, slotId);
    if (pDesc->dataType.type != TSDB_DATA_TYPE_TIMESTAMP) {
      taosArrayPush(pMatchInfo, pColInfo);
    }
  }

H
Haojun Liao 已提交
340 341 342
  taosArrayDestroy(pColMatchInfo->pList);
  pColMatchInfo->pList = pMatchInfo;
  return TSDB_CODE_SUCCESS;
343
}