tsdbCacheRead.c 13.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "taoserror.h"
#include "tarray.h"
#include "tcommon.h"
#include "tsdb.h"

21
#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t))
22

H
Haojun Liao 已提交
23 24
static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
                          void** pRes, const char* idStr) {
25 26
  int32_t numOfRows = pBlock->info.rows;

27
  if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
28 29
    bool allNullRow = true;

30 31 32
    for (int32_t i = 0; i < pReader->numOfCols; ++i) {
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
      SFirstLastRes*   p = (SFirstLastRes*)varDataVal(pRes[i]);
33

34 35 36 37 38
      if (slotIds[i] == -1) {  // the primary timestamp
        SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
        p->ts = pColVal->ts;
        p->bytes = TSDB_KEYSIZE;
        *(int64_t*)p->buf = pColVal->ts;
39
        allNullRow = false;
40 41 42 43
      } else {
        int32_t   slotId = slotIds[i];
        SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);

44 45 46 47 48 49 50
        // add check for null value, caused by the modification of table schema (new column added).
        if (pColVal == NULL) {
          p->ts = 0;
          p->isNull = true;
          continue;
        }

51 52
        p->ts = pColVal->ts;
        p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal);
53 54
        allNullRow = p->isNull & allNullRow;

55 56 57 58
        if (!p->isNull) {
          if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
            varDataSetLen(p->buf, pColVal->colVal.value.nData);
            memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData);
59
            p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE;  // binary needs to plus the header size
60 61 62 63 64 65
          } else {
            memcpy(p->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes);
            p->bytes = pReader->pSchema->columns[slotId].bytes;
          }
        }
      }
66

67
      // pColInfoData->info.bytes includes the VARSTR_HEADER_SIZE, need to substruct it
68
      p->hasResult = true;
69
      varDataSetLen(pRes[i], pColInfoData->info.bytes - VARSTR_HEADER_SIZE);
70 71
      colDataAppend(pColInfoData, numOfRows, (const char*)pRes[i], false);
    }
72

73
    pBlock->info.rows += allNullRow ? 0 : 1;
H
Haojun Liao 已提交
74
  } else if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
75 76 77 78 79 80 81 82 83
    for (int32_t i = 0; i < pReader->numOfCols; ++i) {
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);

      if (slotIds[i] == -1) {
        SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
        colDataAppend(pColInfoData, numOfRows, (const char*)&pColVal->ts, false);
      } else {
        int32_t   slotId = slotIds[i];
        SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
84
        SColVal*  pVal = &pColVal->colVal;
85

86
        if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
87 88 89
          if (!COL_VAL_IS_VALUE(&pColVal->colVal)) {
            colDataAppendNULL(pColInfoData, numOfRows);
          } else {
90 91
            varDataSetLen(pReader->transferBuf[slotId], pVal->value.nData);
            memcpy(varDataVal(pReader->transferBuf[slotId]), pVal->value.pData, pVal->value.nData);
92 93
            colDataAppend(pColInfoData, numOfRows, pReader->transferBuf[slotId], false);
          }
94
        } else {
95
          colDataAppend(pColInfoData, numOfRows, (const char*)&pVal->value.val, !COL_VAL_IS_VALUE(pVal));
96 97 98
        }
      }
    }
99

100
    pBlock->info.rows += 1;
H
Haojun Liao 已提交
101 102 103
  } else {
    tsdbError("invalid retrieve type:%d, %s", pReader->type, idStr);
    return TSDB_CODE_INVALID_PARA;
104
  }
H
Haojun Liao 已提交
105 106

  return TSDB_CODE_SUCCESS;
107 108
}

109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* idstr) {
  int32_t numOfTables = p->numOfTables;

  if (suid != 0) {
    p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, suid, -1, 1);
    if (p->pSchema == NULL) {
      taosMemoryFree(p);
      tsdbWarn("stable:%" PRIu64 " has been dropped, failed to retrieve cached rows, %s", suid, idstr);
      return TSDB_CODE_PAR_TABLE_NOT_EXIST;
    }
  } else {
    for (int32_t i = 0; i < numOfTables; ++i) {
      uint64_t uid = p->pTableList[i].uid;
      p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, uid, -1, 1);
      if (p->pSchema != NULL) {
        break;
      }

      tsdbWarn("table:%" PRIu64 " has been dropped, failed to retrieve cached rows, %s", uid, idstr);
    }

    // all queried tables have been dropped already, return immediately.
    if (p->pSchema == NULL) {
      taosMemoryFree(p);
      tsdbWarn("all queried tables has been dropped, try next group, %s", idstr);
      return TSDB_CODE_PAR_TABLE_NOT_EXIST;
    }
  }

  return TSDB_CODE_SUCCESS;
}

141
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
H
Haojun Liao 已提交
142
                                uint64_t suid, void** pReader, const char* idstr) {
143 144
  *pReader = NULL;
  SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
145 146 147 148
  if (p == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

149 150
  p->type = type;
  p->pVnode = pVnode;
151 152
  p->pTsdb = p->pVnode->pTsdb;
  p->verRange = (SVersionRange){.minVer = 0, .maxVer = UINT64_MAX};
153
  p->numOfCols = numOfCols;
154
  p->suid = suid;
155

156
  if (numOfTables == 0) {
157 158 159 160
    *pReader = p;
    return TSDB_CODE_SUCCESS;
  }

161
  p->pTableList = pTableIdList;
162
  p->numOfTables = numOfTables;
163

164 165 166 167 168 169
  int32_t code = setTableSchema(p, suid, idstr);
  if (code != TSDB_CODE_SUCCESS) {
    tsdbCacherowsReaderClose(p);
    return code;
  }

170
  p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES);
171
  if (p->transferBuf == NULL) {
H
Haojun Liao 已提交
172
    tsdbCacherowsReaderClose(p);
H
Haojun Liao 已提交
173
    return TSDB_CODE_OUT_OF_MEMORY;
174 175
  }

176
  for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
M
Minglei Jin 已提交
177
    if (IS_VAR_DATA_TYPE(p->pSchema->columns[i].type)) {
178
      p->transferBuf[i] = taosMemoryMalloc(p->pSchema->columns[i].bytes);
179 180 181 182
      if (p->transferBuf[i] == NULL) {
        tsdbCacherowsReaderClose(p);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
183 184
    }
  }
185

186 187
  int32_t numOfStt = ((SVnode*)pVnode)->config.sttTrigger;
  p->pLoadInfo = tCreateLastBlockLoadInfo(p->pSchema, NULL, 0, numOfStt);
188 189 190 191 192
  if (p->pLoadInfo == NULL) {
    tsdbCacherowsReaderClose(p);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

H
Haojun Liao 已提交
193
  p->idstr = taosMemoryStrDup(idstr);
194 195
  taosThreadMutexInit(&p->readerMutex, NULL);

196 197 198 199
  *pReader = p;
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
200
void* tsdbCacherowsReaderClose(void* pReader) {
201
  SCacheRowsReader* p = pReader;
202

203 204 205 206 207 208 209
  if (p->pSchema != NULL) {
    for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
      taosMemoryFreeClear(p->transferBuf[i]);
    }

    taosMemoryFree(p->transferBuf);
    taosMemoryFree(p->pSchema);
210 211
  }

212 213
  destroyLastBlockLoadInfo(p->pLoadInfo);

214
  taosMemoryFree((void*)p->idstr);
215 216
  taosThreadMutexDestroy(&p->readerMutex);

217
  taosMemoryFree(pReader);
H
Haojun Liao 已提交
218
  return NULL;
219 220
}

H
Haojun Liao 已提交
221
static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint64_t uid, SArray** pRow,
H
Haojun Liao 已提交
222
                                 LRUHandle** h) {
223
  int32_t code = TSDB_CODE_SUCCESS;
224
  *pRow = NULL;
225

H
Haojun Liao 已提交
226
  if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
227
    code = tsdbCacheGetLastrowH(lruCache, uid, pr, h);
228
  } else {
229
    code = tsdbCacheGetLastH(lruCache, uid, pr, h);
230
  }
231

232 233 234 235 236 237 238
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // no data in the table of Uid
  if (*h != NULL) {
    *pRow = (SArray*)taosLRUCacheValue(lruCache, *h);
239 240 241 242 243
  }

  return code;
}

244
static void freeItem(void* pItem) {
245
  SLastCol* pCol = (SLastCol*)pItem;
246 247 248 249 250
  if (IS_VAR_DATA_TYPE(pCol->colVal.type)) {
    taosMemoryFree(pCol->colVal.value.pData);
  }
}

251
static int32_t tsdbCacheQueryReseek(void* pQHandle) {
252 253 254
  int32_t           code = 0;
  SCacheRowsReader* pReader = pQHandle;

255 256 257 258
  code = taosThreadMutexTryLock(&pReader->readerMutex);
  if (code == 0) {
    // pause current reader's state if not paused, save ts & version for resuming
    // just wait for the big all tables' snapshot untaking for now
259

260
    code = TSDB_CODE_VND_QUERY_BUSY;
261

262 263 264 265 266 267 268 269
    taosThreadMutexUnlock(&pReader->readerMutex);

    return code;
  } else if (code == EBUSY) {
    return TSDB_CODE_VND_QUERY_BUSY;
  } else {
    return -1;
  }
270 271
}

272
int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) {
273
  if (pReader == NULL || pResBlock == NULL) {
274 275 276
    return TSDB_CODE_INVALID_PARA;
  }

277
  SCacheRowsReader* pr = pReader;
278

279
  int32_t    code = TSDB_CODE_SUCCESS;
280
  SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache;
281
  LRUHandle* h = NULL;
H
Haojun Liao 已提交
282
  SArray*    pRow = NULL;
283
  bool       hasRes = false;
284
  SArray*    pLastCols = NULL;
285

286
  void** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES);
287 288 289 290 291
  if (pRes == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }

292
  for (int32_t j = 0; j < pr->numOfCols; ++j) {
293 294 295 296 297
    pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[slotIds[j]].bytes + VARSTR_HEADER_SIZE);
    SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]);
    p->ts = INT64_MIN;
  }

298 299 300 301 302 303
  pLastCols = taosArrayInit(pr->pSchema->numOfCols, sizeof(SLastCol));
  if (pLastCols == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }

304
  for (int32_t i = 0; i < pr->pSchema->numOfCols; ++i) {
305
    struct STColumn* pCol = &pr->pSchema->columns[i];
306
    SLastCol         p = {.ts = INT64_MIN, .colVal.type = pCol->type};
307 308 309 310 311

    if (IS_VAR_DATA_TYPE(pCol->type)) {
      p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char));
    }
    taosArrayPush(pLastCols, &p);
H
Haojun Liao 已提交
312
  }
313

314
  taosThreadMutexLock(&pr->readerMutex);
X
Xiaoyu Wang 已提交
315
  code = tsdbTakeReadSnap((STsdbReader*)pr, tsdbCacheQueryReseek, &pr->pReadSnap);
K
kailixu 已提交
316 317 318
  if (code != TSDB_CODE_SUCCESS) {
    goto _end;
  }
319
  pr->pDataFReader = NULL;
320
  pr->pDataFReaderLast = NULL;
321

322
  // retrieve the only one last row of all tables in the uid list.
H
Haojun Liao 已提交
323
  if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
324 325
    for (int32_t i = 0; i < pr->numOfTables; ++i) {
      STableKeyInfo* pKeyInfo = &pr->pTableList[i];
326

327
      code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
H
Haojun Liao 已提交
328
      if (code != TSDB_CODE_SUCCESS) {
K
kailixu 已提交
329
        goto _end;
330 331
      }

332
      if (h == NULL) {
333 334
        continue;
      }
335 336
      if (taosArrayGetSize(pRow) <= 0) {
        tsdbCacheRelease(lruCache, h);
337 338 339
        continue;
      }

H
Haojun Liao 已提交
340 341
      {
        for (int32_t k = 0; k < pr->numOfCols; ++k) {
342 343 344 345 346
          int32_t slotId = slotIds[k];

          if (slotId == -1) {  // the primary timestamp
            SLastCol* p = taosArrayGet(pLastCols, 0);
            SLastCol* pCol = (SLastCol*)taosArrayGet(pRow, 0);
347
            if (pCol->ts > p->ts) {
348
              hasRes = true;
349 350
              p->ts = pCol->ts;
              p->colVal = pCol->colVal;
H
Haojun Liao 已提交
351 352 353 354 355 356 357 358 359

              // only set value for last row query
              if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
                if (taosArrayGetSize(pTableUidList) == 0) {
                  taosArrayPush(pTableUidList, &pKeyInfo->uid);
                } else {
                  taosArraySet(pTableUidList, 0, &pKeyInfo->uid);
                }
              }
H
Haojun Liao 已提交
360 361
            }
          } else {
362
            SLastCol* p = taosArrayGet(pLastCols, slotId);
H
Haojun Liao 已提交
363 364
            SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);

365
            if (pColVal->ts > p->ts) {
366 367 368 369
              if (!COL_VAL_IS_VALUE(&pColVal->colVal) && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
                continue;
              }

370
              hasRes = true;
371
              p->ts = pColVal->ts;
H
Haojun Liao 已提交
372

H
Haojun Liao 已提交
373 374 375 376 377 378
              if (!IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
                p->colVal = pColVal->colVal;
              } else {
                if (COL_VAL_IS_VALUE(&pColVal->colVal)) {
                  memcpy(p->colVal.value.pData, pColVal->colVal.value.pData, pColVal->colVal.value.nData);
                }
H
Haojun Liao 已提交
379

H
Haojun Liao 已提交
380 381 382 383
                p->colVal.value.nData = pColVal->colVal.value.nData;
                p->colVal.type = pColVal->colVal.type;
                p->colVal.flag = pColVal->colVal.flag;
                p->colVal.cid = pColVal->colVal.cid;
H
Haojun Liao 已提交
384 385 386
              }
            }
          }
387 388
        }
      }
389

390
      tsdbCacheRelease(lruCache, h);
391
    }
392 393

    if (hasRes) {
H
Haojun Liao 已提交
394
      saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes, pr->idstr);
395 396
    }

H
Haojun Liao 已提交
397
  } else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
398 399
    for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
      STableKeyInfo* pKeyInfo = &pr->pTableList[i];
400
      code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
H
Haojun Liao 已提交
401
      if (code != TSDB_CODE_SUCCESS) {
K
kailixu 已提交
402
        goto _end;
403 404
      }

405
      if (h == NULL) {
406 407
        continue;
      }
408 409
      if (taosArrayGetSize(pRow) <= 0) {
        tsdbCacheRelease(lruCache, h);
410 411 412
        continue;
      }

H
Haojun Liao 已提交
413
      saveOneRow(pRow, pResBlock, pr, slotIds, pRes, pr->idstr);
414
      // TODO reset the pRes
415

416
      taosArrayPush(pTableUidList, &pKeyInfo->uid);
417
      tsdbCacheRelease(lruCache, h);
418

419 420
      pr->tableIndex += 1;
      if (pResBlock->info.rows >= pResBlock->info.capacity) {
421
        goto _end;
422
      }
423 424
    }
  } else {
425
    code = TSDB_CODE_INVALID_PARA;
426 427
  }

428
_end:
429
  tsdbDataFReaderClose(&pr->pDataFReaderLast);
430 431
  tsdbDataFReaderClose(&pr->pDataFReader);

432
  resetLastBlockLoadInfo(pr->pLoadInfo);
433
  tsdbUntakeReadSnap((STsdbReader*)pr, pr->pReadSnap, true);
434
  taosThreadMutexUnlock(&pr->readerMutex);
435

H
Haojun Liao 已提交
436 437 438 439
  if (pRes != NULL) {
    for (int32_t j = 0; j < pr->numOfCols; ++j) {
      taosMemoryFree(pRes[j]);
    }
440 441 442
  }

  taosMemoryFree(pRes);
443
  taosArrayDestroyEx(pLastCols, freeItem);
444
  return code;
445
}