tpagedbuf.c 20.6 KB
Newer Older
S
Shengliang Guan 已提交
1
#define _DEFAULT_SOURCE
H
Haojun Liao 已提交
2
#include "tpagedbuf.h"
3 4
#include "taoserror.h"
#include "tcompression.h"
H
Haojun Liao 已提交
5
#include "thash.h"
S
Shengliang Guan 已提交
6
#include "tlog.h"
7

S
Shengliang Guan 已提交
8
#define GET_DATA_PAYLOAD(_p)          ((char*)(_p)->pData + POINTER_BYTES)
9 10
#define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages)

11
typedef struct SPageDiskInfo {
S
Shengliang Guan 已提交
12 13
  int64_t offset;
  int32_t length;
H
Haojun Liao 已提交
14
} SPageDiskInfo, SFreeListItem;
15

H
Haojun Liao 已提交
16
struct SPageInfo {
17
  SListNode* pn;  // point to list node struct
S
Shengliang Guan 已提交
18 19 20
  void*      pData;
  int64_t    offset;
  int32_t    pageId;
21
  int32_t    length : 29;
S
Shengliang Guan 已提交
22 23
  bool       used : 1;   // set current page is in used
  bool       dirty : 1;  // set current buffer page is dirty or not
H
Haojun Liao 已提交
24
};
25

H
Haojun Liao 已提交
26
struct SDiskbasedBuf {
27 28
  int32_t   numOfPages;
  int64_t   totalBufSize;
S
Shengliang Guan 已提交
29
  uint64_t  fileSize;  // disk file size
30
  TdFilePtr pFile;
S
Shengliang Guan 已提交
31 32 33 34
  int32_t   allocateId;  // allocated page id
  char*     path;        // file path
  int32_t   pageSize;    // current used page size
  int32_t   inMemPages;  // numOfPages that are allocated in memory
35 36
  SList*    freePgList;  // free page list
  SHashObj* groupSet;    // id hash table, todo remove it
37 38
  SHashObj* all;
  SList*    lruList;
S
Shengliang Guan 已提交
39 40 41 42 43 44
  void*     emptyDummyIdList;  // dummy id list
  void*     assistBuf;         // assistant buffer for compress/decompress data
  SArray*   pFree;             // free area in file
  bool      comp;              // compressed before flushed to disk
  uint64_t  nextPos;           // next page flush position

H
Haojun Liao 已提交
45 46
  char*     id;          // for debug purpose
  bool      printStatis;  // Print statistics info when closing this buffer.
H
Haojun Liao 已提交
47
  SDiskbasedBufStatis statis;
H
Haojun Liao 已提交
48
};
49

H
Haojun Liao 已提交
50
static int32_t createDiskFile(SDiskbasedBuf* pBuf) {
51
  pBuf->pFile = taosOpenFile(pBuf->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
52
  if (pBuf->pFile == NULL) {
53 54 55 56 57 58
    return TAOS_SYSTEM_ERROR(errno);
  }

  return TSDB_CODE_SUCCESS;
}

S
Shengliang Guan 已提交
59
static char* doCompressData(void* data, int32_t srcSize, int32_t* dst, SDiskbasedBuf* pBuf) {  // do nothing
H
Haojun Liao 已提交
60
  if (!pBuf->comp) {
61 62 63 64
    *dst = srcSize;
    return data;
  }

H
Haojun Liao 已提交
65
  *dst = tsCompressString(data, srcSize, 1, pBuf->assistBuf, srcSize, ONE_STAGE_COMP, NULL, 0);
66

H
Haojun Liao 已提交
67
  memcpy(data, pBuf->assistBuf, *dst);
68 69 70
  return data;
}

S
Shengliang Guan 已提交
71
static char* doDecompressData(void* data, int32_t srcSize, int32_t* dst, SDiskbasedBuf* pBuf) {  // do nothing
H
Haojun Liao 已提交
72
  if (!pBuf->comp) {
73 74 75 76
    *dst = srcSize;
    return data;
  }

H
Haojun Liao 已提交
77
  *dst = tsDecompressString(data, srcSize, 1, pBuf->assistBuf, pBuf->pageSize, ONE_STAGE_COMP, NULL, 0);
78
  if (*dst > 0) {
H
Haojun Liao 已提交
79
    memcpy(data, pBuf->assistBuf, *dst);
80 81 82 83
  }
  return data;
}

H
Haojun Liao 已提交
84 85 86
static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) {
  if (pBuf->pFree == NULL) {
    return pBuf->nextPos;
87 88 89
  } else {
    int32_t offset = -1;

H
Haojun Liao 已提交
90
    size_t num = taosArrayGetSize(pBuf->pFree);
S
Shengliang Guan 已提交
91
    for (int32_t i = 0; i < num; ++i) {
H
Haojun Liao 已提交
92
      SFreeListItem* pi = taosArrayGet(pBuf->pFree, i);
H
Haojun Liao 已提交
93
      if (pi->length >= size) {
94 95
        offset = pi->offset;
        pi->offset += (int32_t)size;
H
Haojun Liao 已提交
96
        pi->length -= (int32_t)size;
97 98 99 100 101 102

        return offset;
      }
    }

    // no available recycle space, allocate new area in file
H
Haojun Liao 已提交
103
    return pBuf->nextPos;
104 105 106
  }
}

107
static void setPageNotInBuf(SPageInfo* pPageInfo) { pPageInfo->pData = NULL; }
H
Haojun Liao 已提交
108

109
static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + 2; }
H
Haojun Liao 已提交
110

H
Haojun Liao 已提交
111 112 113 114 115 116 117 118
/**
 *   +--------------------------+-------------------+--------------+
 *   | PTR to SPageInfo (8bytes)| Payload (PageSize)| 2 Extra Bytes|
 *   +--------------------------+-------------------+--------------+
 * @param pBuf
 * @param pg
 * @return
 */
H
Haojun Liao 已提交
119
static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
120 121
  assert(!pg->used && pg->pData != NULL);

H
Haojun Liao 已提交
122
  int32_t size = pBuf->pageSize;
H
Haojun Liao 已提交
123 124
  char*   t = NULL;
  if (pg->offset == -1 || pg->dirty) {
H
Haojun Liao 已提交
125 126 127
    void* payload = GET_DATA_PAYLOAD(pg);
    t = doCompressData(payload, pBuf->pageSize, &size, pBuf);
    assert(size >= 0);
H
Haojun Liao 已提交
128
  }
129 130

  // this page is flushed to disk for the first time
H
Haojun Liao 已提交
131 132 133
  if (pg->dirty) {
    if (pg->offset == -1) {
      assert(pg->dirty == true);
H
Haojun Liao 已提交
134

H
Haojun Liao 已提交
135 136
      pg->offset = allocatePositionInFile(pBuf, size);
      pBuf->nextPos += size;
137

138
      int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET);
139
      if (ret == -1) {
140 141 142
        terrno = TAOS_SYSTEM_ERROR(errno);
        return NULL;
      }
143

144 145 146 147 148
      ret = (int32_t)taosWriteFile(pBuf->pFile, t, size);
      if (ret != size) {
        terrno = TAOS_SYSTEM_ERROR(errno);
        return NULL;
      }
149

H
Haojun Liao 已提交
150 151 152
      if (pBuf->fileSize < pg->offset + size) {
        pBuf->fileSize = pg->offset + size;
      }
H
Haojun Liao 已提交
153

H
Haojun Liao 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166
      pBuf->statis.flushBytes += size;
      pBuf->statis.flushPages += 1;
    } else {
      // length becomes greater, current space is not enough, allocate new place, otherwise, do nothing
      if (pg->length < size) {
        // 1. add current space to free list
        SPageDiskInfo dinfo = {.length = pg->length, .offset = pg->offset};
        taosArrayPush(pBuf->pFree, &dinfo);

        // 2. allocate new position, and update the info
        pg->offset = allocatePositionInFile(pBuf, size);
        pBuf->nextPos += size;
      }
167

168 169
      // 3. write to disk.
      int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET);
170
      if (ret == -1) {
171 172 173
        terrno = TAOS_SYSTEM_ERROR(errno);
        return NULL;
      }
174

175 176 177 178 179
      ret = (int32_t)taosWriteFile(pBuf->pFile, t, size);
      if (ret != size) {
        terrno = TAOS_SYSTEM_ERROR(errno);
        return NULL;
      }
180

H
Haojun Liao 已提交
181 182 183
      if (pBuf->fileSize < pg->offset + size) {
        pBuf->fileSize = pg->offset + size;
      }
184

H
Haojun Liao 已提交
185 186
      pBuf->statis.flushBytes += size;
      pBuf->statis.flushPages += 1;
187
    }
188
  } else {  // NOTE: the size may be -1, the this recycle page has not been flushed to disk yet.
H
Haojun Liao 已提交
189
    size = pg->length;
190 191
  }

H
Haojun Liao 已提交
192
  ASSERT(size > 0 || (pg->offset == -1 && pg->length == -1));
193

H
Haojun Liao 已提交
194
  char* pDataBuf = pg->pData;
H
Haojun Liao 已提交
195
  memset(pDataBuf, 0, getAllocPageSize(pBuf->pageSize));
196
#ifdef BUF_PAGE_DEBUG
197
  uDebug("page_flush %p, pageId:%d, offset:%d", pDataBuf, pg->pageId, pg->offset);
198
#endif
H
Haojun Liao 已提交
199
  pg->length = size;  // on disk size
H
Haojun Liao 已提交
200
  return pDataBuf;
201 202
}

H
Haojun Liao 已提交
203
static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
204
  int32_t ret = TSDB_CODE_SUCCESS;
S
Shengliang Guan 已提交
205
  assert(((int64_t)pBuf->numOfPages * pBuf->pageSize) == pBuf->totalBufSize && pBuf->numOfPages >= pBuf->inMemPages);
206

207
  if (pBuf->pFile == NULL) {
H
Haojun Liao 已提交
208
    if ((ret = createDiskFile(pBuf)) != TSDB_CODE_SUCCESS) {
209 210 211 212 213
      terrno = ret;
      return NULL;
    }
  }

H
Haojun Liao 已提交
214 215 216 217 218
  char* p = doFlushPageToDisk(pBuf, pg);
  setPageNotInBuf(pg);
  pg->dirty = false;

  return p;
219 220 221
}

// load file block data in disk
H
Haojun Liao 已提交
222
static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
223
  int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET);
224
  if (ret == -1) {
H
Haojun Liao 已提交
225 226 227 228
    ret = TAOS_SYSTEM_ERROR(errno);
    return ret;
  }

229
  void* pPage = (void*)GET_DATA_PAYLOAD(pg);
H
Haojun Liao 已提交
230
  ret = (int32_t)taosReadFile(pBuf->pFile, pPage, pg->length);
231
  if (ret != pg->length) {
H
Haojun Liao 已提交
232 233
    ret = TAOS_SYSTEM_ERROR(errno);
    return ret;
234 235
  }

H
Haojun Liao 已提交
236 237
  pBuf->statis.loadBytes += pg->length;
  pBuf->statis.loadPages += 1;
238 239

  int32_t fullSize = 0;
H
Haojun Liao 已提交
240
  doDecompressData(pPage, pg->length, &fullSize, pBuf);
H
Haojun Liao 已提交
241
  return 0;
242 243
}

H
Haojun Liao 已提交
244
static SIDList addNewGroup(SDiskbasedBuf* pBuf, int32_t groupId) {
S
Shengliang Guan 已提交
245
  assert(taosHashGet(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t)) == NULL);
246 247

  SArray* pa = taosArrayInit(1, POINTER_BYTES);
H
Haojun Liao 已提交
248
  int32_t ret = taosHashPut(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t), &pa, POINTER_BYTES);
249 250 251 252 253
  assert(ret == 0);

  return pa;
}

H
Haojun Liao 已提交
254
static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t pageId) {
255 256
  SIDList list = NULL;

H
Haojun Liao 已提交
257
  char** p = taosHashGet(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t));
258
  if (p == NULL) {  // it is a new group id
H
Haojun Liao 已提交
259
    list = addNewGroup(pBuf, groupId);
260
  } else {
S
Shengliang Guan 已提交
261
    list = (SIDList)(*p);
262 263
  }

H
Haojun Liao 已提交
264
  pBuf->numOfPages += 1;
265

wafwerar's avatar
wafwerar 已提交
266
  SPageInfo* ppi = taosMemoryMalloc(sizeof(SPageInfo));
267 268

  ppi->pageId = pageId;
269
  ppi->pData  = NULL;
270 271
  ppi->offset = -1;
  ppi->length = -1;
272 273 274
  ppi->used   = true;
  ppi->pn     = NULL;
  ppi->dirty  = false;
275

S
Shengliang Guan 已提交
276
  return *(SPageInfo**)taosArrayPush(list, &ppi);
277 278
}

H
Haojun Liao 已提交
279
static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
280
  SListIter iter = {0};
H
Haojun Liao 已提交
281
  tdListInitIter(pBuf->lruList, &iter, TD_LIST_BACKWARD);
282 283

  SListNode* pn = NULL;
S
Shengliang Guan 已提交
284 285
  while ((pn = tdListNext(&iter)) != NULL) {
    SPageInfo* pageInfo = *(SPageInfo**)pn->data;
286 287 288
    assert(pageInfo->pageId >= 0 && pageInfo->pn == pn);

    if (!pageInfo->used) {
289
      //      printf("%d is chosen\n", pageInfo->pageId);
290
      break;
H
Haojun Liao 已提交
291
    } else {
292
      //      printf("page %d is used, dirty:%d\n", pageInfo->pageId, pageInfo->dirty);
293 294 295
    }
  }

296 297 298 299 300 301 302 303 304
  //  int32_t pos = listNEles(pBuf->lruList);
  //  SListIter iter1 = {0};
  //  tdListInitIter(pBuf->lruList, &iter1, TD_LIST_BACKWARD);
  //  SListNode* pn1 = NULL;
  //  while((pn1 = tdListNext(&iter1)) != NULL) {
  //    SPageInfo* pageInfo = *(SPageInfo**) pn1->data;
  //    printf("page %d is used, dirty:%d, pos:%d\n", pageInfo->pageId, pageInfo->dirty, pos - 1);
  //    pos -= 1;
  //  }
H
Haojun Liao 已提交
305

306 307 308
  return pn;
}

H
Haojun Liao 已提交
309
static char* evacOneDataPage(SDiskbasedBuf* pBuf) {
S
Shengliang Guan 已提交
310
  char*      bufPage = NULL;
H
Haojun Liao 已提交
311
  SListNode* pn = getEldestUnrefedPage(pBuf);
312 313 314

  // all pages are referenced by user, try to allocate new space
  if (pn == NULL) {
315
    assert(0);
H
Haojun Liao 已提交
316
    int32_t prev = pBuf->inMemPages;
317 318

    // increase by 50% of previous mem pages
H
Haojun Liao 已提交
319
    pBuf->inMemPages = (int32_t)(pBuf->inMemPages * 1.5f);
320

S
Shengliang Guan 已提交
321 322
    //    qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pBuf, prev,
    //          pBuf->inMemPages, pBuf->pageSize);
323
  } else {
H
Haojun Liao 已提交
324
    tdListPopNode(pBuf->lruList, pn);
325

S
Shengliang Guan 已提交
326
    SPageInfo* d = *(SPageInfo**)pn->data;
327 328 329
    assert(d->pn == pn);

    d->pn = NULL;
wafwerar's avatar
wafwerar 已提交
330
    taosMemoryFreeClear(pn);
331

H
Haojun Liao 已提交
332
    bufPage = flushPageToDisk(pBuf, d);
333 334 335 336 337
  }

  return bufPage;
}

S
Shengliang Guan 已提交
338
static void lruListPushFront(SList* pList, SPageInfo* pi) {
339 340 341 342 343
  tdListPrepend(pList, &pi);
  SListNode* front = tdListGetHead(pList);
  pi->pn = front;
}

S
Shengliang Guan 已提交
344
static void lruListMoveToFront(SList* pList, SPageInfo* pi) {
345 346 347 348
  tdListPopNode(pList, pi->pn);
  tdListPrependNode(pList, pi->pn);
}

H
Haojun Liao 已提交
349
static SPageInfo* getPageInfoFromPayload(void* page) {
350
  char*   p = (char *)page - POINTER_BYTES;
H
Haojun Liao 已提交
351

352
  SPageInfo* ppi = ((SPageInfo**)p)[0];
H
Haojun Liao 已提交
353
  return ppi;
354 355
}

H
Haojun Liao 已提交
356
int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, const char* id,
357
                           const char* dir) {
wafwerar's avatar
wafwerar 已提交
358
  *pBuf = taosMemoryCalloc(1, sizeof(SDiskbasedBuf));
H
Haojun Liao 已提交
359

H
Haojun Liao 已提交
360 361
  SDiskbasedBuf* pPBuf = *pBuf;
  if (pPBuf == NULL) {
H
Haojun Liao 已提交
362 363 364
    return TSDB_CODE_OUT_OF_MEMORY;
  }

365
  pPBuf->pageSize = pagesize;
H
Haojun Liao 已提交
366
  pPBuf->numOfPages   = 0;  // all pages are in buffer in the first place
H
Haojun Liao 已提交
367
  pPBuf->totalBufSize = 0;
368 369
  pPBuf->inMemPages = inMemBufSize / pagesize;  // maximum allowed pages, it is a soft limit.
  pPBuf->allocateId = -1;
H
Haojun Liao 已提交
370 371
  pPBuf->pFile    = NULL;
  pPBuf->id       = strdup(id);
372 373 374
  pPBuf->fileSize = 0;
  pPBuf->pFree = taosArrayInit(4, sizeof(SFreeListItem));
  pPBuf->freePgList = tdListNew(POINTER_BYTES);
H
Haojun Liao 已提交
375 376 377 378

  // at least more than 2 pages must be in memory
  assert(inMemBufSize >= pagesize * 2);

H
Haojun Liao 已提交
379
  pPBuf->lruList = tdListNew(POINTER_BYTES);
H
Haojun Liao 已提交
380 381 382

  // init id hash table
  _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
383
  pPBuf->groupSet = taosHashInit(10, fn, true, false);
wafwerar's avatar
wafwerar 已提交
384
  pPBuf->assistBuf = taosMemoryMalloc(pPBuf->pageSize + 2);  // EXTRA BYTES
H
Haojun Liao 已提交
385
  pPBuf->all = taosHashInit(10, fn, true, false);
H
Haojun Liao 已提交
386 387 388

  char path[PATH_MAX] = {0};
  taosGetTmpfilePath(dir, "paged-buf", path);
H
Haojun Liao 已提交
389
  pPBuf->path = strdup(path);
H
Haojun Liao 已提交
390

H
Haojun Liao 已提交
391
  pPBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t));
H
Haojun Liao 已提交
392

393 394 395
  //  qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId,
  //  pPBuf->pageSize,
  //         pPBuf->inMemPages, pPBuf->path);
H
Haojun Liao 已提交
396 397

  return TSDB_CODE_SUCCESS;
398 399
}

H
Haojun Liao 已提交
400
void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) {
H
Haojun Liao 已提交
401
  pBuf->statis.getPages += 1;
402 403

  char* availablePage = NULL;
H
Haojun Liao 已提交
404 405
  if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) {
    availablePage = evacOneDataPage(pBuf);
406

407 408
    // Failed to allocate a new buffer page, and there is an error occurs.
    if (availablePage == NULL) {
H
Haojun Liao 已提交
409
      assert(0);
410 411
      return NULL;
    }
H
Haojun Liao 已提交
412 413
  }

H
Haojun Liao 已提交
414 415 416
  SPageInfo* pi = NULL;
  if (listNEles(pBuf->freePgList) != 0) {
    SListNode* pItem = tdListPopHead(pBuf->freePgList);
417
    pi = *(SPageInfo**)pItem->data;
H
Haojun Liao 已提交
418
    pi->used = true;
419
    *pageId = pi->pageId;
wafwerar's avatar
wafwerar 已提交
420
    taosMemoryFreeClear(pItem);
421
  } else {  // create a new pageinfo
H
Haojun Liao 已提交
422 423 424 425 426 427 428 429 430 431
    // register new id in this group
    *pageId = (++pBuf->allocateId);

    // register page id info
    pi = registerPage(pBuf, groupId, *pageId);

    // add to hash map
    taosHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES);
    pBuf->totalBufSize += pBuf->pageSize;
  }
432 433

  // add to LRU list
H
Haojun Liao 已提交
434 435
  assert(listNEles(pBuf->lruList) < pBuf->inMemPages && pBuf->inMemPages > 0);
  lruListPushFront(pBuf->lruList, pi);
436 437 438

  // allocate buf
  if (availablePage == NULL) {
wafwerar's avatar
wafwerar 已提交
439
    pi->pData = taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize));  // add extract bytes in case of zipped buffer increased.
440 441 442 443 444
  } else {
    pi->pData = availablePage;
  }

  ((void**)pi->pData)[0] = pi;
445
#ifdef BUF_PAGE_DEBUG
446
  uDebug("page_getNewBufPage , pi->pData:%p, pageId:%d, offset:%"PRId64, pi->pData, pi->pageId, pi->offset);
447
#endif
S
Shengliang Guan 已提交
448
  return (void*)(GET_DATA_PAYLOAD(pi));
449 450
}

H
Haojun Liao 已提交
451
void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
H
Haojun Liao 已提交
452 453
  assert(pBuf != NULL && id >= 0);
  pBuf->statis.getPages += 1;
454

H
Haojun Liao 已提交
455
  SPageInfo** pi = taosHashGet(pBuf->all, &id, sizeof(int32_t));
456 457
  assert(pi != NULL && *pi != NULL);

S
Shengliang Guan 已提交
458
  if ((*pi)->pData != NULL) {  // it is in memory
459
    // no need to update the LRU list if only one page exists
H
Haojun Liao 已提交
460
    if (pBuf->numOfPages == 1) {
461
      (*pi)->used = true;
S
Shengliang Guan 已提交
462
      return (void*)(GET_DATA_PAYLOAD(*pi));
463 464
    }

S
Shengliang Guan 已提交
465
    SPageInfo** pInfo = (SPageInfo**)((*pi)->pn->data);
466 467
    assert(*pInfo == *pi);

H
Haojun Liao 已提交
468
    lruListMoveToFront(pBuf->lruList, (*pi));
469
    (*pi)->used = true;
470
#ifdef BUF_PAGE_DEBUG
wmmhello's avatar
wmmhello 已提交
471
    uDebug("page_getBufPage1 pageId:%d, offset:%"PRId64, (*pi)->pageId, (*pi)->offset);
472
#endif
S
Shengliang Guan 已提交
473 474
    return (void*)(GET_DATA_PAYLOAD(*pi));
  } else {  // not in memory
475
    assert((*pi)->pData == NULL && (*pi)->pn == NULL && (((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1)));
476 477

    char* availablePage = NULL;
H
Haojun Liao 已提交
478 479
    if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) {
      availablePage = evacOneDataPage(pBuf);
480 481 482
      if (availablePage == NULL) {
        return NULL;
      }
483 484 485
    }

    if (availablePage == NULL) {
wafwerar's avatar
wafwerar 已提交
486
      (*pi)->pData = taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize));
487 488 489 490
    } else {
      (*pi)->pData = availablePage;
    }

H
Haojun Liao 已提交
491
    // set the ptr to the new SPageInfo
492 493
    ((void**)((*pi)->pData))[0] = (*pi);

H
Haojun Liao 已提交
494
    lruListPushFront(pBuf->lruList, *pi);
495 496
    (*pi)->used = true;

497 498 499 500 501 502
    // some data has been flushed to disk, and needs to be loaded into buffer again.
    if ((*pi)->length > 0 && (*pi)->offset >= 0) {
      int32_t code = loadPageFromDisk(pBuf, *pi);
      if (code != 0) {
        return NULL;
      }
H
Haojun Liao 已提交
503
    }
504
#ifdef BUF_PAGE_DEBUG
wmmhello's avatar
wmmhello 已提交
505
    uDebug("page_getBufPage2 pageId:%d, offset:%"PRId64, (*pi)->pageId, (*pi)->offset);
506
#endif
S
Shengliang Guan 已提交
507
    return (void*)(GET_DATA_PAYLOAD(*pi));
508 509 510
  }
}

H
Haojun Liao 已提交
511 512
void releaseBufPage(SDiskbasedBuf* pBuf, void* page) {
  assert(pBuf != NULL && page != NULL);
H
Haojun Liao 已提交
513
  SPageInfo* ppi = getPageInfoFromPayload(page);
H
Haojun Liao 已提交
514
  releaseBufPageInfo(pBuf, ppi);
515 516
}

H
Haojun Liao 已提交
517
void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) {
518
#ifdef BUF_PAGE_DEBUG
wmmhello's avatar
wmmhello 已提交
519
  uDebug("page_releaseBufPageInfo pageId:%d, used:%d, offset:%"PRId64, pi->pageId, pi->used, pi->offset);
520
#endif
521 522
  // assert(pi->pData != NULL && pi->used == true);
  assert(pi->pData != NULL);
523
  pi->used = false;
H
Haojun Liao 已提交
524
  pBuf->statis.releasePages += 1;
525 526
}

H
Haojun Liao 已提交
527
size_t getNumOfBufGroupId(const SDiskbasedBuf* pBuf) { return taosHashGetSize(pBuf->groupSet); }
528

H
Haojun Liao 已提交
529
size_t getTotalBufSize(const SDiskbasedBuf* pBuf) { return (size_t)pBuf->totalBufSize; }
530

H
Haojun Liao 已提交
531 532
SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf, int32_t groupId) {
  assert(pBuf != NULL);
533

H
Haojun Liao 已提交
534
  char** p = taosHashGet(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t));
535
  if (p == NULL) {  // it is a new group id
H
Haojun Liao 已提交
536
    return pBuf->emptyDummyIdList;
537
  } else {
S
Shengliang Guan 已提交
538
    return (SArray*)(*p);
539 540 541
  }
}

H
Haojun Liao 已提交
542
void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
H
Haojun Liao 已提交
543
  if (pBuf == NULL) {
544 545 546
    return;
  }

H
Haojun Liao 已提交
547
  dBufPrintStatis(pBuf);
H
Haojun Liao 已提交
548

549
  if (pBuf->pFile != NULL) {
S
Shengliang Guan 已提交
550 551
    uDebug(
        "Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page "
H
Haojun Liao 已提交
552
        "size:%.2f Kb, %s\n",
S
Shengliang Guan 已提交
553
        pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0,
H
Haojun Liao 已提交
554
        listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id);
555

S
Shengliang Guan 已提交
556
    taosCloseFile(&pBuf->pFile);
557
  } else {
H
Haojun Liao 已提交
558
    uDebug("Paged buffer closed, total:%.2f Kb, no file created, %s", pBuf->totalBufSize / 1024.0, pBuf->id);
559 560
  }

H
Haojun Liao 已提交
561 562
  // print the statistics information
  {
S
Shengliang Guan 已提交
563
    SDiskbasedBufStatis* ps = &pBuf->statis;
564 565 566 567 568 569 570 571 572 573
    if (ps->loadPages == 0) {
      uDebug(
          "Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages)",
          ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f, ps->loadPages);
    } else {
      uDebug(
          "Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f Kb",
          ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f,
          ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages));
    }
H
Haojun Liao 已提交
574
  }
575

576
  taosRemoveFile(pBuf->path);
wafwerar's avatar
wafwerar 已提交
577
  taosMemoryFreeClear(pBuf->path);
H
Haojun Liao 已提交
578 579

  SArray** p = taosHashIterate(pBuf->groupSet, NULL);
S
Shengliang Guan 已提交
580
  while (p) {
581
    size_t n = taosArrayGetSize(*p);
S
Shengliang Guan 已提交
582
    for (int32_t i = 0; i < n; ++i) {
583
      SPageInfo* pi = taosArrayGetP(*p, i);
wafwerar's avatar
wafwerar 已提交
584 585
      taosMemoryFreeClear(pi->pData);
      taosMemoryFreeClear(pi);
586 587 588
    }

    taosArrayDestroy(*p);
H
Haojun Liao 已提交
589
    p = taosHashIterate(pBuf->groupSet, p);
590 591
  }

H
Haojun Liao 已提交
592
  tdListFree(pBuf->lruList);
H
Haojun Liao 已提交
593 594
  tdListFree(pBuf->freePgList);

H
Haojun Liao 已提交
595
  taosArrayDestroy(pBuf->emptyDummyIdList);
H
Haojun Liao 已提交
596 597
  taosArrayDestroy(pBuf->pFree);

H
Haojun Liao 已提交
598 599
  taosHashCleanup(pBuf->groupSet);
  taosHashCleanup(pBuf->all);
600

wafwerar's avatar
wafwerar 已提交
601 602 603
  taosMemoryFreeClear(pBuf->id);
  taosMemoryFreeClear(pBuf->assistBuf);
  taosMemoryFreeClear(pBuf);
604 605
}

606
SPageInfo* getLastPageInfo(SIDList pList) {
S
Shengliang Guan 已提交
607
  size_t     size = taosArrayGetSize(pList);
608 609 610 611
  SPageInfo* pPgInfo = taosArrayGetP(pList, size - 1);
  return pPgInfo;
}

612
int32_t getPageId(const SPageInfo* pPgInfo) {
613 614 615 616
  ASSERT(pPgInfo != NULL);
  return pPgInfo->pageId;
}

S
Shengliang Guan 已提交
617
int32_t getBufPageSize(const SDiskbasedBuf* pBuf) { return pBuf->pageSize; }
H
Haojun Liao 已提交
618

S
Shengliang Guan 已提交
619
int32_t getNumOfInMemBufPages(const SDiskbasedBuf* pBuf) { return pBuf->inMemPages; }
H
Haojun Liao 已提交
620

S
Shengliang Guan 已提交
621
bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf) { return pBuf->fileSize == 0; }
H
Haojun Liao 已提交
622

H
Haojun Liao 已提交
623
void setBufPageDirty(void* pPage, bool dirty) {
H
Haojun Liao 已提交
624
  SPageInfo* ppi = getPageInfoFromPayload(pPage);
H
Haojun Liao 已提交
625
  ppi->dirty = dirty;
626 627
}

628
void setBufPageCompressOnDisk(SDiskbasedBuf* pBuf, bool comp) { pBuf->comp = comp; }
H
Haojun Liao 已提交
629

630
void dBufSetBufPageRecycled(SDiskbasedBuf* pBuf, void* pPage) {
H
Haojun Liao 已提交
631 632
  SPageInfo* ppi = getPageInfoFromPayload(pPage);

633
  ppi->used = false;
H
Haojun Liao 已提交
634 635
  ppi->dirty = false;

H
Haojun Liao 已提交
636 637
  // add this pageinfo into the free page info list
  SListNode* pNode = tdListPopNode(pBuf->lruList, ppi->pn);
wafwerar's avatar
wafwerar 已提交
638 639
  taosMemoryFreeClear(ppi->pData);
  taosMemoryFreeClear(pNode);
640
  ppi->pn = NULL;
H
Haojun Liao 已提交
641 642

  tdListAppend(pBuf->freePgList, &ppi);
H
Haojun Liao 已提交
643
}
H
Haojun Liao 已提交
644

645
void dBufSetPrintInfo(SDiskbasedBuf* pBuf) { pBuf->printStatis = true; }
646

S
Shengliang Guan 已提交
647
SDiskbasedBufStatis getDBufStatis(const SDiskbasedBuf* pBuf) { return pBuf->statis; }
H
Haojun Liao 已提交
648

H
Haojun Liao 已提交
649
void dBufPrintStatis(const SDiskbasedBuf* pBuf) {
H
Haojun Liao 已提交
650 651 652 653 654 655 656 657
  if (!pBuf->printStatis) {
    return;
  }

  const SDiskbasedBufStatis* ps = &pBuf->statis;

  printf(
      "Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page size:%.2f "
H
Haojun Liao 已提交
658
      "Kb, %s\n",
H
Haojun Liao 已提交
659
      pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0,
H
Haojun Liao 已提交
660
      listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id);
H
Haojun Liao 已提交
661 662 663 664 665

  printf(
      "Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f Kb\n",
      ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f, ps->loadPages,
      ps->loadBytes / (1024.0 * ps->loadPages));
666
}
5
54liuyao 已提交
667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694

void clearDiskbasedBuf(SDiskbasedBuf* pBuf) {
  SArray** p = taosHashIterate(pBuf->groupSet, NULL);
  while (p) {
    size_t n = taosArrayGetSize(*p);
    for (int32_t i = 0; i < n; ++i) {
      SPageInfo* pi = taosArrayGetP(*p, i);
      taosMemoryFreeClear(pi->pData);
      taosMemoryFreeClear(pi);
    }
    taosArrayDestroy(*p);
    p = taosHashIterate(pBuf->groupSet, p);
  }

  tdListEmpty(pBuf->lruList);
  tdListEmpty(pBuf->freePgList);

  taosArrayClear(pBuf->emptyDummyIdList);
  taosArrayClear(pBuf->pFree);

  taosHashClear(pBuf->groupSet);
  taosHashClear(pBuf->all);

  pBuf->numOfPages   = 0;  // all pages are in buffer in the first place
  pBuf->totalBufSize = 0;
  pBuf->allocateId = -1;
  pBuf->fileSize = 0;
}