tpagedbuf.c 19.5 KB
Newer Older
S
Shengliang Guan 已提交
1
#define _DEFAULT_SOURCE
H
Haojun Liao 已提交
2
#include "tpagedbuf.h"
3 4
#include "taoserror.h"
#include "tcompression.h"
H
Haojun Liao 已提交
5
#include "thash.h"
S
Shengliang Guan 已提交
6
#include "tlog.h"
7

S
Shengliang Guan 已提交
8
#define GET_DATA_PAYLOAD(_p)          ((char*)(_p)->pData + POINTER_BYTES)
9 10
#define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages)

11
typedef struct SPageDiskInfo {
S
Shengliang Guan 已提交
12 13
  int64_t offset;
  int32_t length;
H
Haojun Liao 已提交
14
} SPageDiskInfo, SFreeListItem;
15

H
Haojun Liao 已提交
16
struct SPageInfo {
17
  SListNode* pn;  // point to list node struct
S
Shengliang Guan 已提交
18 19 20
  void*      pData;
  int64_t    offset;
  int32_t    pageId;
21
  int32_t    length : 29;
S
Shengliang Guan 已提交
22 23
  bool       used : 1;   // set current page is in used
  bool       dirty : 1;  // set current buffer page is dirty or not
H
Haojun Liao 已提交
24
};
25

H
Haojun Liao 已提交
26
struct SDiskbasedBuf {
27 28
  int32_t   numOfPages;
  int64_t   totalBufSize;
S
Shengliang Guan 已提交
29
  uint64_t  fileSize;  // disk file size
30
  TdFilePtr pFile;
S
Shengliang Guan 已提交
31 32
  int32_t   allocateId;  // allocated page id
  char*     path;        // file path
33
  char*     prefix;      // file name prefix
S
Shengliang Guan 已提交
34 35
  int32_t   pageSize;    // current used page size
  int32_t   inMemPages;  // numOfPages that are allocated in memory
36
  SList*    freePgList;  // free page list
37
  SArray*   pIdList;     // page id list
38 39
  SHashObj* all;
  SList*    lruList;
S
Shengliang Guan 已提交
40 41 42 43 44 45
  void*     emptyDummyIdList;  // dummy id list
  void*     assistBuf;         // assistant buffer for compress/decompress data
  SArray*   pFree;             // free area in file
  bool      comp;              // compressed before flushed to disk
  uint64_t  nextPos;           // next page flush position

H
Hongze Cheng 已提交
46 47
  char*               id;           // for debug purpose
  bool                printStatis;  // Print statistics info when closing this buffer.
H
Haojun Liao 已提交
48
  SDiskbasedBufStatis statis;
H
Haojun Liao 已提交
49
};
50

H
Haojun Liao 已提交
51
static int32_t createDiskFile(SDiskbasedBuf* pBuf) {
H
Haojun Liao 已提交
52
  if (pBuf->path == NULL) { // prepare the file name when needed it
53 54 55 56 57
    char path[PATH_MAX] = {0};
    taosGetTmpfilePath(pBuf->prefix, "paged-buf", path);
    pBuf->path = taosMemoryStrDup(path);
  }

H
Hongze Cheng 已提交
58 59
  pBuf->pFile =
      taosOpenFile(pBuf->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
60
  if (pBuf->pFile == NULL) {
61 62 63 64 65 66
    return TAOS_SYSTEM_ERROR(errno);
  }

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
67
static char* doCompressData(void* data, int32_t srcSize, int32_t* dst, SDiskbasedBuf* pBuf) {  // do nothing
H
Haojun Liao 已提交
68
  if (!pBuf->comp) {
69 70 71 72
    *dst = srcSize;
    return data;
  }

H
Haojun Liao 已提交
73
  *dst = tsCompressString(data, srcSize, 1, pBuf->assistBuf, srcSize, ONE_STAGE_COMP, NULL, 0);
74

H
Haojun Liao 已提交
75
  memcpy(data, pBuf->assistBuf, *dst);
76 77 78
  return data;
}

S
Shengliang Guan 已提交
79
static char* doDecompressData(void* data, int32_t srcSize, int32_t* dst, SDiskbasedBuf* pBuf) {  // do nothing
H
Haojun Liao 已提交
80
  if (!pBuf->comp) {
81 82 83 84
    *dst = srcSize;
    return data;
  }

H
Haojun Liao 已提交
85
  *dst = tsDecompressString(data, srcSize, 1, pBuf->assistBuf, pBuf->pageSize, ONE_STAGE_COMP, NULL, 0);
86
  if (*dst > 0) {
H
Haojun Liao 已提交
87
    memcpy(data, pBuf->assistBuf, *dst);
88 89 90 91
  }
  return data;
}

H
Haojun Liao 已提交
92 93 94
static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) {
  if (pBuf->pFree == NULL) {
    return pBuf->nextPos;
95 96 97
  } else {
    int32_t offset = -1;

H
Haojun Liao 已提交
98
    size_t num = taosArrayGetSize(pBuf->pFree);
S
Shengliang Guan 已提交
99
    for (int32_t i = 0; i < num; ++i) {
H
Haojun Liao 已提交
100
      SFreeListItem* pi = taosArrayGet(pBuf->pFree, i);
H
Haojun Liao 已提交
101
      if (pi->length >= size) {
102 103
        offset = pi->offset;
        pi->offset += (int32_t)size;
H
Haojun Liao 已提交
104
        pi->length -= (int32_t)size;
105 106 107 108 109 110

        return offset;
      }
    }

    // no available recycle space, allocate new area in file
H
Haojun Liao 已提交
111
    return pBuf->nextPos;
112 113 114
  }
}

115
static void setPageNotInBuf(SPageInfo* pPageInfo) { pPageInfo->pData = NULL; }
H
Haojun Liao 已提交
116

117
static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + sizeof(SFilePage); }
H
Haojun Liao 已提交
118

H
Haojun Liao 已提交
119 120 121 122 123 124 125 126
/**
 *   +--------------------------+-------------------+--------------+
 *   | PTR to SPageInfo (8bytes)| Payload (PageSize)| 2 Extra Bytes|
 *   +--------------------------+-------------------+--------------+
 * @param pBuf
 * @param pg
 * @return
 */
H
Haojun Liao 已提交
127
static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
128 129
  assert(!pg->used && pg->pData != NULL);

H
Haojun Liao 已提交
130
  int32_t size = pBuf->pageSize;
H
Haojun Liao 已提交
131 132
  char*   t = NULL;
  if (pg->offset == -1 || pg->dirty) {
H
Haojun Liao 已提交
133 134 135
    void* payload = GET_DATA_PAYLOAD(pg);
    t = doCompressData(payload, pBuf->pageSize, &size, pBuf);
    assert(size >= 0);
H
Haojun Liao 已提交
136
  }
137 138

  // this page is flushed to disk for the first time
H
Haojun Liao 已提交
139 140 141
  if (pg->dirty) {
    if (pg->offset == -1) {
      assert(pg->dirty == true);
H
Haojun Liao 已提交
142

H
Haojun Liao 已提交
143 144
      pg->offset = allocatePositionInFile(pBuf, size);
      pBuf->nextPos += size;
145

146
      int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET);
147
      if (ret == -1) {
148 149 150
        terrno = TAOS_SYSTEM_ERROR(errno);
        return NULL;
      }
151

152 153 154 155 156
      ret = (int32_t)taosWriteFile(pBuf->pFile, t, size);
      if (ret != size) {
        terrno = TAOS_SYSTEM_ERROR(errno);
        return NULL;
      }
157

H
Haojun Liao 已提交
158 159 160
      if (pBuf->fileSize < pg->offset + size) {
        pBuf->fileSize = pg->offset + size;
      }
H
Haojun Liao 已提交
161

H
Haojun Liao 已提交
162 163 164 165 166 167 168 169 170 171 172 173 174
      pBuf->statis.flushBytes += size;
      pBuf->statis.flushPages += 1;
    } else {
      // length becomes greater, current space is not enough, allocate new place, otherwise, do nothing
      if (pg->length < size) {
        // 1. add current space to free list
        SPageDiskInfo dinfo = {.length = pg->length, .offset = pg->offset};
        taosArrayPush(pBuf->pFree, &dinfo);

        // 2. allocate new position, and update the info
        pg->offset = allocatePositionInFile(pBuf, size);
        pBuf->nextPos += size;
      }
175

176 177
      // 3. write to disk.
      int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET);
178
      if (ret == -1) {
179 180 181
        terrno = TAOS_SYSTEM_ERROR(errno);
        return NULL;
      }
182

183 184 185 186 187
      ret = (int32_t)taosWriteFile(pBuf->pFile, t, size);
      if (ret != size) {
        terrno = TAOS_SYSTEM_ERROR(errno);
        return NULL;
      }
188

H
Haojun Liao 已提交
189 190 191
      if (pBuf->fileSize < pg->offset + size) {
        pBuf->fileSize = pg->offset + size;
      }
192

H
Haojun Liao 已提交
193 194
      pBuf->statis.flushBytes += size;
      pBuf->statis.flushPages += 1;
195
    }
196
  } else {  // NOTE: the size may be -1, the this recycle page has not been flushed to disk yet.
H
Haojun Liao 已提交
197
    size = pg->length;
198 199
  }

H
Haojun Liao 已提交
200
  ASSERT(size > 0 || (pg->offset == -1 && pg->length == -1));
201

H
Haojun Liao 已提交
202
  char* pDataBuf = pg->pData;
H
Haojun Liao 已提交
203
  memset(pDataBuf, 0, getAllocPageSize(pBuf->pageSize));
204
#ifdef BUF_PAGE_DEBUG
205
  uDebug("page_flush %p, pageId:%d, offset:%d", pDataBuf, pg->pageId, pg->offset);
206
#endif
H
Haojun Liao 已提交
207
  pg->length = size;  // on disk size
H
Haojun Liao 已提交
208
  return pDataBuf;
209 210
}

H
Haojun Liao 已提交
211
static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
212
  int32_t ret = TSDB_CODE_SUCCESS;
S
Shengliang Guan 已提交
213
  assert(((int64_t)pBuf->numOfPages * pBuf->pageSize) == pBuf->totalBufSize && pBuf->numOfPages >= pBuf->inMemPages);
214

215
  if (pBuf->pFile == NULL) {
H
Haojun Liao 已提交
216
    if ((ret = createDiskFile(pBuf)) != TSDB_CODE_SUCCESS) {
217 218 219 220 221
      terrno = ret;
      return NULL;
    }
  }

H
Haojun Liao 已提交
222 223 224 225 226
  char* p = doFlushPageToDisk(pBuf, pg);
  setPageNotInBuf(pg);
  pg->dirty = false;

  return p;
227 228 229
}

// load file block data in disk
H
Haojun Liao 已提交
230
static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
231
  int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET);
232
  if (ret == -1) {
H
Haojun Liao 已提交
233 234 235 236
    ret = TAOS_SYSTEM_ERROR(errno);
    return ret;
  }

237
  void* pPage = (void*)GET_DATA_PAYLOAD(pg);
H
Haojun Liao 已提交
238
  ret = (int32_t)taosReadFile(pBuf->pFile, pPage, pg->length);
239
  if (ret != pg->length) {
H
Haojun Liao 已提交
240 241
    ret = TAOS_SYSTEM_ERROR(errno);
    return ret;
242 243
  }

H
Haojun Liao 已提交
244 245
  pBuf->statis.loadBytes += pg->length;
  pBuf->statis.loadPages += 1;
246 247

  int32_t fullSize = 0;
H
Haojun Liao 已提交
248
  doDecompressData(pPage, pg->length, &fullSize, pBuf);
H
Haojun Liao 已提交
249
  return 0;
250 251
}

252
static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t pageId) {
H
Haojun Liao 已提交
253
  pBuf->numOfPages += 1;
254

wafwerar's avatar
wafwerar 已提交
255
  SPageInfo* ppi = taosMemoryMalloc(sizeof(SPageInfo));
256 257

  ppi->pageId = pageId;
H
Hongze Cheng 已提交
258
  ppi->pData = NULL;
259 260
  ppi->offset = -1;
  ppi->length = -1;
H
Hongze Cheng 已提交
261 262 263
  ppi->used = true;
  ppi->pn = NULL;
  ppi->dirty = false;
264

265
  return *(SPageInfo**)taosArrayPush(pBuf->pIdList, &ppi);
266 267
}

H
Haojun Liao 已提交
268
static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
269
  SListIter iter = {0};
H
Haojun Liao 已提交
270
  tdListInitIter(pBuf->lruList, &iter, TD_LIST_BACKWARD);
271 272

  SListNode* pn = NULL;
S
Shengliang Guan 已提交
273 274
  while ((pn = tdListNext(&iter)) != NULL) {
    SPageInfo* pageInfo = *(SPageInfo**)pn->data;
275 276 277
    assert(pageInfo->pageId >= 0 && pageInfo->pn == pn);

    if (!pageInfo->used) {
278
      //      printf("%d is chosen\n", pageInfo->pageId);
279
      break;
H
Haojun Liao 已提交
280
    } else {
281
      //      printf("page %d is used, dirty:%d\n", pageInfo->pageId, pageInfo->dirty);
282 283 284 285 286 287
    }
  }

  return pn;
}

H
Haojun Liao 已提交
288
static char* evacOneDataPage(SDiskbasedBuf* pBuf) {
S
Shengliang Guan 已提交
289
  char*      bufPage = NULL;
H
Haojun Liao 已提交
290
  SListNode* pn = getEldestUnrefedPage(pBuf);
291
  terrno = 0;
292 293 294

  // all pages are referenced by user, try to allocate new space
  if (pn == NULL) {
H
Haojun Liao 已提交
295
    int32_t prev = pBuf->inMemPages;
296 297

    // increase by 50% of previous mem pages
H
Haojun Liao 已提交
298
    pBuf->inMemPages = (int32_t)(pBuf->inMemPages * 1.5f);
299

S
Shengliang Guan 已提交
300 301
    //    qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pBuf, prev,
    //          pBuf->inMemPages, pBuf->pageSize);
302
  } else {
H
Haojun Liao 已提交
303
    tdListPopNode(pBuf->lruList, pn);
304

S
Shengliang Guan 已提交
305
    SPageInfo* d = *(SPageInfo**)pn->data;
306 307 308
    assert(d->pn == pn);

    d->pn = NULL;
wafwerar's avatar
wafwerar 已提交
309
    taosMemoryFreeClear(pn);
310

H
Haojun Liao 已提交
311
    bufPage = flushPageToDisk(pBuf, d);
312 313 314 315 316
  }

  return bufPage;
}

S
Shengliang Guan 已提交
317
static void lruListPushFront(SList* pList, SPageInfo* pi) {
318 319 320 321 322
  tdListPrepend(pList, &pi);
  SListNode* front = tdListGetHead(pList);
  pi->pn = front;
}

S
Shengliang Guan 已提交
323
static void lruListMoveToFront(SList* pList, SPageInfo* pi) {
324 325 326 327
  tdListPopNode(pList, pi->pn);
  tdListPrependNode(pList, pi->pn);
}

H
Haojun Liao 已提交
328
static SPageInfo* getPageInfoFromPayload(void* page) {
H
Hongze Cheng 已提交
329
  char* p = (char*)page - POINTER_BYTES;
H
Haojun Liao 已提交
330

331
  SPageInfo* ppi = ((SPageInfo**)p)[0];
H
Haojun Liao 已提交
332
  return ppi;
333 334
}

H
Haojun Liao 已提交
335
int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, const char* id,
336
                           const char* dir) {
wafwerar's avatar
wafwerar 已提交
337
  *pBuf = taosMemoryCalloc(1, sizeof(SDiskbasedBuf));
H
Haojun Liao 已提交
338

H
Haojun Liao 已提交
339 340
  SDiskbasedBuf* pPBuf = *pBuf;
  if (pPBuf == NULL) {
H
Haojun Liao 已提交
341 342 343
    return TSDB_CODE_OUT_OF_MEMORY;
  }

344
  pPBuf->pageSize = pagesize;
H
Hongze Cheng 已提交
345
  pPBuf->numOfPages = 0;  // all pages are in buffer in the first place
H
Haojun Liao 已提交
346
  pPBuf->totalBufSize = 0;
347 348
  pPBuf->inMemPages = inMemBufSize / pagesize;  // maximum allowed pages, it is a soft limit.
  pPBuf->allocateId = -1;
H
Hongze Cheng 已提交
349 350
  pPBuf->pFile = NULL;
  pPBuf->id = strdup(id);
351 352 353
  pPBuf->fileSize = 0;
  pPBuf->pFree = taosArrayInit(4, sizeof(SFreeListItem));
  pPBuf->freePgList = tdListNew(POINTER_BYTES);
H
Haojun Liao 已提交
354 355 356 357

  // at least more than 2 pages must be in memory
  assert(inMemBufSize >= pagesize * 2);

H
Haojun Liao 已提交
358
  pPBuf->lruList = tdListNew(POINTER_BYTES);
H
Haojun Liao 已提交
359 360 361

  // init id hash table
  _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
362 363
  pPBuf->pIdList = taosArrayInit(4, POINTER_BYTES);

wafwerar's avatar
wafwerar 已提交
364
  pPBuf->assistBuf = taosMemoryMalloc(pPBuf->pageSize + 2);  // EXTRA BYTES
H
Haojun Liao 已提交
365
  pPBuf->all = taosHashInit(10, fn, true, false);
366
  pPBuf->prefix = (char*) dir;
H
Haojun Liao 已提交
367

H
Haojun Liao 已提交
368
  pPBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t));
H
Haojun Liao 已提交
369

370 371 372
  //  qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId,
  //  pPBuf->pageSize,
  //         pPBuf->inMemPages, pPBuf->path);
H
Haojun Liao 已提交
373 374

  return TSDB_CODE_SUCCESS;
375 376
}

377
void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
H
Haojun Liao 已提交
378
  pBuf->statis.getPages += 1;
379 380

  char* availablePage = NULL;
H
Haojun Liao 已提交
381 382
  if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) {
    availablePage = evacOneDataPage(pBuf);
H
Haojun Liao 已提交
383 384
  }

H
Haojun Liao 已提交
385 386 387
  SPageInfo* pi = NULL;
  if (listNEles(pBuf->freePgList) != 0) {
    SListNode* pItem = tdListPopHead(pBuf->freePgList);
388
    pi = *(SPageInfo**)pItem->data;
H
Haojun Liao 已提交
389
    pi->used = true;
390
    *pageId = pi->pageId;
wafwerar's avatar
wafwerar 已提交
391
    taosMemoryFreeClear(pItem);
392
  } else {  // create a new pageinfo
H
Haojun Liao 已提交
393 394 395 396
    // register new id in this group
    *pageId = (++pBuf->allocateId);

    // register page id info
397
    pi = registerPage(pBuf, *pageId);
H
Haojun Liao 已提交
398 399 400 401 402

    // add to hash map
    taosHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES);
    pBuf->totalBufSize += pBuf->pageSize;
  }
403 404

  // add to LRU list
H
Haojun Liao 已提交
405 406
  assert(listNEles(pBuf->lruList) < pBuf->inMemPages && pBuf->inMemPages > 0);
  lruListPushFront(pBuf->lruList, pi);
407 408 409

  // allocate buf
  if (availablePage == NULL) {
H
Hongze Cheng 已提交
410 411
    pi->pData =
        taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize));  // add extract bytes in case of zipped buffer increased.
412 413 414 415 416
  } else {
    pi->pData = availablePage;
  }

  ((void**)pi->pData)[0] = pi;
417
#ifdef BUF_PAGE_DEBUG
H
Hongze Cheng 已提交
418
  uDebug("page_getNewBufPage , pi->pData:%p, pageId:%d, offset:%" PRId64, pi->pData, pi->pageId, pi->offset);
419
#endif
S
Shengliang Guan 已提交
420
  return (void*)(GET_DATA_PAYLOAD(pi));
421 422
}

H
Haojun Liao 已提交
423
void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
H
Haojun Liao 已提交
424 425
  assert(pBuf != NULL && id >= 0);
  pBuf->statis.getPages += 1;
426

H
Haojun Liao 已提交
427
  SPageInfo** pi = taosHashGet(pBuf->all, &id, sizeof(int32_t));
428 429
  assert(pi != NULL && *pi != NULL);

S
Shengliang Guan 已提交
430
  if ((*pi)->pData != NULL) {  // it is in memory
431
    // no need to update the LRU list if only one page exists
H
Haojun Liao 已提交
432
    if (pBuf->numOfPages == 1) {
433
      (*pi)->used = true;
S
Shengliang Guan 已提交
434
      return (void*)(GET_DATA_PAYLOAD(*pi));
435 436
    }

S
Shengliang Guan 已提交
437
    SPageInfo** pInfo = (SPageInfo**)((*pi)->pn->data);
438 439
    assert(*pInfo == *pi);

H
Haojun Liao 已提交
440
    lruListMoveToFront(pBuf->lruList, (*pi));
441
    (*pi)->used = true;
442
#ifdef BUF_PAGE_DEBUG
H
Hongze Cheng 已提交
443
    uDebug("page_getBufPage1 pageId:%d, offset:%" PRId64, (*pi)->pageId, (*pi)->offset);
444
#endif
S
Shengliang Guan 已提交
445 446
    return (void*)(GET_DATA_PAYLOAD(*pi));
  } else {  // not in memory
H
Hongze Cheng 已提交
447 448
    assert((*pi)->pData == NULL && (*pi)->pn == NULL &&
           (((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1)));
449 450

    char* availablePage = NULL;
H
Haojun Liao 已提交
451 452
    if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) {
      availablePage = evacOneDataPage(pBuf);
453 454 455
      if (availablePage == NULL) {
        return NULL;
      }
456 457 458
    }

    if (availablePage == NULL) {
wafwerar's avatar
wafwerar 已提交
459
      (*pi)->pData = taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize));
460 461 462 463
    } else {
      (*pi)->pData = availablePage;
    }

H
Haojun Liao 已提交
464
    // set the ptr to the new SPageInfo
465 466
    ((void**)((*pi)->pData))[0] = (*pi);

H
Haojun Liao 已提交
467
    lruListPushFront(pBuf->lruList, *pi);
468 469
    (*pi)->used = true;

470 471 472 473 474 475
    // some data has been flushed to disk, and needs to be loaded into buffer again.
    if ((*pi)->length > 0 && (*pi)->offset >= 0) {
      int32_t code = loadPageFromDisk(pBuf, *pi);
      if (code != 0) {
        return NULL;
      }
H
Haojun Liao 已提交
476
    }
477
#ifdef BUF_PAGE_DEBUG
H
Hongze Cheng 已提交
478
    uDebug("page_getBufPage2 pageId:%d, offset:%" PRId64, (*pi)->pageId, (*pi)->offset);
479
#endif
S
Shengliang Guan 已提交
480
    return (void*)(GET_DATA_PAYLOAD(*pi));
481 482 483
  }
}

H
Haojun Liao 已提交
484 485
void releaseBufPage(SDiskbasedBuf* pBuf, void* page) {
  assert(pBuf != NULL && page != NULL);
H
Haojun Liao 已提交
486
  SPageInfo* ppi = getPageInfoFromPayload(page);
H
Haojun Liao 已提交
487
  releaseBufPageInfo(pBuf, ppi);
488 489
}

H
Haojun Liao 已提交
490
void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) {
491
#ifdef BUF_PAGE_DEBUG
H
Hongze Cheng 已提交
492
  uDebug("page_releaseBufPageInfo pageId:%d, used:%d, offset:%" PRId64, pi->pageId, pi->used, pi->offset);
493
#endif
494 495
  // assert(pi->pData != NULL && pi->used == true);
  assert(pi->pData != NULL);
496
  pi->used = false;
H
Haojun Liao 已提交
497
  pBuf->statis.releasePages += 1;
498 499
}

H
Haojun Liao 已提交
500
size_t getTotalBufSize(const SDiskbasedBuf* pBuf) { return (size_t)pBuf->totalBufSize; }
501

502
SArray* getDataBufPagesIdList(SDiskbasedBuf* pBuf) {
503 504
  ASSERT(pBuf != NULL);
  return pBuf->pIdList;
505 506
}

H
Haojun Liao 已提交
507
void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
H
Haojun Liao 已提交
508
  if (pBuf == NULL) {
509 510 511
    return;
  }

H
Haojun Liao 已提交
512
  dBufPrintStatis(pBuf);
H
Haojun Liao 已提交
513

514
  bool needRemoveFile = false;
515
  if (pBuf->pFile != NULL) {
516
    needRemoveFile = true;
S
Shengliang Guan 已提交
517 518
    uDebug(
        "Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page "
H
Haojun Liao 已提交
519
        "size:%.2f Kb, %s\n",
S
Shengliang Guan 已提交
520
        pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0,
H
Haojun Liao 已提交
521
        listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id);
522

S
Shengliang Guan 已提交
523
    taosCloseFile(&pBuf->pFile);
524
  } else {
H
Haojun Liao 已提交
525
    uDebug("Paged buffer closed, total:%.2f Kb, no file created, %s", pBuf->totalBufSize / 1024.0, pBuf->id);
526 527
  }

H
Haojun Liao 已提交
528 529
  // print the statistics information
  {
S
Shengliang Guan 已提交
530
    SDiskbasedBufStatis* ps = &pBuf->statis;
531
    if (ps->loadPages == 0) {
H
Hongze Cheng 已提交
532 533
      uDebug("Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages)", ps->getPages,
             ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f, ps->loadPages);
534 535
    } else {
      uDebug(
H
Hongze Cheng 已提交
536 537
          "Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f "
          "Kb",
538 539 540
          ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f,
          ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages));
    }
H
Haojun Liao 已提交
541
  }
542

543 544 545
  if (needRemoveFile) {
    int32_t ret = taosRemoveFile(pBuf->path);
    if (ret != 0) {  // print the error and discard this error info
H
Haojun Liao 已提交
546
      uDebug("WARNING tPage remove file failed. path=%s, code:%s", pBuf->path, strerror(errno));
547
    }
548
  }
549

wafwerar's avatar
wafwerar 已提交
550
  taosMemoryFreeClear(pBuf->path);
H
Haojun Liao 已提交
551

552 553 554 555 556
  size_t n = taosArrayGetSize(pBuf->pIdList);
  for (int32_t i = 0; i < n; ++i) {
    SPageInfo* pi = taosArrayGetP(pBuf->pIdList, i);
    taosMemoryFreeClear(pi->pData);
    taosMemoryFreeClear(pi);
557 558
  }

559 560
  taosArrayDestroy(pBuf->pIdList);

H
Haojun Liao 已提交
561
  tdListFree(pBuf->lruList);
H
Haojun Liao 已提交
562 563
  tdListFree(pBuf->freePgList);

H
Haojun Liao 已提交
564
  taosArrayDestroy(pBuf->emptyDummyIdList);
H
Haojun Liao 已提交
565 566
  taosArrayDestroy(pBuf->pFree);

H
Haojun Liao 已提交
567
  taosHashCleanup(pBuf->all);
568

wafwerar's avatar
wafwerar 已提交
569 570 571
  taosMemoryFreeClear(pBuf->id);
  taosMemoryFreeClear(pBuf->assistBuf);
  taosMemoryFreeClear(pBuf);
572 573
}

574
SPageInfo* getLastPageInfo(SArray* pList) {
S
Shengliang Guan 已提交
575
  size_t     size = taosArrayGetSize(pList);
576 577 578 579
  SPageInfo* pPgInfo = taosArrayGetP(pList, size - 1);
  return pPgInfo;
}

580
int32_t getPageId(const SPageInfo* pPgInfo) {
581 582 583 584
  ASSERT(pPgInfo != NULL);
  return pPgInfo->pageId;
}

S
Shengliang Guan 已提交
585
int32_t getBufPageSize(const SDiskbasedBuf* pBuf) { return pBuf->pageSize; }
H
Haojun Liao 已提交
586

S
Shengliang Guan 已提交
587
int32_t getNumOfInMemBufPages(const SDiskbasedBuf* pBuf) { return pBuf->inMemPages; }
H
Haojun Liao 已提交
588

S
Shengliang Guan 已提交
589
bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf) { return pBuf->fileSize == 0; }
H
Haojun Liao 已提交
590

H
Haojun Liao 已提交
591
void setBufPageDirty(void* pPage, bool dirty) {
H
Haojun Liao 已提交
592
  SPageInfo* ppi = getPageInfoFromPayload(pPage);
H
Haojun Liao 已提交
593
  ppi->dirty = dirty;
594 595
}

596
void setBufPageCompressOnDisk(SDiskbasedBuf* pBuf, bool comp) { pBuf->comp = comp; }
H
Haojun Liao 已提交
597

598
void dBufSetBufPageRecycled(SDiskbasedBuf* pBuf, void* pPage) {
H
Haojun Liao 已提交
599 600
  SPageInfo* ppi = getPageInfoFromPayload(pPage);

601
  ppi->used = false;
H
Haojun Liao 已提交
602 603
  ppi->dirty = false;

H
Haojun Liao 已提交
604 605
  // add this pageinfo into the free page info list
  SListNode* pNode = tdListPopNode(pBuf->lruList, ppi->pn);
wafwerar's avatar
wafwerar 已提交
606 607
  taosMemoryFreeClear(ppi->pData);
  taosMemoryFreeClear(pNode);
608
  ppi->pn = NULL;
H
Haojun Liao 已提交
609 610

  tdListAppend(pBuf->freePgList, &ppi);
H
Haojun Liao 已提交
611
}
H
Haojun Liao 已提交
612

613
void dBufSetPrintInfo(SDiskbasedBuf* pBuf) { pBuf->printStatis = true; }
614

S
Shengliang Guan 已提交
615
SDiskbasedBufStatis getDBufStatis(const SDiskbasedBuf* pBuf) { return pBuf->statis; }
H
Haojun Liao 已提交
616

H
Haojun Liao 已提交
617
void dBufPrintStatis(const SDiskbasedBuf* pBuf) {
H
Haojun Liao 已提交
618 619 620 621 622 623
  if (!pBuf->printStatis) {
    return;
  }

  const SDiskbasedBufStatis* ps = &pBuf->statis;

H
Haojun Liao 已提交
624
#if 0
H
Haojun Liao 已提交
625 626
  printf(
      "Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page size:%.2f "
H
Haojun Liao 已提交
627
      "Kb, %s\n",
H
Haojun Liao 已提交
628
      pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0,
H
Haojun Liao 已提交
629
      listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id);
H
Haojun Liao 已提交
630
#endif
H
Haojun Liao 已提交
631

632 633
  if (ps->loadPages > 0) {
    printf(
H
Hongze Cheng 已提交
634 635
        "Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f "
        "Kb\n",
636 637 638 639 640
        ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f,
        ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages));
  } else {
    printf("no page loaded\n");
  }
641
}
5
54liuyao 已提交
642 643

void clearDiskbasedBuf(SDiskbasedBuf* pBuf) {
644 645 646 647 648
  size_t n = taosArrayGetSize(pBuf->pIdList);
  for (int32_t i = 0; i < n; ++i) {
    SPageInfo* pi = taosArrayGetP(pBuf->pIdList, i);
    taosMemoryFreeClear(pi->pData);
    taosMemoryFreeClear(pi);
5
54liuyao 已提交
649 650
  }

651
  taosArrayClear(pBuf->pIdList);
652

5
54liuyao 已提交
653 654 655 656 657 658 659 660
  tdListEmpty(pBuf->lruList);
  tdListEmpty(pBuf->freePgList);

  taosArrayClear(pBuf->emptyDummyIdList);
  taosArrayClear(pBuf->pFree);

  taosHashClear(pBuf->all);

H
Hongze Cheng 已提交
661
  pBuf->numOfPages = 0;  // all pages are in buffer in the first place
5
54liuyao 已提交
662 663 664
  pBuf->totalBufSize = 0;
  pBuf->allocateId = -1;
  pBuf->fileSize = 0;
665
}