diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 8990b69c5aee57dfcfad696e06dc982574aac6b4..528e9b2825befe74beb8381f6291e2b9dda66b1e 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -217,6 +217,59 @@ void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!! int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge); void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows); + +// ----------------- Tag row structure + +/* A tag row, the format is like below: ++----------+----------------------------------------------------------------+ +| STagRow | STagCol | STagCol | STagCol | STagCol | ...| STagCol | STagCol | ++----------+----------------------------------------------------------------+ + +pData ++----------+----------------------------------------------------------------+ +| value 1 | value 2 | value 3 | value 4 | ....|value n | ++----------+----------------------------------------------------------------+ + + */ + + +#define TD_TAG_ROW_HEAD_SIZE sizeof(int16_t) + +#define tagRowNum(r) (*(int16_t *)(r)) +#define tagRowArray(r) POINTER_SHIFT(r, TD_TAG_ROW_HEAD_SIZE) +//#define dataRowKey(r) (*(TSKEY *)(dataRowTuple(r))) +//#define dataRowSetLen(r, l) (dataRowLen(r) = (l)) +//#define dataRowCpy(dst, r) memcpy((dst), (r), dataRowLen(r)) +//#define dataRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_DATA_ROW_HEAD_SIZE) + +typedef struct { + int16_t colId; // column ID + int16_t colType; + uint16_t offset; //to store value for numeric col or offset for binary/Nchar +} STagCol; + +typedef struct { + int32_t len; + void * pData; // Space to store the tag value + uint16_t dataLen; + int16_t ncols; // Total columns allocated + STagCol tagCols[]; +} STagRow; + + +#define tagColSize(r) (sizeof(STagCol) + r.colLen) + +int tdSetTagCol(SDataRow row, void *value, int16_t len, int8_t type, int16_t colId); //insert tag value and update all the information +int tdDeleteTagCol(SDataRow row, int16_t colId); // delete tag value and update all the information +void * tdQueryTagByID(SDataRow row, int16_t colId, int16_t *type); //if find tag, 0, else return -1; +int tdAppendTagColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int16_t colId); +SDataRow tdTagRowDup(SDataRow row); +void tdFreeTagRow(SDataRow row); +SDataRow tdTagRowDecode(SDataRow row); +int tdTagRowCpy(SDataRow dst, SDataRow src); +void * tdNewTagRowFromSchema(STSchema *pSchema, int16_t numofTags); +STSchema *tdGetSchemaFromData(SDataRow *row); + #ifdef __cplusplus } #endif diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 81ea801c944011443b4d0f341427dadfd85f8153..922c8bdea0a98e624497838006f4dcf2b3bea087 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -14,6 +14,7 @@ */ #include "tdataformat.h" #include "wchar.h" +#include "talgo.h" /** * Create a SSchema object with nCols columns @@ -151,6 +152,151 @@ SDataRow tdNewDataRowFromSchema(STSchema *pSchema) { return row; } +int tdSetTagCol(SDataRow row, void *value, int16_t len, int8_t type, int16_t colId){ //insert/update tag value and update all the information + ASSERT(((STagRow *)row)->pData != NULL); + //STagCol * stCol = tdQueryTagColByID() + + return 0; +}; + +int tdDeleteTagCol(SDataRow row, int16_t colId){ // delete tag value and update all the information + //todo + return 0; +}; + +static int compTagId(const void *key1, const void *key2) { + if (((STagCol *)key1)->colId > ((STagCol *)key2)->colId) { + return 1; + } else if (((STagCol *)key1)->colId == ((STagCol *)key2)->colId) { + return 0; + } else { + return -1; + } +} + +/** + * Find tag structure by colId, if find, return tag structure, else return NULL; + */ +STagCol * tdQueryTagColByID(SDataRow row, int16_t colId, int flags) { //if find tag, 0, else return -1; + ASSERT(((STagRow *)row)->pData != NULL); + STagCol *pBase = ((STagRow *)row)->tagCols; + int16_t nCols = ((STagRow *)row)->ncols; + STagCol key = {colId,0,0}; + STagCol * stCol = taosbsearch(&key, pBase, nCols, sizeof(STagCol), compTagId, flags); + return stCol; +}; + +/** +* Find tag value by colId, if find, return tag value, else return NULL; +*/ +void * tdQueryTagByID(SDataRow row, int16_t colId, int16_t *type) { + ASSERT(((STagRow *)row)->pData != NULL); + STagCol *pBase = ((STagRow *)row)->tagCols; + int16_t nCols = ((STagRow *)row)->ncols; + STagCol key = {colId,0,0}; + STagCol * stCol = taosbsearch(&key, pBase, nCols, sizeof(STagCol), compTagId, TD_EQ); + if (NULL == stCol) { + return NULL; + } + + void * pData = ((STagRow *)row)->pData; + *type = stCol->colType; + + return pData + stCol->offset; +}; + +int tdAppendTagColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int16_t colId){ + ASSERT(value != NULL); + //ASSERT(bytes-2 == varDataTLen(value)); + ASSERT(row != NULL); + STagRow *pTagrow = row; + pTagrow->tagCols[pTagrow->ncols].colId = colId; + pTagrow->tagCols[pTagrow->ncols].colType = type; + pTagrow->tagCols[pTagrow->ncols].offset = pTagrow->dataLen; + + switch (type) { + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + memcpy((char *)pTagrow->pData + pTagrow->dataLen, value, varDataTLen(value)); + pTagrow->dataLen += varDataTLen(value); + break; + default: + memcpy((char *)pTagrow->pData + pTagrow->dataLen, value, TYPE_BYTES[type]); + pTagrow->dataLen += TYPE_BYTES[type]; + break; + } + + pTagrow->ncols++; + + return 0; +}; + +void * tdNewTagRowFromSchema(STSchema *pSchema, int16_t numofTags) { + int32_t size = sizeof(STagRow) + numofTags * sizeof(STagCol); + + STagRow *row = malloc(size); + if (row == NULL) return NULL; + + int32_t datasize = pSchema->tlen; + row->pData = malloc(datasize); + if (NULL == row->pData) { + free(row); + return NULL; + } + + row->len = size; + row->dataLen = 0; + row->ncols = 0; + return row; +} +/** + * free tag row + */ + +void tdFreeTagRow(SDataRow row) { + if (row) { + free(((STagRow *)row)->pData); + free(row); + } +} + +SDataRow tdTagRowDup(SDataRow row) { + STagRow *trow = malloc(dataRowLen(row)); + if (trow == NULL) return NULL; + + dataRowCpy(trow, row); + trow->pData = malloc(trow->dataLen); + if (NULL == trow->pData) { + free(trow); + return NULL; + } + memcpy(trow->pData, ((STagRow *)row)->pData, trow->dataLen); + return trow; +} + +SDataRow tdTagRowDecode(SDataRow row) { + STagRow *trow = malloc(dataRowLen(row)); + if (trow == NULL) return NULL; + + dataRowCpy(trow, row); + trow->pData = malloc(trow->dataLen); + if (NULL == trow->pData) { + free(trow); + return NULL; + } + char * pData = (char *)row + dataRowLen(row); + memcpy(trow->pData, pData, trow->dataLen); + return trow; +} + +int tdTagRowCpy(SDataRow dst, SDataRow src) { + if (src == NULL) return -1; + + dataRowCpy(dst, src); + void * pData = dst + dataRowLen(src); + memcpy(pData, ((STagRow *)src)->pData, ((STagRow *)src)->dataLen); + return 0; +} /** * Free the SDataRow object */ diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 95680f95c4565a500a00b14e8dfec9de91d902fe..37a10bb06971d6bba01334ad3b79dfc509336d53 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -54,7 +54,7 @@ void *tsdbEncodeTable(STable *pTable, int *contLen) { ptr = tdEncodeSchema(ptr, pTable->schema); ptr = tdEncodeSchema(ptr, pTable->tagSchema); } else if (pTable->type == TSDB_CHILD_TABLE) { - dataRowCpy(ptr, pTable->tagVal); + tdTagRowCpy(ptr, pTable->tagVal); } else { ptr = tdEncodeSchema(ptr, pTable->schema); } @@ -96,7 +96,7 @@ STable *tsdbDecodeTable(void *cont, int contLen) { pTable->schema = tdDecodeSchema(&ptr); pTable->tagSchema = tdDecodeSchema(&ptr); } else if (pTable->type == TSDB_CHILD_TABLE) { - pTable->tagVal = tdDataRowDup(ptr); + pTable->tagVal = tdTagRowDecode(ptr); } else { pTable->schema = tdDecodeSchema(&ptr); } @@ -114,8 +114,10 @@ static char* getTagIndexKey(const void* pData) { SDataRow row = elem->pTable->tagVal; STSchema* pSchema = tsdbGetTableTagSchema(elem->pMeta, elem->pTable); STColumn* pCol = &pSchema->columns[DEFAULT_TAG_INDEX_COLUMN]; - - return tdGetRowDataOfCol(row, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset); + int16_t type = 0; + void * res = tdQueryTagByID(row, pCol->colId,&type); + ASSERT(type == pCol->type); + return res; } int tsdbRestoreTable(void *pHandle, void *cont, int contLen) { @@ -255,8 +257,9 @@ int32_t tsdbGetTableTagVal(TsdbRepoT* repo, STableId* id, int32_t colId, int16_t } SDataRow row = (SDataRow)pTable->tagVal; - char* d = tdGetRowDataOfCol(row, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset); - + int16_t tagtype = 0; + char* d = tdQueryTagByID(row, pCol->colId, &tagtype); + //ASSERT((int8_t)tagtype == pCol->type) *val = d; *type = pCol->type; *bytes = pCol->bytes; @@ -328,7 +331,7 @@ int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg) { if (super->pIndex == NULL) { tdFreeSchema(super->schema); tdFreeSchema(super->tagSchema); - tdFreeDataRow(super->tagVal); + tdFreeTagRow(super->tagVal); free(super); return -1; } @@ -353,7 +356,7 @@ int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg) { if (IS_CREATE_STABLE(pCfg)) { // TSDB_CHILD_TABLE table->type = TSDB_CHILD_TABLE; table->superUid = pCfg->superUid; - table->tagVal = tdDataRowDup(pCfg->tagValues); + table->tagVal = tdTagRowDup(pCfg->tagValues); } else { // TSDB_NORMAL_TABLE table->type = TSDB_NORMAL_TABLE; table->superUid = -1; @@ -440,7 +443,7 @@ static void tsdbFreeMemTable(SMemTable *pMemTable) { static int tsdbFreeTable(STable *pTable) { // TODO: finish this function if (pTable->type == TSDB_CHILD_TABLE) { - tdFreeDataRow(pTable->tagVal); + tdFreeTagRow(pTable->tagVal); } else { tdFreeSchema(pTable->schema); } @@ -579,7 +582,9 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) { STSchema* pSchema = tsdbGetTableTagSchema(pMeta, pTable); STColumn* pCol = &pSchema->columns[DEFAULT_TAG_INDEX_COLUMN]; - char* key = tdGetRowDataOfCol(pTable->tagVal, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset); + int16_t tagtype = 0; + char* key = tdQueryTagByID(pTable->tagVal, pCol->colId, &tagtype); + ASSERT(pCol->type == tagtype); SArray* res = tSkipListGet(pSTable->pIndex, key); size_t size = taosArrayGetSize(res); @@ -610,7 +615,8 @@ static int tsdbEstimateTableEncodeSize(STable *pTable) { size += tdGetSchemaEncodeSize(pTable->schema); size += tdGetSchemaEncodeSize(pTable->tagSchema); } else if (pTable->type == TSDB_CHILD_TABLE) { - size += dataRowLen(pTable->tagVal); + STagRow *pTagRow = (STagRow *)(pTable->tagVal); + size += dataRowLen(pTable->tagVal) + pTagRow->dataLen; } else { size += tdGetSchemaEncodeSize(pTable->schema); } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 595217debb75b32ff150bec6a4231da3a0f0f683..2220ebfd88abf6933ae4c88003b26b1c9be9b1b4 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -1753,9 +1753,9 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) { STColumn* pCol = schemaColAt(pTableGroupSupp->pTagSchema, colIndex); bytes = pCol->bytes; type = pCol->type; - - f1 = tdGetRowDataOfCol(pTable1->tagVal, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset); - f2 = tdGetRowDataOfCol(pTable2->tagVal, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset); + int16_t tgtype1, tgtype2 = 0; + f1 = tdQueryTagByID(pTable1->tagVal, pCol->colId, &tgtype1); + f2 = tdQueryTagByID(pTable2->tagVal, pCol->colId, &tgtype2); } int32_t ret = doCompare(f1, f2, type, bytes); @@ -1843,12 +1843,14 @@ bool indexedNodeFilterFp(const void* pNode, void* param) { val = (char*) elem->pTable->name; type = TSDB_DATA_TYPE_BINARY; } else { - STSchema* pTSchema = (STSchema*) pInfo->param; // todo table schema is identical to stable schema?? - - int32_t offset = pTSchema->columns[pInfo->colIndex].offset; - val = tdGetRowDataOfCol(elem->pTable->tagVal, pInfo->sch.type, TD_DATA_ROW_HEAD_SIZE + offset); - } - +// STSchema* pTSchema = (STSchema*) pInfo->param; // todo table schema is identical to stable schema?? + int16_t type; + // int32_t offset = pTSchema->columns[pInfo->colIndex].offset; + // val = tdGetRowDataOfCol(elem->pTable->tagVal, pInfo->sch.type, TD_DATA_ROW_HEAD_SIZE + offset); + val = tdQueryTagByID(elem->pTable->tagVal, pInfo->sch.colId, &type); + // ASSERT(pInfo->sch.type == type); + } + //todo :the val is possible to be null, so check it out carefully int32_t ret = 0; if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { if (pInfo->optr == TSDB_RELATION_IN) { diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 9c415d6af756d6c9f015ef50055a00b2c48a910b..96ca19e1294de876339ebac6d3ee055feceaacd8 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -139,11 +139,13 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe char *pTagData = pTable->data + totalCols * sizeof(SSchema); int accumBytes = 0; - dataRow = tdNewDataRowFromSchema(pDestTagSchema); + //dataRow = tdNewDataRowFromSchema(pDestTagSchema); + dataRow = tdNewTagRowFromSchema(pDestTagSchema, numOfTags); for (int i = 0; i < numOfTags; i++) { STColumn *pTCol = schemaColAt(pDestTagSchema, i); - tdAppendColVal(dataRow, pTagData + accumBytes, pTCol->type, pTCol->bytes, pTCol->offset); +// tdAppendColVal(dataRow, pTagData + accumBytes, pTCol->type, pTCol->bytes, pTCol->offset); + tdAppendTagColVal(dataRow, pTagData + accumBytes, pTCol->type, pTCol->bytes, pTCol->colId); accumBytes += htons(pSchema[i + numOfColumns].bytes); } tsdbTableSetTagValue(&tCfg, dataRow, false);