From d93d1093b849a9363cb9c3e15b90d9cdf6a326f6 Mon Sep 17 00:00:00 2001 From: slguan Date: Sun, 22 Mar 2020 10:12:41 +0800 Subject: [PATCH] [TD-15] refact the interface of sdb --- src/mnode/inc/mgmtSdb.h | 43 +-- src/mnode/src/mgmtSdb.c | 794 +++++++++++++++++++++------------------- 2 files changed, 435 insertions(+), 402 deletions(-) diff --git a/src/mnode/inc/mgmtSdb.h b/src/mnode/inc/mgmtSdb.h index bf5ccb2634..e33bfce824 100644 --- a/src/mnode/inc/mgmtSdb.h +++ b/src/mnode/inc/mgmtSdb.h @@ -21,46 +21,47 @@ extern "C" { #endif typedef enum { - SDB_KEYTYPE_STRING, - SDB_KEYTYPE_AUTO, - SDB_KEYTYPE_MAX + SDB_KEY_TYPE_STRING, + SDB_KEY_TYPE_AUTO } ESdbKeyType; typedef enum { - SDB_OPER_GLOBAL, - SDB_OPER_LOCAL, - SDB_OPER_DISK + SDB_OPER_TYPE_GLOBAL, + SDB_OPER_TYPE_LOCAL } ESdbOperType; -enum _sdbaction { - SDB_TYPE_INSERT, - SDB_TYPE_DELETE, - SDB_TYPE_UPDATE, -} ESdbForwardType; +typedef struct { + ESdbOperType type; + int32_t maxRowSize; + int32_t rowSize; + void * rowData; + void * pObj; + void * table; + int64_t version; +} SSdbOperDesc; typedef struct { char *tableName; int32_t hashSessions; int32_t maxRowSize; ESdbKeyType keyType; - int32_t (*insertFp)(void *pObj); - int32_t (*deleteFp)(void *pObj); - int32_t (*updateFp)(void *pObj); - int32_t (*encodeFp)(void *pObj, void *pData, int32_t maxRowSize); - void * (*decodeFp)(void *pData); - int32_t (*destroyFp)(void *pObj); + int32_t (*insertFp)(SSdbOperDesc *pOper); + int32_t (*deleteFp)(SSdbOperDesc *pOper); + int32_t (*updateFp)(SSdbOperDesc *pOper); + int32_t (*encodeFp)(SSdbOperDesc *pOper); + int32_t (*decodeFp)(SSdbOperDesc *pDesc); + int32_t (*destroyFp)(SSdbOperDesc *pDesc); } SSdbTableDesc; void *sdbOpenTable(SSdbTableDesc *desc); void sdbCloseTable(void *handle); -int32_t sdbInsertRow(void *handle, void *row, ESdbOperType oper); -int32_t sdbDeleteRow(void *handle, void *key, ESdbOperType oper); -int32_t sdbUpdateRow(void *handle, void *row, int32_t rowSize, ESdbOperType oper); +int32_t sdbInsertRow(SSdbOperDesc *pOper); +int32_t sdbDeleteRow(SSdbOperDesc *pOper); +int32_t sdbUpdateRow(SSdbOperDesc *pOper); void *sdbGetRow(void *handle, void *key); void *sdbFetchRow(void *handle, void *pNode, void **ppRow); -int64_t sdbGetId(void *handle); int64_t sdbGetNumOfRows(void *handle); uint64_t sdbGetVersion(); diff --git a/src/mnode/src/mgmtSdb.c b/src/mnode/src/mgmtSdb.c index ebdbba5605..f957dc4a83 100644 --- a/src/mnode/src/mgmtSdb.c +++ b/src/mnode/src/mgmtSdb.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "os.h" #include "taosdef.h" +#include "taoserror.h" #include "tchecksum.h" #include "tglobalcfg.h" #include "tlog.h" @@ -39,29 +40,29 @@ typedef struct { typedef struct _SSdbTable { SSdbHeader header; - char name[TSDB_DB_NAME_LEN]; - char fn[TSDB_FILENAME_LEN]; + char tableName[TSDB_DB_NAME_LEN]; + char fileName[TSDB_FILENAME_LEN]; ESdbKeyType keyType; - int32_t dbId; + int32_t tableId; int32_t hashSessions; int32_t maxRowSize; - uint32_t autoIndex; + int32_t autoIndex; + int32_t fd; int64_t numOfRows; - int64_t id; - int64_t size; + int64_t version; + int64_t fileSize; void * iHandle; - int32_t fd; - int32_t (*insertFp)(void *pObj); - int32_t (*deleteFp)(void *pObj); - int32_t (*updateFp)(void *pObj); - void * (*decodeFp)(void *pData); // return pObj - int32_t (*encodeFp)(void *pObj, void *pData, int32_t maxRowSize); // return size of pData - int32_t (*destroyFp)(void *pObj); + int32_t (*insertFp)(SSdbOperDesc *pDesc); + int32_t (*deleteFp)(SSdbOperDesc *pOper); + int32_t (*updateFp)(SSdbOperDesc *pOper); + int32_t (*decodeFp)(SSdbOperDesc *pOper); + int32_t (*encodeFp)(SSdbOperDesc *pOper); + int32_t (*destroyFp)(SSdbOperDesc *pOper); pthread_mutex_t mutex; } SSdbTable; typedef struct { - int64_t id; + int64_t version; int64_t offset; int32_t rowSize; void * row; @@ -70,21 +71,27 @@ typedef struct { typedef struct { int32_t delimiter; int32_t rowSize; - int64_t id; + int64_t version; char data[]; } SRowHead; +typedef enum { + SDB_FORWARD_TYPE_INSERT, + SDB_FORWARD_TYPE_DELETE, + SDB_FORWARD_TYPE_UPDATE +} ESdbForwardType; + typedef struct { - uint8_t dbId; - int8_t type; - int16_t dataLen; - uint64_t version; - char data[]; + ESdbForwardType type; + int32_t tableId; + int64_t version; + int32_t rowSize; + void * rowData; } SForwardMsg; extern char version[]; const int16_t sdbFileVersion = 2; -int32_t (*mpeerForwardRequestFp)(SSdbTable *pTable, char type, void *cont, int32_t contLen) = NULL; +int32_t (*mpeerForwardRequestFp)(SForwardMsg *forwardMsg) = NULL; static SSdbTable *sdbTableList[10] = {0}; static int32_t sdbNumOfTables = 0; @@ -101,25 +108,36 @@ void sdbResetTable(SSdbTable *pTable); void sdbSaveSnapShot(void *handle); uint64_t sdbGetVersion() { return sdbVersion; } -int64_t sdbGetId(void *handle) { return ((SSdbTable *)handle)->id; } +int64_t sdbGetId(void *handle) { return ((SSdbTable *)handle)->version; } int64_t sdbGetNumOfRows(void *handle) { return ((SSdbTable *)handle)->numOfRows; } -static int32_t sdbForwardDbReqToPeer(SSdbTable *pTable, char type, char *data, int32_t dataLen) { +static char *sdbGetkeyStr(SSdbTable *pTable, void *row) { + static char str[16]; + switch (pTable->keyType) { + case SDB_KEY_TYPE_STRING: + return (char *)row; + case SDB_KEY_TYPE_AUTO: + sprintf(str, "%d", *(int32_t *)row); + return str; + default: + return "unknown"; + } +} + +static int32_t sdbForwardDbReqToPeer(SForwardMsg *forwardMsg) { if (mpeerForwardRequestFp) { - return mpeerForwardRequestFp(pTable, type, data, dataLen); + return mpeerForwardRequestFp(forwardMsg); } else { return 0; } } -static void sdbFinishCommit(void *handle) { - SSdbTable *pTable = (SSdbTable *)handle; - uint32_t sdbEcommit = SDB_ENDCOMMIT; - +static void sdbFinishCommit(SSdbTable *pTable) { + uint32_t sdbEcommit = SDB_ENDCOMMIT; off_t offset = lseek(pTable->fd, 0, SEEK_END); - assert(offset == pTable->size); + assert(offset == pTable->fileSize); twrite(pTable->fd, &sdbEcommit, sizeof(sdbEcommit)); - pTable->size += sizeof(sdbEcommit); + pTable->fileSize += sizeof(sdbEcommit); } static int32_t sdbOpenSdbFile(SSdbTable *pTable) { @@ -136,40 +154,40 @@ static int32_t sdbOpenSdbFile(SSdbTable *pTable) { // check sdb.db and .sdb.db status char fn[TSDB_FILENAME_LEN] = "\0"; - dirc = strdup(pTable->fn); - basec = strdup(pTable->fn); + dirc = strdup(pTable->fileName); + basec = strdup(pTable->fileName); sprintf(fn, "%s/.%s", dirname(dirc), basename(basec)); tfree(dirc); tfree(basec); if (stat(fn, &ofstat) == 0) { // .sdb.db file exists - if (stat(pTable->fn, &fstat) == 0) { + if (stat(pTable->fileName, &fstat) == 0) { remove(fn); } else { - remove(pTable->fn); - rename(fn, pTable->fn); + remove(pTable->fileName); + rename(fn, pTable->fileName); } } - pTable->fd = open(pTable->fn, O_RDWR | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); + pTable->fd = open(pTable->fileName, O_RDWR | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); if (pTable->fd < 0) { - sdbError("failed to open file:%s", pTable->fn); + sdbError("table:%s, failed to open file:%s", pTable->tableName, pTable->fileName); return -1; } - pTable->size = 0; - stat(pTable->fn, &fstat); + pTable->fileSize = 0; + stat(pTable->fileName, &fstat); size = sizeof(pTable->header); if (fstat.st_size == 0) { pTable->header.swVersion = swVersion.iversion; pTable->header.sdbFileVersion = sdbFileVersion; if (taosCalcChecksumAppend(0, (uint8_t *)(&pTable->header), size) < 0) { - sdbError("failed to get file header checksum, file:%s", pTable->fn); + sdbError("table:%s, failed to get file header checksum, file:%s", pTable->tableName, pTable->fileName); tclose(pTable->fd); return -1; } twrite(pTable->fd, &(pTable->header), size); - pTable->size += size; + pTable->fileSize += size; sdbFinishCommit(pTable); } else { uint32_t sdbEcommit = 0; @@ -186,25 +204,25 @@ static int32_t sdbOpenSdbFile(SSdbTable *pTable) { ssize_t tsize = read(pTable->fd, &(pTable->header), size); if (tsize < size) { - sdbError("failed to read sdb file header, file:%s", pTable->fn); + sdbError("table:%s, failed to read sdb file header, file:%s", pTable->tableName, pTable->fileName); tclose(pTable->fd); return -1; } if (pTable->header.swVersion != swVersion.iversion) { - sdbWarn("sdb file:%s version not match software version", pTable->fn); + sdbWarn("table:%s, sdb file:%s version not match software version", pTable->tableName, pTable->fileName); } if (!taosCheckChecksumWhole((uint8_t *)(&pTable->header), size)) { - sdbError("sdb file header is broken since checksum mismatch, file:%s", pTable->fn); + sdbError("table:%s, sdb file header is broken since checksum mismatch, file:%s", pTable->tableName, pTable->fileName); tclose(pTable->fd); return -1; } - pTable->size += size; + pTable->fileSize += size; // skip end commit symbol lseek(pTable->fd, sizeof(sdbEcommit), SEEK_CUR); - pTable->size += sizeof(sdbEcommit); + pTable->fileSize += sizeof(sdbEcommit); } pTable->numOfRows = 0; @@ -213,106 +231,170 @@ static int32_t sdbOpenSdbFile(SSdbTable *pTable) { } static int32_t sdbInitTableByFile(SSdbTable *pTable) { - SRowMeta rowMeta; - int32_t numOfDels = 0; - int32_t bytes = 0; - int64_t oldId = 0; - void * pMetaRow = NULL; - int32_t total_size = 0; - int32_t real_size = 0; - int32_t maxAutoIndex = 0; - - if (sdbOpenSdbFile(pTable) < 0) return -1; - - total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM); + sdbTrace("table:%s, open sdb file:%s for read", pTable->tableName, pTable->fileName); + if (sdbOpenSdbFile(pTable) < 0) { + sdbError("table:%s, failed to open sdb file:%s for read", pTable->tableName, pTable->fileName); + return -1; + } + + int32_t total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM); SRowHead *rowHead = (SRowHead *)malloc(total_size); if (rowHead == NULL) { - sdbError("failed to allocate row head memory, sdb:%s", pTable->name); + sdbError("table:%s, failed to allocate row head memory, sdb:%s", pTable->tableName, pTable->tableName); return -1; } - sdbTrace("open sdb file:%s for read", pTable->fn); - + int32_t numOfChanged = 0; + int32_t maxAutoIndex = 0; while (1) { memset(rowHead, 0, total_size); - bytes = read(pTable->fd, rowHead, sizeof(SRowHead)); + int32_t bytes = read(pTable->fd, rowHead, sizeof(SRowHead)); if (bytes < 0) { - sdbError("failed to read sdb file:%s", pTable->fn); - goto sdb_exit1; + sdbError("table:%s, failed to read sdb file:%s", pTable->tableName, pTable->fileName); + tfree(rowHead); + return -1; } if (bytes == 0) break; if (bytes < sizeof(SRowHead) || rowHead->delimiter != SDB_DELIMITER) { - pTable->size++; + pTable->fileSize++; lseek(pTable->fd, -(bytes - 1), SEEK_CUR); continue; } if (rowHead->rowSize < 0 || rowHead->rowSize > pTable->maxRowSize) { - sdbError("error row size in sdb file:%s, id:%d rowSize:%d maxRowSize:%d", - pTable->fn, rowHead->id, rowHead->rowSize, pTable->maxRowSize); - pTable->size += sizeof(SRowHead); + sdbError("table:%s, error row size in sdb filesize:%d, version:%d rowSize:%d maxRowSize:%d", pTable->tableName, + pTable->fileSize, rowHead->version, rowHead->rowSize, pTable->maxRowSize); + pTable->fileSize += sizeof(SRowHead); continue; } bytes = read(pTable->fd, rowHead->data, rowHead->rowSize + sizeof(TSCKSUM)); if (bytes < rowHead->rowSize + sizeof(TSCKSUM)) { - // TODO: Here may cause pTable->size not end of the file - sdbError("failed to read sdb file:%s id:%d rowSize:%d", pTable->fn, rowHead->id, rowHead->rowSize); + // TODO: Here may cause pTable->fileSize not end of the file + sdbError("table:%s, failed to read sdb file, version:%d rowSize:%d", pTable->tableName, rowHead->version, + rowHead->rowSize); break; } - real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM); + int32_t real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM); if (!taosCheckChecksumWhole((uint8_t *)rowHead, real_size)) { - sdbError("error sdb checksum, sdb:%s id:%d, skip", pTable->name, rowHead->id); - pTable->size += real_size; + sdbError("table:%s, error sdb checksum, version:%d, skip", pTable->tableName, rowHead->version); + pTable->fileSize += real_size; continue; } - - if (pTable->keyType == SDB_KEYTYPE_AUTO) { + + if (pTable->keyType == SDB_KEY_TYPE_AUTO) { maxAutoIndex = MAX(maxAutoIndex, *(int32_t *) rowHead->data); } - pMetaRow = sdbGetRow(pTable, rowHead->data); + void *pMetaRow = sdbGetRow(pTable, rowHead->data); if (pMetaRow == NULL) { - if (rowHead->id < 0) { - sdbError("error sdb negative id:%d, sdb:%s, skip", rowHead->id, pTable->name); + if (rowHead->version < 0) { + sdbError("table:%s, error sdb negative version:%d, record:%s, skip", pTable->tableName, rowHead->version, + sdbGetkeyStr(pTable, rowHead->data)); } else { - sdbInsertRow(pTable, rowHead->data, SDB_OPER_DISK); + SRowMeta rowMeta; + rowMeta.version = rowHead->version; + rowMeta.offset = pTable->fileSize; + rowMeta.rowSize = rowHead->rowSize; + SSdbOperDesc oper = { + .table = pTable, + .rowData = rowHead->data, + .rowSize = rowHead->rowSize + }; + int32_t code = (*pTable->decodeFp)(&oper); + if (code == TSDB_CODE_SUCCESS) { + rowMeta.row = oper.pObj; + (*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, rowMeta.row, &rowMeta); + pTable->numOfRows++; + sdbTrace("table:%s, read record:%s and insert, numOfRows:%d version:%" PRId64 " sdbversion:%" PRId64, + pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version, sdbVersion); + } else { + sdbError("table:%s, failed to decode record:%s, numOfRows:%d version:%" PRId64 " sdbversion:%" PRId64, + pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version, sdbVersion); + } } } else { - if (rowHead->id < 0) { - sdbDeleteRow(pTable, rowHead->data, SDB_OPER_DISK); + if (rowHead->version < 0) { + SSdbOperDesc oper = { + .table = pTable, + .pObj = pMetaRow + }; + (*pTable->destroyFp)(&oper); + (*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, rowHead->data); + pTable->numOfRows--; + sdbTrace("table:%s, read record:%s and delete, numOfRows:%d version:%" PRId64 " sdbversion:%" PRId64, + pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version, sdbVersion); } else { - sdbUpdateRow(pTable, rowHead->data, rowHead->rowSize, SDB_OPER_DISK); + SRowMeta rowMeta; + rowMeta.version = rowHead->version; + rowMeta.offset = pTable->fileSize; + rowMeta.rowSize = rowHead->rowSize; + SSdbOperDesc oper = { + .table = pTable, + .rowData = rowHead->data, + .rowSize = rowHead->rowSize, + .pObj = pMetaRow + }; + + (*pTable->destroyFp)(&oper); + (*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, rowHead->data); + + int32_t code = (*pTable->decodeFp)(&oper); + if (code == TSDB_CODE_SUCCESS) { + rowMeta.row = oper.pObj; + (*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, rowMeta.row, &rowMeta); + sdbTrace("table:%s, read record:%s and update, numOfRows:%d version:%" PRId64 " sdbversion:%" PRId64, + pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version, sdbVersion); + } else { + sdbError("table:%s, failed to decode record:%s, numOfRows:%d version:%" PRId64 " sdbversion:%" PRId64, + pTable->tableName, sdbGetkeyStr(pTable, rowHead->data), pTable->numOfRows, pTable->version, sdbVersion); + } + } + numOfChanged++; + if (pTable->version < abs(rowHead->version)) { + pTable->version = abs(rowHead->version); } - numOfDels++; } - pTable->size += real_size; - if (pTable->id < abs(rowHead->id)) pTable->id = abs(rowHead->id); - - pTable->size += 4; + pTable->fileSize += real_size; + pTable->fileSize += 4; lseek(pTable->fd, 4, SEEK_CUR); } - if (pTable->keyType == SDB_KEYTYPE_AUTO) { + void *pNode = NULL; + while (1) { + SRowMeta * pMeta; + pNode = (*sdbFetchRowFp[pTable->keyType])(pTable->iHandle, pNode, (void **)&pMeta); + if (pMeta == NULL) break; + + SSdbOperDesc oper = { + .pObj = pMeta->row, + .table = pTable, + .version = pMeta->version, + }; + + int32_t code = (*pTable->insertFp)(&oper); + if (code != TSDB_CODE_SUCCESS) { + sdbError("table:%s, failed to insert record:%s", pTable->tableName, sdbGetkeyStr(pTable, rowHead->data)); + } + } + + sdbVersion += pTable->version; + + if (pTable->keyType == SDB_KEY_TYPE_AUTO) { pTable->autoIndex = maxAutoIndex; } - sdbVersion += pTable->id; - if (numOfDels > pTable->hashSessions / 4) { + if (numOfChanged > pTable->hashSessions / 4) { sdbSaveSnapShot(pTable); } tfree(rowHead); return 0; - -sdb_exit1: - tfree(rowHead); - return -1; } void *sdbOpenTable(SSdbTableDesc *pDesc) { @@ -328,8 +410,8 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) { pTable->encodeFp = pDesc->encodeFp; pTable->decodeFp = pDesc->decodeFp; pTable->destroyFp = pDesc->destroyFp; - strcpy(pTable->name, pDesc->tableName); - sprintf(pTable->fn, "%s/%s.db", tsMnodeDir, pTable->name); + strcpy(pTable->tableName, pDesc->tableName); + sprintf(pTable->fileName, "%s/%s.db", tsMnodeDir, pTable->tableName); if (sdbInitIndexFp[pTable->keyType] != NULL) { pTable->iHandle = (*sdbInitIndexFp[pTable->keyType])(pTable->maxRowSize, sizeof(SRowMeta)); @@ -339,10 +421,11 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) { if (sdbInitTableByFile(pTable) < 0) return NULL; - pTable->dbId = sdbNumOfTables++; - sdbTableList[pTable->dbId] = pTable; + pTable->tableId = sdbNumOfTables++; + sdbTableList[pTable->tableId] = pTable; - sdbTrace("table:%s is initialized, numOfRows:%d, numOfTables:%d", pTable->name, pTable->numOfRows, sdbNumOfTables); + sdbTrace("table:%s is initialized, numOfRows:%d, numOfTables:%d, version:%" PRId64 " sdbversion:%" PRId64, + pTable->tableName, pTable->numOfRows, sdbNumOfTables, pTable->version, sdbVersion); return pTable; } @@ -375,323 +458,267 @@ void *sdbGetRow(void *handle, void *key) { return pMeta->row; } -int32_t sdbInsertRow(void *handle, void *row, ESdbOperType oper) { - SSdbTable *pTable = (SSdbTable *)handle; - SRowMeta rowMeta; - void * pObj = NULL; - int32_t total_size = 0; - int32_t real_size = 0; - +int32_t sdbInsertRow(SSdbOperDesc *pOper) { + SSdbTable *pTable = (SSdbTable *)pOper->table; if (pTable == NULL) { sdbError("sdb tables is null"); - return -1; - } - - if (sdbGetRow(handle, row)) { - switch (pTable->keyType) { - case SDB_KEYTYPE_STRING: - sdbError("table:%s, failed to insert record:%s sdbVersion:%" PRId64 " id:%" PRId64 , pTable->name, (char *)row, sdbVersion, pTable->id); - break; - case SDB_KEYTYPE_AUTO: - sdbError("table:%s, failed to insert record:%d sdbVersion:%" PRId64 " id:%" PRId64, pTable->name, *(int32_t *)row, sdbVersion, pTable->id); - break; - default: - sdbError("table:%s, failed to insert record sdbVersion:%" PRId64 " id:%" PRId64, pTable->name, sdbVersion, pTable->id); - break; - } - return -1; + return TSDB_CODE_OTHERS; } - total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM); - SRowHead *rowHead = (SRowHead *)malloc(total_size); - if (rowHead == NULL) { - sdbError("table:%s, failed to allocate row head memory", pTable->name); - return -1; + if (sdbGetRow(pTable, pOper->pObj)) { + sdbError("table:%s, failed to insert record:%s, already exist", pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj)); + return TSDB_CODE_ALREADY_THERE; } - memset(rowHead, 0, total_size); - if (oper == SDB_OPER_GLOBAL) { - pObj = row; - } else { - pObj = (*pTable->decodeFp)(row); - } - pthread_mutex_lock(&pTable->mutex); - if (oper == SDB_OPER_GLOBAL) { - if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_INSERT, rowHead->data, rowHead->rowSize) != 0) { - sdbError("table:%s, failed to insert record", pTable->name); + if (pOper->type == SDB_OPER_TYPE_GLOBAL) { + SForwardMsg forward = { + .type = SDB_FORWARD_TYPE_INSERT, + .tableId = pTable->tableId, + .version = pTable->version + 1, + .rowSize = pOper->rowSize, + .rowData = pOper->rowData, + }; + + if (sdbForwardDbReqToPeer(&forward) != 0) { + sdbError("table:%s, failed to forward record:%s version:%" PRId64 " sdbversion:%" PRId64, + pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), pOper->version, sdbVersion); pthread_mutex_unlock(&pTable->mutex); - tfree(rowHead); - return -1; + return TSDB_CODE_OTHERS; } } - if (oper != SDB_OPER_DISK) { - rowHead->rowSize = (*pTable->encodeFp)(pObj, rowHead->data, pTable->maxRowSize); - assert(rowHead->rowSize > 0 && rowHead->rowSize <= pTable->maxRowSize); - - real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM); - rowHead->delimiter = SDB_DELIMITER; - rowHead->id = pTable->id + 1; - if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) { - sdbError("table:%s, failed to get checksum while inserting", pTable->name); - pthread_mutex_unlock(&pTable->mutex); - tfree(rowHead); - return -1; - } - - twrite(pTable->fd, rowHead, real_size); - pTable->size += real_size; - sdbFinishCommit(pTable); + int32_t total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM); + SRowHead *rowHead = (SRowHead *)calloc(1, total_size); + if (rowHead == NULL) { + pthread_mutex_unlock(&pTable->mutex); + sdbError("table:%s, failed to allocate row head memory for record:%s version:%" PRId64 " sdbversion:%" PRId64, + pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), pOper->version, sdbVersion); + return -1; + } + + if (pTable->keyType == SDB_KEY_TYPE_AUTO) { + *((uint32_t *)pOper->pObj) = ++pTable->autoIndex; + } + pTable->version++; + sdbVersion++; + + pOper->rowData = rowHead->data; + (*pTable->encodeFp)(pOper); + rowHead->rowSize = pOper->rowSize; + + assert(rowHead->rowSize > 0 && rowHead->rowSize <= pTable->maxRowSize); + + int32_t real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM); + rowHead->delimiter = SDB_DELIMITER; + rowHead->version = pTable->version; + if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) { + sdbError("table:%s, failed to get checksum while inserting", pTable->tableName); + pTable->version--; + sdbVersion--; + pthread_mutex_unlock(&pTable->mutex); + tfree(rowHead); + return -1; } + twrite(pTable->fd, rowHead, real_size); + pTable->fileSize += real_size; + sdbFinishCommit(pTable); + tfree(rowHead); + // update in SDB layer - rowMeta.id = pTable->id; - rowMeta.offset = pTable->size; - rowMeta.rowSize = rowHead->rowSize; - rowMeta.row = pObj; - (*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, pObj, &rowMeta); - - if (pTable->keyType == SDB_KEYTYPE_AUTO) { - *((uint32_t *)pObj) = ++pTable->autoIndex; - } + SRowMeta rowMeta; + rowMeta.version = pTable->version; + rowMeta.offset = pTable->fileSize; + rowMeta.rowSize = pOper->rowSize; + rowMeta.row = pOper->pObj; + (*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, pOper->pObj, &rowMeta); pTable->numOfRows++; - - if (oper != SDB_OPER_DISK) { - pTable->id++; - sdbVersion++; - } pthread_mutex_unlock(&pTable->mutex); - switch (pTable->keyType) { - case SDB_KEYTYPE_STRING: - sdbTrace("table:%s, a record is inserted:%s, sdbVersion:%" PRId64 " id:%" PRId64 " rowSize:%d numOfRows:%d fileSize:%" PRId64, - pTable->name, (char *)row, sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size); - break; - case SDB_KEYTYPE_AUTO: - sdbTrace("table:%s, a record is inserted:%d, sdbVersion:%" PRId64 " id:%" PRId64 " rowSize:%d numOfRows:%d fileSize:%" PRId64, - pTable->name, *(int32_t *)row, sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size); - break; - default: - sdbTrace("table:%s, a record is inserted, sdbVersion:%" PRId64 " id:%" PRId64 " rowSize:%d numOfRows:%d fileSize:%" PRId64, - pTable->name, sdbVersion, rowHead->id, rowHead->rowSize, pTable->numOfRows, pTable->size); - break; - } + sdbTrace("table:%s, a record is inserted:%s, sdbversion:%" PRId64 " version:%" PRId64 " rowSize:%d numOfRows:%d fileSize:%" PRId64, + pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj), sdbVersion, pOper->version, pOper->rowSize, pTable->numOfRows, pTable->fileSize); - (*pTable->insertFp)(pObj); - - tfree(rowHead); + (*pTable->insertFp)(pOper); return 0; } // row here can be object or null-terminated string -int32_t sdbDeleteRow(void *handle, void *row, ESdbOperType oper) { - SSdbTable *pTable = (SSdbTable *)handle; - SRowMeta * pMeta = NULL; - void * pMetaRow = NULL; - SRowHead * rowHead = NULL; - int32_t rowSize = 0; - int32_t total_size = 0; - +int32_t sdbDeleteRow(SSdbOperDesc *pOper) { + SSdbTable *pTable = (SSdbTable *)pOper->table; if (pTable == NULL) return -1; - pMeta = sdbGetRowMeta(handle, row); + SRowMeta *pMeta = sdbGetRowMeta(pTable, pOper->pObj); if (pMeta == NULL) { - sdbTrace("table:%s, record is not there, delete failed", pTable->name); + sdbTrace("table:%s, record is not there, delete failed", pTable->tableName); return -1; } - pMetaRow = pMeta->row; + void * pMetaRow = pMeta->row; assert(pMetaRow != NULL); - switch (pTable->keyType) { - case SDB_KEYTYPE_STRING: - rowSize = strlen((char *)row) + 1; - break; - case SDB_KEYTYPE_AUTO: - rowSize = sizeof(uint64_t); - break; - default: - return -1; - } - - total_size = sizeof(SRowHead) + rowSize + sizeof(TSCKSUM); - rowHead = (SRowHead *)malloc(total_size); - if (rowHead == NULL) { - sdbError("failed to allocate row head memory, sdb:%s", pTable->name); - return -1; - } - memset(rowHead, 0, total_size); - pthread_mutex_lock(&pTable->mutex); - if (oper == SDB_OPER_GLOBAL) { - if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_DELETE, (char *)row, rowSize) == 0) { - sdbError("table:%s, failed to delete record", pTable->name); + if (pOper->type == SDB_OPER_TYPE_GLOBAL) { + SForwardMsg forward = { + .type = SDB_FORWARD_TYPE_DELETE, + .tableId = pTable->tableId, + .version = pTable->version + 1, + .rowSize = pOper->rowSize, + .rowData = pOper->rowData, + }; + + if (sdbForwardDbReqToPeer(&forward) == 0) { + sdbError("table:%s, failed to delete record", pTable->tableName); pthread_mutex_unlock(&pTable->mutex); - tfree(rowHead); return -1; } } - if (oper != SDB_OPER_DISK) { - rowHead->delimiter = SDB_DELIMITER; - rowHead->rowSize = rowSize; - rowHead->id = -(pTable->id); - memcpy(rowHead->data, row, rowSize); - if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, total_size) < 0) { - sdbError("failed to get checksum while inserting, sdb:%s", pTable->name); - pthread_mutex_unlock(&pTable->mutex); - tfree(rowHead); - return -1; - } - - twrite(pTable->fd, rowHead, total_size); - pTable->size += total_size; - sdbFinishCommit(pTable); + int32_t total_size = sizeof(SRowHead) + pOper->rowSize + sizeof(TSCKSUM); + SRowHead *rowHead = (SRowHead *)calloc(1, total_size); + if (rowHead == NULL) { + sdbError("failed to allocate row head memory, sdb:%s", pTable->tableName); + pthread_mutex_unlock(&pTable->mutex); + return -1; } + pTable->version++; + sdbVersion++; + + int32_t rowSize = 0; switch (pTable->keyType) { - case SDB_KEYTYPE_STRING: - sdbTrace("table:%s, a record is deleted:%s, sdbVersion:%" PRId64 " id:%" PRId64 " numOfRows:%d", - pTable->name, (char *)row, sdbVersion, pTable->id, pTable->numOfRows); + case SDB_KEY_TYPE_STRING: + rowSize = strlen((char *)pOper->rowData) + 1; break; - case SDB_KEYTYPE_AUTO: - sdbTrace("table:%s, a record is deleted:%d, sdbVersion:%" PRId64 " id:%" PRId64 " numOfRows:%d", - pTable->name, *(int32_t *)row, sdbVersion, pTable->id, pTable->numOfRows); + case SDB_KEY_TYPE_AUTO: + rowSize = sizeof(uint64_t); break; default: - sdbTrace("table:%s, a record is deleted, sdbVersion:%" PRId64 " id:%" PRId64 " numOfRows:%d", - pTable->name, sdbVersion, pTable->id, pTable->numOfRows); - break; + return -1; + } + + rowHead->delimiter = SDB_DELIMITER; + rowHead->rowSize = rowSize; + rowHead->version = -(pTable->version); + memcpy(rowHead->data, pOper->rowData, pOper->rowSize); + if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, total_size) < 0) { + sdbError("failed to get checksum while inserting, sdb:%s", pTable->tableName); + pTable->version--; + sdbVersion--; + pthread_mutex_unlock(&pTable->mutex); + tfree(rowHead); + return -1; } + twrite(pTable->fd, rowHead, total_size); + pTable->fileSize += total_size; + sdbFinishCommit(pTable); + + tfree(rowHead); + + sdbTrace("table:%s, a record is deleted:%s, sdbversion:%" PRId64 " id:%" PRId64 " numOfRows:%d", + pTable->tableName, sdbGetkeyStr(pTable, pOper->rowData), sdbVersion, pTable->version, pTable->numOfRows); + // Delete from current layer - (*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, row); + (*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, pOper->pObj); pTable->numOfRows--; - if (oper != SDB_OPER_DISK) { - pTable->id++; - sdbVersion++; - } - pthread_mutex_unlock(&pTable->mutex); - tfree(rowHead); - - (*pTable->deleteFp)(pMetaRow); - (*pTable->destroyFp)(pMetaRow); + (*pTable->deleteFp)(pOper); + (*pTable->destroyFp)(pOper); return 0; } // row here can be the object or the string info (encoded string) -int32_t sdbUpdateRow(void *handle, void *row, int32_t updateSize, ESdbOperType oper) { - SSdbTable *pTable = (SSdbTable *)handle; - SRowMeta * pMeta = NULL; - int32_t total_size = 0; - int32_t real_size = 0; +int32_t sdbUpdateRow(SSdbOperDesc *pOper) { + SSdbTable *pTable = (SSdbTable *)pOper->table; + if (pTable == NULL) return -1; - if (pTable == NULL || row == NULL) return -1; - pMeta = sdbGetRowMeta(handle, row); + SRowMeta *pMeta = sdbGetRowMeta(pTable, pOper->rowData); if (pMeta == NULL) { - switch (pTable->keyType) { - case SDB_KEYTYPE_STRING: - sdbError("table:%s, failed to update record:%s, record is not there, sdbVersion:%" PRId64 " id:%" PRId64, - pTable->name, (char *) row, sdbVersion, pTable->id); - break; - case SDB_KEYTYPE_AUTO: - sdbError("table:%s, failed to update record:%d, record is not there, sdbVersion:%" PRId64 " id:%" PRId64, - pTable->name, *(int32_t *) row, sdbVersion, pTable->id); - break; - default: - sdbError("table:%s, failed to update record, record is not there, sdbVersion:%" PRId64 " id:%" PRId64, - pTable->name, sdbVersion, pTable->id); - break; - } + sdbError("table:%s, failed to update record:%s, record is not there, sdbversion:%" PRId64 " id:%" PRId64, + pTable->tableName, sdbGetkeyStr(pTable, pOper->rowData), sdbVersion, pTable->version); return -1; } void *pMetaRow = pMeta->row; assert(pMetaRow != NULL); - total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM); - SRowHead *rowHead = (SRowHead *)malloc(total_size); - if (rowHead == NULL) { - sdbError("failed to allocate row head memory, sdb:%s", pTable->name); - return -1; - } - memset(rowHead, 0, total_size); - pthread_mutex_lock(&pTable->mutex); - if (oper == SDB_OPER_GLOBAL) { - if (sdbForwardDbReqToPeer(pTable, SDB_TYPE_UPDATE, rowHead->data, rowHead->rowSize) == 0) { - sdbError("table:%s, failed to update record", pTable->name); + if (pOper->type == SDB_OPER_TYPE_GLOBAL) { + SForwardMsg forward = { + .type = SDB_FORWARD_TYPE_UPDATE, + .tableId = pTable->tableId, + .version = pOper->version + 1, + .rowSize = pOper->rowSize, + .rowData = pOper->rowData, + }; + if (sdbForwardDbReqToPeer(&forward) == 0) { + sdbError("table:%s, failed to update record", pTable->tableName); pthread_mutex_unlock(&pTable->mutex); - tfree(rowHead); return -1; } } - if (pMetaRow != row) { - memcpy(rowHead->data, row, updateSize); - rowHead->rowSize = updateSize; - } else { - rowHead->rowSize = (*pTable->encodeFp)(pMetaRow, rowHead->data, pTable->maxRowSize); - } - - real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM); - - // write to the new position - if (oper != SDB_OPER_DISK) { - rowHead->delimiter = SDB_DELIMITER; - rowHead->id = pTable->id; - if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) { - sdbError("failed to get checksum, sdb:%s id:%d", pTable->name, rowHead->id); - pthread_mutex_unlock(&pTable->mutex); - tfree(rowHead); - return -1; - } - - twrite(pTable->fd, rowHead, real_size); - - pMeta->id = pTable->id; - pMeta->offset = pTable->size; - pMeta->rowSize = rowHead->rowSize; - pTable->size += real_size; - - sdbFinishCommit(pTable); - } - - switch (pTable->keyType) { - case SDB_KEYTYPE_STRING: - sdbTrace("table:%s, a record is updated:%s, sdbVersion:%" PRId64 " id:%" PRId64 " numOfRows:%" PRId64, - pTable->name, (char *)row, sdbVersion, pTable->id, pTable->numOfRows); - break; - case SDB_KEYTYPE_AUTO: - sdbTrace("table:%s, a record is updated:%d, sdbVersion:%" PRId64 " id:%" PRId64 " numOfRows:%" PRId64, - pTable->name, *(int32_t *)row, sdbVersion, pTable->id, pTable->numOfRows); - break; - default: - sdbTrace("table:%s, a record is updated, sdbVersion:%" PRId64 " id:%" PRId64 " numOfRows:%" PRId64, pTable->name, sdbVersion, - pTable->id, pTable->numOfRows); - break; + int32_t total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM); + SRowHead *rowHead = (SRowHead *)calloc(1, total_size); + if (rowHead == NULL) { + sdbError("table:%s, failed to allocate row head memory", pTable->tableName); + return -1; } - - if (oper != SDB_OPER_DISK) { - pTable->id++; - sdbVersion++; + + if (pMetaRow != pOper->pObj) { + memcpy(rowHead->data, pOper->rowData, pOper->rowSize); + rowHead->rowSize = pOper->rowSize; + } else { + SSdbOperDesc oper = { + .table = pTable, + .rowData = rowHead->data, + .maxRowSize = pTable->maxRowSize, + .pObj = pOper->pObj + }; + (*pTable->encodeFp)(&oper); + rowHead->rowSize = oper.rowSize; + } + + pTable->version++; + sdbVersion++; + + int32_t real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM); + rowHead->delimiter = SDB_DELIMITER; + rowHead->version = pTable->version; + if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) { + sdbError("failed to get checksum, sdb:%s version:%d", pTable->tableName, rowHead->version); + pTable->version--; + sdbVersion--; + pthread_mutex_unlock(&pTable->mutex); + tfree(rowHead); + return -1; } + + twrite(pTable->fd, rowHead, real_size); + pTable->fileSize += real_size; + sdbFinishCommit(pTable); + + sdbTrace("table:%s, a record is updated:%s, sdbversion:%" PRId64 " id:%" PRId64 " numOfRows:%" PRId64, + pTable->tableName, sdbGetkeyStr(pTable, pOper->rowData), sdbVersion, pTable->version, pTable->numOfRows); + pMeta->version = pTable->version; + pMeta->offset = pTable->fileSize; + pMeta->rowSize = rowHead->rowSize; + pthread_mutex_unlock(&pTable->mutex); - (*pTable->updateFp)(pMetaRow); // update in upper layer + (*pTable->updateFp)(pOper); // update in upper layer tfree(rowHead); @@ -708,7 +735,12 @@ void sdbCloseTable(void *handle) { while (1) { pNode = sdbFetchRow(handle, pNode, &row); if (row == NULL) break; - (*pTable->destroyFp)(row); + + SSdbOperDesc oper = { + .table = pTable, + .rowData = row, + }; + (*pTable->destroyFp)(&oper); } if (sdbCleanUpIndexFp[pTable->keyType]) (*sdbCleanUpIndexFp[pTable->keyType])(pTable->iHandle); @@ -718,20 +750,20 @@ void sdbCloseTable(void *handle) { pthread_mutex_destroy(&pTable->mutex); sdbNumOfTables--; - sdbTrace("table:%s is closed, id:%" PRId64 " numOfTables:%d", pTable->name, pTable->id, sdbNumOfTables); + sdbTrace("table:%s is closed, id:%" PRId64 " numOfTables:%d", pTable->tableName, pTable->version, sdbNumOfTables); tfree(pTable); } void sdbResetTable(SSdbTable *pTable) { - /* SRowHead rowHead; */ + /* SRowMeta rowMeta; int32_t bytes; int32_t total_size = 0; int32_t real_size = 0; SRowHead *rowHead = NULL; void * pMetaRow = NULL; - int64_t oldId = pTable->id; + int64_t oldId = pTable->version; int32_t oldNumOfRows = pTable->numOfRows; if (sdbOpenSdbFile(pTable) < 0) return; @@ -740,18 +772,18 @@ void sdbResetTable(SSdbTable *pTable) { total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM); rowHead = (SRowHead *)malloc(total_size); if (rowHead == NULL) { - sdbError("failed to allocate row head memory for reset, sdb:%s", pTable->name); + sdbError("failed to allocate row head memory for reset, sdb:%s", pTable->tableName); return; } - sdbPrint("open sdb file:%s for reset table", pTable->fn); + sdbPrint("open sdb file:%s for reset table", pTable->fileName); while (1) { memset(rowHead, 0, total_size); bytes = read(pTable->fd, rowHead, sizeof(SRowHead)); if (bytes < 0) { - sdbError("failed to read sdb file:%s", pTable->fn); + sdbError("failed to read sdb file:%s", pTable->fileName); tfree(rowHead); return; } @@ -759,40 +791,40 @@ void sdbResetTable(SSdbTable *pTable) { if (bytes == 0) break; if (bytes < sizeof(SRowHead) || rowHead->delimiter != SDB_DELIMITER) { - pTable->size++; + pTable->fileSize++; lseek(pTable->fd, -(bytes - 1), SEEK_CUR); continue; } if (rowHead->rowSize < 0 || rowHead->rowSize > pTable->maxRowSize) { - sdbError("error row size in sdb file:%s for reset, id:%d rowSize:%d maxRowSize:%d", - pTable->fn, rowHead->id, rowHead->rowSize, pTable->maxRowSize); - pTable->size += sizeof(SRowHead); + sdbError("error row size in sdb file:%s for reset, version:%d rowSize:%d maxRowSize:%d", + pTable->fileName, rowHead->version, rowHead->rowSize, pTable->maxRowSize); + pTable->fileSize += sizeof(SRowHead); continue; } bytes = read(pTable->fd, rowHead->data, rowHead->rowSize + sizeof(TSCKSUM)); if (bytes < rowHead->rowSize + sizeof(TSCKSUM)) { - sdbError("failed to read sdb file:%s for reset, id:%d rowSize:%d", pTable->fn, rowHead->id, rowHead->rowSize); + sdbError("failed to read sdb file:%s for reset, version:%d rowSize:%d", pTable->fileName, rowHead->version, rowHead->rowSize); break; } real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM); if (!taosCheckChecksumWhole((uint8_t *)rowHead, real_size)) { - sdbError("error sdb checksum, sdb:%s id:%d, skip", pTable->name, rowHead->id); - pTable->size += real_size; + sdbError("error sdb checksum, sdb:%s version:%d, skip", pTable->tableName, rowHead->version); + pTable->fileSize += real_size; continue; } - if (abs(rowHead->id) > oldId) { // not operated + if (abs(rowHead->version) > oldId) { // not operated pMetaRow = sdbGetRow(pTable, rowHead->data); if (pMetaRow == NULL) { // New object - if (rowHead->id < 0) { - sdbError("error sdb negative id:%d, sdb:%s, skip", rowHead->id, pTable->name); + if (rowHead->version < 0) { + sdbError("error sdb negative version:%d, sdb:%s, skip", rowHead->version, pTable->tableName); } else { - rowMeta.id = rowHead->id; + rowMeta.version = rowHead->version; // TODO:Get rid of the rowMeta.offset and rowSize - rowMeta.offset = pTable->size; + rowMeta.offset = pTable->fileSize; rowMeta.rowSize = rowHead->rowSize; rowMeta.row = (*pTable->decodeFp)(rowHead->data); (*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, rowMeta.row, &rowMeta); @@ -801,7 +833,7 @@ void sdbResetTable(SSdbTable *pTable) { (*pTable->insertFp)(rowMeta.row); } } else { // already exists - if (rowHead->id < 0) { // Delete the object + if (rowHead->version < 0) { // Delete the object (*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, rowHead->data); (*pTable->destroyFp)(pMetaRow); pTable->numOfRows--; @@ -811,38 +843,39 @@ void sdbResetTable(SSdbTable *pTable) { } } - pTable->size += real_size; - if (pTable->id < abs(rowHead->id)) pTable->id = abs(rowHead->id); + pTable->fileSize += real_size; + if (pTable->version < abs(rowHead->version)) pTable->version = abs(rowHead->version); } - sdbVersion += (pTable->id - oldId); + sdbVersion += (pTable->version - oldId); tfree(rowHead); - sdbPrint("table:%s is updated, sdbVerion:%" PRId64 " id:%" PRId64, pTable->name, sdbVersion, pTable->id); + sdbPrint("table:%s is updated, sdbVerion:%" PRId64 " id:%" PRId64, pTable->tableName, sdbVersion, pTable->version); + */ } // TODO:A problem here :use snapshot file to sync another node will cause problem void sdbSaveSnapShot(void *handle) { + /* SSdbTable *pTable = (SSdbTable *)handle; SRowMeta * pMeta; void * pNode = NULL; - int32_t total_size = 0; - int32_t real_size = 0; - int32_t size = 0; - int32_t numOfRows = 0; + int32_t total_size = 0; + int32_t real_size = 0; + int32_t size = 0; + int32_t numOfRows = 0; uint32_t sdbEcommit = SDB_ENDCOMMIT; char * dirc = NULL; char * basec = NULL; - /* char action = SDB_TYPE_INSERT; */ if (pTable == NULL) return; - sdbTrace("Table:%s, save the snapshop", pTable->name); + sdbTrace("Table:%s, save the snapshop", pTable->tableName); char fn[128] = "\0"; - dirc = strdup(pTable->fn); - basec = strdup(pTable->fn); + dirc = strdup(pTable->fileName); + basec = strdup(pTable->fileName); sprintf(fn, "%s/.%s", dirname(dirc), basename(basec)); int32_t fd = open(fn, O_RDWR | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); tfree(dirc); @@ -851,7 +884,7 @@ void sdbSaveSnapShot(void *handle) { total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM); SRowHead *rowHead = (SRowHead *)malloc(total_size); if (rowHead == NULL) { - sdbError("failed to allocate memory while saving SDB snapshot, sdb:%s", pTable->name); + sdbError("failed to allocate memory while saving SDB snapshot, sdb:%s", pTable->tableName); return; } memset(rowHead, 0, size); @@ -867,36 +900,35 @@ void sdbSaveSnapShot(void *handle) { if (pMeta == NULL) break; rowHead->delimiter = SDB_DELIMITER; - rowHead->id = pMeta->id; + rowHead->version = pMeta->id; rowHead->rowSize = (*pTable->encodeFp)(pMeta->row, rowHead->data, pTable->maxRowSize); real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM); if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) { - sdbError("failed to get checksum while save sdb %s snapshot", pTable->name); + sdbError("failed to get checksum while save sdb %s snapshot", pTable->tableName); tfree(rowHead); return; } - /* write(fd, &action, sizeof(action)); */ - /* size += sizeof(action); */ - twrite(fd, rowHead, real_size); - size += real_size; - twrite(fd, &sdbEcommit, sizeof(sdbEcommit)); - size += sizeof(sdbEcommit); - numOfRows++; + twrite(fd, rowHead, real_size); + size += real_size; + twrite(fd, &sdbEcommit, sizeof(sdbEcommit)); + size += sizeof(sdbEcommit); + numOfRows++; } tfree(rowHead); // Remove the old file tclose(pTable->fd); - remove(pTable->fn); + remove(pTable->fileName); // Rename the .sdb.db file to sdb.db file - rename(fn, pTable->fn); + rename(fn, pTable->fileName); pTable->fd = fd; - pTable->size = size; + pTable->fileSize = size; pTable->numOfRows = numOfRows; fdatasync(pTable->fd); + */ } void *sdbFetchRow(void *handle, void *pNode, void **ppRow) { -- GitLab