提交 0be65a23 编写于 作者: H Hongze Cheng

Merge branch 'develop' into feature/2.0tsdb

...@@ -217,6 +217,59 @@ void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!! ...@@ -217,6 +217,59 @@ void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!!
int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge); int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge);
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows); void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows);
// ----------------- Tag row structure
/* A tag row, the format is like below:
+----------+----------------------------------------------------------------+
| STagRow | STagCol | STagCol | STagCol | STagCol | ...| STagCol | STagCol |
+----------+----------------------------------------------------------------+
pData
+----------+----------------------------------------------------------------+
| value 1 | value 2 | value 3 | value 4 | ....|value n |
+----------+----------------------------------------------------------------+
*/
#define TD_TAG_ROW_HEAD_SIZE sizeof(int16_t)
#define tagRowNum(r) (*(int16_t *)(r))
#define tagRowArray(r) POINTER_SHIFT(r, TD_TAG_ROW_HEAD_SIZE)
//#define dataRowKey(r) (*(TSKEY *)(dataRowTuple(r)))
//#define dataRowSetLen(r, l) (dataRowLen(r) = (l))
//#define dataRowCpy(dst, r) memcpy((dst), (r), dataRowLen(r))
//#define dataRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_DATA_ROW_HEAD_SIZE)
typedef struct {
int16_t colId; // column ID
int16_t colType;
uint16_t offset; //to store value for numeric col or offset for binary/Nchar
} STagCol;
typedef struct {
int32_t len;
void * pData; // Space to store the tag value
uint16_t dataLen;
int16_t ncols; // Total columns allocated
STagCol tagCols[];
} STagRow;
#define tagColSize(r) (sizeof(STagCol) + r.colLen)
int tdSetTagCol(SDataRow row, void *value, int16_t len, int8_t type, int16_t colId); //insert tag value and update all the information
int tdDeleteTagCol(SDataRow row, int16_t colId); // delete tag value and update all the information
void * tdQueryTagByID(SDataRow row, int16_t colId, int16_t *type); //if find tag, 0, else return -1;
int tdAppendTagColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int16_t colId);
SDataRow tdTagRowDup(SDataRow row);
void tdFreeTagRow(SDataRow row);
SDataRow tdTagRowDecode(SDataRow row);
int tdTagRowCpy(SDataRow dst, SDataRow src);
void * tdNewTagRowFromSchema(STSchema *pSchema, int16_t numofTags);
STSchema *tdGetSchemaFromData(SDataRow *row);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "tdataformat.h" #include "tdataformat.h"
#include "wchar.h" #include "wchar.h"
#include "talgo.h"
/** /**
* Create a SSchema object with nCols columns * Create a SSchema object with nCols columns
...@@ -151,6 +152,151 @@ SDataRow tdNewDataRowFromSchema(STSchema *pSchema) { ...@@ -151,6 +152,151 @@ SDataRow tdNewDataRowFromSchema(STSchema *pSchema) {
return row; return row;
} }
int tdSetTagCol(SDataRow row, void *value, int16_t len, int8_t type, int16_t colId){ //insert/update tag value and update all the information
ASSERT(((STagRow *)row)->pData != NULL);
//STagCol * stCol = tdQueryTagColByID()
return 0;
};
int tdDeleteTagCol(SDataRow row, int16_t colId){ // delete tag value and update all the information
//todo
return 0;
};
static int compTagId(const void *key1, const void *key2) {
if (((STagCol *)key1)->colId > ((STagCol *)key2)->colId) {
return 1;
} else if (((STagCol *)key1)->colId == ((STagCol *)key2)->colId) {
return 0;
} else {
return -1;
}
}
/**
* Find tag structure by colId, if find, return tag structure, else return NULL;
*/
STagCol * tdQueryTagColByID(SDataRow row, int16_t colId, int flags) { //if find tag, 0, else return -1;
ASSERT(((STagRow *)row)->pData != NULL);
STagCol *pBase = ((STagRow *)row)->tagCols;
int16_t nCols = ((STagRow *)row)->ncols;
STagCol key = {colId,0,0};
STagCol * stCol = taosbsearch(&key, pBase, nCols, sizeof(STagCol), compTagId, flags);
return stCol;
};
/**
* Find tag value by colId, if find, return tag value, else return NULL;
*/
void * tdQueryTagByID(SDataRow row, int16_t colId, int16_t *type) {
ASSERT(((STagRow *)row)->pData != NULL);
STagCol *pBase = ((STagRow *)row)->tagCols;
int16_t nCols = ((STagRow *)row)->ncols;
STagCol key = {colId,0,0};
STagCol * stCol = taosbsearch(&key, pBase, nCols, sizeof(STagCol), compTagId, TD_EQ);
if (NULL == stCol) {
return NULL;
}
void * pData = ((STagRow *)row)->pData;
*type = stCol->colType;
return pData + stCol->offset;
};
int tdAppendTagColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int16_t colId){
ASSERT(value != NULL);
//ASSERT(bytes-2 == varDataTLen(value));
ASSERT(row != NULL);
STagRow *pTagrow = row;
pTagrow->tagCols[pTagrow->ncols].colId = colId;
pTagrow->tagCols[pTagrow->ncols].colType = type;
pTagrow->tagCols[pTagrow->ncols].offset = pTagrow->dataLen;
switch (type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
memcpy((char *)pTagrow->pData + pTagrow->dataLen, value, varDataTLen(value));
pTagrow->dataLen += varDataTLen(value);
break;
default:
memcpy((char *)pTagrow->pData + pTagrow->dataLen, value, TYPE_BYTES[type]);
pTagrow->dataLen += TYPE_BYTES[type];
break;
}
pTagrow->ncols++;
return 0;
};
void * tdNewTagRowFromSchema(STSchema *pSchema, int16_t numofTags) {
int32_t size = sizeof(STagRow) + numofTags * sizeof(STagCol);
STagRow *row = malloc(size);
if (row == NULL) return NULL;
int32_t datasize = pSchema->tlen;
row->pData = malloc(datasize);
if (NULL == row->pData) {
free(row);
return NULL;
}
row->len = size;
row->dataLen = 0;
row->ncols = 0;
return row;
}
/**
* free tag row
*/
void tdFreeTagRow(SDataRow row) {
if (row) {
free(((STagRow *)row)->pData);
free(row);
}
}
SDataRow tdTagRowDup(SDataRow row) {
STagRow *trow = malloc(dataRowLen(row));
if (trow == NULL) return NULL;
dataRowCpy(trow, row);
trow->pData = malloc(trow->dataLen);
if (NULL == trow->pData) {
free(trow);
return NULL;
}
memcpy(trow->pData, ((STagRow *)row)->pData, trow->dataLen);
return trow;
}
SDataRow tdTagRowDecode(SDataRow row) {
STagRow *trow = malloc(dataRowLen(row));
if (trow == NULL) return NULL;
dataRowCpy(trow, row);
trow->pData = malloc(trow->dataLen);
if (NULL == trow->pData) {
free(trow);
return NULL;
}
char * pData = (char *)row + dataRowLen(row);
memcpy(trow->pData, pData, trow->dataLen);
return trow;
}
int tdTagRowCpy(SDataRow dst, SDataRow src) {
if (src == NULL) return -1;
dataRowCpy(dst, src);
void * pData = dst + dataRowLen(src);
memcpy(pData, ((STagRow *)src)->pData, ((STagRow *)src)->dataLen);
return 0;
}
/** /**
* Free the SDataRow object * Free the SDataRow object
*/ */
......
...@@ -47,8 +47,8 @@ void tsdbEncodeTable(STable *pTable, char *buf, int *contLen) { ...@@ -47,8 +47,8 @@ void tsdbEncodeTable(STable *pTable, char *buf, int *contLen) {
ptr = tdEncodeSchema(ptr, pTable->schema); ptr = tdEncodeSchema(ptr, pTable->schema);
ptr = tdEncodeSchema(ptr, pTable->tagSchema); ptr = tdEncodeSchema(ptr, pTable->tagSchema);
} else if (pTable->type == TSDB_CHILD_TABLE) { } else if (pTable->type == TSDB_CHILD_TABLE) {
dataRowCpy(ptr, pTable->tagVal); tdTagRowCpy(ptr, pTable->tagVal);
ptr = POINTER_SHIFT(ptr, dataRowLen(pTable->tagVal)); ptr = POINTER_SHIFT(ptr, dataRowLen(pTable->tagVal) + ((STagRow *)pTable->tagVal)->dataLen);
} else { } else {
ptr = tdEncodeSchema(ptr, pTable->schema); ptr = tdEncodeSchema(ptr, pTable->schema);
} }
...@@ -94,8 +94,8 @@ STable *tsdbDecodeTable(void *cont, int contLen) { ...@@ -94,8 +94,8 @@ STable *tsdbDecodeTable(void *cont, int contLen) {
pTable->schema = tdDecodeSchema(&ptr); pTable->schema = tdDecodeSchema(&ptr);
pTable->tagSchema = tdDecodeSchema(&ptr); pTable->tagSchema = tdDecodeSchema(&ptr);
} else if (pTable->type == TSDB_CHILD_TABLE) { } else if (pTable->type == TSDB_CHILD_TABLE) {
pTable->tagVal = tdDataRowDup(ptr); pTable->tagVal = tdTagRowDecode(ptr);
ptr = POINTER_SHIFT(ptr, dataRowLen(pTable->tagVal)); ptr = POINTER_SHIFT(ptr, dataRowLen(pTable->tagVal) + ((STagRow *)pTable->tagVal)->dataLen);
} else { } else {
pTable->schema = tdDecodeSchema(&ptr); pTable->schema = tdDecodeSchema(&ptr);
} }
...@@ -117,8 +117,10 @@ static char* getTagIndexKey(const void* pData) { ...@@ -117,8 +117,10 @@ static char* getTagIndexKey(const void* pData) {
SDataRow row = elem->pTable->tagVal; SDataRow row = elem->pTable->tagVal;
STSchema* pSchema = tsdbGetTableTagSchema(elem->pMeta, elem->pTable); STSchema* pSchema = tsdbGetTableTagSchema(elem->pMeta, elem->pTable);
STColumn* pCol = &pSchema->columns[DEFAULT_TAG_INDEX_COLUMN]; STColumn* pCol = &pSchema->columns[DEFAULT_TAG_INDEX_COLUMN];
int16_t type = 0;
return tdGetRowDataOfCol(row, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset); void * res = tdQueryTagByID(row, pCol->colId,&type);
ASSERT(type == pCol->type);
return res;
} }
int tsdbRestoreTable(void *pHandle, void *cont, int contLen) { int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
...@@ -258,8 +260,9 @@ int32_t tsdbGetTableTagVal(TsdbRepoT* repo, STableId* id, int32_t colId, int16_t ...@@ -258,8 +260,9 @@ int32_t tsdbGetTableTagVal(TsdbRepoT* repo, STableId* id, int32_t colId, int16_t
} }
SDataRow row = (SDataRow)pTable->tagVal; SDataRow row = (SDataRow)pTable->tagVal;
char* d = tdGetRowDataOfCol(row, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset); int16_t tagtype = 0;
char* d = tdQueryTagByID(row, pCol->colId, &tagtype);
//ASSERT((int8_t)tagtype == pCol->type)
*val = d; *val = d;
*type = pCol->type; *type = pCol->type;
*bytes = pCol->bytes; *bytes = pCol->bytes;
...@@ -472,7 +475,7 @@ static int tsdbFreeTable(STable *pTable) { ...@@ -472,7 +475,7 @@ static int tsdbFreeTable(STable *pTable) {
if (pTable == NULL) return 0; if (pTable == NULL) return 0;
if (pTable->type == TSDB_CHILD_TABLE) { if (pTable->type == TSDB_CHILD_TABLE) {
tdFreeDataRow(pTable->tagVal); tdFreeTagRow(pTable->tagVal);
} else { } else {
tdFreeSchema(pTable->schema); tdFreeSchema(pTable->schema);
} }
...@@ -620,7 +623,9 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) { ...@@ -620,7 +623,9 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) {
STSchema* pSchema = tsdbGetTableTagSchema(pMeta, pTable); STSchema* pSchema = tsdbGetTableTagSchema(pMeta, pTable);
STColumn* pCol = &pSchema->columns[DEFAULT_TAG_INDEX_COLUMN]; STColumn* pCol = &pSchema->columns[DEFAULT_TAG_INDEX_COLUMN];
char* key = tdGetRowDataOfCol(pTable->tagVal, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset); int16_t tagtype = 0;
char* key = tdQueryTagByID(pTable->tagVal, pCol->colId, &tagtype);
ASSERT(pCol->type == tagtype);
SArray* res = tSkipListGet(pSTable->pIndex, key); SArray* res = tSkipListGet(pSTable->pIndex, key);
size_t size = taosArrayGetSize(res); size_t size = taosArrayGetSize(res);
...@@ -639,6 +644,7 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) { ...@@ -639,6 +644,7 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) {
return 0; return 0;
} }
char *getTSTupleKey(const void * data) { char *getTSTupleKey(const void * data) {
SDataRow row = (SDataRow)data; SDataRow row = (SDataRow)data;
return POINTER_SHIFT(row, TD_DATA_ROW_HEAD_SIZE); return POINTER_SHIFT(row, TD_DATA_ROW_HEAD_SIZE);
......
...@@ -1753,9 +1753,9 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) { ...@@ -1753,9 +1753,9 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
STColumn* pCol = schemaColAt(pTableGroupSupp->pTagSchema, colIndex); STColumn* pCol = schemaColAt(pTableGroupSupp->pTagSchema, colIndex);
bytes = pCol->bytes; bytes = pCol->bytes;
type = pCol->type; type = pCol->type;
int16_t tgtype1, tgtype2 = 0;
f1 = tdGetRowDataOfCol(pTable1->tagVal, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset); f1 = tdQueryTagByID(pTable1->tagVal, pCol->colId, &tgtype1);
f2 = tdGetRowDataOfCol(pTable2->tagVal, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset); f2 = tdQueryTagByID(pTable2->tagVal, pCol->colId, &tgtype2);
} }
int32_t ret = doCompare(f1, f2, type, bytes); int32_t ret = doCompare(f1, f2, type, bytes);
...@@ -1843,12 +1843,14 @@ bool indexedNodeFilterFp(const void* pNode, void* param) { ...@@ -1843,12 +1843,14 @@ bool indexedNodeFilterFp(const void* pNode, void* param) {
val = (char*) elem->pTable->name; val = (char*) elem->pTable->name;
type = TSDB_DATA_TYPE_BINARY; type = TSDB_DATA_TYPE_BINARY;
} else { } else {
STSchema* pTSchema = (STSchema*) pInfo->param; // todo table schema is identical to stable schema?? // STSchema* pTSchema = (STSchema*) pInfo->param; // todo table schema is identical to stable schema??
int16_t type;
int32_t offset = pTSchema->columns[pInfo->colIndex].offset; // int32_t offset = pTSchema->columns[pInfo->colIndex].offset;
val = tdGetRowDataOfCol(elem->pTable->tagVal, pInfo->sch.type, TD_DATA_ROW_HEAD_SIZE + offset); // val = tdGetRowDataOfCol(elem->pTable->tagVal, pInfo->sch.type, TD_DATA_ROW_HEAD_SIZE + offset);
} val = tdQueryTagByID(elem->pTable->tagVal, pInfo->sch.colId, &type);
// ASSERT(pInfo->sch.type == type);
}
//todo :the val is possible to be null, so check it out carefully
int32_t ret = 0; int32_t ret = 0;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
if (pInfo->optr == TSDB_RELATION_IN) { if (pInfo->optr == TSDB_RELATION_IN) {
......
...@@ -140,11 +140,13 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe ...@@ -140,11 +140,13 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
char *pTagData = pTable->data + totalCols * sizeof(SSchema); char *pTagData = pTable->data + totalCols * sizeof(SSchema);
int accumBytes = 0; int accumBytes = 0;
dataRow = tdNewDataRowFromSchema(pDestTagSchema); //dataRow = tdNewDataRowFromSchema(pDestTagSchema);
dataRow = tdNewTagRowFromSchema(pDestTagSchema, numOfTags);
for (int i = 0; i < numOfTags; i++) { for (int i = 0; i < numOfTags; i++) {
STColumn *pTCol = schemaColAt(pDestTagSchema, i); STColumn *pTCol = schemaColAt(pDestTagSchema, i);
tdAppendColVal(dataRow, pTagData + accumBytes, pTCol->type, pTCol->bytes, pTCol->offset); // tdAppendColVal(dataRow, pTagData + accumBytes, pTCol->type, pTCol->bytes, pTCol->offset);
tdAppendTagColVal(dataRow, pTagData + accumBytes, pTCol->type, pTCol->bytes, pTCol->colId);
accumBytes += htons(pSchema[i + numOfColumns].bytes); accumBytes += htons(pSchema[i + numOfColumns].bytes);
} }
tsdbTableSetTagValue(&tCfg, dataRow, false); tsdbTableSetTagValue(&tCfg, dataRow, false);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册