cachescanoperator.c 10.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "function.h"
H
Hongze Cheng 已提交
17
#include "os.h"
18 19 20 21 22 23 24 25 26 27
#include "tname.h"

#include "tdatablock.h"
#include "tmsg.h"

#include "executorimpl.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"

28
static SSDataBlock* doScanCache(SOperatorInfo* pOperator);
H
Hongze Cheng 已提交
29
static void         destroyLastrowScanOperator(void* param);
30
static int32_t      extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds);
H
Haojun Liao 已提交
31
static int32_t      removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo);
32

33 34 35
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
                                           SExecTaskInfo* pTaskInfo) {
  int32_t           code = TSDB_CODE_SUCCESS;
36 37 38
  SLastrowScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SLastrowScanInfo));
  SOperatorInfo*    pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
39
    code = TSDB_CODE_OUT_OF_MEMORY;
40 41 42 43
    goto _error;
  }

  pInfo->readHandle = *readHandle;
44 45 46

  SDataBlockDescNode* pDescNode = pScanNode->scan.node.pOutputDataBlockDesc;
  pInfo->pRes = createResDataBlock(pDescNode);
47 48

  int32_t numOfCols = 0;
49 50
  code =
      extractColMatchInfo(pScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
51 52 53 54
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
55
  removeRedundantTsCol(pScanNode, &pInfo->matchInfo);
56

H
Haojun Liao 已提交
57
  code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds);
58 59 60 61
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

62 63
  STableListInfo* pTableList = &pTaskInfo->tableqinfoList;

64
  initResultSizeInfo(&pOperator->resultInfo, 4096);
65 66 67
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
  pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));

68
  // partition by tbname, todo opt perf
69
  if (oneTableForEachGroup(pTableList) || (getTotalTables(pTableList) == 1)) {
70 71
    pInfo->retrieveType =
        CACHESCAN_RETRIEVE_TYPE_ALL | (pScanNode->ignoreNull ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW);
72 73 74 75 76

    STableKeyInfo* pList = taosArrayGet(pTableList->pTableList, 0);
    size_t num = taosArrayGetSize(pTableList->pTableList);

    code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
77
                                   taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader);
78 79 80 81
    if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }

82 83
    pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false);
    blockDataEnsureCapacity(pInfo->pBufferredRes, pOperator->resultInfo.capacity);
84 85 86
  } else {  // by tags
    pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE |
                          (pScanNode->ignoreNull ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW);
87
  }
88

X
Xiaoyu Wang 已提交
89
  if (pScanNode->scan.pScanPseudoCols != NULL) {
90 91 92
    SExprSupp* p = &pInfo->pseudoExprSup;
    p->pExprInfo = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &p->numOfExprs);
    p->pCtx = createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset);
93
  }
94

H
Hongze Cheng 已提交
95
  pOperator->name = "LastrowScanOperator";
96
  pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN;
H
Hongze Cheng 已提交
97 98 99 100
  pOperator->blocking = false;
  pOperator->status = OP_NOT_OPENED;
  pOperator->info = pInfo;
  pOperator->pTaskInfo = pTaskInfo;
101 102 103
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);

  pOperator->fpSet =
104
      createOperatorFpSet(operatorDummyOpenFn, doScanCache, NULL, NULL, destroyLastrowScanOperator, NULL);
105

106 107 108
  pOperator->cost.openCost = 0;
  return pOperator;

H
Hongze Cheng 已提交
109
_error:
110 111
  pTaskInfo->code = code;
  destroyLastrowScanOperator(pInfo);
112 113 114 115
  taosMemoryFree(pOperator);
  return NULL;
}

116
SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
117 118 119 120 121 122
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SLastrowScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
123 124
  STableListInfo*   pTableList = &pTaskInfo->tableqinfoList;
  int32_t           size = taosArrayGetSize(pTableList->pTableList);
125
  if (size == 0) {
126
    doSetOperatorCompleted(pOperator);
127 128 129
    return NULL;
  }

130 131
  blockDataCleanup(pInfo->pRes);

132
  // check if it is a group by tbname
133
  if ((pInfo->retrieveType & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) {
134 135 136 137
    if (pInfo->indexOfBufferedRes >= pInfo->pBufferredRes->info.rows) {
      blockDataCleanup(pInfo->pBufferredRes);
      taosArrayClear(pInfo->pUidList);

H
Hongze Cheng 已提交
138 139
      int32_t code =
          tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds, pInfo->pUidList);
140
      if (code != TSDB_CODE_SUCCESS) {
141
        T_LONG_JMP(pTaskInfo->env, code);
142 143 144 145
      }

      // check for tag values
      int32_t resultRows = pInfo->pBufferredRes->info.rows;
146 147 148

      // the results may be null, if last values are all null
      ASSERT(resultRows == 0 || resultRows == taosArrayGetSize(pInfo->pUidList));
149 150 151 152
      pInfo->indexOfBufferedRes = 0;
    }

    if (pInfo->indexOfBufferedRes < pInfo->pBufferredRes->info.rows) {
H
Haojun Liao 已提交
153 154 155
      for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
        SColMatchItem* pMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i);
        int32_t        slotId = pMatchInfo->dstSlotId;
156 157 158 159 160

        SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId);
        SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, slotId);

        char* p = colDataGetData(pSrc, pInfo->indexOfBufferedRes);
H
Hongze Cheng 已提交
161
        bool  isNull = colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes);
162 163 164
        colDataAppend(pDst, 0, p, isNull);
      }

165 166 167
      pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes);
      pInfo->pRes->info.rows = 1;

168 169
      if (pInfo->pseudoExprSup.numOfExprs > 0) {
        SExprSupp* pSup = &pInfo->pseudoExprSup;
H
Hongze Cheng 已提交
170 171
        int32_t    code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
                                                 GET_TASKID(pTaskInfo));
172 173 174 175
        if (code != TSDB_CODE_SUCCESS) {
          pTaskInfo->code = code;
          return NULL;
        }
176 177
      }

178
      pInfo->pRes->info.groupId = getTableGroupId(pTableList, pInfo->pRes->info.uid);
179 180 181 182 183 184

      pInfo->indexOfBufferedRes += 1;
      return pInfo->pRes;
    } else {
      doSetOperatorCompleted(pOperator);
      return NULL;
185
    }
186
  } else {
H
Haojun Liao 已提交
187
    size_t totalGroups = getNumOfOutputGroups(pTableList);
188 189 190

    while (pInfo->currentGroupIndex < totalGroups) {

191 192 193 194 195 196 197 198 199
      STableKeyInfo* pList = NULL;
      int32_t num = 0;

      int32_t code = getTablesOfGroup(pTableList, pInfo->currentGroupIndex, &pList, &num);
      if (code != TSDB_CODE_SUCCESS) {
        T_LONG_JMP(pTaskInfo->env, code);
      }

      tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
200
                              taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader);
201 202
      taosArrayClear(pInfo->pUidList);

203
      code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList);
204
      if (code != TSDB_CODE_SUCCESS) {
205
        T_LONG_JMP(pTaskInfo->env, code);
206
      }
207

208 209 210 211 212 213 214
      pInfo->currentGroupIndex += 1;

      // check for tag values
      if (pInfo->pRes->info.rows > 0) {
        if (pInfo->pseudoExprSup.numOfExprs > 0) {
          SExprSupp* pSup = &pInfo->pseudoExprSup;

215
          STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pTableList)[0];
216 217
          pInfo->pRes->info.groupId = pKeyInfo->groupId;

H
Haojun Liao 已提交
218 219 220 221 222 223 224 225 226 227
          if (taosArrayGetSize(pInfo->pUidList) > 0) {
            ASSERT((pInfo->retrieveType & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW);

            pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0);
            code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
                                          GET_TASKID(pTaskInfo));
            if (code != TSDB_CODE_SUCCESS) {
              pTaskInfo->code = code;
              return NULL;
            }
228
          }
229 230
        }

H
Haojun Liao 已提交
231
        pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader);
232 233
        return pInfo->pRes;
      }
234 235 236
    }

    doSetOperatorCompleted(pOperator);
237
    return NULL;
238 239 240
  }
}

241
void destroyLastrowScanOperator(void* param) {
242 243
  SLastrowScanInfo* pInfo = (SLastrowScanInfo*)param;
  blockDataDestroy(pInfo->pRes);
H
Haojun Liao 已提交
244 245 246
  blockDataDestroy(pInfo->pBufferredRes);
  taosMemoryFree(pInfo->pSlotIds);
  taosArrayDestroy(pInfo->pUidList);
H
Haojun Liao 已提交
247
  taosArrayDestroy(pInfo->matchInfo.pList);
H
Haojun Liao 已提交
248 249 250 251 252

  if (pInfo->pLastrowReader != NULL) {
    pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader);
  }

253 254 255
  taosMemoryFreeClear(param);
}

256
int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds) {
H
Hongze Cheng 已提交
257
  size_t numOfCols = taosArrayGetSize(pColMatchInfo);
258 259

  *pSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t));
H
Hongze Cheng 已提交
260
  if (*pSlotIds == NULL) {
261 262 263
    return TSDB_CODE_OUT_OF_MEMORY;
  }

264 265
  SSchemaWrapper* pWrapper = pTaskInfo->schemaInfo.sw;

266
  for (int32_t i = 0; i < numOfCols; ++i) {
H
Haojun Liao 已提交
267
    SColMatchItem* pColMatch = taosArrayGet(pColMatchInfo, i);
268
    for (int32_t j = 0; j < pWrapper->nCols; ++j) {
269
      if (pColMatch->colId == pWrapper->pSchema[j].colId && pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
270
        (*pSlotIds)[pColMatch->dstSlotId] = -1;
271 272 273
        break;
      }

274
      if (pColMatch->colId == pWrapper->pSchema[j].colId) {
H
Haojun Liao 已提交
275
        (*pSlotIds)[pColMatch->dstSlotId] = j;
276 277 278 279 280 281
        break;
      }
    }
  }

  return TSDB_CODE_SUCCESS;
282
}
283

H
Haojun Liao 已提交
284
int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo) {
285
  if (!pScanNode->ignoreNull) {  // retrieve cached last value
H
Haojun Liao 已提交
286
    return TSDB_CODE_SUCCESS;
287 288
  }

H
Haojun Liao 已提交
289
  size_t size = taosArrayGetSize(pColMatchInfo->pList);
290
  SArray* pMatchInfo = taosArrayInit(size, sizeof(SColMatchItem));
291

H
Haojun Liao 已提交
292 293
  for (int32_t i = 0; i < size; ++i) {
    SColMatchItem* pColInfo = taosArrayGet(pColMatchInfo->pList, i);
294

H
Haojun Liao 已提交
295
    int32_t    slotId = pColInfo->dstSlotId;
296 297 298 299 300 301 302 303
    SNodeList* pList = pScanNode->scan.node.pOutputDataBlockDesc->pSlots;

    SSlotDescNode* pDesc = (SSlotDescNode*)nodesListGetNode(pList, slotId);
    if (pDesc->dataType.type != TSDB_DATA_TYPE_TIMESTAMP) {
      taosArrayPush(pMatchInfo, pColInfo);
    }
  }

H
Haojun Liao 已提交
304 305 306
  taosArrayDestroy(pColMatchInfo->pList);
  pColMatchInfo->pList = pMatchInfo;
  return TSDB_CODE_SUCCESS;
307
}