tpagedbuf.c 20.5 KB
Newer Older
S
Shengliang Guan 已提交
1
#define _DEFAULT_SOURCE
H
Haojun Liao 已提交
2
#include "tpagedbuf.h"
3 4
#include "taoserror.h"
#include "tcompression.h"
5
#include "tsimplehash.h"
S
Shengliang Guan 已提交
6
#include "tlog.h"
7

dengyihao's avatar
dengyihao 已提交
8 9
#define GET_PAYLOAD_DATA(_p)           ((char*)(_p)->pData + POINTER_BYTES)
#define BUF_PAGE_IN_MEM(_p)            ((_p)->pData != NULL)
10
#define CLEAR_BUF_PAGE_IN_MEM_FLAG(_p) ((_p)->pData = NULL)
dengyihao's avatar
dengyihao 已提交
11 12
#define HAS_DATA_IN_DISK(_p)           ((_p)->offset >= 0)
#define NO_IN_MEM_AVAILABLE_PAGES(_b)  (listNEles((_b)->lruList) >= (_b)->inMemPages)
13

14
typedef struct SPageDiskInfo {
S
Shengliang Guan 已提交
15 16
  int64_t offset;
  int32_t length;
H
Haojun Liao 已提交
17
} SPageDiskInfo, SFreeListItem;
18

H
Haojun Liao 已提交
19
struct SPageInfo {
dengyihao's avatar
dengyihao 已提交
20
  SListNode* pn;  // point to list node struct. it is NULL when the page is evicted from the in-memory buffer
S
Shengliang Guan 已提交
21 22 23
  void*      pData;
  int64_t    offset;
  int32_t    pageId;
24
  int32_t    length : 29;
S
Shengliang Guan 已提交
25 26
  bool       used : 1;   // set current page is in used
  bool       dirty : 1;  // set current buffer page is dirty or not
H
Haojun Liao 已提交
27
};
28

H
Haojun Liao 已提交
29
struct SDiskbasedBuf {
30 31
  int32_t   numOfPages;
  int64_t   totalBufSize;
S
Shengliang Guan 已提交
32
  uint64_t  fileSize;  // disk file size
33
  TdFilePtr pFile;
S
Shengliang Guan 已提交
34 35
  int32_t   allocateId;  // allocated page id
  char*     path;        // file path
36
  char*     prefix;      // file name prefix
S
Shengliang Guan 已提交
37 38
  int32_t   pageSize;    // current used page size
  int32_t   inMemPages;  // numOfPages that are allocated in memory
39
  SList*    freePgList;  // free page list
40
  SArray*   pIdList;     // page id list
41
  SSHashObj*all;
42
  SList*    lruList;
S
Shengliang Guan 已提交
43 44 45 46 47 48
  void*     emptyDummyIdList;  // dummy id list
  void*     assistBuf;         // assistant buffer for compress/decompress data
  SArray*   pFree;             // free area in file
  bool      comp;              // compressed before flushed to disk
  uint64_t  nextPos;           // next page flush position

H
Hongze Cheng 已提交
49 50
  char*               id;           // for debug purpose
  bool                printStatis;  // Print statistics info when closing this buffer.
H
Haojun Liao 已提交
51
  SDiskbasedBufStatis statis;
H
Haojun Liao 已提交
52
};
53

H
Haojun Liao 已提交
54
static int32_t createDiskFile(SDiskbasedBuf* pBuf) {
dengyihao's avatar
dengyihao 已提交
55
  if (pBuf->path == NULL) {  // prepare the file name when needed it
56 57
    char path[PATH_MAX] = {0};
    taosGetTmpfilePath(pBuf->prefix, "paged-buf", path);
58
    pBuf->path = taosStrdup(path);
dengyihao's avatar
dengyihao 已提交
59 60 61
    if (pBuf->path == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
62 63
  }

H
Hongze Cheng 已提交
64 65
  pBuf->pFile =
      taosOpenFile(pBuf->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
66
  if (pBuf->pFile == NULL) {
67 68 69 70 71 72
    return TAOS_SYSTEM_ERROR(errno);
  }

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
73
static char* doCompressData(void* data, int32_t srcSize, int32_t* dst, SDiskbasedBuf* pBuf) {  // do nothing
H
Haojun Liao 已提交
74
  if (!pBuf->comp) {
75 76 77 78
    *dst = srcSize;
    return data;
  }

H
Haojun Liao 已提交
79
  *dst = tsCompressString(data, srcSize, 1, pBuf->assistBuf, srcSize, ONE_STAGE_COMP, NULL, 0);
80

H
Haojun Liao 已提交
81
  memcpy(data, pBuf->assistBuf, *dst);
82 83 84
  return data;
}

S
Shengliang Guan 已提交
85
static char* doDecompressData(void* data, int32_t srcSize, int32_t* dst, SDiskbasedBuf* pBuf) {  // do nothing
H
Haojun Liao 已提交
86
  if (!pBuf->comp) {
87 88 89 90
    *dst = srcSize;
    return data;
  }

H
Haojun Liao 已提交
91
  *dst = tsDecompressString(data, srcSize, 1, pBuf->assistBuf, pBuf->pageSize, ONE_STAGE_COMP, NULL, 0);
92
  if (*dst > 0) {
H
Haojun Liao 已提交
93
    memcpy(data, pBuf->assistBuf, *dst);
94 95 96 97
  }
  return data;
}

98
static uint64_t allocateNewPositionInFile(SDiskbasedBuf* pBuf, size_t size) {
H
Haojun Liao 已提交
99 100
  if (pBuf->pFree == NULL) {
    return pBuf->nextPos;
101 102 103
  } else {
    int32_t offset = -1;

H
Haojun Liao 已提交
104
    size_t num = taosArrayGetSize(pBuf->pFree);
S
Shengliang Guan 已提交
105
    for (int32_t i = 0; i < num; ++i) {
H
Haojun Liao 已提交
106
      SFreeListItem* pi = taosArrayGet(pBuf->pFree, i);
H
Haojun Liao 已提交
107
      if (pi->length >= size) {
108 109
        offset = pi->offset;
        pi->offset += (int32_t)size;
H
Haojun Liao 已提交
110
        pi->length -= (int32_t)size;
111 112 113 114 115 116

        return offset;
      }
    }

    // no available recycle space, allocate new area in file
H
Haojun Liao 已提交
117
    return pBuf->nextPos;
118 119 120
  }
}

H
Haojun Liao 已提交
121 122 123 124 125 126 127 128
/**
 *   +--------------------------+-------------------+--------------+
 *   | PTR to SPageInfo (8bytes)| Payload (PageSize)| 2 Extra Bytes|
 *   +--------------------------+-------------------+--------------+
 * @param pBuf
 * @param pg
 * @return
 */
129 130 131

static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + sizeof(SFilePage); }

132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
static int32_t doFlushBufPageImpl(SDiskbasedBuf* pBuf, int64_t offset, const char* pData, int32_t size) {
  int32_t ret = taosLSeekFile(pBuf->pFile, offset, SEEK_SET);
  if (ret == -1) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return terrno;
  }

  ret = (int32_t)taosWriteFile(pBuf->pFile, pData, size);
  if (ret != size) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return terrno;
  }

  // extend the file
  if (pBuf->fileSize < offset + size) {
    pBuf->fileSize = offset + size;
  }

  pBuf->statis.flushBytes += size;
  pBuf->statis.flushPages += 1;

  return TSDB_CODE_SUCCESS;
}

156
static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
H
Haojun Liao 已提交
157 158 159 160 161
  if (pg->pData == NULL || pg->used) {
    uError("invalid params in paged buffer process when flushing buf to disk, %s", pBuf->id);
    terrno = TSDB_CODE_INVALID_PARA;
    return NULL;
  }
162

H
Haojun Liao 已提交
163
  int32_t size = pBuf->pageSize;
164 165 166
  int64_t offset = pg->offset;

  char* t = NULL;
167
  if ((!HAS_DATA_IN_DISK(pg)) || pg->dirty) {
168
    void* payload = GET_PAYLOAD_DATA(pg);
H
Haojun Liao 已提交
169
    t = doCompressData(payload, pBuf->pageSize, &size, pBuf);
170 171
    if (size < 0) {
      uError("failed to compress data when flushing data to disk, %s", pBuf->id);
dengyihao's avatar
dengyihao 已提交
172
      terrno = TSDB_CODE_INVALID_PARA;
173 174
      return NULL;
    }
H
Haojun Liao 已提交
175
  }
176 177

  // this page is flushed to disk for the first time
H
Haojun Liao 已提交
178
  if (pg->dirty) {
179
    if (!HAS_DATA_IN_DISK(pg)) {
180
      offset = allocateNewPositionInFile(pBuf, size);
H
Haojun Liao 已提交
181
      pBuf->nextPos += size;
182

183 184
      int32_t code = doFlushBufPageImpl(pBuf, offset, t, size);
      if (code != TSDB_CODE_SUCCESS) {
185 186
        return NULL;
      }
H
Haojun Liao 已提交
187 188 189 190
    } else {
      // length becomes greater, current space is not enough, allocate new place, otherwise, do nothing
      if (pg->length < size) {
        // 1. add current space to free list
191
        SPageDiskInfo dinfo = {.length = pg->length, .offset = offset};
H
Haojun Liao 已提交
192 193 194
        taosArrayPush(pBuf->pFree, &dinfo);

        // 2. allocate new position, and update the info
195
        offset = allocateNewPositionInFile(pBuf, size);
H
Haojun Liao 已提交
196 197
        pBuf->nextPos += size;
      }
198

199 200
      int32_t code = doFlushBufPageImpl(pBuf, offset, t, size);
      if (code != TSDB_CODE_SUCCESS) {
201 202
        return NULL;
      }
203
    }
204
  } else {  // NOTE: the size may be -1, the this recycle page has not been flushed to disk yet.
H
Haojun Liao 已提交
205
    size = pg->length;
206 207
  }

H
Haojun Liao 已提交
208
  char* pDataBuf = pg->pData;
H
Haojun Liao 已提交
209
  memset(pDataBuf, 0, getAllocPageSize(pBuf->pageSize));
210

211
#ifdef BUF_PAGE_DEBUG
212
  uDebug("page_flush %p, pageId:%d, offset:%d", pDataBuf, pg->pageId, offset);
213
#endif
214

215
  pg->offset = offset;
H
Haojun Liao 已提交
216
  pg->length = size;  // on disk size
H
Haojun Liao 已提交
217
  return pDataBuf;
218 219
}

220
static char* flushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
221 222
  int32_t ret = TSDB_CODE_SUCCESS;

223
  if (pBuf->pFile == NULL) {
H
Haojun Liao 已提交
224
    if ((ret = createDiskFile(pBuf)) != TSDB_CODE_SUCCESS) {
225 226 227 228 229
      terrno = ret;
      return NULL;
    }
  }

230
  char* p = doFlushBufPage(pBuf, pg);
231
  CLEAR_BUF_PAGE_IN_MEM_FLAG(pg);
H
Haojun Liao 已提交
232 233 234

  pg->dirty = false;
  return p;
235 236 237
}

// load file block data in disk
H
Haojun Liao 已提交
238
static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
239
  if (pg->offset < 0 || pg->length <= 0) {
dengyihao's avatar
dengyihao 已提交
240
    uError("failed to load buf page from disk, offset:%" PRId64 ", length:%d, %s", pg->offset, pg->length, pBuf->id);
241 242 243
    return TSDB_CODE_INVALID_PARA;
  }

244
  int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET);
245
  if (ret == -1) {
H
Haojun Liao 已提交
246 247 248 249
    ret = TAOS_SYSTEM_ERROR(errno);
    return ret;
  }

250
  void* pPage = (void*)GET_PAYLOAD_DATA(pg);
H
Haojun Liao 已提交
251
  ret = (int32_t)taosReadFile(pBuf->pFile, pPage, pg->length);
252
  if (ret != pg->length) {
H
Haojun Liao 已提交
253 254
    ret = TAOS_SYSTEM_ERROR(errno);
    return ret;
255 256
  }

H
Haojun Liao 已提交
257 258
  pBuf->statis.loadBytes += pg->length;
  pBuf->statis.loadPages += 1;
259 260

  int32_t fullSize = 0;
H
Haojun Liao 已提交
261
  doDecompressData(pPage, pg->length, &fullSize, pBuf);
H
Haojun Liao 已提交
262
  return 0;
263 264
}

265
static SPageInfo* registerNewPageInfo(SDiskbasedBuf* pBuf, int32_t pageId) {
H
Haojun Liao 已提交
266
  pBuf->numOfPages += 1;
267

wafwerar's avatar
wafwerar 已提交
268
  SPageInfo* ppi = taosMemoryMalloc(sizeof(SPageInfo));
269 270 271 272
  if (ppi == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
273 274

  ppi->pageId = pageId;
H
Hongze Cheng 已提交
275
  ppi->pData = NULL;
276 277
  ppi->offset = -1;
  ppi->length = -1;
H
Hongze Cheng 已提交
278 279 280
  ppi->used = true;
  ppi->pn = NULL;
  ppi->dirty = false;
281

282
  return *(SPageInfo**)taosArrayPush(pBuf->pIdList, &ppi);
283 284
}

H
Haojun Liao 已提交
285
static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
286
  SListIter iter = {0};
H
Haojun Liao 已提交
287
  tdListInitIter(pBuf->lruList, &iter, TD_LIST_BACKWARD);
288 289

  SListNode* pn = NULL;
S
Shengliang Guan 已提交
290 291
  while ((pn = tdListNext(&iter)) != NULL) {
    SPageInfo* pageInfo = *(SPageInfo**)pn->data;
292 293 294

    SPageInfo* p = *(SPageInfo**)(pageInfo->pData);
    ASSERT(pageInfo->pageId >= 0 && pageInfo->pn == pn && p == pageInfo);
295 296 297 298 299 300 301 302 303

    if (!pageInfo->used) {
      break;
    }
  }

  return pn;
}

304
static char* evictBufPage(SDiskbasedBuf* pBuf) {
H
Haojun Liao 已提交
305
  SListNode* pn = getEldestUnrefedPage(pBuf);
306
  if (pn == NULL) {  // no available buffer pages now, return.
dengyihao's avatar
dengyihao 已提交
307
    terrno = TSDB_CODE_OUT_OF_MEMORY;
308 309
    return NULL;
  }
310

311 312
  terrno = 0;
  tdListPopNode(pBuf->lruList, pn);
313

314
  SPageInfo* d = *(SPageInfo**)pn->data;
315

316 317
  d->pn = NULL;
  taosMemoryFreeClear(pn);
318

319
  return flushBufPage(pBuf, d);
320 321
}

S
Shengliang Guan 已提交
322
static void lruListPushFront(SList* pList, SPageInfo* pi) {
323 324 325 326 327
  tdListPrepend(pList, &pi);
  SListNode* front = tdListGetHead(pList);
  pi->pn = front;
}

S
Shengliang Guan 已提交
328
static void lruListMoveToFront(SList* pList, SPageInfo* pi) {
329 330 331 332
  tdListPopNode(pList, pi->pn);
  tdListPrependNode(pList, pi->pn);
}

H
Haojun Liao 已提交
333
static SPageInfo* getPageInfoFromPayload(void* page) {
H
Hongze Cheng 已提交
334
  char* p = (char*)page - POINTER_BYTES;
H
Haojun Liao 已提交
335

336
  SPageInfo* ppi = ((SPageInfo**)p)[0];
H
Haojun Liao 已提交
337
  return ppi;
338 339
}

H
Haojun Liao 已提交
340
int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, const char* id,
341
                           const char* dir) {
wafwerar's avatar
wafwerar 已提交
342
  *pBuf = taosMemoryCalloc(1, sizeof(SDiskbasedBuf));
H
Haojun Liao 已提交
343

H
Haojun Liao 已提交
344 345
  SDiskbasedBuf* pPBuf = *pBuf;
  if (pPBuf == NULL) {
346
    goto _error;
H
Haojun Liao 已提交
347 348
  }

349
  pPBuf->pageSize = pagesize;
H
Hongze Cheng 已提交
350
  pPBuf->numOfPages = 0;  // all pages are in buffer in the first place
H
Haojun Liao 已提交
351
  pPBuf->totalBufSize = 0;
352
  pPBuf->allocateId = -1;
H
Hongze Cheng 已提交
353
  pPBuf->pFile = NULL;
354
  pPBuf->id = taosStrdup(id);
355 356 357
  pPBuf->fileSize = 0;
  pPBuf->pFree = taosArrayInit(4, sizeof(SFreeListItem));
  pPBuf->freePgList = tdListNew(POINTER_BYTES);
H
Haojun Liao 已提交
358 359

  // at least more than 2 pages must be in memory
H
Haojun Liao 已提交
360 361
  if (inMemBufSize < pagesize * 2) {
    inMemBufSize = pagesize * 2;
362
  }
H
Haojun Liao 已提交
363

H
Haojun Liao 已提交
364
  pPBuf->inMemPages = inMemBufSize / pagesize;  // maximum allowed pages, it is a soft limit.
H
Haojun Liao 已提交
365
  pPBuf->lruList = tdListNew(POINTER_BYTES);
H
Haojun Liao 已提交
366 367 368
  if (pPBuf->lruList == NULL) {
    goto _error;
  }
H
Haojun Liao 已提交
369 370 371

  // init id hash table
  _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
372
  pPBuf->pIdList = taosArrayInit(4, POINTER_BYTES);
H
Haojun Liao 已提交
373 374 375
  if (pPBuf->pIdList == NULL) {
    goto _error;
  }
376

377
  pPBuf->all = tSimpleHashInit(64, fn);
378 379 380
  if (pPBuf->all == NULL) {
    goto _error;
  }
H
Haojun Liao 已提交
381

dengyihao's avatar
dengyihao 已提交
382
  pPBuf->prefix = (char*)dir;
H
Haojun Liao 已提交
383
  pPBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t));
H
Haojun Liao 已提交
384

385
  //  qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId,
386
  //  pPBuf->pageSize, pPBuf->inMemPages, pPBuf->path);
H
Haojun Liao 已提交
387 388

  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
389
_error:
390 391
  destroyDiskbasedBuf(pPBuf);
  return TSDB_CODE_OUT_OF_MEMORY;
392 393
}

H
Haojun Liao 已提交
394
static char* doExtractPage(SDiskbasedBuf* pBuf, bool* newPage) {
395
  char* availablePage = NULL;
H
Haojun Liao 已提交
396
  if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) {
397
    availablePage = evictBufPage(pBuf);
398
    if (availablePage == NULL) {
dengyihao's avatar
dengyihao 已提交
399 400
      uWarn("no available buf pages, current:%d, max:%d, reason: %s, %s", listNEles(pBuf->lruList), pBuf->inMemPages,
            terrstr(), pBuf->id)
401 402
    }
  } else {
dengyihao's avatar
dengyihao 已提交
403 404
    availablePage =
        taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize));  // add extract bytes in case of zipped buffer increased.
405 406 407
    if (availablePage == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
    }
H
Haojun Liao 已提交
408
    *newPage = true;
409 410 411
  }

  return availablePage;
412 413
}

414
void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
H
Haojun Liao 已提交
415
  pBuf->statis.getPages += 1;
416

H
Haojun Liao 已提交
417 418
  bool newPage = false;
  char* availablePage = doExtractPage(pBuf, &newPage);
419 420
  if (availablePage == NULL) {
    return NULL;
H
Haojun Liao 已提交
421 422
  }

H
Haojun Liao 已提交
423 424 425
  SPageInfo* pi = NULL;
  if (listNEles(pBuf->freePgList) != 0) {
    SListNode* pItem = tdListPopHead(pBuf->freePgList);
426
    pi = *(SPageInfo**)pItem->data;
H
Haojun Liao 已提交
427
    pi->used = true;
428
    *pageId = pi->pageId;
wafwerar's avatar
wafwerar 已提交
429
    taosMemoryFreeClear(pItem);
430
  } else {  // create a new pageinfo
H
Haojun Liao 已提交
431 432 433 434
    // register new id in this group
    *pageId = (++pBuf->allocateId);

    // register page id info
435
    pi = registerNewPageInfo(pBuf, *pageId);
436
    if (pi == NULL) {
H
Haojun Liao 已提交
437 438 439
      if (newPage) {
        taosMemoryFree(availablePage);
      }
440 441
      return NULL;
    }
H
Haojun Liao 已提交
442 443

    // add to hash map
444
    tSimpleHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES);
H
Haojun Liao 已提交
445 446
    pBuf->totalBufSize += pBuf->pageSize;
  }
447 448

  // add to LRU list
H
Haojun Liao 已提交
449
  lruListPushFront(pBuf->lruList, pi);
450
  pi->pData = availablePage;
451 452

  ((void**)pi->pData)[0] = pi;
453
#ifdef BUF_PAGE_DEBUG
H
Hongze Cheng 已提交
454
  uDebug("page_getNewBufPage , pi->pData:%p, pageId:%d, offset:%" PRId64, pi->pData, pi->pageId, pi->offset);
455
#endif
456 457

  return (void*)(GET_PAYLOAD_DATA(pi));
458 459
}

H
Haojun Liao 已提交
460
void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
461
  if (id < 0) {
462 463
    terrno = TSDB_CODE_INVALID_PARA;
    uError("invalid page id:%d, %s", id, pBuf->id);
464 465 466
    return NULL;
  }

H
Haojun Liao 已提交
467
  pBuf->statis.getPages += 1;
468

469
  SPageInfo** pi = tSimpleHashGet(pBuf->all, &id, sizeof(int32_t));
470
  if (pi == NULL || *pi == NULL) {
471 472
    uError("failed to locate the buffer page:%d, %s", id, pBuf->id);
    terrno = TSDB_CODE_INVALID_PARA;
473 474
    return NULL;
  }
475

476
  if (BUF_PAGE_IN_MEM(*pi)) {  // it is in memory
477
    // no need to update the LRU list if only one page exists
H
Haojun Liao 已提交
478
    if (pBuf->numOfPages == 1) {
479
      (*pi)->used = true;
480
      return (void*)(GET_PAYLOAD_DATA(*pi));
481 482
    }

S
Shengliang Guan 已提交
483
    SPageInfo** pInfo = (SPageInfo**)((*pi)->pn->data);
484
    if (*pInfo != *pi) {
485
      uError("inconsistently data in paged buffer, pInfo:%p, pi:%p, %s", *pInfo, *pi, pBuf->id);
486 487
      return NULL;
    }
488

H
Haojun Liao 已提交
489
    lruListMoveToFront(pBuf->lruList, (*pi));
490
    (*pi)->used = true;
491

492
#ifdef BUF_PAGE_DEBUG
H
Hongze Cheng 已提交
493
    uDebug("page_getBufPage1 pageId:%d, offset:%" PRId64, (*pi)->pageId, (*pi)->offset);
494
#endif
495
    return (void*)(GET_PAYLOAD_DATA(*pi));
S
Shengliang Guan 已提交
496
  } else {  // not in memory
497
    ASSERT((!BUF_PAGE_IN_MEM(*pi)) && (*pi)->pn == NULL &&
H
Hongze Cheng 已提交
498
           (((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1)));
499

H
Haojun Liao 已提交
500 501
    bool newPage = false;
    (*pi)->pData = doExtractPage(pBuf, &newPage);
502

503
    // failed to evict buffer page, return with error code.
504 505
    if ((*pi)->pData == NULL) {
      return NULL;
506 507
    }

H
Haojun Liao 已提交
508
    // set the ptr to the new SPageInfo
509 510
    ((void**)((*pi)->pData))[0] = (*pi);

H
Haojun Liao 已提交
511
    lruListPushFront(pBuf->lruList, *pi);
512 513
    (*pi)->used = true;

514
    // some data has been flushed to disk, and needs to be loaded into buffer again.
515
    if (HAS_DATA_IN_DISK(*pi)) {
516 517
      int32_t code = loadPageFromDisk(pBuf, *pi);
      if (code != 0) {
H
Haojun Liao 已提交
518 519 520 521
        if (newPage) {
          taosMemoryFree((*pi)->pData);
        }

522
        terrno = code;
523 524
        return NULL;
      }
H
Haojun Liao 已提交
525
    }
526
#ifdef BUF_PAGE_DEBUG
H
Hongze Cheng 已提交
527
    uDebug("page_getBufPage2 pageId:%d, offset:%" PRId64, (*pi)->pageId, (*pi)->offset);
528
#endif
529
    return (void*)(GET_PAYLOAD_DATA(*pi));
530 531 532
  }
}

H
Haojun Liao 已提交
533
void releaseBufPage(SDiskbasedBuf* pBuf, void* page) {
534
  if (page == NULL) {
535 536
    return;
  }
537

H
Haojun Liao 已提交
538
  SPageInfo* ppi = getPageInfoFromPayload(page);
H
Haojun Liao 已提交
539
  releaseBufPageInfo(pBuf, ppi);
540 541
}

H
Haojun Liao 已提交
542
void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) {
543
#ifdef BUF_PAGE_DEBUG
H
Hongze Cheng 已提交
544
  uDebug("page_releaseBufPageInfo pageId:%d, used:%d, offset:%" PRId64, pi->pageId, pi->used, pi->offset);
545
#endif
546

H
Haojun Liao 已提交
547 548 549 550
  if (pi == NULL) {
    return;
  }

551
  if (pi->pData == NULL) {
H
Haojun Liao 已提交
552
    uError("pi->pData (page data) is null");
553 554 555
    return;
  }

556
  pi->used = false;
H
Haojun Liao 已提交
557
  pBuf->statis.releasePages += 1;
558 559
}

H
Haojun Liao 已提交
560
size_t getTotalBufSize(const SDiskbasedBuf* pBuf) { return (size_t)pBuf->totalBufSize; }
561

dengyihao's avatar
dengyihao 已提交
562
SArray* getDataBufPagesIdList(SDiskbasedBuf* pBuf) { return pBuf->pIdList; }
563

H
Haojun Liao 已提交
564
void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
H
Haojun Liao 已提交
565
  if (pBuf == NULL) {
566 567 568
    return;
  }

H
Haojun Liao 已提交
569
  dBufPrintStatis(pBuf);
H
Haojun Liao 已提交
570

571
  bool needRemoveFile = false;
572
  if (pBuf->pFile != NULL) {
573
    needRemoveFile = true;
S
Shengliang Guan 已提交
574 575
    uDebug(
        "Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page "
H
Haojun Liao 已提交
576
        "size:%.2f Kb, %s",
S
Shengliang Guan 已提交
577
        pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0,
H
Haojun Liao 已提交
578
        listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id);
579

S
Shengliang Guan 已提交
580
    taosCloseFile(&pBuf->pFile);
581
  } else {
H
Haojun Liao 已提交
582
    uDebug("Paged buffer closed, total:%.2f Kb, no file created, %s", pBuf->totalBufSize / 1024.0, pBuf->id);
583 584
  }

H
Haojun Liao 已提交
585 586
  // print the statistics information
  {
S
Shengliang Guan 已提交
587
    SDiskbasedBufStatis* ps = &pBuf->statis;
588
    if (ps->loadPages == 0) {
H
Hongze Cheng 已提交
589 590
      uDebug("Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages)", ps->getPages,
             ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f, ps->loadPages);
591 592
    } else {
      uDebug(
H
Haojun Liao 已提交
593
          "Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPgSize:%.2f Kb",
594 595 596
          ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f,
          ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages));
    }
H
Haojun Liao 已提交
597
  }
598

599 600 601
  if (needRemoveFile) {
    int32_t ret = taosRemoveFile(pBuf->path);
    if (ret != 0) {  // print the error and discard this error info
H
Haojun Liao 已提交
602
      uDebug("WARNING tPage remove file failed. path=%s, code:%s", pBuf->path, strerror(errno));
603
    }
604
  }
605

wafwerar's avatar
wafwerar 已提交
606
  taosMemoryFreeClear(pBuf->path);
H
Haojun Liao 已提交
607

608 609 610 611 612
  size_t n = taosArrayGetSize(pBuf->pIdList);
  for (int32_t i = 0; i < n; ++i) {
    SPageInfo* pi = taosArrayGetP(pBuf->pIdList, i);
    taosMemoryFreeClear(pi->pData);
    taosMemoryFreeClear(pi);
613 614
  }

615 616
  taosArrayDestroy(pBuf->pIdList);

H
Haojun Liao 已提交
617
  tdListFree(pBuf->lruList);
H
Haojun Liao 已提交
618 619
  tdListFree(pBuf->freePgList);

H
Haojun Liao 已提交
620
  taosArrayDestroy(pBuf->emptyDummyIdList);
H
Haojun Liao 已提交
621 622
  taosArrayDestroy(pBuf->pFree);

623
  tSimpleHashCleanup(pBuf->all);
624

wafwerar's avatar
wafwerar 已提交
625 626 627
  taosMemoryFreeClear(pBuf->id);
  taosMemoryFreeClear(pBuf->assistBuf);
  taosMemoryFreeClear(pBuf);
628 629
}

630
SPageInfo* getLastPageInfo(SArray* pList) {
S
Shengliang Guan 已提交
631
  size_t     size = taosArrayGetSize(pList);
632 633 634 635
  SPageInfo* pPgInfo = taosArrayGetP(pList, size - 1);
  return pPgInfo;
}

dengyihao's avatar
dengyihao 已提交
636
int32_t getPageId(const SPageInfo* pPgInfo) { return pPgInfo->pageId; }
637

S
Shengliang Guan 已提交
638
int32_t getBufPageSize(const SDiskbasedBuf* pBuf) { return pBuf->pageSize; }
H
Haojun Liao 已提交
639

S
Shengliang Guan 已提交
640
int32_t getNumOfInMemBufPages(const SDiskbasedBuf* pBuf) { return pBuf->inMemPages; }
H
Haojun Liao 已提交
641

S
Shengliang Guan 已提交
642
bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf) { return pBuf->fileSize == 0; }
H
Haojun Liao 已提交
643

H
Haojun Liao 已提交
644
void setBufPageDirty(void* pPage, bool dirty) {
H
Haojun Liao 已提交
645
  SPageInfo* ppi = getPageInfoFromPayload(pPage);
H
Haojun Liao 已提交
646
  ppi->dirty = dirty;
647 648
}

649 650 651 652 653 654
void setBufPageCompressOnDisk(SDiskbasedBuf* pBuf, bool comp) {
  pBuf->comp = comp;
  if (comp  && (pBuf->assistBuf == NULL)) {
    pBuf->assistBuf = taosMemoryMalloc(pBuf->pageSize + 2);  // EXTRA BYTES
  }
}
H
Haojun Liao 已提交
655

656
void dBufSetBufPageRecycled(SDiskbasedBuf* pBuf, void* pPage) {
H
Haojun Liao 已提交
657 658
  SPageInfo* ppi = getPageInfoFromPayload(pPage);

659
  ppi->used = false;
H
Haojun Liao 已提交
660 661
  ppi->dirty = false;

H
Haojun Liao 已提交
662 663
  // add this pageinfo into the free page info list
  SListNode* pNode = tdListPopNode(pBuf->lruList, ppi->pn);
wafwerar's avatar
wafwerar 已提交
664 665
  taosMemoryFreeClear(ppi->pData);
  taosMemoryFreeClear(pNode);
666
  ppi->pn = NULL;
H
Haojun Liao 已提交
667 668

  tdListAppend(pBuf->freePgList, &ppi);
H
Haojun Liao 已提交
669
}
H
Haojun Liao 已提交
670

671
void dBufSetPrintInfo(SDiskbasedBuf* pBuf) { pBuf->printStatis = true; }
672

S
Shengliang Guan 已提交
673
SDiskbasedBufStatis getDBufStatis(const SDiskbasedBuf* pBuf) { return pBuf->statis; }
H
Haojun Liao 已提交
674

H
Haojun Liao 已提交
675
void dBufPrintStatis(const SDiskbasedBuf* pBuf) {
H
Haojun Liao 已提交
676 677 678 679 680 681
  if (!pBuf->printStatis) {
    return;
  }

  const SDiskbasedBufStatis* ps = &pBuf->statis;

H
Haojun Liao 已提交
682
#if 0
H
Haojun Liao 已提交
683 684
  printf(
      "Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page size:%.2f "
H
Haojun Liao 已提交
685
      "Kb, %s\n",
H
Haojun Liao 已提交
686
      pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0,
H
Haojun Liao 已提交
687
      listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id);
H
Haojun Liao 已提交
688
#endif
H
Haojun Liao 已提交
689

690 691
  if (ps->loadPages > 0) {
    printf(
H
Hongze Cheng 已提交
692 693
        "Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f "
        "Kb\n",
694 695 696
        ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f,
        ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages));
  } else {
dengyihao's avatar
dengyihao 已提交
697
    // printf("no page loaded\n");
698
  }
699
}
5
54liuyao 已提交
700 701

void clearDiskbasedBuf(SDiskbasedBuf* pBuf) {
702 703 704 705 706
  size_t n = taosArrayGetSize(pBuf->pIdList);
  for (int32_t i = 0; i < n; ++i) {
    SPageInfo* pi = taosArrayGetP(pBuf->pIdList, i);
    taosMemoryFreeClear(pi->pData);
    taosMemoryFreeClear(pi);
5
54liuyao 已提交
707 708
  }

709
  taosArrayClear(pBuf->pIdList);
710

5
54liuyao 已提交
711 712 713 714 715 716
  tdListEmpty(pBuf->lruList);
  tdListEmpty(pBuf->freePgList);

  taosArrayClear(pBuf->emptyDummyIdList);
  taosArrayClear(pBuf->pFree);

717
  tSimpleHashClear(pBuf->all);
5
54liuyao 已提交
718

H
Hongze Cheng 已提交
719
  pBuf->numOfPages = 0;  // all pages are in buffer in the first place
5
54liuyao 已提交
720 721 722
  pBuf->totalBufSize = 0;
  pBuf->allocateId = -1;
  pBuf->fileSize = 0;
723
}