提交 6e0bffe4 编写于 作者: H Hongze Cheng

TD-453

上级 63794619
...@@ -48,6 +48,8 @@ static int tsdbEncodeTable(void **buf, STable *pTable); ...@@ -48,6 +48,8 @@ static int tsdbEncodeTable(void **buf, STable *pTable);
static void * tsdbDecodeTable(void *buf, STable **pRTable); static void * tsdbDecodeTable(void *buf, STable **pRTable);
static int tsdbGetTableEncodeSize(int8_t act, STable *pTable); static int tsdbGetTableEncodeSize(int8_t act, STable *pTable);
static void * tsdbInsertTableAct(STsdbRepo *pRepo, int8_t act, void *buf, STable *pTable); static void * tsdbInsertTableAct(STsdbRepo *pRepo, int8_t act, void *buf, STable *pTable);
static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable);
static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable);
// ------------------ OUTER FUNCTIONS ------------------ // ------------------ OUTER FUNCTIONS ------------------
int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) { int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
...@@ -117,7 +119,7 @@ int tsdbDropTable(TSDB_REPO_T *repo, STableId tableId) { ...@@ -117,7 +119,7 @@ int tsdbDropTable(TSDB_REPO_T *repo, STableId tableId) {
STable *pTable = tsdbGetTableByUid(pMeta, uid); STable *pTable = tsdbGetTableByUid(pMeta, uid);
if (pTable == NULL) { if (pTable == NULL) {
tsdbError("vgId:%d failed to drop table since table not exists! tid:%d uid %" PRId64, REPO_ID(pRepo), tableId.tid, tsdbError("vgId:%d failed to drop table since table not exists! tid:%d uid %" PRIu64, REPO_ID(pRepo), tableId.tid,
uid); uid);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return -1; return -1;
...@@ -132,30 +134,26 @@ int tsdbDropTable(TSDB_REPO_T *repo, STableId tableId) { ...@@ -132,30 +134,26 @@ int tsdbDropTable(TSDB_REPO_T *repo, STableId tableId) {
return -1; return -1;
} }
if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE) { // Write to KV store first
if (pTable->cqhandle) pRepo->appH.cqDropFunc(pTable->cqhandle); if (tsdbRemoveTableFromStore(pRepo, pTable) < 0) {
tsdbError("vgId:%d failed to drop table %s since %s", REPO_ID(pRepo), tbname, tstrerror(terrno));
goto _err;
} }
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) { // Remove table from Meta
SSkipListIterator *pIter = tSkipListCreateIter(pTable->pIndex); if (tsdbRmTableFromMeta(pRepo, pTable) < 0) {
while (tSkipListIterNext(pIter)) { tsdbError("vgId:%d failed to drop table %s since %s", REPO_ID(pRepo), tbname, tstrerror(terrno));
STable *tTable = *(STable **)SL_GET_NODE_DATA(tSkipListIterGet(pIter)); goto _err;
ASSERT(TABLE_TYPE(tTable) == TSDB_CHILD_TABLE);
int tlen = tsdbGetTableEncodeSize(TSDB_DROP_META, tTable);
void *buf = tsdbAllocBytes(pRepo, tlen);
ASSERT(buf != NULL);
tsdbInsertTableAct(pRepo, TSDB_DROP_META, buf, tTable);
tsdbRemoveTableFromMeta(pRepo, tTable, false, true);
}
tSkipListDestroyIter(pIter);
} }
tsdbRemoveTableFromMeta(pRepo, pTable, true, true);
tsdbTrace("vgId:%d, table %s is dropped! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, tbname, tid, uid); tsdbTrace("vgId:%d, table %s is dropped! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, tbname, tid, uid);
free(tbname); free(tbname);
return 0; return 0;
_err:
tfree(tbname);
return -1;
} }
void *tsdbGetTableTagVal(const void* pTable, int32_t colId, int16_t type, int16_t bytes) { void *tsdbGetTableTagVal(const void* pTable, int32_t colId, int16_t type, int16_t bytes) {
...@@ -555,6 +553,7 @@ int tsdbUnlockRepoMeta(STsdbRepo *pRepo) { ...@@ -555,6 +553,7 @@ int tsdbUnlockRepoMeta(STsdbRepo *pRepo) {
void tsdbRefTable(STable *pTable) { void tsdbRefTable(STable *pTable) {
int16_t ref = T_REF_INC(pTable); int16_t ref = T_REF_INC(pTable);
UNUSED(ref);
// tsdbTrace("ref table %"PRIu64", tid:%d, refCount:%d", TABLE_UID(pTable), TABLE_TID(pTable), ref); // tsdbTrace("ref table %"PRIu64", tid:%d, refCount:%d", TABLE_UID(pTable), TABLE_TID(pTable), ref);
} }
...@@ -1164,8 +1163,16 @@ static void *tsdbDecodeTable(void *buf, STable **pRTable) { ...@@ -1164,8 +1163,16 @@ static void *tsdbDecodeTable(void *buf, STable **pRTable) {
} }
static int tsdbGetTableEncodeSize(int8_t act, STable *pTable) { static int tsdbGetTableEncodeSize(int8_t act, STable *pTable) {
int tlen = sizeof(SListNode) + sizeof(SActObj); int tlen = 0;
if (act == TSDB_UPDATE_META) tlen += (sizeof(SActCont) + tsdbEncodeTable(NULL, pTable) + sizeof(TSCKSUM)); if (act == TSDB_UPDATE_META) {
tlen = sizeof(SListNode) + sizeof(SActObj) + sizeof(SActCont) + tsdbEncodeTable(NULL, pTable) + sizeof(TSCKSUM);
} else {
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
tlen = (sizeof(SListNode) + sizeof(SActObj)) * (tSkipListGetSize(pTable->pIndex) + 1);
} else {
tlen = sizeof(SListNode) + sizeof(SActObj);
}
}
return tlen; return tlen;
} }
...@@ -1190,4 +1197,61 @@ static void *tsdbInsertTableAct(STsdbRepo *pRepo, int8_t act, void *buf, STable ...@@ -1190,4 +1197,61 @@ static void *tsdbInsertTableAct(STsdbRepo *pRepo, int8_t act, void *buf, STable
tdListAppendNode(pRepo->mem->actList, pNode); tdListAppendNode(pRepo->mem->actList, pNode);
return pBuf; return pBuf;
}
static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable) {
int tlen = tsdbGetTableEncodeSize(TSDB_DROP_META, pTable);
void *buf = tsdbAllocBytes(pRepo, tlen);
ASSERT(buf != NULL);
void *pBuf = buf;
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
SSkipListIterator *pIter = tSkipListCreateIter(pTable->pIndex);
if (pIter == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
while (tSkipListIterNext(pIter)) {
STable *tTable = *(STable **)SL_GET_NODE_DATA(tSkipListIterGet(pIter));
ASSERT(TABLE_TYPE(tTable) == TSDB_CHILD_TABLE);
pBuf = tsdbInsertTableAct(pRepo, TSDB_DROP_META, pBuf, pTable);
}
tSkipListDestroyIter(pIter);
}
pBuf = tsdbInsertTableAct(pRepo, TSDB_DROP_META, pBuf, pTable);
ASSERT(POINTER_DISTANCE(pBuf, buf) == tlen);
return 0;
}
static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable) {
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
SSkipListIterator *pIter = tSkipListCreateIter(pTable->pIndex);
if (pIter == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
tsdbWLockRepoMeta(pRepo);
while (tSkipListIterNext(pIter)) {
STable *tTable = *(STable **)SL_GET_NODE_DATA(tSkipListIterGet(pIter));
tsdbRemoveTableFromMeta(pRepo, tTable, false, false);
}
tsdbRemoveTableFromMeta(pRepo, pTable, false, false);
tsdbUnlockRepoMeta(pRepo);
tSkipListDestroyIter(pIter);
} else {
if ((TABLE_TYPE(pTable) == TSDB_STREAM_TABLE) && pTable->cqhandle) pRepo->appH.cqDropFunc(pTable->cqhandle);
tsdbRemoveTableFromMeta(pRepo, pTable, true, true);
}
return 0;
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册