/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "tqMetaStore.h" // TODO:replace by an abstract file layer #include #include #include #include "osDir.h" #define TQ_META_NAME "tq.meta" #define TQ_IDX_NAME "tq.idx" static int32_t tqHandlePutCommitted(STqMetaStore*, int64_t key, void* value); static void* tqHandleGetUncommitted(STqMetaStore*, int64_t key); static inline void tqLinkUnpersist(STqMetaStore* pMeta, STqMetaList* pNode) { if (pNode->unpersistNext == NULL) { pNode->unpersistNext = pMeta->unpersistHead->unpersistNext; pNode->unpersistPrev = pMeta->unpersistHead; pMeta->unpersistHead->unpersistNext->unpersistPrev = pNode; pMeta->unpersistHead->unpersistNext = pNode; } } static inline int tqSeekLastPage(int fd) { int offset = lseek(fd, 0, SEEK_END); int pageNo = offset / TQ_PAGE_SIZE; int curPageOffset = pageNo * TQ_PAGE_SIZE; return lseek(fd, curPageOffset, SEEK_SET); } // TODO: the struct is tightly coupled with index entry typedef struct STqIdxPageHead { int16_t writeOffset; int8_t unused[14]; } STqIdxPageHead; typedef struct STqIdxPageBuf { STqIdxPageHead head; char buffer[TQ_IDX_PAGE_BODY_SIZE]; } STqIdxPageBuf; static inline int tqReadLastPage(int fd, STqIdxPageBuf* pBuf) { int offset = tqSeekLastPage(fd); int nBytes; if ((nBytes = read(fd, pBuf, TQ_PAGE_SIZE)) == -1) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } if (nBytes == 0) { memset(pBuf, 0, TQ_PAGE_SIZE); pBuf->head.writeOffset = TQ_IDX_PAGE_HEAD_SIZE; } ASSERT(nBytes == 0 || nBytes == pBuf->head.writeOffset); return lseek(fd, offset, SEEK_SET); } STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserialize deserializer, FTqDelete deleter, int32_t tqConfigFlag) { STqMetaStore* pMeta = malloc(sizeof(STqMetaStore)); if (pMeta == NULL) { terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; return NULL; } memset(pMeta, 0, sizeof(STqMetaStore)); // concat data file name and index file name size_t pathLen = strlen(path); pMeta->dirPath = malloc(pathLen + 1); if (pMeta->dirPath == NULL) { terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; return NULL; } strcpy(pMeta->dirPath, path); char name[pathLen + 10]; strcpy(name, path); if (taosDirExist(name) != 0 && taosMkDir(name) != 0) { terrno = TSDB_CODE_TQ_FAILED_TO_CREATE_DIR; tqError("failed to create dir:%s since %s ", name, terrstr()); } strcat(name, "/" TQ_IDX_NAME); int idxFd = open(name, O_RDWR | O_CREAT, 0755); if (idxFd < 0) { terrno = TAOS_SYSTEM_ERROR(errno); tqError("failed to open file:%s since %s ", name, terrstr()); // free memory return NULL; } pMeta->idxFd = idxFd; pMeta->unpersistHead = malloc(sizeof(STqMetaList)); if (pMeta->unpersistHead == NULL) { terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; return NULL; } memset(pMeta->unpersistHead, 0, sizeof(STqMetaList)); pMeta->unpersistHead->unpersistNext = pMeta->unpersistHead->unpersistPrev = pMeta->unpersistHead; strcpy(name, path); strcat(name, "/" TQ_META_NAME); int fileFd = open(name, O_RDWR | O_CREAT, 0755); if (fileFd < 0) { terrno = TAOS_SYSTEM_ERROR(errno); tqError("failed to open file:%s since %s", name, terrstr()); return NULL; } pMeta->fileFd = fileFd; pMeta->pSerializer = serializer; pMeta->pDeserializer = deserializer; pMeta->pDeleter = deleter; pMeta->tqConfigFlag = tqConfigFlag; // read idx file and load into memory STqIdxPageBuf idxBuf; STqSerializedHead* serializedObj = malloc(TQ_PAGE_SIZE); if (serializedObj == NULL) { terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; } int idxRead; int allocated = TQ_PAGE_SIZE; bool readEnd = false; while ((idxRead = read(idxFd, &idxBuf, TQ_PAGE_SIZE))) { if (idxRead == -1) { // TODO: handle error terrno = TAOS_SYSTEM_ERROR(errno); tqError("failed to read tq index file since %s", terrstr()); } ASSERT(idxBuf.head.writeOffset == idxRead); // loop read every entry for (int i = 0; i < idxBuf.head.writeOffset - TQ_IDX_PAGE_HEAD_SIZE; i += TQ_IDX_SIZE) { STqMetaList* pNode = malloc(sizeof(STqMetaList)); if (pNode == NULL) { terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; // TODO: free memory } memset(pNode, 0, sizeof(STqMetaList)); memcpy(&pNode->handle, &idxBuf.buffer[i], TQ_IDX_SIZE); lseek(fileFd, pNode->handle.offset, SEEK_SET); if (allocated < pNode->handle.serializedSize) { void* ptr = realloc(serializedObj, pNode->handle.serializedSize); if (ptr == NULL) { terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; // TODO: free memory } serializedObj = ptr; allocated = pNode->handle.serializedSize; } serializedObj->ssize = pNode->handle.serializedSize; if (read(fileFd, serializedObj, pNode->handle.serializedSize) != pNode->handle.serializedSize) { // TODO: read error } if (serializedObj->action == TQ_ACTION_INUSE) { if (serializedObj->ssize != sizeof(STqSerializedHead)) { pMeta->pDeserializer(serializedObj, &pNode->handle.valueInUse); } else { pNode->handle.valueInUse = TQ_DELETE_TOKEN; } } else if (serializedObj->action == TQ_ACTION_INTXN) { if (serializedObj->ssize != sizeof(STqSerializedHead)) { pMeta->pDeserializer(serializedObj, &pNode->handle.valueInTxn); } else { pNode->handle.valueInTxn = TQ_DELETE_TOKEN; } } else if (serializedObj->action == TQ_ACTION_INUSE_CONT) { if (serializedObj->ssize != sizeof(STqSerializedHead)) { pMeta->pDeserializer(serializedObj, &pNode->handle.valueInUse); } else { pNode->handle.valueInUse = TQ_DELETE_TOKEN; } STqSerializedHead* ptr = POINTER_SHIFT(serializedObj, serializedObj->ssize); if (ptr->ssize != sizeof(STqSerializedHead)) { pMeta->pDeserializer(ptr, &pNode->handle.valueInTxn); } else { pNode->handle.valueInTxn = TQ_DELETE_TOKEN; } } else { ASSERT(0); } // put into list int bucketKey = pNode->handle.key & TQ_BUCKET_MASK; STqMetaList* pBucketNode = pMeta->bucket[bucketKey]; if (pBucketNode == NULL) { pMeta->bucket[bucketKey] = pNode; } else if (pBucketNode->handle.key == pNode->handle.key) { pNode->next = pBucketNode->next; pMeta->bucket[bucketKey] = pNode; } else { while (pBucketNode->next && pBucketNode->next->handle.key != pNode->handle.key) { pBucketNode = pBucketNode->next; } if (pBucketNode->next) { ASSERT(pBucketNode->next->handle.key == pNode->handle.key); STqMetaList* pNodeFound = pBucketNode->next; pNode->next = pNodeFound->next; pBucketNode->next = pNode; pBucketNode = pNodeFound; } else { pNode->next = pMeta->bucket[bucketKey]; pMeta->bucket[bucketKey] = pNode; pBucketNode = NULL; } } if (pBucketNode) { if (pBucketNode->handle.valueInUse && pBucketNode->handle.valueInUse != TQ_DELETE_TOKEN) { pMeta->pDeleter(pBucketNode->handle.valueInUse); } if (pBucketNode->handle.valueInTxn && pBucketNode->handle.valueInTxn != TQ_DELETE_TOKEN) { pMeta->pDeleter(pBucketNode->handle.valueInTxn); } free(pBucketNode); } } } free(serializedObj); return pMeta; } int32_t tqStoreClose(STqMetaStore* pMeta) { // commit data and idx tqStorePersist(pMeta); ASSERT(pMeta->unpersistHead && pMeta->unpersistHead->next == NULL); close(pMeta->fileFd); close(pMeta->idxFd); // free memory for (int i = 0; i < TQ_BUCKET_SIZE; i++) { STqMetaList* pNode = pMeta->bucket[i]; while (pNode) { ASSERT(pNode->unpersistNext == NULL); ASSERT(pNode->unpersistPrev == NULL); if (pNode->handle.valueInTxn && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { pMeta->pDeleter(pNode->handle.valueInTxn); } if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { pMeta->pDeleter(pNode->handle.valueInUse); } STqMetaList* next = pNode->next; free(pNode); pNode = next; } } free(pMeta->dirPath); free(pMeta->unpersistHead); free(pMeta); return 0; } int32_t tqStoreDelete(STqMetaStore* pMeta) { close(pMeta->fileFd); close(pMeta->idxFd); // free memory for (int i = 0; i < TQ_BUCKET_SIZE; i++) { STqMetaList* pNode = pMeta->bucket[i]; pMeta->bucket[i] = NULL; while (pNode) { if (pNode->handle.valueInTxn && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { pMeta->pDeleter(pNode->handle.valueInTxn); } if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { pMeta->pDeleter(pNode->handle.valueInUse); } STqMetaList* next = pNode->next; free(pNode); pNode = next; } } free(pMeta->unpersistHead); taosRemoveDir(pMeta->dirPath); free(pMeta->dirPath); free(pMeta); return 0; } int32_t tqStorePersist(STqMetaStore* pMeta) { STqIdxPageBuf idxBuf; int64_t* bufPtr = (int64_t*)idxBuf.buffer; STqMetaList* pHead = pMeta->unpersistHead; STqMetaList* pNode = pHead->unpersistNext; STqSerializedHead* pSHead = malloc(sizeof(STqSerializedHead)); if (pSHead == NULL) { terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; return -1; } pSHead->ver = TQ_SVER; pSHead->checksum = 0; pSHead->ssize = sizeof(STqSerializedHead); int allocatedSize = sizeof(STqSerializedHead); int offset = lseek(pMeta->fileFd, 0, SEEK_CUR); tqReadLastPage(pMeta->idxFd, &idxBuf); if (idxBuf.head.writeOffset == TQ_PAGE_SIZE) { lseek(pMeta->idxFd, 0, SEEK_END); memset(&idxBuf, 0, TQ_PAGE_SIZE); idxBuf.head.writeOffset = TQ_IDX_PAGE_HEAD_SIZE; } else { bufPtr = POINTER_SHIFT(&idxBuf, idxBuf.head.writeOffset); } while (pHead != pNode) { int nBytes = 0; if (pNode->handle.valueInUse) { if (pNode->handle.valueInTxn) { pSHead->action = TQ_ACTION_INUSE_CONT; } else { pSHead->action = TQ_ACTION_INUSE; } if (pNode->handle.valueInUse == TQ_DELETE_TOKEN) { pSHead->ssize = sizeof(STqSerializedHead); } else { pMeta->pSerializer(pNode->handle.valueInUse, &pSHead); } nBytes = write(pMeta->fileFd, pSHead, pSHead->ssize); ASSERT(nBytes == pSHead->ssize); } if (pNode->handle.valueInTxn) { pSHead->action = TQ_ACTION_INTXN; if (pNode->handle.valueInTxn == TQ_DELETE_TOKEN) { pSHead->ssize = sizeof(STqSerializedHead); } else { pMeta->pSerializer(pNode->handle.valueInTxn, &pSHead); } int nBytesTxn = write(pMeta->fileFd, pSHead, pSHead->ssize); ASSERT(nBytesTxn == pSHead->ssize); nBytes += nBytesTxn; } pNode->handle.offset = offset; offset += nBytes; // write idx file // TODO: endian check and convert *(bufPtr++) = pNode->handle.key; *(bufPtr++) = pNode->handle.offset; *(bufPtr++) = (int64_t)nBytes; idxBuf.head.writeOffset += TQ_IDX_SIZE; if (idxBuf.head.writeOffset >= TQ_PAGE_SIZE) { nBytes = write(pMeta->idxFd, &idxBuf, TQ_PAGE_SIZE); // TODO: handle error with tfile ASSERT(nBytes == TQ_PAGE_SIZE); memset(&idxBuf, 0, TQ_PAGE_SIZE); idxBuf.head.writeOffset = TQ_IDX_PAGE_HEAD_SIZE; bufPtr = (int64_t*)&idxBuf.buffer; } // remove from unpersist list pHead->unpersistNext = pNode->unpersistNext; pHead->unpersistNext->unpersistPrev = pHead; pNode->unpersistPrev = pNode->unpersistNext = NULL; pNode = pHead->unpersistNext; // remove from bucket if (pNode->handle.valueInUse == TQ_DELETE_TOKEN && pNode->handle.valueInTxn == NULL) { int bucketKey = pNode->handle.key & TQ_BUCKET_MASK; STqMetaList* pBucketHead = pMeta->bucket[bucketKey]; if (pBucketHead == pNode) { pMeta->bucket[bucketKey] = pNode->next; } else { STqMetaList* pBucketNode = pBucketHead; while (pBucketNode->next != NULL && pBucketNode->next != pNode) { pBucketNode = pBucketNode->next; } // impossible for pBucket->next == NULL ASSERT(pBucketNode->next == pNode); pBucketNode->next = pNode->next; } free(pNode); } } // write left bytes free(pSHead); // TODO: write new version in tfile if ((char*)bufPtr != idxBuf.buffer) { int nBytes = write(pMeta->idxFd, &idxBuf, idxBuf.head.writeOffset); // TODO: handle error in tfile ASSERT(nBytes == idxBuf.head.writeOffset); } // TODO: using fsync in tfile fsync(pMeta->idxFd); fsync(pMeta->fileFd); return 0; } static int32_t tqHandlePutCommitted(STqMetaStore* pMeta, int64_t key, void* value) { int64_t bucketKey = key & TQ_BUCKET_MASK; STqMetaList* pNode = pMeta->bucket[bucketKey]; while (pNode) { if (pNode->handle.key == key) { if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { pMeta->pDeleter(pNode->handle.valueInUse); } // change pointer ownership pNode->handle.valueInUse = value; return 0; } else { pNode = pNode->next; } } STqMetaList* pNewNode = malloc(sizeof(STqMetaList)); if (pNewNode == NULL) { terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; return -1; } memset(pNewNode, 0, sizeof(STqMetaList)); pNewNode->handle.key = key; pNewNode->handle.valueInUse = value; // put into unpersist list pNewNode->unpersistPrev = pMeta->unpersistHead; pNewNode->unpersistNext = pMeta->unpersistHead->unpersistNext; pMeta->unpersistHead->unpersistNext->unpersistPrev = pNewNode; pMeta->unpersistHead->unpersistNext = pNewNode; return 0; } void* tqHandleGet(STqMetaStore* pMeta, int64_t key) { int64_t bucketKey = key & TQ_BUCKET_MASK; STqMetaList* pNode = pMeta->bucket[bucketKey]; while (pNode) { if (pNode->handle.key == key) { if (pNode->handle.valueInUse != NULL && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { return pNode->handle.valueInUse; } else { return NULL; } } else { pNode = pNode->next; } } return NULL; } void* tqHandleTouchGet(STqMetaStore* pMeta, int64_t key) { int64_t bucketKey = key & TQ_BUCKET_MASK; STqMetaList* pNode = pMeta->bucket[bucketKey]; while (pNode) { if (pNode->handle.key == key) { if (pNode->handle.valueInUse != NULL && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { tqLinkUnpersist(pMeta, pNode); return pNode->handle.valueInUse; } else { return NULL; } } else { pNode = pNode->next; } } return NULL; } static inline int32_t tqHandlePutImpl(STqMetaStore* pMeta, int64_t key, void* value) { int64_t bucketKey = key & TQ_BUCKET_MASK; STqMetaList* pNode = pMeta->bucket[bucketKey]; while (pNode) { if (pNode->handle.key == key) { if (pNode->handle.valueInTxn) { if (tqDupIntxnReject(pMeta->tqConfigFlag)) { terrno = TSDB_CODE_TQ_META_KEY_DUP_IN_TXN; return -1; } if (pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { pMeta->pDeleter(pNode->handle.valueInTxn); } } pNode->handle.valueInTxn = value; tqLinkUnpersist(pMeta, pNode); return 0; } else { pNode = pNode->next; } } STqMetaList* pNewNode = malloc(sizeof(STqMetaList)); if (pNewNode == NULL) { terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; return -1; } memset(pNewNode, 0, sizeof(STqMetaList)); pNewNode->handle.key = key; pNewNode->handle.valueInTxn = value; pNewNode->next = pMeta->bucket[bucketKey]; pMeta->bucket[bucketKey] = pNewNode; tqLinkUnpersist(pMeta, pNewNode); return 0; } int32_t tqHandleMovePut(STqMetaStore* pMeta, int64_t key, void* value) { return tqHandlePutImpl(pMeta, key, value); } int32_t tqHandleCopyPut(STqMetaStore* pMeta, int64_t key, void* value, size_t vsize) { void* vmem = malloc(vsize); if (vmem == NULL) { terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; return -1; } memcpy(vmem, value, vsize); return tqHandlePutImpl(pMeta, key, vmem); } static void* tqHandleGetUncommitted(STqMetaStore* pMeta, int64_t key) { int64_t bucketKey = key & TQ_BUCKET_MASK; STqMetaList* pNode = pMeta->bucket[bucketKey]; while (pNode) { if (pNode->handle.key == key) { if (pNode->handle.valueInTxn != NULL && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { return pNode->handle.valueInTxn; } else { return NULL; } } else { pNode = pNode->next; } } return NULL; } int32_t tqHandleCommit(STqMetaStore* pMeta, int64_t key) { int64_t bucketKey = key & TQ_BUCKET_MASK; STqMetaList* pNode = pMeta->bucket[bucketKey]; while (pNode) { if (pNode->handle.key == key) { if (pNode->handle.valueInTxn == NULL) { terrno = TSDB_CODE_TQ_META_KEY_NOT_IN_TXN; return -1; } if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { pMeta->pDeleter(pNode->handle.valueInUse); } pNode->handle.valueInUse = pNode->handle.valueInTxn; pNode->handle.valueInTxn = NULL; tqLinkUnpersist(pMeta, pNode); return 0; } else { pNode = pNode->next; } } terrno = TSDB_CODE_TQ_META_NO_SUCH_KEY; return -1; } int32_t tqHandleAbort(STqMetaStore* pMeta, int64_t key) { int64_t bucketKey = key & TQ_BUCKET_MASK; STqMetaList* pNode = pMeta->bucket[bucketKey]; while (pNode) { if (pNode->handle.key == key) { if (pNode->handle.valueInTxn) { if (pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { pMeta->pDeleter(pNode->handle.valueInTxn); } pNode->handle.valueInTxn = NULL; tqLinkUnpersist(pMeta, pNode); return 0; } terrno = TSDB_CODE_TQ_META_KEY_NOT_IN_TXN; return -1; } else { pNode = pNode->next; } } terrno = TSDB_CODE_TQ_META_NO_SUCH_KEY; return -1; } int32_t tqHandleDel(STqMetaStore* pMeta, int64_t key) { int64_t bucketKey = key & TQ_BUCKET_MASK; STqMetaList* pNode = pMeta->bucket[bucketKey]; while (pNode) { if (pNode->handle.key == key) { if (pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { if (pNode->handle.valueInTxn) { pMeta->pDeleter(pNode->handle.valueInTxn); } pNode->handle.valueInTxn = TQ_DELETE_TOKEN; tqLinkUnpersist(pMeta, pNode); return 0; } } else { pNode = pNode->next; } } terrno = TSDB_CODE_TQ_META_NO_SUCH_KEY; return -1; } int32_t tqHandlePurge(STqMetaStore* pMeta, int64_t key) { int64_t bucketKey = key & TQ_BUCKET_MASK; STqMetaList* pNode = pMeta->bucket[bucketKey]; while (pNode) { if (pNode->handle.key == key) { pNode->handle.valueInUse = TQ_DELETE_TOKEN; tqLinkUnpersist(pMeta, pNode); return 0; } else { pNode = pNode->next; } } terrno = TSDB_CODE_TQ_META_NO_SUCH_KEY; return -1; } // TODO: clean deleted idx and data from persistent file int32_t tqStoreCompact(STqMetaStore* pMeta) { return 0; }