tsdbCacheRead.c 14.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "taoserror.h"
#include "tarray.h"
#include "tcommon.h"
#include "tsdb.h"

21
#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t))
22

H
Haojun Liao 已提交
23 24
static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
                          void** pRes, const char* idStr) {
25 26
  int32_t numOfRows = pBlock->info.rows;

27
  if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
28 29
    bool allNullRow = true;

30 31 32
    for (int32_t i = 0; i < pReader->numOfCols; ++i) {
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
      SFirstLastRes*   p = (SFirstLastRes*)varDataVal(pRes[i]);
33

34 35 36 37 38
      if (slotIds[i] == -1) {  // the primary timestamp
        SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
        p->ts = pColVal->ts;
        p->bytes = TSDB_KEYSIZE;
        *(int64_t*)p->buf = pColVal->ts;
39
        allNullRow = false;
40
      } else {
41
        int32_t slotId = slotIds[i];
42
        // add check for null value, caused by the modification of table schema (new column added).
43
        if (slotId >= taosArrayGetSize(pRow)) {
44 45
          p->ts = 0;
          p->isNull = true;
46
          colDataSetNULL(pColInfoData, numOfRows);
47 48 49
          continue;
        }

50 51
        SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);

52 53
        p->ts = pColVal->ts;
        p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal);
54 55
        allNullRow = p->isNull & allNullRow;

56 57 58 59
        if (!p->isNull) {
          if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
            varDataSetLen(p->buf, pColVal->colVal.value.nData);
            memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData);
60
            p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE;  // binary needs to plus the header size
61 62 63 64 65 66
          } else {
            memcpy(p->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes);
            p->bytes = pReader->pSchema->columns[slotId].bytes;
          }
        }
      }
67

68
      // pColInfoData->info.bytes includes the VARSTR_HEADER_SIZE, need to substruct it
69
      p->hasResult = true;
70
      varDataSetLen(pRes[i], pColInfoData->info.bytes - VARSTR_HEADER_SIZE);
71
      colDataSetVal(pColInfoData, numOfRows, (const char*)pRes[i], false);
72
    }
73

74
    pBlock->info.rows += allNullRow ? 0 : 1;
H
Haojun Liao 已提交
75
  } else if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
76 77 78 79 80
    for (int32_t i = 0; i < pReader->numOfCols; ++i) {
      SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);

      if (slotIds[i] == -1) {
        SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
81
        colDataSetVal(pColInfoData, numOfRows, (const char*)&pColVal->ts, false);
82
      } else {
83 84 85 86 87 88
        int32_t slotId = slotIds[i];
        // add check for null value, caused by the modification of table schema (new column added).
        if (slotId >= taosArrayGetSize(pRow)) {
          colDataSetNULL(pColInfoData, numOfRows);
          continue;
        }
89
        SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
90
        SColVal*  pVal = &pColVal->colVal;
91

92
        if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
93
          if (!COL_VAL_IS_VALUE(&pColVal->colVal)) {
94
            colDataSetNULL(pColInfoData, numOfRows);
95
          } else {
96 97
            varDataSetLen(pReader->transferBuf[slotId], pVal->value.nData);
            memcpy(varDataVal(pReader->transferBuf[slotId]), pVal->value.pData, pVal->value.nData);
98
            colDataSetVal(pColInfoData, numOfRows, pReader->transferBuf[slotId], false);
99
          }
100
        } else {
101
          colDataSetVal(pColInfoData, numOfRows, (const char*)&pVal->value.val, !COL_VAL_IS_VALUE(pVal));
102 103 104
        }
      }
    }
105

106
    pBlock->info.rows += 1;
H
Haojun Liao 已提交
107 108 109
  } else {
    tsdbError("invalid retrieve type:%d, %s", pReader->type, idStr);
    return TSDB_CODE_INVALID_PARA;
110
  }
H
Haojun Liao 已提交
111 112

  return TSDB_CODE_SUCCESS;
113 114
}

115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* idstr) {
  int32_t numOfTables = p->numOfTables;

  if (suid != 0) {
    p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, suid, -1, 1);
    if (p->pSchema == NULL) {
      tsdbWarn("stable:%" PRIu64 " has been dropped, failed to retrieve cached rows, %s", suid, idstr);
      return TSDB_CODE_PAR_TABLE_NOT_EXIST;
    }
  } else {
    for (int32_t i = 0; i < numOfTables; ++i) {
      uint64_t uid = p->pTableList[i].uid;
      p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, uid, -1, 1);
      if (p->pSchema != NULL) {
        break;
      }

      tsdbWarn("table:%" PRIu64 " has been dropped, failed to retrieve cached rows, %s", uid, idstr);
    }

    // all queried tables have been dropped already, return immediately.
    if (p->pSchema == NULL) {
      taosMemoryFree(p);
      tsdbWarn("all queried tables has been dropped, try next group, %s", idstr);
      return TSDB_CODE_PAR_TABLE_NOT_EXIST;
    }
  }

  return TSDB_CODE_SUCCESS;
}

146
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
H
Haojun Liao 已提交
147
                                uint64_t suid, void** pReader, const char* idstr) {
148 149
  *pReader = NULL;
  SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
150 151 152 153
  if (p == NULL) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

154 155
  p->type = type;
  p->pVnode = pVnode;
156 157
  p->pTsdb = p->pVnode->pTsdb;
  p->verRange = (SVersionRange){.minVer = 0, .maxVer = UINT64_MAX};
158
  p->numOfCols = numOfCols;
159
  p->suid = suid;
160

161
  if (numOfTables == 0) {
162 163 164 165
    *pReader = p;
    return TSDB_CODE_SUCCESS;
  }

166
  p->pTableList = pTableIdList;
167
  p->numOfTables = numOfTables;
168

169 170 171 172 173 174
  int32_t code = setTableSchema(p, suid, idstr);
  if (code != TSDB_CODE_SUCCESS) {
    tsdbCacherowsReaderClose(p);
    return code;
  }

175
  p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES);
176
  if (p->transferBuf == NULL) {
H
Haojun Liao 已提交
177
    tsdbCacherowsReaderClose(p);
H
Haojun Liao 已提交
178
    return TSDB_CODE_OUT_OF_MEMORY;
179 180
  }

181
  for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
M
Minglei Jin 已提交
182
    if (IS_VAR_DATA_TYPE(p->pSchema->columns[i].type)) {
183
      p->transferBuf[i] = taosMemoryMalloc(p->pSchema->columns[i].bytes);
184 185 186 187
      if (p->transferBuf[i] == NULL) {
        tsdbCacherowsReaderClose(p);
        return TSDB_CODE_OUT_OF_MEMORY;
      }
188 189
    }
  }
190

191 192
  int32_t numOfStt = ((SVnode*)pVnode)->config.sttTrigger;
  p->pLoadInfo = tCreateLastBlockLoadInfo(p->pSchema, NULL, 0, numOfStt);
193 194 195 196 197
  if (p->pLoadInfo == NULL) {
    tsdbCacherowsReaderClose(p);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

198
  p->idstr = taosStrdup(idstr);
199 200
  taosThreadMutexInit(&p->readerMutex, NULL);

201 202
  p->lastTs = INT64_MIN;

203 204 205 206
  *pReader = p;
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
207
void* tsdbCacherowsReaderClose(void* pReader) {
208
  SCacheRowsReader* p = pReader;
209

210 211 212 213 214 215 216
  if (p->pSchema != NULL) {
    for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
      taosMemoryFreeClear(p->transferBuf[i]);
    }

    taosMemoryFree(p->transferBuf);
    taosMemoryFree(p->pSchema);
217 218
  }

219 220
  taosMemoryFree(p->pCurrSchema);

221 222
  destroyLastBlockLoadInfo(p->pLoadInfo);

223
  taosMemoryFree((void*)p->idstr);
224 225
  taosThreadMutexDestroy(&p->readerMutex);

226
  taosMemoryFree(pReader);
H
Haojun Liao 已提交
227
  return NULL;
228 229
}

H
Haojun Liao 已提交
230
static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint64_t uid, SArray** pRow,
H
Haojun Liao 已提交
231
                                 LRUHandle** h) {
232
  int32_t code = TSDB_CODE_SUCCESS;
233
  *pRow = NULL;
234

H
Haojun Liao 已提交
235
  if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
236
    code = tsdbCacheGetLastrowH(lruCache, uid, pr, h);
237
  } else {
238
    code = tsdbCacheGetLastH(lruCache, uid, pr, h);
239
  }
240

241 242 243 244 245 246 247
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  // no data in the table of Uid
  if (*h != NULL) {
    *pRow = (SArray*)taosLRUCacheValue(lruCache, *h);
248 249 250 251 252
  }

  return code;
}

253
static void freeItem(void* pItem) {
254
  SLastCol* pCol = (SLastCol*)pItem;
255 256 257 258 259
  if (IS_VAR_DATA_TYPE(pCol->colVal.type)) {
    taosMemoryFree(pCol->colVal.value.pData);
  }
}

260
static int32_t tsdbCacheQueryReseek(void* pQHandle) {
261 262 263
  int32_t           code = 0;
  SCacheRowsReader* pReader = pQHandle;

264 265 266 267
  code = taosThreadMutexTryLock(&pReader->readerMutex);
  if (code == 0) {
    // pause current reader's state if not paused, save ts & version for resuming
    // just wait for the big all tables' snapshot untaking for now
268

269
    code = TSDB_CODE_VND_QUERY_BUSY;
270

271 272 273 274 275 276 277 278
    taosThreadMutexUnlock(&pReader->readerMutex);

    return code;
  } else if (code == EBUSY) {
    return TSDB_CODE_VND_QUERY_BUSY;
  } else {
    return -1;
  }
279 280
}

281
int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) {
282
  if (pReader == NULL || pResBlock == NULL) {
283 284 285
    return TSDB_CODE_INVALID_PARA;
  }

286
  SCacheRowsReader* pr = pReader;
287

288
  int32_t    code = TSDB_CODE_SUCCESS;
289
  SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache;
290
  LRUHandle* h = NULL;
H
Haojun Liao 已提交
291
  SArray*    pRow = NULL;
292
  bool       hasRes = false;
293
  SArray*    pLastCols = NULL;
294

295
  void** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES);
296 297 298 299 300
  if (pRes == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }

301
  for (int32_t j = 0; j < pr->numOfCols; ++j) {
X
Xiaoyu Wang 已提交
302 303
    pRes[j] = taosMemoryCalloc(
        1, sizeof(SFirstLastRes) + pr->pSchema->columns[-1 == slotIds[j] ? 0 : slotIds[j]].bytes + VARSTR_HEADER_SIZE);
304 305 306 307
    SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]);
    p->ts = INT64_MIN;
  }

308 309 310 311 312 313
  pLastCols = taosArrayInit(pr->pSchema->numOfCols, sizeof(SLastCol));
  if (pLastCols == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
  }

314
  for (int32_t i = 0; i < pr->pSchema->numOfCols; ++i) {
315
    struct STColumn* pCol = &pr->pSchema->columns[i];
316
    SLastCol         p = {.ts = INT64_MIN, .colVal.type = pCol->type, .colVal.flag = CV_FLAG_NULL};
317 318 319 320 321

    if (IS_VAR_DATA_TYPE(pCol->type)) {
      p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char));
    }
    taosArrayPush(pLastCols, &p);
H
Haojun Liao 已提交
322
  }
323

324
  taosThreadMutexLock(&pr->readerMutex);
X
Xiaoyu Wang 已提交
325
  code = tsdbTakeReadSnap((STsdbReader*)pr, tsdbCacheQueryReseek, &pr->pReadSnap);
K
kailixu 已提交
326 327 328
  if (code != TSDB_CODE_SUCCESS) {
    goto _end;
  }
329
  pr->pDataFReader = NULL;
330
  pr->pDataFReaderLast = NULL;
331

332
  // retrieve the only one last row of all tables in the uid list.
H
Haojun Liao 已提交
333
  if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
334
    int64_t st = taosGetTimestampUs();
335 336
    for (int32_t i = 0; i < pr->numOfTables; ++i) {
      STableKeyInfo* pKeyInfo = &pr->pTableList[i];
337

338
      code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
H
Haojun Liao 已提交
339
      if (code != TSDB_CODE_SUCCESS) {
K
kailixu 已提交
340
        goto _end;
341 342
      }

343
      if (h == NULL) {
344 345
        continue;
      }
346 347
      if (taosArrayGetSize(pRow) <= 0) {
        tsdbCacheRelease(lruCache, h);
348 349 350
        continue;
      }

H
Haojun Liao 已提交
351
      {
352 353
        bool    hasNotNullRow = true;
        int64_t minTs = INT64_MAX;
H
Haojun Liao 已提交
354
        for (int32_t k = 0; k < pr->numOfCols; ++k) {
355 356 357 358 359
          int32_t slotId = slotIds[k];

          if (slotId == -1) {  // the primary timestamp
            SLastCol* p = taosArrayGet(pLastCols, 0);
            SLastCol* pCol = (SLastCol*)taosArrayGet(pRow, 0);
360
            if (pCol->ts > p->ts) {
361
              hasRes = true;
362 363
              p->ts = pCol->ts;
              p->colVal = pCol->colVal;
364
              minTs = pCol->ts;
H
Haojun Liao 已提交
365 366 367 368 369 370 371 372 373

              // only set value for last row query
              if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
                if (taosArrayGetSize(pTableUidList) == 0) {
                  taosArrayPush(pTableUidList, &pKeyInfo->uid);
                } else {
                  taosArraySet(pTableUidList, 0, &pKeyInfo->uid);
                }
              }
H
Haojun Liao 已提交
374 375
            }
          } else {
376
            SLastCol* p = taosArrayGet(pLastCols, slotId);
H
Haojun Liao 已提交
377 378
            SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);

379
            if (pColVal->ts > p->ts) {
380
              if (!COL_VAL_IS_VALUE(&pColVal->colVal) && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
381 382 383
                if (!COL_VAL_IS_VALUE(&p->colVal)) {
                  hasNotNullRow = false;
                }
384 385 386
                continue;
              }

387
              hasRes = true;
388
              p->ts = pColVal->ts;
389 390 391
              if (pColVal->ts < minTs && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
                minTs = pColVal->ts;
              }
H
Haojun Liao 已提交
392

H
Haojun Liao 已提交
393 394 395 396 397 398
              if (!IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
                p->colVal = pColVal->colVal;
              } else {
                if (COL_VAL_IS_VALUE(&pColVal->colVal)) {
                  memcpy(p->colVal.value.pData, pColVal->colVal.value.pData, pColVal->colVal.value.nData);
                }
H
Haojun Liao 已提交
399

H
Haojun Liao 已提交
400 401 402 403
                p->colVal.value.nData = pColVal->colVal.value.nData;
                p->colVal.type = pColVal->colVal.type;
                p->colVal.flag = pColVal->colVal.flag;
                p->colVal.cid = pColVal->colVal.cid;
H
Haojun Liao 已提交
404 405 406
              }
            }
          }
407
        }
408 409

        if (hasNotNullRow) {
410 411 412 413
          double cost = (taosGetTimestampUs() - st) / 1000.0;
          if (cost > tsCacheLazyLoadThreshold) {
            pr->lastTs = minTs;
          }
414
        }
415
      }
416

417
      tsdbCacheRelease(lruCache, h);
418
    }
419 420

    if (hasRes) {
H
Haojun Liao 已提交
421
      saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes, pr->idstr);
422
    }
H
Haojun Liao 已提交
423
  } else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
424 425
    for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
      STableKeyInfo* pKeyInfo = &pr->pTableList[i];
426
      code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
H
Haojun Liao 已提交
427
      if (code != TSDB_CODE_SUCCESS) {
K
kailixu 已提交
428
        goto _end;
429 430
      }

431
      if (h == NULL) {
432 433
        continue;
      }
434 435
      if (taosArrayGetSize(pRow) <= 0) {
        tsdbCacheRelease(lruCache, h);
436 437 438
        continue;
      }

H
Haojun Liao 已提交
439
      saveOneRow(pRow, pResBlock, pr, slotIds, pRes, pr->idstr);
440
      // TODO reset the pRes
441

442
      taosArrayPush(pTableUidList, &pKeyInfo->uid);
443
      tsdbCacheRelease(lruCache, h);
444

445 446
      pr->tableIndex += 1;
      if (pResBlock->info.rows >= pResBlock->info.capacity) {
447
        goto _end;
448
      }
449 450
    }
  } else {
451
    code = TSDB_CODE_INVALID_PARA;
452 453
  }

454
_end:
455
  tsdbDataFReaderClose(&pr->pDataFReaderLast);
456 457
  tsdbDataFReaderClose(&pr->pDataFReader);

458
  resetLastBlockLoadInfo(pr->pLoadInfo);
459
  tsdbUntakeReadSnap((STsdbReader*)pr, pr->pReadSnap, true);
460
  taosThreadMutexUnlock(&pr->readerMutex);
461

H
Haojun Liao 已提交
462 463 464 465
  if (pRes != NULL) {
    for (int32_t j = 0; j < pr->numOfCols; ++j) {
      taosMemoryFree(pRes[j]);
    }
466 467 468
  }

  taosMemoryFree(pRes);
469
  taosArrayDestroyEx(pLastCols, freeItem);
470
  return code;
471
}