提交 a3580fd0 编写于 作者: H Hongze Cheng

implement TD-2088

上级 6272bd4a
...@@ -442,8 +442,6 @@ void tsdbCloseBufPool(STsdbRepo* pRepo); ...@@ -442,8 +442,6 @@ void tsdbCloseBufPool(STsdbRepo* pRepo);
SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo); SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo);
// ------------------ tsdbMemTable.c // ------------------ tsdbMemTable.c
int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, TSKEY now, int32_t* affectedrows);
int tsdbUpdateRowInMem(STsdbRepo* pRepo, SDataRow row, STable* pTable);
int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem); int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem);
......
...@@ -40,8 +40,6 @@ static int tsdbLoadConfig(char *rootDir, STsdbCfg *pCfg); ...@@ -40,8 +40,6 @@ static int tsdbLoadConfig(char *rootDir, STsdbCfg *pCfg);
static char * tsdbGetCfgFname(char *rootDir); static char * tsdbGetCfgFname(char *rootDir);
static STsdbRepo * tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg); static STsdbRepo * tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg);
static void tsdbFreeRepo(STsdbRepo *pRepo); static void tsdbFreeRepo(STsdbRepo *pRepo);
static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter);
static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock);
static int tsdbRestoreInfo(STsdbRepo *pRepo); static int tsdbRestoreInfo(STsdbRepo *pRepo);
static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression); static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression);
static int tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep); static int tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep);
...@@ -49,8 +47,6 @@ static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks); ...@@ -49,8 +47,6 @@ static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks);
static int keyFGroupCompFunc(const void *key, const void *fgroup); static int keyFGroupCompFunc(const void *key, const void *fgroup);
static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg); static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg);
static void * tsdbDecodeCfg(void *buf, STsdbCfg *pCfg); static void * tsdbDecodeCfg(void *buf, STsdbCfg *pCfg);
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable);
static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg);
static void tsdbStartStream(STsdbRepo *pRepo); static void tsdbStartStream(STsdbRepo *pRepo);
static void tsdbStopStream(STsdbRepo *pRepo); static void tsdbStopStream(STsdbRepo *pRepo);
...@@ -162,40 +158,6 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { ...@@ -162,40 +158,6 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
tsdbDebug("vgId:%d repository is closed", vgId); tsdbDebug("vgId:%d repository is closed", vgId);
} }
int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) {
STsdbRepo * pRepo = (STsdbRepo *)repo;
SSubmitMsgIter msgIter = {0};
if (tsdbScanAndConvertSubmitMsg(pRepo, pMsg) < 0) {
if (terrno != TSDB_CODE_TDB_TABLE_RECONFIGURE) {
tsdbError("vgId:%d failed to insert data since %s", REPO_ID(pRepo), tstrerror(terrno));
}
return -1;
}
if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) {
tsdbError("vgId:%d failed to insert data since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
SSubmitBlk *pBlock = NULL;
int32_t affectedrows = 0;
TSKEY now = taosGetTimestamp(pRepo->config.precision);
while (true) {
tsdbGetSubmitMsgNext(&msgIter, &pBlock);
if (pBlock == NULL) break;
if (tsdbInsertDataToTable(pRepo, pBlock, now, &affectedrows) < 0) {
return -1;
}
}
if (pRsp != NULL) pRsp->affectedRows = htonl(affectedrows);
if (tsdbCheckCommit(pRepo) < 0) return -1;
return 0;
}
uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size) { uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
// STsdbMeta *pMeta = pRepo->tsdbMeta; // STsdbMeta *pMeta = pRepo->tsdbMeta;
...@@ -720,42 +682,6 @@ static void tsdbFreeRepo(STsdbRepo *pRepo) { ...@@ -720,42 +682,6 @@ static void tsdbFreeRepo(STsdbRepo *pRepo) {
} }
} }
static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) {
if (pMsg == NULL) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return -1;
}
pIter->totalLen = pMsg->length;
pIter->len = 0;
pIter->pMsg = pMsg;
if (pMsg->length <= TSDB_SUBMIT_MSG_HEAD_SIZE) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return -1;
}
return 0;
}
static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
if (pIter->len == 0) {
pIter->len += TSDB_SUBMIT_MSG_HEAD_SIZE;
} else {
SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen);
}
if (pIter->len > pIter->totalLen) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
*pPBlock = NULL;
return -1;
}
*pPBlock = (pIter->len == pIter->totalLen) ? NULL : (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
return 0;
}
static int tsdbRestoreInfo(STsdbRepo *pRepo) { static int tsdbRestoreInfo(STsdbRepo *pRepo) {
STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbMeta * pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH; STsdbFileH *pFileH = pRepo->tsdbFileH;
...@@ -885,134 +811,6 @@ static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) { ...@@ -885,134 +811,6 @@ static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) {
return buf; return buf;
} }
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable) {
ASSERT(pTable != NULL);
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
int sversion = schemaVersion(pSchema);
if (pBlock->sversion == sversion) {
return 0;
} else {
if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE) { // stream table is not allowed to change schema
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
return -1;
}
}
if (pBlock->sversion > sversion) { // may need to update table schema
if (pBlock->schemaLen > 0) {
tsdbDebug(
"vgId:%d table %s tid %d uid %" PRIu64 " schema version %d is out of data, client version %d, update...",
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), sversion, pBlock->sversion);
ASSERT(pBlock->schemaLen % sizeof(STColumn) == 0);
int numOfCols = pBlock->schemaLen / sizeof(STColumn);
STColumn *pTCol = (STColumn *)pBlock->data;
STSchemaBuilder schemaBuilder = {0};
if (tdInitTSchemaBuilder(&schemaBuilder, pBlock->sversion) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("vgId:%d failed to update schema of table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
tstrerror(terrno));
return -1;
}
for (int i = 0; i < numOfCols; i++) {
if (tdAddColToSchema(&schemaBuilder, pTCol[i].type, htons(pTCol[i].colId), htons(pTCol[i].bytes)) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("vgId:%d failed to update schema of table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
tstrerror(terrno));
tdDestroyTSchemaBuilder(&schemaBuilder);
return -1;
}
}
STSchema *pNSchema = tdGetSchemaFromBuilder(&schemaBuilder);
if (pNSchema == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tdDestroyTSchemaBuilder(&schemaBuilder);
return -1;
}
tdDestroyTSchemaBuilder(&schemaBuilder);
tsdbUpdateTableSchema(pRepo, pTable, pNSchema, true);
} else {
tsdbDebug(
"vgId:%d table %s tid %d uid %" PRIu64 " schema version %d is out of data, client version %d, reconfigure...",
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), sversion, pBlock->sversion);
terrno = TSDB_CODE_TDB_TABLE_RECONFIGURE;
return -1;
}
} else {
ASSERT(pBlock->sversion >= 0);
if (tsdbGetTableSchemaImpl(pTable, false, false, pBlock->sversion) == NULL) {
tsdbError("vgId:%d invalid submit schema version %d to table %s tid %d from client", REPO_ID(pRepo),
pBlock->sversion, TABLE_CHAR_NAME(pTable), TABLE_TID(pTable));
}
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
return -1;
}
return 0;
}
static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) {
ASSERT(pMsg != NULL);
STsdbMeta * pMeta = pRepo->tsdbMeta;
SSubmitMsgIter msgIter = {0};
SSubmitBlk * pBlock = NULL;
terrno = TSDB_CODE_SUCCESS;
pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
while (true) {
if (tsdbGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
if (pBlock == NULL) break;
pBlock->uid = htobe64(pBlock->uid);
pBlock->tid = htonl(pBlock->tid);
pBlock->sversion = htonl(pBlock->sversion);
pBlock->dataLen = htonl(pBlock->dataLen);
pBlock->schemaLen = htonl(pBlock->schemaLen);
pBlock->numOfRows = htons(pBlock->numOfRows);
if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) {
tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid,
pBlock->tid);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return -1;
}
STable *pTable = pMeta->tables[pBlock->tid];
if (pTable == NULL || TABLE_UID(pTable) != pBlock->uid) {
tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid,
pBlock->tid);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return -1;
}
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
tsdbError("vgId:%d invalid action trying to insert a super table %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable));
terrno = TSDB_CODE_TDB_INVALID_ACTION;
return -1;
}
// Check schema version and update schema if needed
if (tsdbCheckTableSchema(pRepo, pBlock, pTable) < 0) {
if (terrno == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
continue;
} else {
return -1;
}
}
}
if (terrno != TSDB_CODE_SUCCESS) return -1;
return 0;
}
static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) { static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) {
// TODO // TODO
// STsdbCache *pCache = pRepo->tsdbCache; // STsdbCache *pCache = pRepo->tsdbCache;
......
...@@ -34,148 +34,46 @@ static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables); ...@@ -34,148 +34,46 @@ static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables);
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SDataRow row); static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SDataRow row);
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter); static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter);
static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter); static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter);
static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg);
static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *affectedrows);
static int tsdbCopyRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable, void **ppRow);
static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter);
static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock);
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable);
static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **rows, int rowCounter);
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SDataRow row, TSKEY minKey, TSKEY maxKey, static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SDataRow row, TSKEY minKey, TSKEY maxKey,
TSKEY now); TSKEY now);
// ---------------- INTERNAL FUNCTIONS ---------------- int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) {
int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY now, int32_t *affectedrows) { STsdbRepo * pRepo = (STsdbRepo *)repo;
STsdbMeta * pMeta = pRepo->tsdbMeta; SSubmitMsgIter msgIter = {0};
int64_t points = 0; SSubmitBlk * pBlock = NULL;
STable * pTable = NULL; int32_t affectedrows = 0;
SSubmitBlkIter blkIter = {0};
SDataRow row = NULL;
TSKEY minKey = 0;
TSKEY maxKey = 0;
void ** pData = NULL;
// int rowCounter = 0;
ASSERT(pBlock->tid < pMeta->maxTables);
pTable = pMeta->tables[pBlock->tid];
ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid);
minKey = now - tsMsPerDay[pRepo->config.precision] * pRepo->config.keep;
maxKey = now + tsMsPerDay[pRepo->config.precision] * pRepo->config.daysPerFile;
pData = (void **)calloc(pBlock->numOfRows, sizeof(void *));
if (pData == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
tsdbInitSubmitBlkIter(pBlock, &blkIter);
while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) {
if (tsdbCheckRowRange(pRepo, pTable, row, minKey, maxKey, now) < 0) return -1;
if (tsdbUpdateRowInMem(pRepo, row, pTable) < 0) return -1;
(*affectedrows)++;
points++;
}
STSchema *pSchema = tsdbGetTableSchemaByVersion(pTable, pBlock->sversion);
pRepo->stat.pointsWritten += points * schemaNCols(pSchema);
pRepo->stat.totalStorage += points * schemaVLen(pSchema);
return 0;
}
int tsdbUpdateRowInMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
STsdbCfg * pCfg = &pRepo->config;
STsdbMeta * pMeta = pRepo->tsdbMeta;
TKEY tkey = dataRowTKey(row);
TSKEY key = dataRowKey(row);
SMemTable * pMemTable = pRepo->mem;
STableData *pTableData = NULL;
bool isRowDelete = TKEY_IS_DELETED(tkey);
if (isRowDelete) {
if (!pCfg->update) {
tsdbWarn("vgId:%d vnode is not allowed to update but try to delete a data row", REPO_ID(pRepo));
terrno = TSDB_CODE_TDB_INVALID_ACTION;
return -1;
}
if (key > TABLE_LASTKEY(pTable)) { if (tsdbScanAndConvertSubmitMsg(pRepo, pMsg) < 0) {
tsdbTrace("vgId:%d skip to delete row key %" PRId64 " which is larger than table lastKey %" PRId64, if (terrno != TSDB_CODE_TDB_TABLE_RECONFIGURE) {
REPO_ID(pRepo), key, TABLE_LASTKEY(pTable)); tsdbError("vgId:%d failed to insert data since %s", REPO_ID(pRepo), tstrerror(terrno));
return 0;
} }
}
void *pRow = tsdbAllocBytes(pRepo, dataRowLen(row));
if (pRow == NULL) {
tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s",
REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), dataRowLen(row), tstrerror(terrno));
return -1; return -1;
} }
dataRowCpy(pRow, row); tsdbInitSubmitMsgIter(pMsg, &msgIter);
while (true) {
// Operations above may change pRepo->mem, retake those values tsdbGetSubmitMsgNext(&msgIter, &pBlock);
ASSERT(pRepo->mem != NULL); if (pBlock == NULL) break;
pMemTable = pRepo->mem; if (tsdbInsertDataToTable(pRepo, pBlock, &affectedrows) < 0) {
if (TABLE_TID(pTable) >= pMemTable->maxTables) {
if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) {
tsdbFreeBytes(pRepo, pRow, dataRowLen(row));
return -1;
}
}
pTableData = pMemTable->tData[TABLE_TID(pTable)];
if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) {
if (pTableData != NULL) {
taosWLockLatch(&(pMemTable->latch));
pMemTable->tData[TABLE_TID(pTable)] = NULL;
tsdbFreeTableData(pTableData);
taosWUnLockLatch(&(pMemTable->latch));
}
pTableData = tsdbNewTableData(pCfg, pTable);
if (pTableData == NULL) {
tsdbError("vgId:%d failed to insert row with key %" PRId64
" to table %s while create new table data object since %s",
REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), tstrerror(terrno));
tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row));
return -1; return -1;
} }
pRepo->mem->tData[TABLE_TID(pTable)] = pTableData;
}
ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable));
int64_t oldSize = SL_SIZE(pTableData->pData);
if (tSkipListPut(pTableData->pData, pRow) == NULL) {
tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row));
} else {
int64_t deltaSize = SL_SIZE(pTableData->pData) - oldSize;
if (isRowDelete) {
if (TABLE_LASTKEY(pTable) == key) {
// TODO: need to update table last key here (may from file)
}
} else {
if (TABLE_LASTKEY(pTable) < key) TABLE_LASTKEY(pTable) = key;
}
if (pMemTable->keyFirst > key) pMemTable->keyFirst = key;
if (pMemTable->keyLast < key) pMemTable->keyLast = key;
pMemTable->numOfRows += deltaSize;
if (pTableData->keyFirst > key) pTableData->keyFirst = key;
if (pTableData->keyLast < key) pTableData->keyLast = key;
pTableData->numOfRows += deltaSize;
} }
tsdbTrace("vgId:%d a row is %s table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo), if (pRsp != NULL) pRsp->affectedRows = htonl(affectedrows);
isRowDelete ? "deleted from" : "updated in", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable),
key);
if (tsdbCheckCommit(pRepo) < 0) return -1;
return 0; return 0;
} }
// ---------------- INTERNAL FUNCTIONS ----------------
int tsdbRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { int tsdbRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
if (pMemTable == NULL) return 0; if (pMemTable == NULL) return 0;
int ref = T_REF_INC(pMemTable); int ref = T_REF_INC(pMemTable);
...@@ -930,5 +828,327 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SDat ...@@ -930,5 +828,327 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SDat
return -1; return -1;
} }
return 0;
}
static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) {
ASSERT(pMsg != NULL);
STsdbMeta * pMeta = pRepo->tsdbMeta;
SSubmitMsgIter msgIter = {0};
SSubmitBlk * pBlock = NULL;
SSubmitBlkIter blkIter = {0};
SDataRow row = NULL;
TSKEY now = taosGetTimestamp(pRepo->config.precision);
TSKEY minKey = now - tsMsPerDay[pRepo->config.precision] * pRepo->config.keep;
TSKEY maxKey = now + tsMsPerDay[pRepo->config.precision] * pRepo->config.daysPerFile;
terrno = TSDB_CODE_SUCCESS;
pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
while (true) {
if (tsdbGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
if (pBlock == NULL) break;
pBlock->uid = htobe64(pBlock->uid);
pBlock->tid = htonl(pBlock->tid);
pBlock->sversion = htonl(pBlock->sversion);
pBlock->dataLen = htonl(pBlock->dataLen);
pBlock->schemaLen = htonl(pBlock->schemaLen);
pBlock->numOfRows = htons(pBlock->numOfRows);
if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) {
tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid,
pBlock->tid);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return -1;
}
STable *pTable = pMeta->tables[pBlock->tid];
if (pTable == NULL || TABLE_UID(pTable) != pBlock->uid) {
tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid,
pBlock->tid);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return -1;
}
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
tsdbError("vgId:%d invalid action trying to insert a super table %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable));
terrno = TSDB_CODE_TDB_INVALID_ACTION;
return -1;
}
// Check schema version and update schema if needed
if (tsdbCheckTableSchema(pRepo, pBlock, pTable) < 0) {
if (terrno == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
continue;
} else {
return -1;
}
}
tsdbInitSubmitBlkIter(pBlock, &blkIter);
while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) {
if (tsdbCheckRowRange(pRepo, pTable, row, minKey, maxKey, now) < 0) {
return -1;
}
}
}
if (terrno != TSDB_CODE_SUCCESS) return -1;
return 0;
}
static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *affectedrows) {
STsdbMeta * pMeta = pRepo->tsdbMeta;
int64_t points = 0;
STable * pTable = NULL;
SSubmitBlkIter blkIter = {0};
SDataRow row = NULL;
void ** rows = NULL;
int rowCounter = 0;
ASSERT(pBlock->tid < pMeta->maxTables);
pTable = pMeta->tables[pBlock->tid];
ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid);
rows = (void **)calloc(pBlock->numOfRows, sizeof(void *));
if (rows == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
tsdbInitSubmitBlkIter(pBlock, &blkIter);
while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) {
if (tsdbCopyRowToMem(pRepo, row, pTable, &(rows[rowCounter])) < 0) {
free(rows);
return -1;
}
(*affectedrows)++;
points++;
if (rows[rowCounter] != NULL) {
rowCounter++;
}
}
if (tsdbInsertDataToTableImpl(pRepo, pTable, rows, rowCounter) < 0) {
free(rows);
return -1;
}
STSchema *pSchema = tsdbGetTableSchemaByVersion(pTable, pBlock->sversion);
pRepo->stat.pointsWritten += points * schemaNCols(pSchema);
pRepo->stat.totalStorage += points * schemaVLen(pSchema);
free(rows);
return 0;
}
static int tsdbCopyRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable, void **ppRow) {
STsdbCfg * pCfg = &pRepo->config;
TKEY tkey = dataRowTKey(row);
TSKEY key = dataRowKey(row);
bool isRowDelete = TKEY_IS_DELETED(tkey);
if (isRowDelete) {
if (!pCfg->update) {
tsdbWarn("vgId:%d vnode is not allowed to update but try to delete a data row", REPO_ID(pRepo));
terrno = TSDB_CODE_TDB_INVALID_ACTION;
return -1;
}
if (key > TABLE_LASTKEY(pTable)) {
tsdbTrace("vgId:%d skip to delete row key %" PRId64 " which is larger than table lastKey %" PRId64,
REPO_ID(pRepo), key, TABLE_LASTKEY(pTable));
return 0;
}
}
void *pRow = tsdbAllocBytes(pRepo, dataRowLen(row));
if (pRow == NULL) {
tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s",
REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), dataRowLen(row), tstrerror(terrno));
return -1;
}
dataRowCpy(pRow, row);
ppRow[0] = pRow;
tsdbTrace("vgId:%d a row is %s table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo),
isRowDelete ? "deleted from" : "updated in", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable),
key);
return 0;
}
static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) {
if (pMsg == NULL) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return -1;
}
pIter->totalLen = pMsg->length;
pIter->len = 0;
pIter->pMsg = pMsg;
if (pMsg->length <= TSDB_SUBMIT_MSG_HEAD_SIZE) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return -1;
}
return 0;
}
static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
if (pIter->len == 0) {
pIter->len += TSDB_SUBMIT_MSG_HEAD_SIZE;
} else {
SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen);
}
if (pIter->len > pIter->totalLen) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
*pPBlock = NULL;
return -1;
}
*pPBlock = (pIter->len == pIter->totalLen) ? NULL : (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
return 0;
}
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable) {
ASSERT(pTable != NULL);
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
int sversion = schemaVersion(pSchema);
if (pBlock->sversion == sversion) {
return 0;
} else {
if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE) { // stream table is not allowed to change schema
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
return -1;
}
}
if (pBlock->sversion > sversion) { // may need to update table schema
if (pBlock->schemaLen > 0) {
tsdbDebug(
"vgId:%d table %s tid %d uid %" PRIu64 " schema version %d is out of data, client version %d, update...",
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), sversion, pBlock->sversion);
ASSERT(pBlock->schemaLen % sizeof(STColumn) == 0);
int numOfCols = pBlock->schemaLen / sizeof(STColumn);
STColumn *pTCol = (STColumn *)pBlock->data;
STSchemaBuilder schemaBuilder = {0};
if (tdInitTSchemaBuilder(&schemaBuilder, pBlock->sversion) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("vgId:%d failed to update schema of table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
tstrerror(terrno));
return -1;
}
for (int i = 0; i < numOfCols; i++) {
if (tdAddColToSchema(&schemaBuilder, pTCol[i].type, htons(pTCol[i].colId), htons(pTCol[i].bytes)) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("vgId:%d failed to update schema of table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
tstrerror(terrno));
tdDestroyTSchemaBuilder(&schemaBuilder);
return -1;
}
}
STSchema *pNSchema = tdGetSchemaFromBuilder(&schemaBuilder);
if (pNSchema == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tdDestroyTSchemaBuilder(&schemaBuilder);
return -1;
}
tdDestroyTSchemaBuilder(&schemaBuilder);
tsdbUpdateTableSchema(pRepo, pTable, pNSchema, true);
} else {
tsdbDebug(
"vgId:%d table %s tid %d uid %" PRIu64 " schema version %d is out of data, client version %d, reconfigure...",
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), sversion, pBlock->sversion);
terrno = TSDB_CODE_TDB_TABLE_RECONFIGURE;
return -1;
}
} else {
ASSERT(pBlock->sversion >= 0);
if (tsdbGetTableSchemaImpl(pTable, false, false, pBlock->sversion) == NULL) {
tsdbError("vgId:%d invalid submit schema version %d to table %s tid %d from client", REPO_ID(pRepo),
pBlock->sversion, TABLE_CHAR_NAME(pTable), TABLE_TID(pTable));
}
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
return -1;
}
return 0;
}
static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **rows, int rowCounter) {
if (rowCounter < 1) return 0;
SMemTable * pMemTable = NULL;
STableData *pTableData = NULL;
STsdbMeta * pMeta = pRepo->tsdbMeta;
STsdbCfg * pCfg = &(pRepo->config);
ASSERT(pRepo->mem != NULL);
pMemTable = pRepo->mem;
if (TABLE_TID(pTable) >= pMemTable->maxTables) {
if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) {
for (int i = rowCounter - 1; i >= 0; i--) {
tsdbFreeBytes(pRepo, rows[i], dataRowLen(rows[i]));
}
return -1;
}
}
pTableData = pMemTable->tData[TABLE_TID(pTable)];
if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) {
if (pTableData != NULL) {
taosWLockLatch(&(pMemTable->latch));
pMemTable->tData[TABLE_TID(pTable)] = NULL;
tsdbFreeTableData(pTableData);
taosWUnLockLatch(&(pMemTable->latch));
}
pTableData = tsdbNewTableData(pCfg, pTable);
if (pTableData == NULL) {
tsdbError("vgId:%d failed to insert data to table %s uid %" PRId64 " tid %d since %s", REPO_ID(pRepo),
TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable), tstrerror(terrno));
for (int i = rowCounter - 1; i >= 0; i--) {
tsdbFreeBytes(pRepo, rows[i], dataRowLen(rows[i]));
}
return -1;
}
pRepo->mem->tData[TABLE_TID(pTable)] = pTableData;
}
ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable));
int64_t osize = SL_SIZE(pTableData->pData);
tSkipListPutBatch(pTableData->pData, rows, rowCounter);
int64_t dsize = SL_SIZE(pTableData->pData) - osize;
if (pMemTable->keyFirst > dataRowKey(rows[0])) pMemTable->keyFirst = dataRowKey(rows[0]);
if (pMemTable->keyLast < dataRowKey(rows[rowCounter - 1])) pMemTable->keyLast = dataRowKey(rows[rowCounter - 1]);
pMemTable->numOfRows += dsize;
if (pTableData->keyFirst > dataRowKey(rows[0])) pTableData->keyFirst = dataRowKey(rows[0]);
if (pTableData->keyLast < dataRowKey(rows[rowCounter - 1])) pTableData->keyLast = dataRowKey(rows[rowCounter - 1]);
pTableData->numOfRows += dsize;
// TODO: impl delete row thing
if (TABLE_LASTKEY(pTable) < dataRowKey(rows[rowCounter-1])) TABLE_LASTKEY(pTable) = dataRowKey(rows[rowCounter-1]);
return 0; return 0;
} }
\ No newline at end of file
...@@ -131,6 +131,7 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, _ ...@@ -131,6 +131,7 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, _
__sl_key_fn_t fn); __sl_key_fn_t fn);
void tSkipListDestroy(SSkipList *pSkipList); void tSkipListDestroy(SSkipList *pSkipList);
SSkipListNode * tSkipListPut(SSkipList *pSkipList, void *pData); SSkipListNode * tSkipListPut(SSkipList *pSkipList, void *pData);
void tSkipListPutBatch(SSkipList *pSkipList, void **ppData, int ndata);
SArray * tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey); SArray * tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey);
void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel); void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel);
SSkipListIterator *tSkipListCreateIter(SSkipList *pSkipList); SSkipListIterator *tSkipListCreateIter(SSkipList *pSkipList);
......
...@@ -24,10 +24,12 @@ static SSkipListNode * getPriorNode(SSkipList *pSkipList, const char *val, in ...@@ -24,10 +24,12 @@ static SSkipListNode * getPriorNode(SSkipList *pSkipList, const char *val, in
static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode); static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode);
static void tSkipListCorrectLevel(SSkipList *pSkipList); static void tSkipListCorrectLevel(SSkipList *pSkipList);
static SSkipListIterator *doCreateSkipListIterator(SSkipList *pSkipList, int32_t order); static SSkipListIterator *doCreateSkipListIterator(SSkipList *pSkipList, int32_t order);
static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **backward, SSkipListNode *pNode); static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **direction, SSkipListNode *pNode, bool isForward);
static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, void *pData); static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, void *pData);
static SSkipListNode * tSkipListNewNode(uint8_t level); static SSkipListNode *tSkipListNewNode(uint8_t level);
#define tSkipListFreeNode(n) tfree((n)) #define tSkipListFreeNode(n) tfree((n))
static SSkipListNode *tSkipListPutImpl(SSkipList *pSkipList, void *pData, SSkipListNode **direction, bool isForward,
bool hasDup);
static FORCE_INLINE int tSkipListWLock(SSkipList *pSkipList); static FORCE_INLINE int tSkipListWLock(SSkipList *pSkipList);
static FORCE_INLINE int tSkipListRLock(SSkipList *pSkipList); static FORCE_INLINE int tSkipListRLock(SSkipList *pSkipList);
...@@ -109,30 +111,85 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData) { ...@@ -109,30 +111,85 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData) {
if (pSkipList == NULL || pData == NULL) return NULL; if (pSkipList == NULL || pData == NULL) return NULL;
SSkipListNode *backward[MAX_SKIP_LIST_LEVEL] = {0}; SSkipListNode *backward[MAX_SKIP_LIST_LEVEL] = {0};
uint8_t dupMode = SL_DUP_MODE(pSkipList);
SSkipListNode *pNode = NULL; SSkipListNode *pNode = NULL;
tSkipListWLock(pSkipList); tSkipListWLock(pSkipList);
bool hasDup = tSkipListGetPosToPut(pSkipList, backward, pData); bool hasDup = tSkipListGetPosToPut(pSkipList, backward, pData);
pNode = tSkipListPutImpl(pSkipList, pData, backward, false, hasDup);
if (hasDup && (dupMode == SL_DISCARD_DUP_KEY || dupMode == SL_UPDATE_DUP_KEY)) { tSkipListUnlock(pSkipList);
if (dupMode == SL_UPDATE_DUP_KEY) {
pNode = SL_NODE_GET_BACKWARD_POINTER(backward[0], 0); return pNode;
atomic_store_ptr(&(pNode->pData), pData); }
}
} else { // Put a batch of data into skiplist. The batch of data must be in ascending order
pNode = tSkipListNewNode(getSkipListRandLevel(pSkipList)); void tSkipListPutBatch(SSkipList *pSkipList, void **ppData, int ndata) {
if (pNode != NULL) { SSkipListNode *backward[MAX_SKIP_LIST_LEVEL] = {0};
pNode->pData = pData; SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
bool hasDup = false;
char * pKey = NULL;
char * pDataKey = NULL;
int compare = 0;
tSkipListWLock(pSkipList);
tSkipListDoInsert(pSkipList, backward, pNode); // backward to put the first data
hasDup = tSkipListGetPosToPut(pSkipList, backward, ppData[0]);
tSkipListPutImpl(pSkipList, ppData[0], backward, false, hasDup);
for (int level = 0; level < pSkipList->maxLevel - 1; level++) {
forward[level] = SL_NODE_GET_BACKWARD_POINTER(backward[level], level);
}
// forward to put the rest of data
for (int idata = 1; idata < ndata; idata++) {
pDataKey = pSkipList->keyFn(ppData[idata]);
// Compare max key
pKey = SL_GET_MAX_KEY(pSkipList);
compare = pSkipList->comparFn(pDataKey, pKey);
if (compare > 0) {
for (int i = 0; i < pSkipList->maxLevel; i++) {
forward[i] = SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail, i);
}
hasDup = false;
} else {
SSkipListNode *px = pSkipList->pHead;
for (int i = pSkipList->maxLevel - 1; i >= 0; --i) {
if (i < pSkipList->level) {
// set new px
if (forward[i] != pSkipList->pHead) {
if (px == pSkipList->pHead ||
pSkipList->comparFn(SL_GET_NODE_KEY(pSkipList, forward[i]), SL_GET_NODE_KEY(pSkipList, px)) > 0) {
px = forward[i];
}
}
SSkipListNode *p = SL_NODE_GET_FORWARD_POINTER(px, i);
while (p != pSkipList->pTail) {
pKey = SL_GET_NODE_KEY(pSkipList, p);
compare = pSkipList->comparFn(pKey, pDataKey);
if (compare >= 0) {
if (compare == 0) hasDup = true;
break;
} else {
px = p;
p = SL_NODE_GET_FORWARD_POINTER(px, i);
}
}
}
forward[i] = px;
}
} }
tSkipListPutImpl(pSkipList, ppData[idata], forward, true, hasDup);
} }
tSkipListUnlock(pSkipList); tSkipListUnlock(pSkipList);
return pNode;
} }
uint32_t tSkipListRemove(SSkipList *pSkipList, SSkipListKey key) { uint32_t tSkipListRemove(SSkipList *pSkipList, SSkipListKey key) {
...@@ -310,22 +367,25 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) { ...@@ -310,22 +367,25 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) {
} }
} }
static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **backward, SSkipListNode *pNode) { static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **direction, SSkipListNode *pNode, bool isForward) {
for (int32_t i = 0; i < pNode->level; ++i) { for (int32_t i = 0; i < pNode->level; ++i) {
if (i >= pSkipList->level) { SSkipListNode *x = direction[i];
SL_NODE_GET_FORWARD_POINTER(pNode, i) = pSkipList->pTail; if (isForward) {
SL_NODE_GET_BACKWARD_POINTER(pNode, i) = pSkipList->pHead; SL_NODE_GET_BACKWARD_POINTER(pNode, i) = x;
SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, i) = pNode;
SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail, i) = pNode; SSkipListNode *next = SL_NODE_GET_FORWARD_POINTER(x, i);
SL_NODE_GET_BACKWARD_POINTER(next, i) = pNode;
SL_NODE_GET_FORWARD_POINTER(pNode, i) = next;
SL_NODE_GET_FORWARD_POINTER(x, i) = pNode;
} else { } else {
SSkipListNode *x = backward[i];
SL_NODE_GET_FORWARD_POINTER(pNode, i) = x; SL_NODE_GET_FORWARD_POINTER(pNode, i) = x;
SSkipListNode *prev = SL_NODE_GET_BACKWARD_POINTER(x, i); SSkipListNode *prev = SL_NODE_GET_BACKWARD_POINTER(x, i);
SL_NODE_GET_FORWARD_POINTER(prev, i) = pNode; SL_NODE_GET_FORWARD_POINTER(prev, i) = pNode;
SL_NODE_GET_BACKWARD_POINTER(x, i) = pNode;
SL_NODE_GET_BACKWARD_POINTER(pNode, i) = prev; SL_NODE_GET_BACKWARD_POINTER(pNode, i) = prev;
SL_NODE_GET_BACKWARD_POINTER(x, i) = pNode;
} }
} }
...@@ -377,7 +437,7 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, ...@@ -377,7 +437,7 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward,
char * pDataKey = pSkipList->keyFn(pData); char * pDataKey = pSkipList->keyFn(pData);
if (pSkipList->size == 0) { if (pSkipList->size == 0) {
for (int i = 0; i < pSkipList->level; i++) { for (int i = 0; i < pSkipList->maxLevel; i++) {
backward[i] = pSkipList->pTail; backward[i] = pSkipList->pTail;
} }
} else { } else {
...@@ -387,7 +447,7 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, ...@@ -387,7 +447,7 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward,
pKey = SL_GET_MAX_KEY(pSkipList); pKey = SL_GET_MAX_KEY(pSkipList);
compare = pSkipList->comparFn(pDataKey, pKey); compare = pSkipList->comparFn(pDataKey, pKey);
if (compare >= 0) { if (compare >= 0) {
for (int i = 0; i < pSkipList->level; i++) { for (int i = 0; i < pSkipList->maxLevel; i++) {
backward[i] = pSkipList->pTail; backward[i] = pSkipList->pTail;
} }
...@@ -398,7 +458,7 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, ...@@ -398,7 +458,7 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward,
pKey = SL_GET_MIN_KEY(pSkipList); pKey = SL_GET_MIN_KEY(pSkipList);
compare = pSkipList->comparFn(pDataKey, pKey); compare = pSkipList->comparFn(pDataKey, pKey);
if (compare < 0) { if (compare < 0) {
for (int i = 0; i < pSkipList->level; i++) { for (int i = 0; i < pSkipList->maxLevel; i++) {
backward[i] = SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, i); backward[i] = SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, i);
} }
...@@ -406,18 +466,20 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, ...@@ -406,18 +466,20 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward,
} }
SSkipListNode *px = pSkipList->pTail; SSkipListNode *px = pSkipList->pTail;
for (int i = pSkipList->level - 1; i >= 0; --i) { for (int i = pSkipList->maxLevel - 1; i >= 0; --i) {
SSkipListNode *p = SL_NODE_GET_BACKWARD_POINTER(px, i); if (i < pSkipList->level) {
while (p != pSkipList->pHead) { SSkipListNode *p = SL_NODE_GET_BACKWARD_POINTER(px, i);
pKey = SL_GET_NODE_KEY(pSkipList, p); while (p != pSkipList->pHead) {
pKey = SL_GET_NODE_KEY(pSkipList, p);
compare = pSkipList->comparFn(pKey, pDataKey);
if (compare <= 0) { compare = pSkipList->comparFn(pKey, pDataKey);
if (compare == 0 && !hasDupKey) hasDupKey = true; if (compare <= 0) {
break; if (compare == 0 && !hasDupKey) hasDupKey = true;
} else { break;
px = p; } else {
p = SL_NODE_GET_BACKWARD_POINTER(px, i); px = p;
p = SL_NODE_GET_BACKWARD_POINTER(px, i);
}
} }
} }
...@@ -579,6 +641,32 @@ static SSkipListNode *tSkipListNewNode(uint8_t level) { ...@@ -579,6 +641,32 @@ static SSkipListNode *tSkipListNewNode(uint8_t level) {
return pNode; return pNode;
} }
static SSkipListNode *tSkipListPutImpl(SSkipList *pSkipList, void *pData, SSkipListNode **direction, bool isForward,
bool hasDup) {
uint8_t dupMode = SL_DUP_MODE(pSkipList);
SSkipListNode *pNode = NULL;
if (hasDup && (dupMode == SL_DISCARD_DUP_KEY || dupMode == SL_UPDATE_DUP_KEY)) {
if (dupMode == SL_UPDATE_DUP_KEY) {
if (isForward) {
pNode = SL_NODE_GET_FORWARD_POINTER(direction[0], 0);
} else {
pNode = SL_NODE_GET_BACKWARD_POINTER(direction[0], 0);
}
atomic_store_ptr(&(pNode->pData), pData);
}
} else {
pNode = tSkipListNewNode(getSkipListRandLevel(pSkipList));
if (pNode != NULL) {
pNode->pData = pData;
tSkipListDoInsert(pSkipList, direction, pNode, isForward);
}
}
return pNode;
}
// static int32_t tSkipListEndParQuery(SSkipList *pSkipList, SSkipListNode *pStartNode, SSkipListKey *pEndKey, // static int32_t tSkipListEndParQuery(SSkipList *pSkipList, SSkipListNode *pStartNode, SSkipListKey *pEndKey,
// int32_t cond, SSkipListNode ***pRes) { // int32_t cond, SSkipListNode ***pRes) {
// pthread_rwlock_rdlock(&pSkipList->lock); // pthread_rwlock_rdlock(&pSkipList->lock);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册