diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index bb1ce949743c0527df2b2068472f5afe08596aec..cb39b990e53fe54ffe20a1136a8a12d2f2d86875 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -248,7 +248,7 @@ void tdResetDataCols(SDataCols *pCols); void tdInitDataCols(SDataCols *pCols, STSchema *pSchema); SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData); void tdFreeDataCols(SDataCols *pCols); -void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols); +void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols); void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!! int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge); void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows); diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index f3fd91fc8d7624bad137105d27749659a7188fb2..1a444d6b9021db7eadb946515e8f43847502cf52 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -265,25 +265,29 @@ bool isNEleNull(SDataCol *pCol, int nEle) { } } +void dataColSetNullAt(SDataCol *pCol, int index) { + if (IS_VAR_DATA_TYPE(pCol->type)) { + pCol->dataOff[index] = pCol->len; + char *ptr = POINTER_SHIFT(pCol->pData, pCol->len); + varDataLen(ptr) = (pCol->type == TSDB_DATA_TYPE_BINARY) ? sizeof(char) : TSDB_NCHAR_SIZE; + setNull(varDataVal(ptr), pCol->type, pCol->bytes); + pCol->len += varDataTLen(ptr); + } else { + setNull(POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * index), pCol->type, pCol->bytes); + pCol->len += TYPE_BYTES[pCol->type]; + } +} + void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) { - char *ptr = NULL; - switch (pCol->type) { - case TSDB_DATA_TYPE_BINARY: - case TSDB_DATA_TYPE_NCHAR: - pCol->len = 0; - for (int i = 0; i < nEle; i++) { - pCol->dataOff[i] = pCol->len; - ptr = (char *)pCol->pData + pCol->len; - varDataLen(ptr) = (pCol->type == TSDB_DATA_TYPE_BINARY) ? sizeof(char) : TSDB_NCHAR_SIZE; - setNull(ptr + sizeof(VarDataLenT), pCol->type, pCol->bytes); - pCol->len += varDataTLen(ptr); - } - break; - default: - setNullN(pCol->pData, pCol->type, pCol->bytes, nEle); - pCol->len = TYPE_BYTES[pCol->type] * nEle; - break; + if (IS_VAR_DATA_TYPE(pCol->type)) { + pCol->len = 0; + for (int i = 0; i < nEle; i++) { + dataColSetNullAt(pCol, i); + } + } else { + setNullN(pCol->pData, pCol->type, pCol->bytes, nEle); + pCol->len = TYPE_BYTES[pCol->type] * nEle; } } @@ -380,14 +384,32 @@ void tdResetDataCols(SDataCols *pCols) { } } -void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) { +void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) { ASSERT(dataColsKeyLast(pCols) < dataRowKey(row)); - for (int i = 0; i < pCols->numOfCols; i++) { - SDataCol *pCol = pCols->cols + i; - void * value = tdGetRowDataOfCol(row, pCol->type, pCol->offset); + int rcol = 0; + int dcol = 0; - dataColAppendVal(pCol, value, pCols->numOfRows, pCols->maxPoints); + while (dcol < pCols->numOfCols) { + SDataCol *pDataCol = &(pCols->cols[dcol]); + if (rcol >= schemaNCols(pSchema)) { + dataColSetNullAt(pDataCol, pCols->numOfRows); + dcol++; + continue; + } + + STColumn *pRowCol = schemaColAt(pSchema, rcol); + if (pRowCol->colId == pDataCol->colId) { + dataColAppendVal(pDataCol, tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset), pCols->numOfRows, + pCols->maxPoints); + dcol++; + rcol++; + } else if (pRowCol->colId < pDataCol->colId) { + rcol++; + } else { + dataColSetNullAt(pDataCol, pCols->numOfRows); + dcol++; + } } pCols->numOfRows++; } diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 3d7a672ee6e5f902930f798261d3180862545e75..5f9191c6d494229ab6fd4ed9a2ada141cad7f25d 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -123,7 +123,6 @@ typedef struct STableIndexElem { STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables, void *pRepo); int32_t tsdbFreeMeta(STsdbMeta *pMeta); -STSchema * tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable); STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable); // ---- Operation on STable @@ -503,14 +502,16 @@ int tsdbWriteCompInfo(SRWHelper *pHelper); int tsdbWriteCompIdx(SRWHelper *pHelper); // --------- Other functions need to further organize -void tsdbFitRetention(STsdbRepo *pRepo); -int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks); -void tsdbAdjustCacheBlocks(STsdbCache *pCache); -int32_t tsdbGetMetaFileName(char *rootDir, char *fname); -int tsdbUpdateFileHeader(SFile *pFile, uint32_t version); -int tsdbUpdateTable(STsdbMeta *pMeta, STable *pTable, STableCfg *pCfg); -int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable); -int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable); +void tsdbFitRetention(STsdbRepo *pRepo); +int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks); +void tsdbAdjustCacheBlocks(STsdbCache *pCache); +int32_t tsdbGetMetaFileName(char *rootDir, char *fname); +int tsdbUpdateFileHeader(SFile *pFile, uint32_t version); +int tsdbUpdateTable(STsdbMeta *pMeta, STable *pTable, STableCfg *pCfg); +int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable); +int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable); +STSchema *tsdbGetTableSchemaByVersion(STsdbMeta *pMeta, STable *pTable, int16_t version); +STSchema *tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable); #define DEFAULT_TAG_INDEX_COLUMN 0 // skip list built based on the first column of tags diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 5c420d6f567dd8eb279cc12781554329a6345f04..e76cc5313f6b184163c849fa8c37fa06c2b578fd 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -974,9 +974,10 @@ static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY return TSDB_CODE_SUCCESS; } -static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) { +static int tsdbReadRowsFromCache(STsdbMeta *pMeta, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) { ASSERT(maxRowsToRead > 0); if (pIter == NULL) return 0; + STSchema *pSchema = NULL; int numOfRows = 0; @@ -989,7 +990,15 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max SDataRow row = SL_GET_NODE_DATA(node); if (dataRowKey(row) > maxKey) break; - tdAppendDataRowToDataCol(row, pCols); + if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { + pSchema = tsdbGetTableSchemaByVersion(pMeta, pTable, dataRowVersion(row)); + if (pSchema == NULL) { + // TODO: deal with the error here + ASSERT(false); + } + } + + tdAppendDataRowToDataCol(row, pSchema, pCols); numOfRows++; } while (tSkipListIterNext(pIter)); @@ -1139,7 +1148,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5; int nLoop = 0; while (true) { - int rowsRead = tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pDataCols); + int rowsRead = tsdbReadRowsFromCache(pMeta, pTable, pIter, maxKey, maxRowsToRead, pDataCols); assert(rowsRead >= 0); if (pDataCols->numOfRows == 0) break; nLoop++; diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 0a7f811511090bc708ee4e1feb0fce467cec75d3..cf22c033ef11965f1f82757d9826a0f4dc421a18 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -248,6 +248,32 @@ STSchema *tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable) { } } +static int tsdbCompareSchemaVersion(const void *key1, const void *key2) { + if (*(int16_t *)key1 < (*(STSchema **)key2)->version) { + return -1; + } else if (*(int16_t *)key1 > (*(STSchema **)key2)->version) { + return 1; + } else { + return 0; + } +} + +STSchema *tsdbGetTableSchemaByVersion(STsdbMeta *pMeta, STable *pTable, int16_t version) { + STable *pSearchTable = NULL; + if (pTable->type == TSDB_CHILD_TABLE) { + pSearchTable = tsdbGetTableByUid(pMeta, pTable->superUid); + } else { + pSearchTable = pTable; + } + ASSERT(pSearchTable != NULL); + + void *ptr = taosbsearch(&version, pSearchTable->schema, pSearchTable->numOfSchemas, sizeof(STSchema *), + tsdbCompareSchemaVersion, TD_EQ); + if (ptr == NULL) return NULL; + + return (STSchema *)ptr; +} + STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable) { if (pTable->type == TSDB_SUPER_TABLE) { return pTable->tagSchema; @@ -392,9 +418,21 @@ int tsdbUpdateTable(STsdbMeta *pMeta, STable *pTable, STableCfg *pCfg) { isChanged = true; } - { - // TODO: try to update the data schema + STSchema *pTSchema = tsdbGetTableSchema(pMeta, pTable); + if (schemaVersion(pTSchema) < schemaVersion(pCfg->schema)) { + if (pTable->numOfSchemas < TSDB_MAX_TABLE_SCHEMAS) { + pTable->schema[pTable->numOfSchemas++] = tdDupSchema(pCfg->schema); + } else { + ASSERT(pTable->numOfSchemas == TSDB_MAX_TABLE_SCHEMAS); + STSchema *tSchema = tdDupSchema(pCfg->schema); + tdFreeSchema(pTable->schema[0]); + memmove(pTable->schema, pTable->schema+1, sizeof(STSchema *) * (TSDB_MAX_TABLE_SCHEMAS - 1)); + pTable->schema[pTable->numOfSchemas-1] = tSchema; + } + + isChanged = true; } + if (isChanged) { char *buf = malloc(1024 * 1024); int bufLen = 0;