tsdbCacheRead.c 14.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "taoserror.h"
#include "tarray.h"
#include "tcommon.h"
#include "tsdb.h"

21
#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t))
22

H
Haojun Liao 已提交
23 24
static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
                          void** pRes, const char* idStr) {
25 26
  int32_t numOfRows = pBlock->info.rows;

27
  if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
28 29
    bool allNullRow = true;

30 31 32
    for (int32_t i = 0; i < pReader->numOfCols; ++i) {
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
      SFirstLastRes*   p = (SFirstLastRes*)varDataVal(pRes[i]);
33

34 35 36 37 38
      if (slotIds[i] == -1) {  // the primary timestamp
        SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
        p->ts = pColVal->ts;
        p->bytes = TSDB_KEYSIZE;
        *(int64_t*)p->buf = pColVal->ts;
39
        allNullRow = false;
40
      } else {
41
        int32_t slotId = slotIds[i];
42
        // add check for null value, caused by the modification of table schema (new column added).
43
        if (slotId >= taosArrayGetSize(pRow)) {
44 45
          p->ts = 0;
          p->isNull = true;
46
          colDataSetNULL(pColInfoData, numOfRows);
47 48 49
          continue;
        }

50 51
        SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);

52 53
        p->ts = pColVal->ts;
        p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal);
54 55
        allNullRow = p->isNull & allNullRow;

56 57 58 59
        if (!p->isNull) {
          if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
            varDataSetLen(p->buf, pColVal->colVal.value.nData);
            memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData);
60
            p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE;  // binary needs to plus the header size
61 62 63 64 65 66
          } else {
            memcpy(p->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes);
            p->bytes = pReader->pSchema->columns[slotId].bytes;
          }
        }
      }
67

68
      // pColInfoData->info.bytes includes the VARSTR_HEADER_SIZE, need to substruct it
69
      p->hasResult = true;
70
      varDataSetLen(pRes[i], pColInfoData->info.bytes - VARSTR_HEADER_SIZE);
71
      colDataSetVal(pColInfoData, numOfRows, (const char*)pRes[i], false);
72
    }
73

74
    pBlock->info.rows += allNullRow ? 0 : 1;
H
Haojun Liao 已提交
75
  } else if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
76 77 78 79 80
    for (int32_t i = 0; i < pReader->numOfCols; ++i) {
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);

      if (slotIds[i] == -1) {
        SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
81
        colDataSetVal(pColInfoData, numOfRows, (const char*)&pColVal->ts, false);
82
      } else {
83 84 85 86 87 88
        int32_t slotId = slotIds[i];
        // add check for null value, caused by the modification of table schema (new column added).
        if (slotId >= taosArrayGetSize(pRow)) {
          colDataSetNULL(pColInfoData, numOfRows);
          continue;
        }
89
        SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
90
        SColVal*  pVal = &pColVal->colVal;
91

92
        if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
93
          if (!COL_VAL_IS_VALUE(&pColVal->colVal)) {
94
            colDataSetNULL(pColInfoData, numOfRows);
95
          } else {
96 97
            varDataSetLen(pReader->transferBuf[slotId], pVal->value.nData);
            memcpy(varDataVal(pReader->transferBuf[slotId]), pVal->value.pData, pVal->value.nData);
98
            colDataSetVal(pColInfoData, numOfRows, pReader->transferBuf[slotId], false);
99
          }
100
        } else {
101
          colDataSetVal(pColInfoData, numOfRows, (const char*)&pVal->value.val, !COL_VAL_IS_VALUE(pVal));
102 103 104
        }
      }
    }
105

106
    pBlock->info.rows += 1;
H
Haojun Liao 已提交
107 108 109
  } else {
    tsdbError("invalid retrieve type:%d, %s", pReader->type, idStr);
    return TSDB_CODE_INVALID_PARA;
110
  }
H
Haojun Liao 已提交
111 112

  return TSDB_CODE_SUCCESS;
113 114
}

115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* idstr) {
  int32_t numOfTables = p->numOfTables;

  if (suid != 0) {
    p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, suid, -1, 1);
    if (p->pSchema == NULL) {
      tsdbWarn("stable:%" PRIu64 " has been dropped, failed to retrieve cached rows, %s", suid, idstr);
      return TSDB_CODE_PAR_TABLE_NOT_EXIST;
    }
  } else {
    for (int32_t i = 0; i < numOfTables; ++i) {
      uint64_t uid = p->pTableList[i].uid;
      p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, uid, -1, 1);
      if (p->pSchema != NULL) {
        break;
      }

      tsdbWarn("table:%" PRIu64 " has been dropped, failed to retrieve cached rows, %s", uid, idstr);
    }

    // all queried tables have been dropped already, return immediately.
    if (p->pSchema == NULL) {
      taosMemoryFree(p);
      tsdbWarn("all queried tables has been dropped, try next group, %s", idstr);
      return TSDB_CODE_PAR_TABLE_NOT_EXIST;
    }
  }

  return TSDB_CODE_SUCCESS;
}

146
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
H
Haojun Liao 已提交
147
                                uint64_t suid, void** pReader, const char* idstr) {
148 149
  *pReader = NULL;
  SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
150 151 152 153
  if (p == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

154 155
  p->type = type;
  p->pVnode = pVnode;
156 157
  p->pTsdb = p->pVnode->pTsdb;
  p->verRange = (SVersionRange){.minVer = 0, .maxVer = UINT64_MAX};
158
  p->numOfCols = numOfCols;
159
  p->suid = suid;
160

161
  if (numOfTables == 0) {
162 163 164 165
    *pReader = p;
    return TSDB_CODE_SUCCESS;
  }

166
  p->pTableList = pTableIdList;
167
  p->numOfTables = numOfTables;
168

169 170 171 172 173 174
  int32_t code = setTableSchema(p, suid, idstr);
  if (code != TSDB_CODE_SUCCESS) {
    tsdbCacherowsReaderClose(p);
    return code;
  }

175
  p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES);
176
  if (p->transferBuf == NULL) {
H
Haojun Liao 已提交
177
    tsdbCacherowsReaderClose(p);
H
Haojun Liao 已提交
178
    return TSDB_CODE_OUT_OF_MEMORY;
179 180
  }

181
  for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
M
Minglei Jin 已提交
182
    if (IS_VAR_DATA_TYPE(p->pSchema->columns[i].type)) {
183
      p->transferBuf[i] = taosMemoryMalloc(p->pSchema->columns[i].bytes);
184 185 186 187
      if (p->transferBuf[i] == NULL) {
        tsdbCacherowsReaderClose(p);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
188 189
    }
  }
190

191 192
  int32_t numOfStt = ((SVnode*)pVnode)->config.sttTrigger;
  p->pLoadInfo = tCreateLastBlockLoadInfo(p->pSchema, NULL, 0, numOfStt);
193 194 195 196 197
  if (p->pLoadInfo == NULL) {
    tsdbCacherowsReaderClose(p);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

198
  p->idstr = taosStrdup(idstr);
199 200
  taosThreadMutexInit(&p->readerMutex, NULL);

201 202
  p->lastTs = INT64_MIN;

203 204 205 206
  *pReader = p;
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
207
void* tsdbCacherowsReaderClose(void* pReader) {
208
  SCacheRowsReader* p = pReader;
209

210 211 212 213 214 215 216
  if (p->pSchema != NULL) {
    for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
      taosMemoryFreeClear(p->transferBuf[i]);
    }

    taosMemoryFree(p->transferBuf);
    taosMemoryFree(p->pSchema);
217 218
  }

219 220
  taosMemoryFree(p->pCurrSchema);

221 222
  destroyLastBlockLoadInfo(p->pLoadInfo);

223
  taosMemoryFree((void*)p->idstr);
224 225
  taosThreadMutexDestroy(&p->readerMutex);

226
  taosMemoryFree(pReader);
H
Haojun Liao 已提交
227
  return NULL;
228 229
}

H
Haojun Liao 已提交
230
static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint64_t uid, SArray** pRow,
H
Haojun Liao 已提交
231
                                 LRUHandle** h) {
232
  int32_t code = TSDB_CODE_SUCCESS;
233
  *pRow = NULL;
234

H
Haojun Liao 已提交
235
  if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
236
    code = tsdbCacheGetLastrowH(lruCache, uid, pr, h);
237
  } else {
238
    code = tsdbCacheGetLastH(lruCache, uid, pr, h);
239
  }
240

241 242 243 244 245 246 247
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // no data in the table of Uid
  if (*h != NULL) {
    *pRow = (SArray*)taosLRUCacheValue(lruCache, *h);
248 249 250 251 252
  }

  return code;
}

253
static void freeItem(void* pItem) {
254
  SLastCol* pCol = (SLastCol*)pItem;
255 256 257 258 259
  if (IS_VAR_DATA_TYPE(pCol->colVal.type)) {
    taosMemoryFree(pCol->colVal.value.pData);
  }
}

260
static int32_t tsdbCacheQueryReseek(void* pQHandle) {
261 262 263
  int32_t           code = 0;
  SCacheRowsReader* pReader = pQHandle;

264 265 266 267
  code = taosThreadMutexTryLock(&pReader->readerMutex);
  if (code == 0) {
    // pause current reader's state if not paused, save ts & version for resuming
    // just wait for the big all tables' snapshot untaking for now
268

269
    code = TSDB_CODE_VND_QUERY_BUSY;
270

271 272 273 274 275 276 277 278
    taosThreadMutexUnlock(&pReader->readerMutex);

    return code;
  } else if (code == EBUSY) {
    return TSDB_CODE_VND_QUERY_BUSY;
  } else {
    return -1;
  }
279 280
}

281
int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) {
282
  if (pReader == NULL || pResBlock == NULL) {
283 284 285
    return TSDB_CODE_INVALID_PARA;
  }

286
  SCacheRowsReader* pr = pReader;
287

288
  int32_t    code = TSDB_CODE_SUCCESS;
289
  SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache;
290
  LRUHandle* h = NULL;
H
Haojun Liao 已提交
291
  SArray*    pRow = NULL;
292
  bool       hasRes = false;
293
  SArray*    pLastCols = NULL;
294

295
  void** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES);
296 297 298 299 300
  if (pRes == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }

301
  for (int32_t j = 0; j < pr->numOfCols; ++j) {
X
Xiaoyu Wang 已提交
302 303
    pRes[j] = taosMemoryCalloc(
        1, sizeof(SFirstLastRes) + pr->pSchema->columns[-1 == slotIds[j] ? 0 : slotIds[j]].bytes + VARSTR_HEADER_SIZE);
304 305 306 307
    SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]);
    p->ts = INT64_MIN;
  }

308 309 310 311 312 313
  pLastCols = taosArrayInit(pr->pSchema->numOfCols, sizeof(SLastCol));
  if (pLastCols == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }

314
  for (int32_t i = 0; i < pr->pSchema->numOfCols; ++i) {
315
    struct STColumn* pCol = &pr->pSchema->columns[i];
316
    SLastCol         p = {.ts = INT64_MIN, .colVal.type = pCol->type, .colVal.flag = CV_FLAG_NULL};
317 318 319 320 321

    if (IS_VAR_DATA_TYPE(pCol->type)) {
      p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char));
    }
    taosArrayPush(pLastCols, &p);
H
Haojun Liao 已提交
322
  }
323

324
  taosThreadMutexLock(&pr->readerMutex);
X
Xiaoyu Wang 已提交
325
  code = tsdbTakeReadSnap((STsdbReader*)pr, tsdbCacheQueryReseek, &pr->pReadSnap);
K
kailixu 已提交
326 327 328
  if (code != TSDB_CODE_SUCCESS) {
    goto _end;
  }
329
  pr->pDataFReader = NULL;
330
  pr->pDataFReaderLast = NULL;
331

332
  // retrieve the only one last row of all tables in the uid list.
H
Haojun Liao 已提交
333
  if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
334
    int64_t st = taosGetTimestampUs();
X
Xiaoyu Wang 已提交
335
    int64_t totalLastTs = INT64_MAX;
336 337
    for (int32_t i = 0; i < pr->numOfTables; ++i) {
      STableKeyInfo* pKeyInfo = &pr->pTableList[i];
338

339
      code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
H
Haojun Liao 已提交
340
      if (code != TSDB_CODE_SUCCESS) {
K
kailixu 已提交
341
        goto _end;
342 343
      }

344
      if (h == NULL) {
345 346
        continue;
      }
347 348
      if (taosArrayGetSize(pRow) <= 0) {
        tsdbCacheRelease(lruCache, h);
349 350 351
        continue;
      }

H
Haojun Liao 已提交
352
      {
353
        bool    hasNotNullRow = true;
X
Xiaoyu Wang 已提交
354
        int64_t singleTableLastTs = INT64_MAX;
H
Haojun Liao 已提交
355
        for (int32_t k = 0; k < pr->numOfCols; ++k) {
356 357 358 359 360
          int32_t slotId = slotIds[k];

          if (slotId == -1) {  // the primary timestamp
            SLastCol* p = taosArrayGet(pLastCols, 0);
            SLastCol* pCol = (SLastCol*)taosArrayGet(pRow, 0);
361
            if (pCol->ts > p->ts) {
362
              hasRes = true;
363 364
              p->ts = pCol->ts;
              p->colVal = pCol->colVal;
X
Xiaoyu Wang 已提交
365
              singleTableLastTs = pCol->ts;
H
Haojun Liao 已提交
366 367 368 369 370 371 372 373 374

              // only set value for last row query
              if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
                if (taosArrayGetSize(pTableUidList) == 0) {
                  taosArrayPush(pTableUidList, &pKeyInfo->uid);
                } else {
                  taosArraySet(pTableUidList, 0, &pKeyInfo->uid);
                }
              }
H
Haojun Liao 已提交
375 376
            }
          } else {
377
            SLastCol* p = taosArrayGet(pLastCols, slotId);
H
Haojun Liao 已提交
378 379
            SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);

380
            if (pColVal->ts > p->ts) {
381
              if (!COL_VAL_IS_VALUE(&pColVal->colVal) && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
382 383 384
                if (!COL_VAL_IS_VALUE(&p->colVal)) {
                  hasNotNullRow = false;
                }
385 386 387
                continue;
              }

388
              hasRes = true;
389
              p->ts = pColVal->ts;
X
Xiaoyu Wang 已提交
390 391
              if (pColVal->ts < singleTableLastTs && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
                singleTableLastTs = pColVal->ts;
392
              }
H
Haojun Liao 已提交
393

H
Haojun Liao 已提交
394 395 396 397 398 399
              if (!IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
                p->colVal = pColVal->colVal;
              } else {
                if (COL_VAL_IS_VALUE(&pColVal->colVal)) {
                  memcpy(p->colVal.value.pData, pColVal->colVal.value.pData, pColVal->colVal.value.nData);
                }
H
Haojun Liao 已提交
400

H
Haojun Liao 已提交
401 402 403 404
                p->colVal.value.nData = pColVal->colVal.value.nData;
                p->colVal.type = pColVal->colVal.type;
                p->colVal.flag = pColVal->colVal.flag;
                p->colVal.cid = pColVal->colVal.cid;
H
Haojun Liao 已提交
405 406 407
              }
            }
          }
408
        }
409 410

        if (hasNotNullRow) {
X
Xiaoyu Wang 已提交
411 412
          if (INT64_MAX == totalLastTs || totalLastTs < singleTableLastTs) {
            totalLastTs = singleTableLastTs;
X
Xiaoyu Wang 已提交
413
          }
414 415
          double cost = (taosGetTimestampUs() - st) / 1000.0;
          if (cost > tsCacheLazyLoadThreshold) {
X
Xiaoyu Wang 已提交
416
            pr->lastTs = totalLastTs;
417
          }
418
        }
419
      }
420

421
      tsdbCacheRelease(lruCache, h);
422
    }
423 424

    if (hasRes) {
H
Haojun Liao 已提交
425
      saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes, pr->idstr);
426
    }
H
Haojun Liao 已提交
427
  } else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
428 429
    for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
      STableKeyInfo* pKeyInfo = &pr->pTableList[i];
430
      code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
H
Haojun Liao 已提交
431
      if (code != TSDB_CODE_SUCCESS) {
K
kailixu 已提交
432
        goto _end;
433 434
      }

435
      if (h == NULL) {
436 437
        continue;
      }
438 439
      if (taosArrayGetSize(pRow) <= 0) {
        tsdbCacheRelease(lruCache, h);
440 441 442
        continue;
      }

H
Haojun Liao 已提交
443
      saveOneRow(pRow, pResBlock, pr, slotIds, pRes, pr->idstr);
444
      // TODO reset the pRes
445

446
      taosArrayPush(pTableUidList, &pKeyInfo->uid);
447
      tsdbCacheRelease(lruCache, h);
448

449 450
      pr->tableIndex += 1;
      if (pResBlock->info.rows >= pResBlock->info.capacity) {
451
        goto _end;
452
      }
453 454
    }
  } else {
455
    code = TSDB_CODE_INVALID_PARA;
456 457
  }

458
_end:
459
  tsdbDataFReaderClose(&pr->pDataFReaderLast);
460 461
  tsdbDataFReaderClose(&pr->pDataFReader);

462
  resetLastBlockLoadInfo(pr->pLoadInfo);
463
  tsdbUntakeReadSnap((STsdbReader*)pr, pr->pReadSnap, true);
464
  taosThreadMutexUnlock(&pr->readerMutex);
465

H
Haojun Liao 已提交
466 467 468 469
  if (pRes != NULL) {
    for (int32_t j = 0; j < pr->numOfCols; ++j) {
      taosMemoryFree(pRes[j]);
    }
470 471 472
  }

  taosMemoryFree(pRes);
473
  taosArrayDestroyEx(pLastCols, freeItem);
474
  return code;
475
}