tsdbCacheRead.c 5.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "taoserror.h"
#include "tarray.h"
#include "tcommon.h"
#include "tsdb.h"

21 22 23 24
typedef struct SLastrowReader {
  SVnode*   pVnode;
  STSchema* pSchema;
  uint64_t  uid;
25 26 27 28 29 30
  //  int32_t*  pSlotIds;
  char**  transferBuf;  // todo remove it soon
  int32_t numOfCols;
  int32_t type;
  int32_t tableIndex;  // currently returned result tables
  SArray* pTableList;  // table id list
31 32
} SLastrowReader;

33
static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReader, const int32_t* slotIds) {
34
  int32_t numOfRows = pBlock->info.rows;
35
  size_t  numOfCols = taosArrayGetSize(pBlock->pDataBlock);
36 37

  SColVal colVal = {0};
38 39 40
  for (int32_t i = 0; i < numOfCols; ++i) {
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);

41 42 43 44 45 46
    if (slotIds[i] == -1) {
      colDataAppend(pColInfoData, numOfRows, (const char*)&pRow->ts, false);
    } else {
      tTSRowGetVal(pRow, pReader->pSchema, slotIds[i], &colVal);

      if (IS_VAR_DATA_TYPE(colVal.type)) {
47
        if (colVal.isNull || colVal.isNone) {
48 49 50 51 52 53 54
          colDataAppendNULL(pColInfoData, numOfRows);
        } else {
          varDataSetLen(pReader->transferBuf[i], colVal.value.nData);
          memcpy(varDataVal(pReader->transferBuf[i]), colVal.value.pData, colVal.value.nData);
          colDataAppend(pColInfoData, numOfRows, pReader->transferBuf[i], false);
        }
      } else {
55
        colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, colVal.isNull || colVal.isNone);
56 57
      }
    }
58 59 60 61 62
  }

  pBlock->info.rows += 1;
}

63 64
int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t* colId, int32_t numOfCols,
                              void** pReader) {
65 66 67 68 69
  SLastrowReader* p = taosMemoryCalloc(1, sizeof(SLastrowReader));
  if (p == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

70 71 72
  p->type = type;
  p->pVnode = pVnode;
  p->numOfCols = numOfCols;
73 74 75
  p->transferBuf = taosMemoryCalloc(p->numOfCols, POINTER_BYTES);

  STableKeyInfo* pKeyInfo = taosArrayGet(pTableIdList, 0);
76 77
  p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1);
  p->pTableList = pTableIdList;
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
#if 0
  for(int32_t i = 0; i < p->numOfCols; ++i) {
    for(int32_t j = 0; j < p->pSchema->numOfCols; ++j) {
      if (colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
        p->pSlotIds[i] = -1;
        break;
      }

      if (colId[i] == p->pSchema->columns[j].colId) {
        p->pSlotIds[i] = j;
        break;
      }
    }

    if (IS_VAR_DATA_TYPE(colId[i])) {
      p->transferBuf[i] = taosMemoryMalloc(p->pSchema->columns[p->pSlotIds[i]].bytes);
    }
  }
#endif
  *pReader = p;
  return TSDB_CODE_SUCCESS;
}

int32_t tsdbLastrowReaderClose(void* pReader) {
  SLastrowReader* p = pReader;

104
  for (int32_t i = 0; i < p->numOfCols; ++i) {
105 106 107 108 109 110 111 112 113 114
    taosMemoryFreeClear(p->transferBuf[i]);
  }

  taosMemoryFree(p->transferBuf);
  taosMemoryFree(pReader);
  return TSDB_CODE_SUCCESS;
}

int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds) {
  if (pReader == NULL || pResBlock == NULL) {
115 116 117
    return TSDB_CODE_INVALID_PARA;
  }

118 119
  SLastrowReader* pr = pReader;

120
  SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache;
121 122 123
  LRUHandle* h = NULL;
  STSRow*    pRow = NULL;
  size_t     numOfTables = taosArrayGetSize(pr->pTableList);
124 125

  // retrieve the only one last row of all tables in the uid list.
126
  if (pr->type == LASTROW_RETRIEVE_TYPE_SINGLE) {
127 128 129
    int64_t lastKey = INT64_MIN;
    bool    internalResult = false;
    for (int32_t i = 0; i < numOfTables; ++i) {
130
      STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
131

132
      int32_t code = tsdbCacheGetLastrowH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h);
133
      // int32_t code = tsdbCacheGetLastH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h);
134 135 136 137
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

138
      if (h == NULL) {
139 140 141
        continue;
      }

142
      pRow = (STSRow*)taosLRUCacheValue(lruCache, h);
143 144 145 146 147 148 149
      if (pRow->ts > lastKey) {
        // Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already
        // appended or not.
        if (internalResult) {
          pResBlock->info.rows -= 1;
        }

150
        saveOneRow(pRow, pResBlock, pr, slotIds);
151 152 153
        internalResult = true;
        lastKey = pRow->ts;
      }
154

155
      tsdbCacheRelease(lruCache, h);
156
    }
157 158 159
  } else if (pr->type == LASTROW_RETRIEVE_TYPE_ALL) {
    for (int32_t i = pr->tableIndex; i < numOfTables; ++i) {
      STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
160

161
      int32_t code = tsdbCacheGetLastrowH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h);
162
      // int32_t code = tsdbCacheGetLastH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h);
163 164 165 166 167
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

      // no data in the table of Uid
168
      if (h == NULL) {
169 170 171
        continue;
      }

172
      pRow = (STSRow*)taosLRUCacheValue(lruCache, h);
173 174
      saveOneRow(pRow, pResBlock, pr, slotIds);

175
      tsdbCacheRelease(lruCache, h);
176

177 178 179 180
      pr->tableIndex += 1;
      if (pResBlock->info.rows >= pResBlock->info.capacity) {
        return TSDB_CODE_SUCCESS;
      }
181 182 183 184 185 186 187
    }
  } else {
    return TSDB_CODE_INVALID_PARA;
  }

  return TSDB_CODE_SUCCESS;
}