diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 4d66f12178dd87b1dd870537e3fd5e64d73f19e3..f7d8f2438e6097f69843ca0387c2c299f8e2b3b9 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -255,17 +255,46 @@ _err: return NULL; } +static int32_t colIdCompar(const void* left, const void* right) { + int16_t colId = *(int16_t*) left; + STColumn* p2 = (STColumn*) right; + + if (colId == p2->colId) { + return 0; + } + + return (colId < p2->colId)? -1:1; +} + int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) { STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbMeta *pMeta = pRepo->tsdbMeta; - int16_t tversion = htons(pMsg->tversion); - STable *pTable = tsdbGetTableByUid(pMeta, htobe64(pMsg->uid)); + pMsg->uid = htobe64(pMsg->uid); + pMsg->tid = htonl(pMsg->tid); + pMsg->tversion = htons(pMsg->tversion); + pMsg->colId = htons(pMsg->colId); + pMsg->tagValLen = htonl(pMsg->tagValLen); + pMsg->numOfTags = htons(pMsg->numOfTags); + pMsg->schemaLen = htonl(pMsg->schemaLen); + assert(pMsg->schemaLen == sizeof(STColumn) * pMsg->numOfTags); + + char* d = pMsg->data; + for(int32_t i = 0; i < pMsg->numOfTags; ++i) { + STColumn* pCol = (STColumn*) d; + pCol->colId = htons(pCol->colId); + pCol->bytes = htonl(pCol->bytes); + assert(pCol->offset == 0); + + d += sizeof(STColumn); + } + + STable *pTable = tsdbGetTableByUid(pMeta, pMsg->uid); if (pTable == NULL) { terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; return -1; } - if (TABLE_TID(pTable) != htonl(pMsg->tid)) { + if (TABLE_TID(pTable) != pMsg->tid) { terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; return -1; } @@ -277,10 +306,10 @@ int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) { return -1; } - if (schemaVersion(tsdbGetTableTagSchema(pTable)) < tversion) { + if (schemaVersion(tsdbGetTableTagSchema(pTable)) < pMsg->tversion) { tsdbDebug("vgId:%d server tag version %d is older than client tag version %d, try to config", REPO_ID(pRepo), - schemaVersion(tsdbGetTableTagSchema(pTable)), tversion); - void *msg = (*pRepo->appH.configFunc)(pRepo->config.tsdbId, htonl(pMsg->tid)); + schemaVersion(tsdbGetTableTagSchema(pTable)), pMsg->tversion); + void *msg = (*pRepo->appH.configFunc)(pRepo->config.tsdbId, pMsg->tid); if (msg == NULL) return -1; // Deal with error her @@ -299,21 +328,24 @@ int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) { STSchema *pTagSchema = tsdbGetTableTagSchema(pTable); - if (schemaVersion(pTagSchema) > tversion) { + if (schemaVersion(pTagSchema) > pMsg->tversion) { tsdbError( "vgId:%d failed to update tag value of table %s since version out of date, client tag version %d server tag " "version %d", - REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), tversion, schemaVersion(pTable->tagSchema)); + REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), pMsg->tversion, schemaVersion(pTable->tagSchema)); return TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE; } - if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == htons(pMsg->colId)) { + if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == pMsg->colId) { tsdbRemoveTableFromIndex(pMeta, pTable); } // TODO: remove table from index if it is the first column of tag + // TODO: convert the tag schema from client, and then extract the type and bytes from schema according to colId + STColumn* res = bsearch(&pMsg->colId, pMsg->data, pMsg->numOfTags, sizeof(STColumn), colIdCompar); + assert(res != NULL); -// tdSetKVRowDataOfCol(&pTable->tagVal, htons(pMsg->colId), htons(pMsg->type), pMsg->data); - if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == htons(pMsg->colId)) { + tdSetKVRowDataOfCol(&pTable->tagVal, pMsg->colId, res->type, pMsg->data + pMsg->schemaLen); + if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == pMsg->colId) { tsdbAddTableIntoIndex(pMeta, pTable); } return TSDB_CODE_SUCCESS;