tsdbCacheRead.c 9.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "taoserror.h"
#include "tarray.h"
#include "tcommon.h"
#include "tsdb.h"

21
typedef struct SCacheRowsReader {
22 23 24
  SVnode*   pVnode;
  STSchema* pSchema;
  uint64_t  uid;
25 26 27 28 29
  char**    transferBuf;  // todo remove it soon
  int32_t   numOfCols;
  int32_t   type;
  int32_t   tableIndex;  // currently returned result tables
  SArray*   pTableList;  // table id list
30
} SCacheRowsReader;
31

H
Haojun Liao 已提交
32
static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds) {
33
  ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock));
34 35 36
  int32_t numOfRows = pBlock->info.rows;

  SColVal colVal = {0};
37
  for (int32_t i = 0; i < pReader->numOfCols; ++i) {
38 39
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);

H
Haojun Liao 已提交
40 41 42
    SFirstLastRes *pRes = taosMemoryCalloc(1, sizeof(SFirstLastRes) + TSDB_KEYSIZE);
    SLastCol *pColVal = (SLastCol *)taosArrayGet(pRow, i);

43
    if (slotIds[i] == -1) {
H
Haojun Liao 已提交
44 45 46 47 48 49
      pRes->ts = pColVal->ts;
      pRes->bytes = TSDB_KEYSIZE;
      pRes->isNull = false;
      pRes->hasResult = true;

      colDataAppend(pColInfoData, numOfRows, (const char*)pRes, false);
50
    } else {
51 52
      int32_t slotId = slotIds[i];

H
Haojun Liao 已提交
53 54 55 56
      int32_t bytes = pReader->pSchema->columns[slotId].bytes;
      pRes = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes);
      pRes->bytes = bytes;
      pRes->hasResult = true;
57 58

      if (IS_VAR_DATA_TYPE(colVal.type)) {
H
Haojun Liao 已提交
59
        if (!COL_VAL_IS_VALUE(&colVal)) {
H
Haojun Liao 已提交
60 61 62 63
          pRes->isNull = true;
          pRes->ts = pColVal->ts;

          colDataAppend(pColInfoData, numOfRows, (const char*)pRes, false);
64
        } else {
H
Haojun Liao 已提交
65 66 67 68
          varDataSetLen(pRes->buf, colVal.value.nData);
          memcpy(varDataVal(pRes->buf), colVal.value.pData, colVal.value.nData);
          pRes->bytes = colVal.value.nData;
          colDataAppend(pColInfoData, numOfRows, (const char*)pRes, false);
69 70
        }
      } else {
H
Haojun Liao 已提交
71
        colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, !COL_VAL_IS_VALUE(&colVal));
72 73
      }
    }
74 75 76 77 78
  }

  pBlock->info.rows += 1;
}

79 80 81 82
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t numOfCols, void** pReader) {
  *pReader = NULL;

  SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
83 84 85 86
  if (p == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

87 88 89
  p->type = type;
  p->pVnode = pVnode;
  p->numOfCols = numOfCols;
90

91 92 93 94 95
  if (taosArrayGetSize(pTableIdList) == 0) {
    *pReader = p;
    return TSDB_CODE_SUCCESS;
  }

96
  STableKeyInfo* pKeyInfo = taosArrayGet(pTableIdList, 0);
97
  p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1, 1);
98
  p->pTableList = pTableIdList;
99

100
  p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES);
101
  if (p->transferBuf == NULL) {
H
Haojun Liao 已提交
102
    return TSDB_CODE_OUT_OF_MEMORY;
103 104
  }

105
  for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
M
Minglei Jin 已提交
106
    if (IS_VAR_DATA_TYPE(p->pSchema->columns[i].type)) {
107
      p->transferBuf[i] = taosMemoryMalloc(p->pSchema->columns[i].bytes);
108 109 110 111
      if (p->transferBuf[i] == NULL) {
        tsdbCacherowsReaderClose(p);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
112 113
    }
  }
114

115 116 117 118
  *pReader = p;
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
119
void* tsdbCacherowsReaderClose(void* pReader) {
120
  SCacheRowsReader* p = pReader;
121

122 123 124 125 126 127 128
  if (p->pSchema != NULL) {
    for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
      taosMemoryFreeClear(p->transferBuf[i]);
    }

    taosMemoryFree(p->transferBuf);
    taosMemoryFree(p->pSchema);
129 130 131
  }

  taosMemoryFree(pReader);
H
Haojun Liao 已提交
132
  return NULL;
133 134
}

H
Haojun Liao 已提交
135
static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint64_t uid, SArray** pRow,
H
Haojun Liao 已提交
136
                                 LRUHandle** h) {
137 138 139 140 141 142 143 144
  int32_t code = TSDB_CODE_SUCCESS;
  if ((pr->type & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW) {
    code = tsdbCacheGetLastrowH(lruCache, uid, pr->pVnode->pTsdb, h);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    // no data in the table of Uid
H
Haojun Liao 已提交
145 146
    if (*h != NULL) {  // todo convert to SArray
      *pRow = (SArray*)taosLRUCacheValue(lruCache, *h);
147 148 149 150 151 152 153 154 155
    }
  } else {
    code = tsdbCacheGetLastH(lruCache, uid, pr->pVnode->pTsdb, h);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    // no data in the table of Uid
    if (*h != NULL) {
H
Haojun Liao 已提交
156
      *pRow = (SArray*)taosLRUCacheValue(lruCache, *h);
157 158 159 160 161 162 163
    }
  }

  return code;
}

int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) {
164
  if (pReader == NULL || pResBlock == NULL) {
165 166 167
    return TSDB_CODE_INVALID_PARA;
  }

168
  SCacheRowsReader* pr = pReader;
169

170
  int32_t    code = TSDB_CODE_SUCCESS;
171
  SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache;
172
  LRUHandle* h = NULL;
H
Haojun Liao 已提交
173
  SArray*    pRow = NULL;
174
  size_t     numOfTables = taosArrayGetSize(pr->pTableList);
175

H
Haojun Liao 已提交
176 177 178 179 180
  int64_t* lastTs = taosMemoryMalloc(TSDB_KEYSIZE * pr->pSchema->numOfCols);
  for(int32_t i = 0; i < pr->pSchema->numOfCols; ++i) {
    lastTs[i] = INT64_MIN;
  }

181
  // retrieve the only one last row of all tables in the uid list.
182
  if ((pr->type & CACHESCAN_RETRIEVE_TYPE_SINGLE) == CACHESCAN_RETRIEVE_TYPE_SINGLE) {
H
Haojun Liao 已提交
183
    bool internalResult = false;
184
    for (int32_t i = 0; i < numOfTables; ++i) {
185
      STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
186

187
      code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
H
Haojun Liao 已提交
188
      if (code != TSDB_CODE_SUCCESS) {
189 190 191
        return code;
      }

192
      if (h == NULL) {
193 194 195
        continue;
      }

H
Haojun Liao 已提交
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
      {
        SFirstLastRes** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES);
        for(int32_t j = 0; j < pr->numOfCols; ++j) {
          pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[slotIds[j]].bytes);
          pRes[j]->ts = INT64_MIN;
        }

        for (int32_t k = 0; k < pr->numOfCols; ++k) {
          SColumnInfoData* pColInfoData = taosArrayGet(pResBlock->pDataBlock, k);

          if (slotIds[k] == -1) { // the primary timestamp
            SLastCol *pColVal = (SLastCol *)taosArrayGet(pRow, k);
            if (pColVal->ts > pRes[k]->ts || !pRes[k]->hasResult) {
              pRes[k]->hasResult = true;
              pRes[k]->ts = pColVal->ts;
              memcpy(pRes[k]->buf, &pColVal->ts, TSDB_KEYSIZE);

              colDataAppend(pColInfoData, 1, (const char*)pRes[k], false);
            }
          } else {
            int32_t   slotId = slotIds[k];
            SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);

            if (pColVal->ts > pRes[k]->ts || !pRes[k]->hasResult) {
              pRes[k]->hasResult = true;
              pRes[k]->ts = pColVal->ts;

              pRes[k]->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal);
              if (!pRes[k]->isNull) {
                if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
                  varDataSetLen(pRes[k]->buf, pColVal->colVal.value.nData);
                  memcpy(varDataVal(pRes[k]->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData);
                } else {
                  memcpy(pRes[k]->buf, &pColVal->colVal.value, pr->pSchema->columns[slotId].bytes);
                }
              }

              colDataAppend(pColInfoData, 1, (const char*)pRes[k], false);
            }
          }
        }
      }

/*
240
      if (pRow->ts > lastKey) {
H
Haojun Liao 已提交
241 242
        printf("qualified:%ld, old Value:%ld\n", pRow->ts, lastKey);

243 244 245 246
        // Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already
        // appended or not.
        if (internalResult) {
          pResBlock->info.rows -= 1;
247
          taosArrayClear(pTableUidList);
248 249
        }

250
        saveOneRow(pRow, pResBlock, pr, slotIds);
251
        taosArrayPush(pTableUidList, &pKeyInfo->uid);
252 253 254
        internalResult = true;
        lastKey = pRow->ts;
      }
H
Haojun Liao 已提交
255
*/
256
      tsdbCacheRelease(lruCache, h);
257
    }
258
  } else if ((pr->type & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) {
259 260
    for (int32_t i = pr->tableIndex; i < numOfTables; ++i) {
      STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
261
      code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
H
Haojun Liao 已提交
262
      if (code != TSDB_CODE_SUCCESS) {
263 264 265
        return code;
      }

266
      if (h == NULL) {
267 268 269
        continue;
      }

270
      saveOneRow(pRow, pResBlock, pr, slotIds);
271
      taosArrayPush(pTableUidList, &pKeyInfo->uid);
272

273
      tsdbCacheRelease(lruCache, h);
274

275 276 277 278
      pr->tableIndex += 1;
      if (pResBlock->info.rows >= pResBlock->info.capacity) {
        return TSDB_CODE_SUCCESS;
      }
279 280 281 282 283 284 285
    }
  } else {
    return TSDB_CODE_INVALID_PARA;
  }

  return TSDB_CODE_SUCCESS;
}