提交 c7eab2a1 编写于 作者: H Hongze Cheng

make insert ts to mem work

上级 e9db1ee7
...@@ -194,10 +194,10 @@ typedef struct { ...@@ -194,10 +194,10 @@ typedef struct {
void* pMsg; void* pMsg;
} SSubmitMsgIter; } SSubmitMsgIter;
int tsdbInitSubmitMsgIter(SSubmitMsg* pMsg, SSubmitMsgIter* pIter); int tInitSubmitMsgIter(SSubmitMsg* pMsg, SSubmitMsgIter* pIter);
int tsdbGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); int tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
int tsdbInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter); int tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
SMemRow tsdbGetSubmitBlkNext(SSubmitBlkIter* pIter); SMemRow tGetSubmitBlkNext(SSubmitBlkIter* pIter);
typedef struct { typedef struct {
int32_t index; // index of failed block in submit blocks int32_t index; // index of failed block in submit blocks
......
...@@ -27,7 +27,7 @@ ...@@ -27,7 +27,7 @@
#undef TD_MSG_SEG_CODE_ #undef TD_MSG_SEG_CODE_
#include "tmsgdef.h" #include "tmsgdef.h"
int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) { int tInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) {
if (pMsg == NULL) { if (pMsg == NULL) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return -1; return -1;
...@@ -44,7 +44,7 @@ int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) { ...@@ -44,7 +44,7 @@ int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) {
return 0; return 0;
} }
int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { int tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
if (pIter->len == 0) { if (pIter->len == 0) {
pIter->len += sizeof(SSubmitMsg); pIter->len += sizeof(SSubmitMsg);
} else { } else {
...@@ -63,7 +63,7 @@ int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { ...@@ -63,7 +63,7 @@ int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
return 0; return 0;
} }
int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { int tInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
if (pBlock->dataLen <= 0) return -1; if (pBlock->dataLen <= 0) return -1;
pIter->totalLen = pBlock->dataLen; pIter->totalLen = pBlock->dataLen;
pIter->len = 0; pIter->len = 0;
...@@ -71,18 +71,18 @@ int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { ...@@ -71,18 +71,18 @@ int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
return 0; return 0;
} }
SMemRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) { SMemRow tGetSubmitBlkNext(SSubmitBlkIter *pIter) {
SMemRow row = pIter->row; // firstly, get current row SMemRow row = pIter->row;
if (row == NULL) return NULL;
pIter->len += memRowTLen(row); if (pIter->len >= pIter->totalLen) {
if (pIter->len >= pIter->totalLen) { // reach the end return NULL;
pIter->row = NULL;
} else { } else {
pIter->row = (char *)row + memRowTLen(row); // secondly, move to next row pIter->len += memRowTLen(row);
if (pIter->len < pIter->totalLen) {
pIter->row = POINTER_SHIFT(row, memRowTLen(row));
} }
return row; return row;
}
} }
int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
......
...@@ -108,9 +108,9 @@ int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg, ...@@ -108,9 +108,9 @@ int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg,
return -1; return -1;
} }
tsdbInitSubmitMsgIter(pMsg, &msgIter); tInitSubmitMsgIter(pMsg, &msgIter);
while (true) { while (true) {
tsdbGetSubmitMsgNext(&msgIter, &pBlock); tGetSubmitMsgNext(&msgIter, &pBlock);
if (pBlock == NULL) break; if (pBlock == NULL) break;
if (tsdbMemTableInsertTbData(pTsdb, pBlock, &affectedrows) < 0) { if (tsdbMemTableInsertTbData(pTsdb, pBlock, &affectedrows) < 0) {
return -1; return -1;
...@@ -142,9 +142,9 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitMsg *pMsg) { ...@@ -142,9 +142,9 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitMsg *pMsg) {
pMsg->length = htonl(pMsg->length); pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
while (true) { while (true) {
if (tsdbGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
if (pBlock == NULL) break; if (pBlock == NULL) break;
pBlock->uid = htobe64(pBlock->uid); pBlock->uid = htobe64(pBlock->uid);
...@@ -206,6 +206,10 @@ static int tsdbMemTableInsertTbData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *p ...@@ -206,6 +206,10 @@ static int tsdbMemTableInsertTbData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *p
STsdbMemTable *pMemTable = pTsdb->mem; STsdbMemTable *pMemTable = pTsdb->mem;
void * tptr; void * tptr;
STbData * pTbData; STbData * pTbData;
SMemRow row;
TSKEY keyMin;
TSKEY keyMax;
// SMemTable *pMemTable = NULL; // SMemTable *pMemTable = NULL;
// STableData *pTableData = NULL; // STableData *pTableData = NULL;
// STsdbCfg *pCfg = &(pRepo->config); // STsdbCfg *pCfg = &(pRepo->config);
...@@ -226,46 +230,22 @@ static int tsdbMemTableInsertTbData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *p ...@@ -226,46 +230,22 @@ static int tsdbMemTableInsertTbData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *p
pTbData = *(STbData **)tptr; pTbData = *(STbData **)tptr;
} }
tsdbInitSubmitBlkIter(pBlock, &blkIter); tInitSubmitBlkIter(pBlock, &blkIter);
if (blkIter.row == NULL) return 0; if (blkIter.row == NULL) return 0;
TSKEY firstRowKey = memRowKey(blkIter.row); keyMin = memRowKey(blkIter.row);
// tsdbAllocBytes(pRepo, 0);
// pMemTable = pRepo->mem;
// ASSERT(pMemTable != NULL);
// ASSERT(pBlock->tid < pMeta->maxTables);
// pTable = pMeta->tables[pBlock->tid]; tSkipListPutBatchByIter(pTbData->pData, &blkIter, (iter_next_fn_t)tGetSubmitBlkNext);
// ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid); // Set statistics
keyMax = memRowKey(blkIter.row);
// if (TABLE_TID(pTable) >= pMemTable->maxTables) {
// if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) {
// return -1;
// }
// }
// pTableData = pMemTable->tData[TABLE_TID(pTable)];
// if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) {
// if (pTableData != NULL) {
// taosWLockLatch(&(pMemTable->latch));
// pMemTable->tData[TABLE_TID(pTable)] = NULL;
// tsdbFreeTableData(pTableData);
// taosWUnLockLatch(&(pMemTable->latch));
// }
// pTableData = tsdbNewTableData(pCfg, pTable);
// if (pTableData == NULL) {
// tsdbError("vgId:%d failed to insert data to table %s uid %" PRId64 " tid %d since %s", REPO_ID(pRepo),
// TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable), tstrerror(terrno));
// return -1;
// }
// pRepo->mem->tData[TABLE_TID(pTable)] = pTableData; pTbData->nrows += pBlock->numOfRows;
// } if (pTbData->keyMin > keyMin) pTbData->keyMin = keyMin;
if (pTbData->keyMax < keyMax) pTbData->keyMax = keyMax;
// ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable)); pMemTable->nRow += pBlock->numOfRows;
if (pMemTable->keyMin > keyMin) pMemTable->keyMin = keyMin;
if (pMemTable->keyMax < keyMax) pMemTable->keyMax = keyMax;
// SMemRow lastRow = NULL; // SMemRow lastRow = NULL;
// int64_t osize = SL_SIZE(pTableData->pData); // int64_t osize = SL_SIZE(pTableData->pData);
...@@ -632,45 +612,6 @@ int tsdbSyncCommitConfig(STsdbRepo* pRepo) { ...@@ -632,45 +612,6 @@ int tsdbSyncCommitConfig(STsdbRepo* pRepo) {
return 0; return 0;
} }
int tsdbAsyncCommit(STsdbRepo *pRepo) {
tsem_wait(&(pRepo->readyToCommit));
ASSERT(pRepo->imem == NULL);
if (pRepo->mem == NULL) {
tsem_post(&(pRepo->readyToCommit));
return 0;
}
if (pRepo->code != TSDB_CODE_SUCCESS) {
tsdbWarn("vgId:%d try to commit when TSDB not in good state: %s", REPO_ID(pRepo), tstrerror(terrno));
}
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START, TSDB_CODE_SUCCESS);
if (tsdbLockRepo(pRepo) < 0) return -1;
pRepo->imem = pRepo->mem;
pRepo->mem = NULL;
tsdbScheduleCommit(pRepo, COMMIT_REQ);
if (tsdbUnlockRepo(pRepo) < 0) return -1;
return 0;
}
int tsdbSyncCommit(STsdbRepo *repo) {
STsdbRepo *pRepo = repo;
tsdbAsyncCommit(pRepo);
tsem_wait(&(pRepo->readyToCommit));
tsem_post(&(pRepo->readyToCommit));
if (pRepo->code != TSDB_CODE_SUCCESS) {
terrno = pRepo->code;
return -1;
} else {
terrno = TSDB_CODE_SUCCESS;
return 0;
}
}
/** /**
* This is an important function to load data or try to load data from memory skiplist iterator. * This is an important function to load data or try to load data from memory skiplist iterator.
* *
...@@ -834,28 +775,6 @@ static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema * ...@@ -834,28 +775,6 @@ static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema *
return 0; return 0;
} }
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
if (pBlock->dataLen <= 0) return -1;
pIter->totalLen = pBlock->dataLen;
pIter->len = 0;
pIter->row = (SMemRow)(pBlock->data + pBlock->schemaLen);
return 0;
}
static SMemRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) {
SMemRow row = pIter->row; // firstly, get current row
if (row == NULL) return NULL;
pIter->len += memRowTLen(row);
if (pIter->len >= pIter->totalLen) { // reach the end
pIter->row = NULL;
} else {
pIter->row = (char *)row + memRowTLen(row); // secondly, move to next row
}
return row;
}
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey, static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey,
TSKEY now) { TSKEY now) {
TSKEY rowKey = memRowKey(row); TSKEY rowKey = memRowKey(row);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册