提交 8559032b 编写于 作者: H Hongze Cheng

finish change table function

上级 17f16e50
......@@ -113,7 +113,7 @@ STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);
int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg);
int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId);
int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg);
int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg);
TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid);
void tsdbStartStream(TSDB_REPO_T *repo);
......
......@@ -302,7 +302,7 @@ STSchema* tsdbGetTableSchema(STable* pTable);
STable* tsdbGetTableByUid(STsdbMeta* pMeta, uint64_t uid);
STSchema* tsdbGetTableSchemaByVersion(STable* pTable, int16_t version);
STSchema* tsdbGetTableTagSchema(STable* pTable);
int tsdbUpdateTable(STsdbRepo* pRepo, STable* pTable, STableCfg* pCfg);
// int tsdbUpdateTable(STsdbRepo* pRepo, STable* pTable, STableCfg* pCfg);
int tsdbWLockRepoMeta(STsdbRepo* pRepo);
int tsdbRLockRepoMeta(STsdbRepo* pRepo);
int tsdbUnlockRepoMeta(STsdbRepo* pRepo);
......@@ -321,7 +321,6 @@ static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *k
}
static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, bool copy, int16_t version) {
ASSERT(TABLE_TYPE(pTable) != TSDB_SUPER_TABLE);
STable* pDTable = (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable;
STSchema* pSchema = NULL;
STSchema* pTSchema = NULL;
......
......@@ -29,10 +29,9 @@ static void tsdbOrgMeta(void *pHandle);
static char * getTagIndexKey(const void *pData);
static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper);
static void tsdbFreeTable(STable *pTable);
static int tsdbUpdateTableTagSchema(STable *pTable, STSchema *newSchema);
static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx);
static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, bool lock);
static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFromIdx, bool lock);
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable);
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable, bool refSuper);
static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable);
static int tsdbInitTableCfg(STableCfg *config, ETableType type, uint64_t uid, int32_t tid);
static int tsdbTableSetSchema(STableCfg *config, STSchema *pSchema, bool dup);
......@@ -76,7 +75,7 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
// TODO
if (super->type != TSDB_SUPER_TABLE) return -1;
if (super->tableId.uid != pCfg->superUid) return -1;
tsdbUpdateTable(pRepo, super, pCfg);
// tsdbUpdateTable(pRepo, super, pCfg);
}
}
......@@ -84,10 +83,18 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
if (table == NULL) goto _err;
// Register to meta
tsdbWLockRepoMeta(pRepo);
if (newSuper) {
if (tsdbAddTableToMeta(pRepo, super, true) < 0) goto _err;
if (tsdbAddTableToMeta(pRepo, super, true, false) < 0) {
tsdbUnlockRepoMeta(pRepo);
goto _err;
}
if (tsdbAddTableToMeta(pRepo, table, true) < 0) goto _err;
}
if (tsdbAddTableToMeta(pRepo, table, true, false) < 0) {
tsdbUnlockRepoMeta(pRepo);
goto _err;
}
tsdbUnlockRepoMeta(pRepo);
// Write to memtable action
int tlen1 = (newSuper) ? tsdbGetTableEncodeSize(TSDB_UPDATE_META, super) : 0;
......@@ -255,7 +262,7 @@ _err:
return NULL;
}
static int32_t colIdCompar(const void* left, const void* right) {
static UNUSED_FUNC int32_t colIdCompar(const void* left, const void* right) {
int16_t colId = *(int16_t*) left;
STColumn* p2 = (STColumn*) right;
......@@ -266,9 +273,10 @@ static int32_t colIdCompar(const void* left, const void* right) {
return (colId < p2->colId)? -1:1;
}
int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
STsdbMeta *pMeta = pRepo->tsdbMeta;
STSchema * pNewSchema = NULL;
pMsg->uid = htobe64(pMsg->uid);
pMsg->tid = htonl(pMsg->tid);
......@@ -277,78 +285,105 @@ int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
pMsg->tagValLen = htonl(pMsg->tagValLen);
pMsg->numOfTags = htons(pMsg->numOfTags);
pMsg->schemaLen = htonl(pMsg->schemaLen);
assert(pMsg->schemaLen == sizeof(STColumn) * pMsg->numOfTags);
char* d = pMsg->data;
for(int32_t i = 0; i < pMsg->numOfTags; ++i) {
STColumn* pCol = (STColumn*) d;
pCol->colId = htons(pCol->colId);
pCol->bytes = htons(pCol->bytes);
pCol->offset = 0;
d += sizeof(STColumn);
for (int i = 0; i < pMsg->numOfTags; i++) {
STColumn *pTCol = (STColumn *)pMsg->data + i;
pTCol->bytes = htons(pTCol->bytes);
pTCol->colId = htons(pTCol->colId);
}
STable *pTable = tsdbGetTableByUid(pMeta, pMsg->uid);
if (pTable == NULL) {
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return -1;
}
if (TABLE_TID(pTable) != pMsg->tid) {
if (pTable == NULL || TABLE_TID(pTable) != pMsg->tid) {
tsdbError("vgId:%d failed to update table tag value since invalid table id %d uid %" PRIu64, REPO_ID(pRepo),
pMsg->tid, pMsg->uid);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return -1;
}
if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) {
tsdbError("vgId:%d failed to update tag value of table %s since its type is %d", REPO_ID(pRepo),
TABLE_CHAR_NAME(pTable), TABLE_TYPE(pTable));
tsdbError("vgId:%d try to update tag value of a non-child table, invalid action", REPO_ID(pRepo));
terrno = TSDB_CODE_TDB_INVALID_ACTION;
return -1;
}
if (schemaVersion(tsdbGetTableTagSchema(pTable)) < pMsg->tversion) {
tsdbDebug("vgId:%d server tag version %d is older than client tag version %d, try to config", REPO_ID(pRepo),
schemaVersion(tsdbGetTableTagSchema(pTable)), pMsg->tversion);
void *msg = (*pRepo->appH.configFunc)(pRepo->config.tsdbId, pMsg->tid);
if (msg == NULL) return -1;
if (schemaVersion(pTable->pSuper->tagSchema) > pMsg->tversion) {
tsdbError(
"vgId:%d failed to update tag value of table %s since version out of date, client tag version %d server tag "
"version %d",
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), pMsg->tversion, schemaVersion(pTable->tagSchema));
terrno = TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE;
return -1;
}
if (schemaVersion(pTable->pSuper->tagSchema) < pMsg->tversion) { // tag schema out of data,
tsdbDebug("vgId:%d need to update tag schema of table %s tid %d uid %" PRIu64
" since out of date, current version %d new version %d",
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable),
schemaVersion(pTable->pSuper->tagSchema), pMsg->tversion);
// Deal with error her
STableCfg *pTableCfg = tsdbCreateTableCfgFromMsg(msg);
STable * super = tsdbGetTableByUid(pMeta, pTableCfg->superUid);
ASSERT(super != NULL);
STSchemaBuilder schemaBuilder = {0};
int32_t code = tsdbUpdateTable(pRepo, super, pTableCfg);
if (code != TSDB_CODE_SUCCESS) {
tsdbClearTableCfg(pTableCfg);
return code;
STColumn *pTCol = (STColumn *)pMsg->data;
ASSERT(pMsg->schemaLen % sizeof(STColumn) == 0 && pTCol[0].colId == colColId(schemaColAt(pTable->pSuper->tagSchema, 0)));
if (tdInitTSchemaBuilder(&schemaBuilder, pMsg->tversion) < 0) {
tsdbDebug("vgId:%d failed to update tag schema of table %s tid %d uid %" PRIu64 " since out of memory",
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable));
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
tsdbClearTableCfg(pTableCfg);
rpcFreeCont(msg);
for (int i = 0; i < (pMsg->schemaLen / sizeof(STColumn)); i++) {
if (tdAddColToSchema(&schemaBuilder, pTCol[i].type, pTCol[i].colId, pTCol[i].bytes) < 0) {
tdDestroyTSchemaBuilder(&schemaBuilder);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
}
pNewSchema = tdGetSchemaFromBuilder(&schemaBuilder);
if (pNewSchema == NULL) {
tdDestroyTSchemaBuilder(&schemaBuilder);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
tdDestroyTSchemaBuilder(&schemaBuilder);
}
STSchema *pTagSchema = tsdbGetTableTagSchema(pTable);
if (schemaVersion(pTagSchema) > pMsg->tversion) {
tsdbError(
"vgId:%d failed to update tag value of table %s since version out of date, client tag version %d server tag "
"version %d",
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), pMsg->tversion, schemaVersion(pTable->tagSchema));
return TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE;
// Chage in memory
if (pNewSchema != NULL) { // change super table tag schema
taosWLockLatch(&(pTable->pSuper->latch));
STSchema *pOldSchema = pTable->pSuper->tagSchema;
pTable->pSuper->tagSchema = pNewSchema;
tdFreeSchema(pOldSchema);
taosWUnLockLatch(&(pTable->pSuper->latch));
}
if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == pMsg->colId) {
bool isChangeIndexCol = (pMsg->colId == colColId(schemaColAt(pTable->pSuper->tagSchema, 0)));
STColumn *pCol = bsearch(&(pMsg->colId), pMsg->data, pMsg->numOfTags, sizeof(STColumn), colIdCompar);
ASSERT(pCol != NULL);
if (isChangeIndexCol) {
tsdbWLockRepoMeta(pRepo);
tsdbRemoveTableFromIndex(pMeta, pTable);
}
// TODO: remove table from index if it is the first column of tag
// TODO: convert the tag schema from client, and then extract the type and bytes from schema according to colId
STColumn* res = bsearch(&pMsg->colId, pMsg->data, pMsg->numOfTags, sizeof(STColumn), colIdCompar);
assert(res != NULL);
taosWLockLatch(&(pTable->latch));
tdSetKVRowDataOfCol(&(pTable->tagVal), pMsg->colId, pCol->type, POINTER_SHIFT(pMsg->data, pMsg->schemaLen));
taosWUnLockLatch(&(pTable->latch));
if (isChangeIndexCol) {
tsdbAddTableIntoIndex(pMeta, pTable, false);
tsdbUnlockRepoMeta(pRepo);
}
tdSetKVRowDataOfCol(&pTable->tagVal, pMsg->colId, res->type, pMsg->data + pMsg->schemaLen);
if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == pMsg->colId) {
tsdbAddTableIntoIndex(pMeta, pTable);
// Update on file
int tlen1 = (pNewSchema) ? tsdbGetTableEncodeSize(TSDB_UPDATE_META, pTable->pSuper) : 0;
int tlen2 = tsdbGetTableEncodeSize(TSDB_UPDATE_META, pTable);
void *buf = tsdbAllocBytes(pRepo, tlen1+tlen2);
ASSERT(buf != NULL);
if (pNewSchema) {
void *pBuf = tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, pTable->pSuper);
ASSERT(POINTER_DISTANCE(pBuf, buf) == tlen1);
buf = pBuf;
}
return TSDB_CODE_SUCCESS;
tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, pTable);
return 0;
}
// ------------------ INTERNAL FUNCTIONS ------------------
......@@ -449,7 +484,7 @@ int tsdbCloseMeta(STsdbRepo *pRepo) {
return 0;
}
STSchema *tsdbGetTableSchema(STable *pTable) { return tsdbGetTableSchemaImpl(pTable, true, true, -1); }
STSchema *tsdbGetTableSchema(STable *pTable) { return tsdbGetTableSchemaImpl(pTable, true, false, -1); }
STable *tsdbGetTableByUid(STsdbMeta *pMeta, uint64_t uid) {
void *ptr = taosHashGet(pMeta->uidMap, (char *)(&uid), sizeof(uid));
......@@ -460,7 +495,7 @@ STable *tsdbGetTableByUid(STsdbMeta *pMeta, uint64_t uid) {
}
STSchema *tsdbGetTableSchemaByVersion(STable *pTable, int16_t version) {
return tsdbGetTableSchemaImpl(pTable, true, true, version);
return tsdbGetTableSchemaImpl(pTable, true, false, version);
}
STSchema *tsdbGetTableTagSchema(STable *pTable) {
......@@ -475,48 +510,6 @@ STSchema *tsdbGetTableTagSchema(STable *pTable) {
}
}
int tsdbUpdateTable(STsdbRepo *pRepo, STable *pTable, STableCfg *pCfg) {
// TODO: this function can only be called when there is no query and commit on this table
ASSERT(TABLE_TYPE(pTable) != TSDB_CHILD_TABLE);
bool changed = false;
STsdbMeta *pMeta = pRepo->tsdbMeta;
if ((pTable->type == TSDB_SUPER_TABLE) && (schemaVersion(pTable->tagSchema) < schemaVersion(pCfg->tagSchema))) {
if (tsdbUpdateTableTagSchema(pTable, pCfg->tagSchema) < 0) {
tsdbError("vgId:%d failed to update table %s tag schema since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
tstrerror(terrno));
return -1;
}
changed = true;
}
STSchema *pTSchema = tsdbGetTableSchema(pTable);
if (schemaVersion(pTSchema) < schemaVersion(pCfg->schema)) {
if (pTable->numOfSchemas < TSDB_MAX_TABLE_SCHEMAS) {
pTable->schema[pTable->numOfSchemas++] = tdDupSchema(pCfg->schema);
} else {
ASSERT(pTable->numOfSchemas == TSDB_MAX_TABLE_SCHEMAS);
STSchema *tSchema = tdDupSchema(pCfg->schema);
tdFreeSchema(pTable->schema[0]);
memmove(pTable->schema, pTable->schema + 1, sizeof(STSchema *) * (TSDB_MAX_TABLE_SCHEMAS - 1));
pTable->schema[pTable->numOfSchemas - 1] = tSchema;
}
pMeta->maxRowBytes = MAX(pMeta->maxRowBytes, dataRowMaxBytesFromSchema(pCfg->schema));
pMeta->maxCols = MAX(pMeta->maxCols, schemaNCols(pCfg->schema));
changed = true;
}
if (changed) {
int tlen = tsdbGetTableEncodeSize(TSDB_UPDATE_META, pTable);
void *buf = tsdbAllocBytes(pRepo, tlen);
tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, pTable);
}
return 0;
}
int tsdbWLockRepoMeta(STsdbRepo *pRepo) {
int code = pthread_rwlock_wrlock(&(pRepo->tsdbMeta->rwLock));
if (code != 0) {
......@@ -607,7 +600,7 @@ static int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
tsdbDecodeTable(cont, &pTable);
if (tsdbAddTableToMeta(pRepo, pTable, false) < 0) {
if (tsdbAddTableToMeta(pRepo, pTable, false, false) < 0) {
tsdbFreeTable(pTable);
return -1;
}
......@@ -625,7 +618,7 @@ static void tsdbOrgMeta(void *pHandle) {
for (int i = 1; i < pCfg->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable != NULL && pTable->type == TSDB_CHILD_TABLE) {
tsdbAddTableIntoIndex(pMeta, pTable);
tsdbAddTableIntoIndex(pMeta, pTable, true);
}
}
}
......@@ -735,7 +728,7 @@ _err:
static void tsdbFreeTable(STable *pTable) {
if (pTable) {
tsdbDebug("table %s is destroyed", TABLE_CHAR_NAME(pTable));
if (pTable->name != NULL) tsdbDebug("table %s is destroyed", TABLE_CHAR_NAME(pTable));
tfree(TABLE_NAME(pTable));
if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) {
for (int i = 0; i < TSDB_MAX_TABLE_SCHEMAS; i++) {
......@@ -755,25 +748,10 @@ static void tsdbFreeTable(STable *pTable) {
}
}
static int tsdbUpdateTableTagSchema(STable *pTable, STSchema *newSchema) {
ASSERT(pTable->type == TSDB_SUPER_TABLE);
ASSERT(schemaVersion(pTable->tagSchema) < schemaVersion(newSchema));
STSchema *pOldSchema = pTable->tagSchema;
STSchema *pNewSchema = tdDupSchema(newSchema);
if (pNewSchema == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
pTable->tagSchema = pNewSchema;
tdFreeSchema(pOldSchema);
return 0;
}
static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx) {
static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, bool lock) {
STsdbMeta *pMeta = pRepo->tsdbMeta;
if (addIdx && tsdbWLockRepoMeta(pRepo) < 0) {
if (lock && tsdbWLockRepoMeta(pRepo) < 0) {
tsdbError("vgId:%d failed to add table %s to meta since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
tstrerror(terrno));
return -1;
......@@ -788,7 +766,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx) {
}
} else {
if (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE && addIdx) { // add STABLE to the index
if (tsdbAddTableIntoIndex(pMeta, pTable) < 0) {
if (tsdbAddTableIntoIndex(pMeta, pTable, true) < 0) {
tsdbDebug("vgId:%d failed to add table %s to meta while add table to index since %s", REPO_ID(pRepo),
TABLE_CHAR_NAME(pTable), tstrerror(terrno));
goto _err;
......@@ -812,7 +790,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx) {
if (schemaTLen(pSchema) > pMeta->maxRowBytes) pMeta->maxRowBytes = schemaTLen(pSchema);
}
if (addIdx && tsdbUnlockRepoMeta(pRepo) < 0) return -1;
if (lock && tsdbUnlockRepoMeta(pRepo) < 0) return -1;
if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE && addIdx) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql,
tsdbGetTableSchemaImpl(pTable, false, false, -1));
......@@ -824,7 +802,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx) {
_err:
tsdbRemoveTableFromMeta(pRepo, pTable, false, false);
if (addIdx) tsdbUnlockRepoMeta(pRepo);
if (lock) tsdbUnlockRepoMeta(pRepo);
return -1;
}
......@@ -881,7 +859,7 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro
tsdbUnRefTable(pTable);
}
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) {
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable, bool refSuper) {
ASSERT(pTable->type == TSDB_CHILD_TABLE && pTable != NULL);
STable *pSTable = tsdbGetTableByUid(pMeta, TABLE_SUID(pTable));
ASSERT(pSTable != NULL);
......@@ -905,7 +883,7 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) {
memcpy(SL_GET_NODE_DATA(pNode), &pTable, sizeof(STable *));
tSkipListPut(pSTable->pIndex, pNode);
T_REF_INC(pSTable);
if (refSuper) T_REF_INC(pSTable);
return 0;
}
......
......@@ -158,7 +158,7 @@ static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
}
static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
if (tsdbUpdateTagValue(pVnode->tsdb, (SUpdateTableTagValMsg *)pCont) < 0) {
if (tsdbUpdateTableTagValue(pVnode->tsdb, (SUpdateTableTagValMsg *)pCont) < 0) {
return terrno;
}
return TSDB_CODE_SUCCESS;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册