diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index e8da25d5852da47ca95c402983317687d7aca19b..fb5ea87b7299cb21d4d5b1e033fd05aa938ff04c 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -48,6 +48,8 @@ static int tsdbEncodeTable(void **buf, STable *pTable); static void * tsdbDecodeTable(void *buf, STable **pRTable); static int tsdbGetTableEncodeSize(int8_t act, STable *pTable); static void * tsdbInsertTableAct(STsdbRepo *pRepo, int8_t act, void *buf, STable *pTable); +static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable); +static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable); // ------------------ OUTER FUNCTIONS ------------------ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) { @@ -117,7 +119,7 @@ int tsdbDropTable(TSDB_REPO_T *repo, STableId tableId) { STable *pTable = tsdbGetTableByUid(pMeta, uid); if (pTable == NULL) { - tsdbError("vgId:%d failed to drop table since table not exists! tid:%d uid %" PRId64, REPO_ID(pRepo), tableId.tid, + tsdbError("vgId:%d failed to drop table since table not exists! tid:%d uid %" PRIu64, REPO_ID(pRepo), tableId.tid, uid); terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; return -1; @@ -132,30 +134,26 @@ int tsdbDropTable(TSDB_REPO_T *repo, STableId tableId) { return -1; } - if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE) { - if (pTable->cqhandle) pRepo->appH.cqDropFunc(pTable->cqhandle); + // Write to KV store first + if (tsdbRemoveTableFromStore(pRepo, pTable) < 0) { + tsdbError("vgId:%d failed to drop table %s since %s", REPO_ID(pRepo), tbname, tstrerror(terrno)); + goto _err; } - if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) { - SSkipListIterator *pIter = tSkipListCreateIter(pTable->pIndex); - while (tSkipListIterNext(pIter)) { - STable *tTable = *(STable **)SL_GET_NODE_DATA(tSkipListIterGet(pIter)); - ASSERT(TABLE_TYPE(tTable) == TSDB_CHILD_TABLE); - int tlen = tsdbGetTableEncodeSize(TSDB_DROP_META, tTable); - void *buf = tsdbAllocBytes(pRepo, tlen); - ASSERT(buf != NULL); - tsdbInsertTableAct(pRepo, TSDB_DROP_META, buf, tTable); - tsdbRemoveTableFromMeta(pRepo, tTable, false, true); - } - tSkipListDestroyIter(pIter); + // Remove table from Meta + if (tsdbRmTableFromMeta(pRepo, pTable) < 0) { + tsdbError("vgId:%d failed to drop table %s since %s", REPO_ID(pRepo), tbname, tstrerror(terrno)); + goto _err; } - tsdbRemoveTableFromMeta(pRepo, pTable, true, true); - tsdbTrace("vgId:%d, table %s is dropped! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, tbname, tid, uid); free(tbname); return 0; + +_err: + tfree(tbname); + return -1; } void *tsdbGetTableTagVal(const void* pTable, int32_t colId, int16_t type, int16_t bytes) { @@ -555,15 +553,16 @@ int tsdbUnlockRepoMeta(STsdbRepo *pRepo) { void tsdbRefTable(STable *pTable) { int16_t ref = T_REF_INC(pTable); - tsdbTrace("ref table:%s, uid:%"PRIu64", tid:%d, ref:%d", TABLE_CHAR_NAME(pTable), pTable->tableId.uid, pTable->tableId.tid, ref); + UNUSED(ref); + // tsdbTrace("ref table %"PRIu64", tid:%d, refCount:%d", TABLE_UID(pTable), TABLE_TID(pTable), ref); } void tsdbUnRefTable(STable *pTable) { int16_t ref = T_REF_DEC(pTable); - tsdbTrace("unref table:%s, uid:%"PRIu64", tid:%d, ref:%d", TABLE_CHAR_NAME(pTable), pTable->tableId.uid, pTable->tableId.tid, ref); + tsdbTrace("unref table uid:%"PRIu64", tid:%d, refCount:%d", TABLE_UID(pTable), TABLE_TID(pTable), ref); if (ref == 0) { - tsdbTrace("destroy table:%s uid:%"PRIu64", tid:%d", TABLE_CHAR_NAME(pTable), pTable->tableId.uid, pTable->tableId.tid); + // tsdbTrace("destory table name:%s uid:%"PRIu64", tid:%d", TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable)); if (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) { tsdbUnRefTable(pTable->pSuper); @@ -1164,8 +1163,16 @@ static void *tsdbDecodeTable(void *buf, STable **pRTable) { } static int tsdbGetTableEncodeSize(int8_t act, STable *pTable) { - int tlen = sizeof(SListNode) + sizeof(SActObj); - if (act == TSDB_UPDATE_META) tlen += (sizeof(SActCont) + tsdbEncodeTable(NULL, pTable) + sizeof(TSCKSUM)); + int tlen = 0; + if (act == TSDB_UPDATE_META) { + tlen = sizeof(SListNode) + sizeof(SActObj) + sizeof(SActCont) + tsdbEncodeTable(NULL, pTable) + sizeof(TSCKSUM); + } else { + if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) { + tlen = (sizeof(SListNode) + sizeof(SActObj)) * (tSkipListGetSize(pTable->pIndex) + 1); + } else { + tlen = sizeof(SListNode) + sizeof(SActObj); + } + } return tlen; } @@ -1190,4 +1197,61 @@ static void *tsdbInsertTableAct(STsdbRepo *pRepo, int8_t act, void *buf, STable tdListAppendNode(pRepo->mem->actList, pNode); return pBuf; +} + +static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable) { + int tlen = tsdbGetTableEncodeSize(TSDB_DROP_META, pTable); + void *buf = tsdbAllocBytes(pRepo, tlen); + ASSERT(buf != NULL); + + void *pBuf = buf; + if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) { + SSkipListIterator *pIter = tSkipListCreateIter(pTable->pIndex); + if (pIter == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + while (tSkipListIterNext(pIter)) { + STable *tTable = *(STable **)SL_GET_NODE_DATA(tSkipListIterGet(pIter)); + ASSERT(TABLE_TYPE(tTable) == TSDB_CHILD_TABLE); + pBuf = tsdbInsertTableAct(pRepo, TSDB_DROP_META, pBuf, pTable); + } + + tSkipListDestroyIter(pIter); + } + pBuf = tsdbInsertTableAct(pRepo, TSDB_DROP_META, pBuf, pTable); + + ASSERT(POINTER_DISTANCE(pBuf, buf) == tlen); + + return 0; +} + +static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable) { + if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) { + SSkipListIterator *pIter = tSkipListCreateIter(pTable->pIndex); + if (pIter == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + tsdbWLockRepoMeta(pRepo); + + while (tSkipListIterNext(pIter)) { + STable *tTable = *(STable **)SL_GET_NODE_DATA(tSkipListIterGet(pIter)); + tsdbRemoveTableFromMeta(pRepo, tTable, false, false); + } + + tsdbRemoveTableFromMeta(pRepo, pTable, false, false); + + tsdbUnlockRepoMeta(pRepo); + + tSkipListDestroyIter(pIter); + + } else { + if ((TABLE_TYPE(pTable) == TSDB_STREAM_TABLE) && pTable->cqhandle) pRepo->appH.cqDropFunc(pTable->cqhandle); + tsdbRemoveTableFromMeta(pRepo, pTable, true, true); + } + + return 0; } \ No newline at end of file