/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "tdbInt.h" struct SPager { char * dbFileName; char * jFileName; int pageSize; uint8_t fid[TDB_FILE_ID_LEN]; int fd; int jfd; SPCache *pCache; SPgno dbFileSize; SPgno dbOrigSize; int nDirty; SPage * pDirty; SPage * pDirtyTail; u8 inTran; }; typedef struct __attribute__((__packed__)) { u8 hdrString[16]; u16 pageSize; SPgno freePage; u32 nFreePages; u8 reserved[102]; } SFileHdr; TDB_STATIC_ASSERT(sizeof(SFileHdr) == 128, "Size of file header is not correct"); static int tdbPagerReadPage(SPager *pPager, SPage *pPage); int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) { uint8_t *pPtr; SPager * pPager; int fsize; int zsize; int ret; *ppPager = NULL; fsize = strlen(fileName); zsize = sizeof(*pPager) /* SPager */ + fsize + 1 /* dbFileName */ + fsize + 8 + 1; /* jFileName */ pPtr = (uint8_t *)calloc(1, zsize); if (pPtr == NULL) { return -1; } pPager = (SPager *)pPtr; pPtr += sizeof(*pPager); // pPager->dbFileName pPager->dbFileName = (char *)pPtr; memcpy(pPager->dbFileName, fileName, fsize); pPager->dbFileName[fsize] = '\0'; pPtr += fsize + 1; // pPager->jFileName pPager->jFileName = (char *)pPtr; memcpy(pPager->jFileName, fileName, fsize); memcpy(pPager->jFileName + fsize, "-journal", 8); pPager->jFileName[fsize + 8] = '\0'; // pPager->pCache pPager->pCache = pCache; pPager->fd = open(pPager->dbFileName, O_RDWR | O_CREAT, 0755); if (pPager->fd < 0) { return -1; } ret = tdbGnrtFileID(pPager->dbFileName, pPager->fid, false); if (ret < 0) { return -1; } pPager->jfd = -1; pPager->pageSize = tdbPCacheGetPageSize(pCache); *ppPager = pPager; return 0; } int tdbPagerClose(SPager *pPager) { // TODO return 0; } int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate) { SPgno pgno; SPage *pPage; int ret; { // TODO: try to search the main DB to get the page number pgno = 0; } // if (pgno == 0 && toCreate) { // ret = tdbPagerAllocPage(pPager, &pPage, &pgno); // if (ret < 0) { // return -1; // } // // TODO: Need to zero the page // ret = tdbPagerWrite(pPager, pPage); // if (ret < 0) { // return -1; // } // } *ppgno = pgno; return 0; } SPage *tdbPagerGet(SPager *pPager, SPgno pgno, bool toLoad) { SPgid pgid; SPage *pPage; int ret; memcpy(pgid.fileid, pPager->fid, TDB_FILE_ID_LEN); pgid.pgno = pgno; // Get page frame from the SPCache pPage = tdbPCacheFetch(pPager->pCache, &pgid, 1); if (pPage == NULL) { // TODO: handle error return NULL; } tdbPCacheFetchFinish(pPager->pCache, pPage); // Zero the page or load page content from backend // according to the options if (pPage->pPager == NULL || !toLoad) { if (!toLoad || pgno >= pPager->dbFileSize) { memset(pPage->pData, 0, pPager->pageSize); } else { ret = tdbPagerReadPage(pPager, pPage); if (ret < 0) { // TODO: Need to drop the page return NULL; } } if (pPage->pPager) { ASSERT(pPage->pPager == pPager); } else { pPage->pPager = pPager; } } return pPage; } int tdbPagerWrite(SPager *pPager, SPage *pPage) { int ret; if (pPager->inTran == 0) { ret = tdbPagerBegin(pPager); if (ret < 0) { return -1; } } if (pPage->isDirty == 0) { pPage->isDirty = 1; // TODO: add the page to the dirty list // TODO: write the page to the journal if (1 /*actually load from the file*/) { } } return 0; } // int tdbPagerAllocPage(SPager *pPager, SPage **ppPage, SPgno *ppgno) { // SPage *pPage; // SPgno pgno; // if (1 /*TODO: no free page*/) { // pgno = ++pPager->dbFileSize; // pPage = tdbPagerGet(pPager, pgno, false); // ASSERT(pPage != NULL); // } else { // /* TODO: allocate from the free list */ // ASSERT(0); // } // *ppPage = pPage; // *ppgno = pgno; // return 0; // } int tdbPagerBegin(SPager *pPager) { if (pPager->inTran) { return 0; } // Open the journal pPager->jfd = open(pPager->jFileName, O_RDWR | O_CREAT, 0755); if (pPager->jfd < 0) { return -1; } // TODO: write the size of the file pPager->inTran = 1; return 0; } int tdbPagerCommit(SPager *pPager) { // TODO return 0; } static int tdbPagerReadPage(SPager *pPager, SPage *pPage) { i64 offset; int ret; ASSERT(memcmp(pPager->fid, pPage->pgid.fileid, TDB_FILE_ID_LEN) == 0); offset = (pPage->pgid.pgno - 1) * (i64)(pPager->pageSize); ret = tdbPRead(pPager->fd, pPage->pData, pPager->pageSize, offset); if (ret < 0) { // TODO: handle error return -1; } return 0; } int tdbPagerGetPageSize(SPager *pPager) { return pPager->pageSize; } int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage) { SPage *pPage; SPgid pgid; int ret; // Fetch a page container from the page cache memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN); pgid.pgno = pgno; pPage = tdbPCacheFetch(pPager->pCache, &pgid, 1); if (pPage == NULL) { return -1; } if (pPage->pPager == NULL) { ASSERT(pgno < pPager->dbOrigSize); // tdbWLockPage(pPage); if (pPage->pPager == NULL) { ret = tdbPagerReadPage(pPager, pPage); if (ret < 0) { return -1; } // ret = (*initPage)(pPage); // if (ret < 0) { // return -1; // } pPage->pPager = pPager; } // tdbWUnlockPage(pPage); } else { ASSERT(pPage->pPager == pPager); } *ppPage = pPage; return 0; } int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage) { int ret; SPage *pPage; SPgid pgid; // Allocate a page number ret = tdbPagerAllocPage(pPager, ppgno); if (ret < 0) { return -1; } ASSERT(*ppgno != 0); // Fetch a page container from the page cache memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN); pgid.pgno = *ppgno; pPage = tdbPCacheFetch(pPager->pCache, &pgid, 1); if (pPage == NULL) { return -1; } ASSERT(pPage->pPager == NULL); // TODO: a race condition problem may occur here // tdbWLockPage(pPage); // TODO: zero init the new page // (*initNewPage)(pPage, arg); pPage->pPager = NULL; // tdbWunlockPage(pPage); *ppPage = pPage; return 0; } static int tdbPagerAllocFreePage(SPager *pPager, SPgno *ppgno) { // TODO: Allocate a page from the free list return 0; } static int tdbPagerAllocNewPage(SPager *pPager, SPgno *ppgno) { *ppgno = ++pPager->dbFileSize; return 0; } static int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno) { int ret; *ppgno = 0; // Try to allocate from the free list of the pager ret = tdbPagerAllocFreePage(pPager, ppgno); if (ret < 0) { return -1; } if (*ppgno != 0) return 0; // Allocate the page by extending the pager ret = tdbPagerAllocNewPage(pPager, ppgno); if (ret < 0) { return -1; } ASSERT(*ppgno != 0); return 0; }