未验证 提交 f0b2c37c 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2096 from taosdata/feature/2.0tsdb

Feature/2.0tsdb
......@@ -19,6 +19,7 @@
#include <stdlib.h>
#include <string.h>
#include "talgo.h"
#include "taosdef.h"
#include "tutil.h"
......@@ -26,19 +27,24 @@
extern "C" {
#endif
#define STR_TO_VARSTR(x, str) do {VarDataLenT __len = strlen(str); \
*(VarDataLenT*)(x) = __len; \
strncpy(varDataVal(x), (str), __len);} while(0);
#define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) do {\
char* _e = stpncpy(varDataVal(x), (str), (_maxs));\
varDataSetLen(x, (_e - (x) - VARSTR_HEADER_SIZE));\
} while(0)
#define STR_WITH_SIZE_TO_VARSTR(x, str, _size) do {\
*(VarDataLenT*)(x) = (_size); \
strncpy(varDataVal(x), (str), (_size));\
} while(0);
#define STR_TO_VARSTR(x, str) \
do { \
VarDataLenT __len = strlen(str); \
*(VarDataLenT *)(x) = __len; \
strncpy(varDataVal(x), (str), __len); \
} while (0);
#define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) \
do { \
char *_e = stpncpy(varDataVal(x), (str), (_maxs)); \
varDataSetLen(x, (_e - (x)-VARSTR_HEADER_SIZE)); \
} while (0)
#define STR_WITH_SIZE_TO_VARSTR(x, str, _size) \
do { \
*(VarDataLenT *)(x) = (_size); \
strncpy(varDataVal(x), (str), (_size)); \
} while (0);
// ----------------- TSDB COLUMN DEFINITION
typedef struct {
......@@ -72,15 +78,31 @@ typedef struct {
#define schemaTLen(s) ((s)->tlen)
#define schemaFLen(s) ((s)->flen)
#define schemaColAt(s, i) ((s)->columns + i)
#define tdFreeSchema(s) tfree((s))
STSchema *tdNewSchema(int32_t nCols);
#define tdFreeSchema(s) tfree((s))
int tdSchemaAddCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes);
STSchema *tdDupSchema(STSchema *pSchema);
int tdGetSchemaEncodeSize(STSchema *pSchema);
void * tdEncodeSchema(void *dst, STSchema *pSchema);
STSchema *tdDecodeSchema(void **psrc);
static FORCE_INLINE int comparColId(const void *key1, const void *key2) {
if (*(int16_t *)key1 > ((STColumn *)key2)->colId) {
return 1;
} else if (*(int16_t *)key1 < ((STColumn *)key2)->colId) {
return -1;
} else {
return 0;
}
}
static FORCE_INLINE STColumn *tdGetColOfID(STSchema *pSchema, int16_t colId) {
void *ptr = bsearch(&colId, (void *)pSchema->columns, schemaNCols(pSchema), sizeof(STColumn), comparColId);
if (ptr == NULL) return NULL;
return (STColumn *)ptr;
}
// ----------------- Data row structure
/* A data row, the format is like below:
......@@ -188,12 +210,11 @@ static FORCE_INLINE int32_t dataColGetNEleLen(SDataCol *pDataCol, int rows) {
}
}
typedef struct {
int maxRowSize;
int maxCols; // max number of columns
int maxPoints; // max number of points
int bufSize;
int maxRowSize;
int maxCols; // max number of columns
int maxPoints; // max number of points
int bufSize;
int numOfRows;
int numOfCols; // Total number of cols
......@@ -213,62 +234,102 @@ void tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData);
void tdFreeDataCols(SDataCols *pCols);
void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols);
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!!
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!!
int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge);
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows);
// ----------------- K-V data row structure
/*
* +----------+----------+---------------------------------+---------------------------------+
* | int16_t | int16_t | | |
* +----------+----------+---------------------------------+---------------------------------+
* | len | ncols | cols index | data part |
* +----------+----------+---------------------------------+---------------------------------+
*/
typedef void *SKVRow;
typedef struct {
int16_t colId;
int16_t offset;
} SColIdx;
#define TD_KV_ROW_HEAD_SIZE 2 * sizeof(int16_t)
#define kvRowLen(r) (*(int16_t *)(r))
#define kvRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(int16_t)))
#define kvRowSetLen(r, len) kvRowLen(r) = (len)
#define kvRowSetNCols(r, n) kvRowNCols(r) = (n)
#define kvRowColIdx(r) (SColIdx *)POINTER_SHIFT(r, TD_KV_ROW_HEAD_SIZE)
#define kvRowValues(r) POINTER_SHIFT(r, TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * kvRowNCols(r))
#define kvRowCpy(dst, r) memcpy((dst), (r), kvRowLen(r))
#define kvRowColVal(r, colIdx) POINTER_SHIFT(kvRowValues(r), (colIdx)->offset)
#define kvRowColIdxAt(r, i) (kvRowColIdx(r) + (i))
#define kvRowFree(r) tfree(r)
SKVRow tdKVRowDup(SKVRow row);
SKVRow tdSetKVRowDataOfCol(SKVRow row, int16_t colId, int8_t type, void *value);
void * tdEncodeKVRow(void *buf, SKVRow row);
void * tdDecodeKVRow(void *buf, SKVRow *row);
static FORCE_INLINE int comparTagId(const void *key1, const void *key2) {
if (*(int16_t *)key1 > ((SColIdx *)key2)->colId) {
return 1;
} else if (*(int16_t *)key1 < ((SColIdx *)key2)->colId) {
return -1;
} else {
return 0;
}
}
static FORCE_INLINE void *tdGetKVRowValOfCol(SKVRow row, int16_t colId) {
void *ret = taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_EQ);
if (ret == NULL) return NULL;
return kvRowColVal(row, (SColIdx *)ret);
}
// ----------------- Tag row structure
// ----------------- K-V data row builder
typedef struct {
int16_t tCols;
int16_t nCols;
SColIdx *pColIdx;
int16_t alloc;
int16_t size;
void * buf;
} SKVRowBuilder;
/* A tag row, the format is like below:
+----------+----------------------------------------------------------------+
| STagRow | STagCol | STagCol | STagCol | STagCol | ...| STagCol | STagCol |
+----------+----------------------------------------------------------------+
int tdInitKVRowBuilder(SKVRowBuilder *pBuilder);
void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder);
void tdResetKVRowBuilder(SKVRowBuilder *pBuilder);
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder);
pData
+----------+----------------------------------------------------------------+
| value 1 | value 2 | value 3 | value 4 | ....|value n |
+----------+----------------------------------------------------------------+
static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, void *value) {
ASSERT(pBuilder->nCols == 0 || colId > pBuilder->pColIdx[pBuilder->nCols - 1].colId);
*/
if (pBuilder->nCols >= pBuilder->tCols) {
pBuilder->tCols *= 2;
pBuilder->pColIdx = (SColIdx *)realloc((void *)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols);
if (pBuilder->pColIdx == NULL) return -1;
}
pBuilder->pColIdx[pBuilder->nCols].colId = colId;
pBuilder->pColIdx[pBuilder->nCols].offset = pBuilder->size;
#define TD_TAG_ROW_HEAD_SIZE sizeof(int16_t)
pBuilder->nCols++;
#define tagRowNum(r) (*(int16_t *)(r))
#define tagRowArray(r) POINTER_SHIFT(r, TD_TAG_ROW_HEAD_SIZE)
//#define dataRowKey(r) (*(TSKEY *)(dataRowTuple(r)))
//#define dataRowSetLen(r, l) (dataRowLen(r) = (l))
//#define dataRowCpy(dst, r) memcpy((dst), (r), dataRowLen(r))
//#define dataRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_DATA_ROW_HEAD_SIZE)
int tlen = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type];
if (tlen > pBuilder->alloc - pBuilder->size) {
while (tlen > pBuilder->alloc - pBuilder->size) {
pBuilder->alloc *= 2;
}
pBuilder->buf = realloc(pBuilder->buf, pBuilder->alloc);
if (pBuilder->buf == NULL) return -1;
}
typedef struct {
int16_t colId; // column ID
int16_t colType;
uint16_t offset; //to store value for numeric col or offset for binary/Nchar
} STagCol;
memcpy(POINTER_SHIFT(pBuilder->buf, pBuilder->size), value, tlen);
pBuilder->size += tlen;
typedef struct {
int32_t len;
void * pData; // Space to store the tag value
uint16_t dataLen;
int16_t ncols; // Total columns allocated
STagCol tagCols[];
} STagRow;
#define tagColSize(r) (sizeof(STagCol) + r.colLen)
int tdSetTagCol(SDataRow row, void *value, int16_t len, int8_t type, int16_t colId); //insert tag value and update all the information
int tdDeleteTagCol(SDataRow row, int16_t colId); // delete tag value and update all the information
void * tdQueryTagByID(SDataRow row, int16_t colId, int16_t *type); //if find tag, 0, else return -1;
int tdAppendTagColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int16_t colId);
SDataRow tdTagRowDup(SDataRow row);
void tdFreeTagRow(SDataRow row);
SDataRow tdTagRowDecode(SDataRow row);
int tdTagRowCpy(SDataRow dst, SDataRow src);
void * tdNewTagRowFromSchema(STSchema *pSchema, int16_t numofTags);
STSchema *tdGetSchemaFromData(SDataRow *row);
return 0;
}
#ifdef __cplusplus
}
......
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdataformat.h"
#include "wchar.h"
#include "talgo.h"
#include "wchar.h"
/**
* Create a SSchema object with nCols columns
......@@ -51,13 +51,13 @@ int tdSchemaAddCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes)
if (schemaNCols(pSchema) == 0) {
colSetOffset(pCol, 0);
} else {
STColumn *pTCol = schemaColAt(pSchema, schemaNCols(pSchema)-1);
STColumn *pTCol = schemaColAt(pSchema, schemaNCols(pSchema) - 1);
colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]);
}
switch (type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
colSetBytes(pCol, bytes); // Set as maximum bytes
colSetBytes(pCol, bytes); // Set as maximum bytes
pSchema->tlen += (TYPE_BYTES[type] + sizeof(VarDataLenT) + bytes);
break;
default:
......@@ -152,152 +152,6 @@ SDataRow tdNewDataRowFromSchema(STSchema *pSchema) {
return row;
}
int tdSetTagCol(SDataRow row, void *value, int16_t len, int8_t type, int16_t colId){ //insert/update tag value and update all the information
ASSERT(((STagRow *)row)->pData != NULL);
//STagCol * stCol = tdQueryTagColByID()
return 0;
};
int tdDeleteTagCol(SDataRow row, int16_t colId){ // delete tag value and update all the information
//todo
return 0;
};
static int compTagId(const void *key1, const void *key2) {
if (((STagCol *)key1)->colId > ((STagCol *)key2)->colId) {
return 1;
} else if (((STagCol *)key1)->colId == ((STagCol *)key2)->colId) {
return 0;
} else {
return -1;
}
}
/**
* Find tag structure by colId, if find, return tag structure, else return NULL;
*/
STagCol * tdQueryTagColByID(SDataRow row, int16_t colId, int flags) { //if find tag, 0, else return -1;
ASSERT(((STagRow *)row)->pData != NULL);
STagCol *pBase = ((STagRow *)row)->tagCols;
int16_t nCols = ((STagRow *)row)->ncols;
STagCol key = {colId,0,0};
STagCol * stCol = taosbsearch(&key, pBase, nCols, sizeof(STagCol), compTagId, flags);
return stCol;
};
/**
* Find tag value by colId, if find, return tag value, else return NULL;
*/
void * tdQueryTagByID(SDataRow row, int16_t colId, int16_t *type) {
ASSERT(((STagRow *)row)->pData != NULL);
STagCol *pBase = ((STagRow *)row)->tagCols;
int16_t nCols = ((STagRow *)row)->ncols;
STagCol key = {colId,0,0};
STagCol * stCol = taosbsearch(&key, pBase, nCols, sizeof(STagCol), compTagId, TD_EQ);
if (NULL == stCol) {
type = TSDB_DATA_TYPE_NULL;
return NULL;
}
void * pData = ((STagRow *)row)->pData;
*type = stCol->colType;
return pData + stCol->offset;
};
int tdAppendTagColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int16_t colId){
ASSERT(value != NULL);
//ASSERT(bytes-2 == varDataTLen(value));
ASSERT(row != NULL);
STagRow *pTagrow = row;
pTagrow->tagCols[pTagrow->ncols].colId = colId;
pTagrow->tagCols[pTagrow->ncols].colType = type;
pTagrow->tagCols[pTagrow->ncols].offset = pTagrow->dataLen;
switch (type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
memcpy((char *)pTagrow->pData + pTagrow->dataLen, value, varDataTLen(value));
pTagrow->dataLen += varDataTLen(value);
break;
default:
memcpy((char *)pTagrow->pData + pTagrow->dataLen, value, TYPE_BYTES[type]);
pTagrow->dataLen += TYPE_BYTES[type];
break;
}
pTagrow->ncols++;
return 0;
};
void * tdNewTagRowFromSchema(STSchema *pSchema, int16_t numofTags) {
int32_t size = sizeof(STagRow) + numofTags * sizeof(STagCol);
STagRow *row = malloc(size);
if (row == NULL) return NULL;
int32_t datasize = pSchema->tlen;
row->pData = malloc(datasize);
if (NULL == row->pData) {
free(row);
return NULL;
}
row->len = size;
row->dataLen = 0;
row->ncols = 0;
return row;
}
/**
* free tag row
*/
void tdFreeTagRow(SDataRow row) {
if (row) {
free(((STagRow *)row)->pData);
free(row);
}
}
SDataRow tdTagRowDup(SDataRow row) {
STagRow *trow = malloc(dataRowLen(row));
if (trow == NULL) return NULL;
dataRowCpy(trow, row);
trow->pData = malloc(trow->dataLen);
if (NULL == trow->pData) {
free(trow);
return NULL;
}
memcpy(trow->pData, ((STagRow *)row)->pData, trow->dataLen);
return trow;
}
SDataRow tdTagRowDecode(SDataRow row) {
STagRow *trow = malloc(dataRowLen(row));
if (trow == NULL) return NULL;
dataRowCpy(trow, row);
trow->pData = malloc(trow->dataLen);
if (NULL == trow->pData) {
free(trow);
return NULL;
}
char * pData = (char *)row + dataRowLen(row);
memcpy(trow->pData, pData, trow->dataLen);
return trow;
}
int tdTagRowCpy(SDataRow dst, SDataRow src) {
if (src == NULL) return -1;
dataRowCpy(dst, src);
void * pData = dst + dataRowLen(src);
memcpy(pData, ((STagRow *)src)->pData, ((STagRow *)src)->dataLen);
return 0;
}
/**
* Free the SDataRow object
*/
......@@ -331,7 +185,6 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints)
pDataCol->pData = *pBuf;
*pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize);
}
}
void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints) {
......@@ -415,7 +268,7 @@ void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) {
void dataColSetOffset(SDataCol *pCol, int nEle) {
ASSERT(((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR)));
void * tptr = pCol->pData;
void *tptr = pCol->pData;
// char *tptr = (char *)(pCol->pData);
VarDataOffsetT offset = 0;
......@@ -595,4 +448,131 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCol
(*iter2)++;
}
}
}
SKVRow tdKVRowDup(SKVRow row) {
SKVRow trow = malloc(kvRowLen(row));
if (trow == NULL) return NULL;
kvRowCpy(trow, row);
return trow;
}
SKVRow tdSetKVRowDataOfCol(SKVRow row, int16_t colId, int8_t type, void *value) {
// TODO
return NULL;
// SColIdx *pColIdx = NULL;
// SKVRow rrow = row;
// SKVRow nrow = NULL;
// void *ptr = taosbsearch(&colId, kvDataRowColIdx(row), kvDataRowNCols(row), sizeof(SColIdx), comparTagId, TD_GE);
// if (ptr == NULL || ((SColIdx *)ptr)->colId < colId) { // need to add a column value to the row
// int tlen = kvDataRowLen(row) + sizeof(SColIdx) + (IS_VAR_DATA_TYPE(type) ? varDataTLen(value) :
// TYPE_BYTES[type]); nrow = malloc(tlen); if (nrow == NULL) return NULL;
// kvDataRowSetNCols(nrow, kvDataRowNCols(row)+1);
// kvDataRowSetLen(nrow, tlen);
// if (ptr == NULL) ptr = kvDataRowValues(row);
// // Copy the columns before the col
// if (POINTER_DISTANCE(ptr, kvDataRowColIdx(row)) > 0) {
// memcpy(kvDataRowColIdx(nrow), kvDataRowColIdx(row), POINTER_DISTANCE(ptr, kvDataRowColIdx(row)));
// memcpy(kvDataRowValues(nrow), kvDataRowValues(row), ((SColIdx *)ptr)->offset); // TODO: here is not correct
// }
// // Set the new col value
// pColIdx = (SColIdx *)POINTER_SHIFT(nrow, POINTER_DISTANCE(ptr, row));
// pColIdx->colId = colId;
// pColIdx->offset = ((SColIdx *)ptr)->offset; // TODO: here is not correct
// if (IS_VAR_DATA_TYPE(type)) {
// memcpy(POINTER_SHIFT(kvDataRowValues(nrow), pColIdx->offset), value, varDataLen(value));
// } else {
// memcpy(POINTER_SHIFT(kvDataRowValues(nrow), pColIdx->offset), value, TYPE_BYTES[type]);
// }
// // Copy the columns after the col
// if (POINTER_DISTANCE(kvDataRowValues(row), ptr) > 0) {
// // TODO: memcpy();
// }
// } else {
// // TODO
// ASSERT(((SColIdx *)ptr)->colId == colId);
// if (IS_VAR_DATA_TYPE(type)) {
// void *pOldVal = kvDataRowColVal(row, (SColIdx *)ptr);
// if (varDataTLen(value) == varDataTLen(pOldVal)) { // just update the column value in place
// memcpy(pOldVal, value, varDataTLen(value));
// } else { // enlarge the memory
// // rrow = realloc(rrow, kvDataRowLen(rrow) + varDataTLen(value) - varDataTLen(pOldVal));
// // if (rrow == NULL) return NULL;
// // memmove();
// // for () {
// // ((SColIdx *)ptr)->offset += balabala;
// // }
// // kvDataRowSetLen();
// }
// } else {
// memcpy(kvDataRowColVal(row, (SColIdx *)ptr), value, TYPE_BYTES[type]);
// }
// }
// return rrow;
}
void *tdEncodeKVRow(void *buf, SKVRow row) {
// May change the encode purpose
kvRowCpy(buf, row);
return POINTER_SHIFT(buf, kvRowLen(row));
}
void *tdDecodeKVRow(void *buf, SKVRow *row) {
*row = tdKVRowDup(buf);
return POINTER_SHIFT(buf, kvRowLen(*row));
}
int tdInitKVRowBuilder(SKVRowBuilder *pBuilder) {
pBuilder->tCols = 128;
pBuilder->nCols = 0;
pBuilder->pColIdx = (SColIdx *)malloc(sizeof(SColIdx) * pBuilder->tCols);
if (pBuilder->pColIdx == NULL) return -1;
pBuilder->alloc = 1024;
pBuilder->size = 0;
pBuilder->buf = malloc(pBuilder->alloc);
if (pBuilder->buf == NULL) {
free(pBuilder->pColIdx);
return -1;
}
return 0;
}
void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder) {
tfree(pBuilder->pColIdx);
tfree(pBuilder->buf);
}
void tdResetKVRowBuilder(SKVRowBuilder *pBuilder) {
pBuilder->nCols = 0;
pBuilder->size = 0;
}
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size;
if (tlen == 0) return NULL;
tlen += TD_KV_ROW_HEAD_SIZE;
SKVRow row = malloc(tlen);
if (row == NULL) return NULL;
kvRowSetNCols(row, pBuilder->nCols);
kvRowSetLen(row, tlen);
memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols);
memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size);
return row;
}
\ No newline at end of file
......@@ -52,6 +52,7 @@ typedef struct tstr {
#define varDataCopy(dst, v) memcpy((dst), (void*) (v), varDataTLen(v))
#define varDataLenByData(v) (*(VarDataLenT *)(((char*)(v)) - VARSTR_HEADER_SIZE))
#define varDataSetLen(v, _len) (((VarDataLenT *)(v))[0] = (VarDataLenT) (_len))
#define IS_VAR_DATA_TYPE(t) (((t) == TSDB_DATA_TYPE_BINARY) || ((t) == TSDB_DATA_TYPE_NCHAR))
// this data type is internally used only in 'in' query to hold the values
#define TSDB_DATA_TYPE_ARRAY (TSDB_DATA_TYPE_NCHAR + 1)
......
......@@ -102,14 +102,15 @@ int tsdbInitTableCfg(STableCfg *config, ETableType type, uint64_t uid, int32_t
int tsdbTableSetSuperUid(STableCfg *config, uint64_t uid);
int tsdbTableSetSchema(STableCfg *config, STSchema *pSchema, bool dup);
int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup);
int tsdbTableSetTagValue(STableCfg *config, SDataRow row, bool dup);
int tsdbTableSetTagValue(STableCfg *config, SKVRow row, bool dup);
int tsdbTableSetName(STableCfg *config, char *name, bool dup);
int tsdbTableSetSName(STableCfg *config, char *sname, bool dup);
int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup);
void tsdbClearTableCfg(STableCfg *config);
int32_t tsdbGetTableTagVal(TsdbRepoT *repo, STableId* id, int32_t colId, int16_t *type, int16_t *bytes, char **val);
char* tsdbGetTableName(TsdbRepoT *repo, const STableId* id, int16_t* bytes);
int32_t tsdbGetTableTagVal(TsdbRepoT *repo, STableId *id, int32_t colId, int16_t *type, int16_t *bytes, char **val);
char * tsdbGetTableName(TsdbRepoT *repo, const STableId *id, int16_t *bytes);
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);
int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg);
int tsdbDropTable(TsdbRepoT *pRepo, STableId tableId);
......
......@@ -76,7 +76,7 @@ typedef struct STable {
int32_t sversion;
STSchema * schema;
STSchema * tagSchema;
SDataRow tagVal;
SKVRow tagVal;
SMemTable * mem;
SMemTable * imem;
void * pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index
......
......@@ -94,7 +94,7 @@ static int tsdbInitFile(char *dataDir, int fid, const char *suffix, SFile *pFile
if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) return -1;
void *pBuf = buf;
pBuf = taosDecodeFixed32(pBuf, &version);
pBuf = taosDecodeFixedU32(pBuf, &version);
pBuf = tsdbDecodeSFileInfo(pBuf, &(pFile->info));
tsdbCloseFile(pFile);
......
......@@ -510,11 +510,11 @@ int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup) {
return 0;
}
int tsdbTableSetTagValue(STableCfg *config, SDataRow row, bool dup) {
int tsdbTableSetTagValue(STableCfg *config, SKVRow row, bool dup) {
if (config->type != TSDB_CHILD_TABLE) return -1;
if (dup) {
config->tagValues = tdDataRowDup(row);
config->tagValues = tdKVRowDup(row);
} else {
config->tagValues = row;
}
......@@ -561,7 +561,7 @@ int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup) {
void tsdbClearTableCfg(STableCfg *config) {
if (config->schema) tdFreeSchema(config->schema);
if (config->tagSchema) tdFreeSchema(config->tagSchema);
if (config->tagValues) tdFreeDataRow(config->tagValues);
if (config->tagValues) kvRowFree(config->tagValues);
tfree(config->name);
tfree(config->sname);
tfree(config->sql);
......
......@@ -47,8 +47,7 @@ void tsdbEncodeTable(STable *pTable, char *buf, int *contLen) {
ptr = tdEncodeSchema(ptr, pTable->schema);
ptr = tdEncodeSchema(ptr, pTable->tagSchema);
} else if (pTable->type == TSDB_CHILD_TABLE) {
tdTagRowCpy(ptr, pTable->tagVal);
ptr = POINTER_SHIFT(ptr, dataRowLen(pTable->tagVal) + ((STagRow *)pTable->tagVal)->dataLen);
ptr = tdEncodeKVRow(ptr, pTable->tagVal);
} else {
ptr = tdEncodeSchema(ptr, pTable->schema);
}
......@@ -94,8 +93,7 @@ STable *tsdbDecodeTable(void *cont, int contLen) {
pTable->schema = tdDecodeSchema(&ptr);
pTable->tagSchema = tdDecodeSchema(&ptr);
} else if (pTable->type == TSDB_CHILD_TABLE) {
pTable->tagVal = tdTagRowDecode(ptr);
ptr = POINTER_SHIFT(ptr, dataRowLen(pTable->tagVal) + ((STagRow *)pTable->tagVal)->dataLen);
ptr = tdDecodeKVRow(ptr, &pTable->tagVal);
} else {
pTable->schema = tdDecodeSchema(&ptr);
}
......@@ -115,12 +113,9 @@ void tsdbFreeEncode(void *cont) {
static char* getTagIndexKey(const void* pData) {
STableIndexElem* elem = (STableIndexElem*) pData;
SDataRow row = elem->pTable->tagVal;
STSchema* pSchema = tsdbGetTableTagSchema(elem->pMeta, elem->pTable);
STColumn* pCol = &pSchema->columns[DEFAULT_TAG_INDEX_COLUMN];
int16_t type = 0;
void * res = tdQueryTagByID(row, pCol->colId, &type);
ASSERT(type == pCol->type);
void * res = tdGetKVRowValOfCol(elem->pTable->tagVal, pCol->colId);
return res;
}
......@@ -255,19 +250,24 @@ STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable) {
int32_t tsdbGetTableTagVal(TsdbRepoT* repo, STableId* id, int32_t colId, int16_t* type, int16_t* bytes, char** val) {
STsdbMeta* pMeta = tsdbGetMeta(repo);
STable* pTable = tsdbGetTableByUid(pMeta, id->uid);
STSchema *pSchema = tsdbGetTableTagSchema(pMeta, pTable);
STColumn *pCol = tdGetColOfID(pSchema, colId);
if (pCol == NULL) {
return -1; // No matched tag volumn
}
*val = tdQueryTagByID(pTable->tagVal, colId, type);
*val = tdGetKVRowValOfCol(pTable->tagVal, colId);
*type = pCol->type;
if (*val != NULL) {
switch(*type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: *bytes = varDataLen(*val); break;
case TSDB_DATA_TYPE_NULL: *bytes = 0; break;
default:
*bytes = tDataTypeDesc[*type].nSize;break;
if (IS_VAR_DATA_TYPE(*type)) {
*bytes = varDataLen(*val);
} else {
*bytes = TYPE_BYTES[*type];
}
}
return TSDB_CODE_SUCCESS;
}
......@@ -341,7 +341,7 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) {
if (pCfg->type == TSDB_CHILD_TABLE) {
pTable->superUid = pCfg->superUid;
pTable->tagVal = tdDataRowDup(pCfg->tagValues);
pTable->tagVal = tdKVRowDup(pCfg->tagValues);
} else if (pCfg->type == TSDB_NORMAL_TABLE) {
pTable->superUid = -1;
pTable->schema = tdDupSchema(pCfg->schema);
......@@ -438,6 +438,60 @@ STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId) {
return pTable;
}
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) {
if (pMsg == NULL) return NULL;
SSchema *pSchema = (SSchema *)pMsg->data;
int16_t numOfCols = htons(pMsg->numOfColumns);
int16_t numOfTags = htons(pMsg->numOfTags);
STableCfg *pCfg = (STableCfg *)calloc(1, sizeof(STableCfg));
if (pCfg == NULL) return NULL;
if (tsdbInitTableCfg(pCfg, pMsg->tableType, htobe64(pMsg->uid), htonl(pMsg->sid)) < 0) goto _err;
STSchema *pDSchema = tdNewSchema(numOfCols);
if (pDSchema == NULL) goto _err;
for (int i = 0; i < numOfCols; i++) {
tdSchemaAddCol(pDSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
}
if (tsdbTableSetSchema(pCfg, pDSchema, false) < 0) goto _err;
if (tsdbTableSetName(pCfg, pMsg->tableId, true) < 0) goto _err;
if (numOfTags > 0) {
STSchema *pTSchema = tdNewSchema(numOfTags);
for (int i = numOfCols; i < numOfCols + numOfTags; i++) {
tdSchemaAddCol(pTSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
}
if (tsdbTableSetTagSchema(pCfg, pTSchema, false) < 0) goto _err;
if (tsdbTableSetSName(pCfg, pMsg->superTableId, true) < 0) goto _err;
if (tsdbTableSetSuperUid(pCfg, htobe64(pMsg->superTableUid)) < 0) goto _err;
char * pTagData = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema);
int accBytes = 0;
SKVRowBuilder kvRowBuilder;
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) goto _err;
for (int i = 0; i < numOfTags; i++) {
STColumn *pCol = schemaColAt(pTSchema, i);
tdAddColToKVRow(&kvRowBuilder, pCol->colId, pCol->type, pTagData + accBytes);
accBytes += htons(pSchema[i+numOfCols].bytes);
}
tsdbTableSetTagValue(pCfg, tdGetKVRowFromBuilder(&kvRowBuilder), false);
tdDestroyKVRowBuilder(&kvRowBuilder);
}
if (pMsg->tableType == TSDB_STREAM_TABLE) {
char *sql = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema);
tsdbTableSetStreamSql(pCfg, sql, true);
}
return pCfg;
_err:
tsdbClearTableCfg(pCfg);
tfree(pCfg);
return NULL;
}
// int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId) {
int tsdbDropTable(TsdbRepoT *repo, STableId tableId) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
......@@ -478,7 +532,7 @@ static int tsdbFreeTable(STable *pTable) {
if (pTable == NULL) return 0;
if (pTable->type == TSDB_CHILD_TABLE) {
tdFreeTagRow(pTable->tagVal);
kvRowFree(pTable->tagVal);
} else {
tdFreeSchema(pTable->schema);
}
......@@ -627,9 +681,7 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) {
STSchema* pSchema = tsdbGetTableTagSchema(pMeta, pTable);
STColumn* pCol = &pSchema->columns[DEFAULT_TAG_INDEX_COLUMN];
int16_t tagtype = 0;
char* key = tdQueryTagByID(pTable->tagVal, pCol->colId, &tagtype);
ASSERT(pCol->type == tagtype);
char* key = tdGetKVRowValOfCol(pTable->tagVal, pCol->colId);
SArray* res = tSkipListGet(pSTable->pIndex, key);
size_t size = taosArrayGetSize(res);
......
......@@ -237,20 +237,24 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
if (pHelper->files.headF.fd > 0) {
fsync(pHelper->files.headF.fd);
close(pHelper->files.headF.fd);
pHelper->files.headF.fd = -1;
}
if (pHelper->files.dataF.fd > 0) {
if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.dataF), 0);
fsync(pHelper->files.dataF.fd);
close(pHelper->files.dataF.fd);
pHelper->files.dataF.fd = -1;
}
if (pHelper->files.lastF.fd > 0) {
fsync(pHelper->files.lastF.fd);
close(pHelper->files.lastF.fd);
pHelper->files.lastF.fd = -1;
}
if (pHelper->files.nHeadF.fd > 0) {
if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.nHeadF), 0);
fsync(pHelper->files.nHeadF.fd);
close(pHelper->files.nHeadF.fd);
pHelper->files.nHeadF.fd = -1;
if (hasError) {
......@@ -263,6 +267,7 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
if (pHelper->files.nLastF.fd > 0) {
if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.nLastF), 0);
fsync(pHelper->files.nLastF.fd);
close(pHelper->files.nLastF.fd);
pHelper->files.nLastF.fd = -1;
if (hasError) {
......@@ -448,7 +453,7 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
pHelper->pBuffer = trealloc(pHelper->pBuffer, tsizeof(pHelper->pBuffer)*2);
}
buf = POINTER_SHIFT(pHelper->pBuffer, drift);
buf = taosEncodeVariant32(buf, i);
buf = taosEncodeVariantU32(buf, i);
buf = tsdbEncodeSCompIdx(buf, pCompIdx);
}
}
......@@ -486,7 +491,7 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
void *ptr = pHelper->pBuffer;
while (((char *)ptr - (char *)pHelper->pBuffer) < (pFile->info.len - sizeof(TSCKSUM))) {
uint32_t tid = 0;
if ((ptr = taosDecodeVariant32(ptr, &tid)) == NULL) return -1;
if ((ptr = taosDecodeVariantU32(ptr, &tid)) == NULL) return -1;
ASSERT(tid > 0 && tid < pHelper->config.maxTables);
if ((ptr = tsdbDecodeSCompIdx(ptr, pHelper->pCompIdx + tid)) == NULL) return -1;
......@@ -1248,12 +1253,12 @@ static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey)
}
void *tsdbEncodeSCompIdx(void *buf, SCompIdx *pIdx) {
buf = taosEncodeVariant32(buf, pIdx->len);
buf = taosEncodeVariant32(buf, pIdx->offset);
buf = taosEncodeFixed8(buf, pIdx->hasLast);
buf = taosEncodeVariant32(buf, pIdx->numOfBlocks);
buf = taosEncodeFixed64(buf, pIdx->uid);
buf = taosEncodeFixed64(buf, pIdx->maxKey);
buf = taosEncodeVariantU32(buf, pIdx->len);
buf = taosEncodeVariantU32(buf, pIdx->offset);
buf = taosEncodeFixedU8(buf, pIdx->hasLast);
buf = taosEncodeVariantU32(buf, pIdx->numOfBlocks);
buf = taosEncodeFixedU64(buf, pIdx->uid);
buf = taosEncodeFixedU64(buf, pIdx->maxKey);
return buf;
}
......@@ -1263,15 +1268,15 @@ void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx) {
uint32_t numOfBlocks = 0;
uint64_t value = 0;
if ((buf = taosDecodeVariant32(buf, &(pIdx->len))) == NULL) return NULL;
if ((buf = taosDecodeVariant32(buf, &(pIdx->offset))) == NULL) return NULL;
if ((buf = taosDecodeFixed8(buf, &(hasLast))) == NULL) return NULL;
if ((buf = taosDecodeVariantU32(buf, &(pIdx->len))) == NULL) return NULL;
if ((buf = taosDecodeVariantU32(buf, &(pIdx->offset))) == NULL) return NULL;
if ((buf = taosDecodeFixedU8(buf, &(hasLast))) == NULL) return NULL;
pIdx->hasLast = hasLast;
if ((buf = taosDecodeVariant32(buf, &(numOfBlocks))) == NULL) return NULL;
if ((buf = taosDecodeVariantU32(buf, &(numOfBlocks))) == NULL) return NULL;
pIdx->numOfBlocks = numOfBlocks;
if ((buf = taosDecodeFixed64(buf, &value)) == NULL) return NULL;
if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL;
pIdx->uid = (int64_t)value;
if ((buf = taosDecodeFixed64(buf, &value)) == NULL) return NULL;
if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL;
pIdx->maxKey = (TSKEY)value;
return buf;
......@@ -1281,7 +1286,7 @@ int tsdbUpdateFileHeader(SFile *pFile, uint32_t version) {
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
void *pBuf = (void *)buf;
pBuf = taosEncodeFixed32(pBuf, version);
pBuf = taosEncodeFixedU32(pBuf, version);
pBuf = tsdbEncodeSFileInfo(pBuf, &(pFile->info));
taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE);
......@@ -1295,23 +1300,23 @@ int tsdbUpdateFileHeader(SFile *pFile, uint32_t version) {
void *tsdbEncodeSFileInfo(void *buf, const STsdbFileInfo *pInfo) {
buf = taosEncodeFixed32(buf, pInfo->offset);
buf = taosEncodeFixed32(buf, pInfo->len);
buf = taosEncodeFixed64(buf, pInfo->size);
buf = taosEncodeFixed64(buf, pInfo->tombSize);
buf = taosEncodeFixed32(buf, pInfo->totalBlocks);
buf = taosEncodeFixed32(buf, pInfo->totalSubBlocks);
buf = taosEncodeFixedU32(buf, pInfo->offset);
buf = taosEncodeFixedU32(buf, pInfo->len);
buf = taosEncodeFixedU64(buf, pInfo->size);
buf = taosEncodeFixedU64(buf, pInfo->tombSize);
buf = taosEncodeFixedU32(buf, pInfo->totalBlocks);
buf = taosEncodeFixedU32(buf, pInfo->totalSubBlocks);
return buf;
}
void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo) {
buf = taosDecodeFixed32(buf, &(pInfo->offset));
buf = taosDecodeFixed32(buf, &(pInfo->len));
buf = taosDecodeFixed64(buf, &(pInfo->size));
buf = taosDecodeFixed64(buf, &(pInfo->tombSize));
buf = taosDecodeFixed32(buf, &(pInfo->totalBlocks));
buf = taosDecodeFixed32(buf, &(pInfo->totalSubBlocks));
buf = taosDecodeFixedU32(buf, &(pInfo->offset));
buf = taosDecodeFixedU32(buf, &(pInfo->len));
buf = taosDecodeFixedU64(buf, &(pInfo->size));
buf = taosDecodeFixedU64(buf, &(pInfo->tombSize));
buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks));
buf = taosDecodeFixedU32(buf, &(pInfo->totalSubBlocks));
return buf;
}
\ No newline at end of file
......@@ -1909,9 +1909,8 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
STColumn* pCol = schemaColAt(pTableGroupSupp->pTagSchema, colIndex);
bytes = pCol->bytes;
type = pCol->type;
int16_t tgtype1, tgtype2 = 0;
f1 = tdQueryTagByID(pTable1->tagVal, pCol->colId, &tgtype1);
f2 = tdQueryTagByID(pTable2->tagVal, pCol->colId, &tgtype2);
f1 = tdGetKVRowValOfCol(pTable1->tagVal, pCol->colId);
f2 = tdGetKVRowValOfCol(pTable2->tagVal, pCol->colId);
}
int32_t ret = doCompare(f1, f2, type, bytes);
......@@ -1999,9 +1998,7 @@ bool indexedNodeFilterFp(const void* pNode, void* param) {
val = (char*) elem->pTable->name;
type = TSDB_DATA_TYPE_BINARY;
} else {
int16_t t1;
val = tdQueryTagByID(elem->pTable->tagVal, pInfo->sch.colId, &t1);
assert(pInfo->sch.type == t1);
val = tdGetKVRowValOfCol(elem->pTable->tagVal, pInfo->sch.colId);
}
//todo :the val is possible to be null, so check it out carefully
......
......@@ -29,12 +29,33 @@ extern "C" {
static const int32_t TNUMBER = 1;
#define IS_LITTLE_ENDIAN() (*(uint8_t *)(&TNUMBER) != 0)
static FORCE_INLINE void *taosEncodeFixed8(void *buf, uint8_t value) {
#define ZIGZAGE(T, v) ((u##T)((v) >> (sizeof(T) * 8 - 1))) ^ (((u##T)(v)) << 1) // zigzag encode
#define ZIGZAGD(T, v) ((v) >> 1) ^ -((T)((v)&1)) // zigzag decode
// ---- Fixed U8
static FORCE_INLINE void *taosEncodeFixedU8(void *buf, uint8_t value) {
((uint8_t *)buf)[0] = value;
return POINTER_SHIFT(buf, sizeof(value));
}
static FORCE_INLINE void *taosEncodeFixed16(void *buf, uint16_t value) {
static FORCE_INLINE void *taosDecodeFixedU8(void *buf, uint8_t *value) {
*value = ((uint8_t *)buf)[0];
return POINTER_SHIFT(buf, sizeof(*value));
}
// ---- Fixed I8
static FORCE_INLINE void *taosEncodeFixedI8(void *buf, int8_t value) {
((int8_t *)buf)[0] = value;
return POINTER_SHIFT(buf, sizeof(value));
}
static FORCE_INLINE void *taosDecodeFixedI8(void *buf, int8_t *value) {
*value = ((int8_t *)buf)[0];
return POINTER_SHIFT(buf, sizeof(*value));
}
// ---- Fixed U16
static FORCE_INLINE void *taosEncodeFixedU16(void *buf, uint16_t value) {
if (IS_LITTLE_ENDIAN()) {
memcpy(buf, &value, sizeof(value));
} else {
......@@ -45,20 +66,31 @@ static FORCE_INLINE void *taosEncodeFixed16(void *buf, uint16_t value) {
return POINTER_SHIFT(buf, sizeof(value));
}
static FORCE_INLINE void *taosEncodeFixed32(void *buf, uint32_t value) {
static FORCE_INLINE void *taosDecodeFixedU16(void *buf, uint16_t *value) {
if (IS_LITTLE_ENDIAN()) {
memcpy(buf, &value, sizeof(value));
memcpy(value, buf, sizeof(*value));
} else {
((uint8_t *)buf)[0] = value & 0xff;
((uint8_t *)buf)[1] = (value >> 8) & 0xff;
((uint8_t *)buf)[2] = (value >> 16) & 0xff;
((uint8_t *)buf)[3] = (value >> 24) & 0xff;
((uint8_t *)value)[1] = ((uint8_t *)buf)[0];
((uint8_t *)value)[0] = ((uint8_t *)buf)[1];
}
return POINTER_SHIFT(buf, sizeof(value));
return POINTER_SHIFT(buf, sizeof(*value));
}
// ---- Fixed I16
static FORCE_INLINE void *taosEncodeFixedI16(void *buf, int16_t value) {
return taosEncodeFixedU16(buf, ZIGZAGE(int16_t, value));
}
static FORCE_INLINE void *taosEncodeFixed64(void *buf, uint64_t value) {
static FORCE_INLINE void *taosDecodeFixedI16(void *buf, int16_t *value) {
uint16_t tvalue = 0;
void * ret = taosDecodeFixedU16(buf, &tvalue);
*value = ZIGZAGD(int16_t, tvalue);
return ret;
}
// ---- Fixed U32
static FORCE_INLINE void *taosEncodeFixedU32(void *buf, uint32_t value) {
if (IS_LITTLE_ENDIAN()) {
memcpy(buf, &value, sizeof(value));
} else {
......@@ -66,45 +98,55 @@ static FORCE_INLINE void *taosEncodeFixed64(void *buf, uint64_t value) {
((uint8_t *)buf)[1] = (value >> 8) & 0xff;
((uint8_t *)buf)[2] = (value >> 16) & 0xff;
((uint8_t *)buf)[3] = (value >> 24) & 0xff;
((uint8_t *)buf)[4] = (value >> 32) & 0xff;
((uint8_t *)buf)[5] = (value >> 40) & 0xff;
((uint8_t *)buf)[6] = (value >> 48) & 0xff;
((uint8_t *)buf)[7] = (value >> 56) & 0xff;
}
return POINTER_SHIFT(buf, sizeof(value));
}
static FORCE_INLINE void *taosDecodeFixed8(void *buf, uint8_t *value) {
*value = ((uint8_t *)buf)[0];
return POINTER_SHIFT(buf, sizeof(*value));
}
static FORCE_INLINE void *taosDecodeFixed16(void *buf, uint16_t *value) {
static FORCE_INLINE void *taosDecodeFixedU32(void *buf, uint32_t *value) {
if (IS_LITTLE_ENDIAN()) {
memcpy(value, buf, sizeof(*value));
} else {
((uint8_t *)value)[1] = ((uint8_t *)buf)[0];
((uint8_t *)value)[0] = ((uint8_t *)buf)[1];
((uint8_t *)value)[3] = ((uint8_t *)buf)[0];
((uint8_t *)value)[2] = ((uint8_t *)buf)[1];
((uint8_t *)value)[1] = ((uint8_t *)buf)[2];
((uint8_t *)value)[0] = ((uint8_t *)buf)[3];
}
return POINTER_SHIFT(buf, sizeof(*value));
}
static FORCE_INLINE void *taosDecodeFixed32(void *buf, uint32_t *value) {
// ---- Fixed I32
static FORCE_INLINE void *taosEncodeFixedI32(void *buf, int32_t value) {
return taosEncodeFixedU32(buf, ZIGZAGE(int32_t, value));
}
static FORCE_INLINE void *taosDecodeFixedI32(void *buf, int32_t *value) {
uint32_t tvalue = 0;
void * ret = taosDecodeFixedU32(buf, &tvalue);
*value = ZIGZAGD(int32_t, tvalue);
return ret;
}
// ---- Fixed U64
static FORCE_INLINE void *taosEncodeFixedU64(void *buf, uint64_t value) {
if (IS_LITTLE_ENDIAN()) {
memcpy(value, buf, sizeof(*value));
memcpy(buf, &value, sizeof(value));
} else {
((uint8_t *)value)[3] = ((uint8_t *)buf)[0];
((uint8_t *)value)[2] = ((uint8_t *)buf)[1];
((uint8_t *)value)[1] = ((uint8_t *)buf)[2];
((uint8_t *)value)[0] = ((uint8_t *)buf)[3];
((uint8_t *)buf)[0] = value & 0xff;
((uint8_t *)buf)[1] = (value >> 8) & 0xff;
((uint8_t *)buf)[2] = (value >> 16) & 0xff;
((uint8_t *)buf)[3] = (value >> 24) & 0xff;
((uint8_t *)buf)[4] = (value >> 32) & 0xff;
((uint8_t *)buf)[5] = (value >> 40) & 0xff;
((uint8_t *)buf)[6] = (value >> 48) & 0xff;
((uint8_t *)buf)[7] = (value >> 56) & 0xff;
}
return POINTER_SHIFT(buf, sizeof(*value));
return POINTER_SHIFT(buf, sizeof(value));
}
static FORCE_INLINE void *taosDecodeFixed64(void *buf, uint64_t *value) {
static FORCE_INLINE void *taosDecodeFixedU64(void *buf, uint64_t *value) {
if (IS_LITTLE_ENDIAN()) {
memcpy(value, buf, sizeof(*value));
} else {
......@@ -121,41 +163,26 @@ static FORCE_INLINE void *taosDecodeFixed64(void *buf, uint64_t *value) {
return POINTER_SHIFT(buf, sizeof(*value));
}
static FORCE_INLINE void *taosEncodeVariant16(void *buf, uint16_t value) {
int i = 0;
while (value >= ENCODE_LIMIT) {
((uint8_t *)buf)[i] = (value | ENCODE_LIMIT);
value >>= 7;
i++;
ASSERT(i < 3);
}
((uint8_t *)buf)[i] = value;
return POINTER_SHIFT(buf, i+1);
// ---- Fixed I64
static FORCE_INLINE void *taosEncodeFixedI64(void *buf, int64_t value) {
return taosEncodeFixedU64(buf, ZIGZAGE(int64_t, value));
}
static FORCE_INLINE void *taosEncodeVariant32(void *buf, uint32_t value) {
int i = 0;
while (value >= ENCODE_LIMIT) {
((uint8_t *)buf)[i] = (value | ENCODE_LIMIT);
value >>= 7;
i++;
ASSERT(i < 5);
}
((uint8_t *)buf)[i] = value;
return POINTER_SHIFT(buf, i + 1);
static FORCE_INLINE void *taosDecodeFixedI64(void *buf, int64_t *value) {
uint64_t tvalue = 0;
void * ret = taosDecodeFixedU64(buf, &tvalue);
*value = ZIGZAGD(int64_t, tvalue);
return ret;
}
static FORCE_INLINE void *taosEncodeVariant64(void *buf, uint64_t value) {
// ---- Variant U16
static FORCE_INLINE void *taosEncodeVariantU16(void *buf, uint16_t value) {
int i = 0;
while (value >= ENCODE_LIMIT) {
((uint8_t *)buf)[i] = (value | ENCODE_LIMIT);
value >>= 7;
i++;
ASSERT(i < 10);
ASSERT(i < 3);
}
((uint8_t *)buf)[i] = value;
......@@ -163,8 +190,8 @@ static FORCE_INLINE void *taosEncodeVariant64(void *buf, uint64_t value) {
return POINTER_SHIFT(buf, i + 1);
}
static FORCE_INLINE void *taosDecodeVariant16(void *buf, uint16_t *value) {
int i = 0;
static FORCE_INLINE void *taosDecodeVariantU16(void *buf, uint16_t *value) {
int i = 0;
uint16_t tval = 0;
*value = 0;
while (i < 3) {
......@@ -181,8 +208,35 @@ static FORCE_INLINE void *taosDecodeVariant16(void *buf, uint16_t *value) {
return NULL; // error happened
}
static FORCE_INLINE void *taosDecodeVariant32(void *buf, uint32_t *value) {
// ---- Variant I16
static FORCE_INLINE void *taosEncodeVariantI16(void *buf, int16_t value) {
return taosEncodeVariantU16(buf, ZIGZAGE(int16_t, value));
}
static FORCE_INLINE void *taosDecodeVariantI16(void *buf, int16_t *value) {
uint16_t tvalue = 0;
void * ret = taosDecodeVariantU16(buf, &tvalue);
*value = ZIGZAGD(int16_t, tvalue);
return ret;
}
// ---- Variant U32
static FORCE_INLINE void *taosEncodeVariantU32(void *buf, uint32_t value) {
int i = 0;
while (value >= ENCODE_LIMIT) {
((uint8_t *)buf)[i] = (value | ENCODE_LIMIT);
value >>= 7;
i++;
ASSERT(i < 5);
}
((uint8_t *)buf)[i] = value;
return POINTER_SHIFT(buf, i + 1);
}
static FORCE_INLINE void *taosDecodeVariantU32(void *buf, uint32_t *value) {
int i = 0;
uint32_t tval = 0;
*value = 0;
while (i < 5) {
......@@ -199,8 +253,35 @@ static FORCE_INLINE void *taosDecodeVariant32(void *buf, uint32_t *value) {
return NULL; // error happened
}
static FORCE_INLINE void *taosDecodeVariant64(void *buf, uint64_t *value) {
// ---- Variant I32
static FORCE_INLINE void *taosEncodeVariantI32(void *buf, int32_t value) {
return taosEncodeVariantU32(buf, ZIGZAGE(int32_t, value));
}
static FORCE_INLINE void *taosDecodeVariantI32(void *buf, int32_t *value) {
uint32_t tvalue = 0;
void * ret = taosDecodeVariantU32(buf, &tvalue);
*value = ZIGZAGD(int32_t, tvalue);
return ret;
}
// ---- Variant U64
static FORCE_INLINE void *taosEncodeVariantU64(void *buf, uint64_t value) {
int i = 0;
while (value >= ENCODE_LIMIT) {
((uint8_t *)buf)[i] = (value | ENCODE_LIMIT);
value >>= 7;
i++;
ASSERT(i < 10);
}
((uint8_t *)buf)[i] = value;
return POINTER_SHIFT(buf, i + 1);
}
static FORCE_INLINE void *taosDecodeVariantU64(void *buf, uint64_t *value) {
int i = 0;
uint64_t tval = 0;
*value = 0;
while (i < 10) {
......@@ -217,10 +298,23 @@ static FORCE_INLINE void *taosDecodeVariant64(void *buf, uint64_t *value) {
return NULL; // error happened
}
// ---- Variant I64
static FORCE_INLINE void *taosEncodeVariantI64(void *buf, int64_t value) {
return taosEncodeVariantU64(buf, ZIGZAGE(int64_t, value));
}
static FORCE_INLINE void *taosDecodeVariantI64(void *buf, int64_t *value) {
uint64_t tvalue = 0;
void * ret = taosDecodeVariantU64(buf, &tvalue);
*value = ZIGZAGD(int64_t, tvalue);
return ret;
}
// ---- string
static FORCE_INLINE void *taosEncodeString(void *buf, char *value) {
size_t size = strlen(value);
buf = taosEncodeVariant64(buf, size);
buf = taosEncodeVariantU64(buf, size);
memcpy(buf, value, size);
return POINTER_SHIFT(buf, size);
......@@ -229,7 +323,7 @@ static FORCE_INLINE void *taosEncodeString(void *buf, char *value) {
static FORCE_INLINE void *taosDecodeString(void *buf, char **value) {
uint64_t size = 0;
buf = taosDecodeVariant64(buf, &size);
buf = taosDecodeVariantU64(buf, &size);
*value = (char *)malloc(size + 1);
if (*value == NULL) return NULL;
memcpy(*value, buf, size);
......
......@@ -9,8 +9,18 @@ static bool test_fixed_uint16(uint16_t value) {
char buf[20] = "\0";
uint16_t value_check = 0;
void *ptr1 = taosEncodeFixed16(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeFixed16(static_cast<void *>(buf), &value_check);
void *ptr1 = taosEncodeFixedU16(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeFixedU16(static_cast<void *>(buf), &value_check);
return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2));
}
static bool test_fixed_int16(int16_t value) {
char buf[20] = "\0";
int16_t value_check = 0;
void *ptr1 = taosEncodeFixedI16(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeFixedI16(static_cast<void *>(buf), &value_check);
return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2));
}
......@@ -19,8 +29,18 @@ static bool test_fixed_uint32(uint32_t value) {
char buf[20] = "\0";
uint32_t value_check = 0;
void *ptr1 = taosEncodeFixed32(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeFixed32(static_cast<void *>(buf), &value_check);
void *ptr1 = taosEncodeFixedU32(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeFixedU32(static_cast<void *>(buf), &value_check);
return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2));
}
static bool test_fixed_int32(int32_t value) {
char buf[20] = "\0";
int32_t value_check = 0;
void *ptr1 = taosEncodeFixedI32(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeFixedI32(static_cast<void *>(buf), &value_check);
return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2));
}
......@@ -29,8 +49,18 @@ static bool test_fixed_uint64(uint64_t value) {
char buf[20] = "\0";
uint64_t value_check = 0;
void *ptr1 = taosEncodeFixed64(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeFixed64(static_cast<void *>(buf), &value_check);
void *ptr1 = taosEncodeFixedU64(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeFixedU64(static_cast<void *>(buf), &value_check);
return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2));
}
static bool test_fixed_int64(int64_t value) {
char buf[20] = "\0";
int64_t value_check = 0;
void *ptr1 = taosEncodeFixedI64(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeFixedI64(static_cast<void *>(buf), &value_check);
return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2));
}
......@@ -39,8 +69,18 @@ static bool test_variant_uint16(uint16_t value) {
char buf[20] = "\0";
uint16_t value_check = 0;
void *ptr1 = taosEncodeVariant16(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeVariant16(static_cast<void *>(buf), &value_check);
void *ptr1 = taosEncodeVariantU16(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeVariantU16(static_cast<void *>(buf), &value_check);
return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2));
}
static bool test_variant_int16(int16_t value) {
char buf[20] = "\0";
int16_t value_check = 0;
void *ptr1 = taosEncodeVariantI16(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeVariantI16(static_cast<void *>(buf), &value_check);
return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2));
}
......@@ -49,8 +89,18 @@ static bool test_variant_uint32(uint32_t value) {
char buf[20] = "\0";
uint32_t value_check = 0;
void *ptr1 = taosEncodeVariant32(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeVariant32(static_cast<void *>(buf), &value_check);
void *ptr1 = taosEncodeVariantU32(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeVariantU32(static_cast<void *>(buf), &value_check);
return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2));
}
static bool test_variant_int32(int32_t value) {
char buf[20] = "\0";
int32_t value_check = 0;
void *ptr1 = taosEncodeVariantI32(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeVariantI32(static_cast<void *>(buf), &value_check);
return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2));
}
......@@ -59,8 +109,18 @@ static bool test_variant_uint64(uint64_t value) {
char buf[20] = "\0";
uint64_t value_check = 0;
void *ptr1 = taosEncodeVariant64(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeVariant64(static_cast<void *>(buf), &value_check);
void *ptr1 = taosEncodeVariantU64(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeVariantU64(static_cast<void *>(buf), &value_check);
return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2));
}
static bool test_variant_int64(int64_t value) {
char buf[20] = "\0";
int64_t value_check = 0;
void *ptr1 = taosEncodeVariantI64(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeVariantI64(static_cast<void *>(buf), &value_check);
return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2));
}
......@@ -68,49 +128,111 @@ static bool test_variant_uint64(uint64_t value) {
TEST(codingTest, fixed_encode_decode) {
srand(time(0));
// uint16_t
for (uint16_t value = 0; value <= UINT16_MAX; value++) {
ASSERT_TRUE(test_fixed_uint16(value));
if (value == UINT16_MAX) break;
}
// int16_t
for (int16_t value = INT16_MIN; value <= INT16_MAX; value++) {
ASSERT_TRUE(test_fixed_int16(value));
if (value == INT16_MAX) break;
}
std::mt19937 gen32(std::random_device{}());
// uint32_t
ASSERT_TRUE(test_fixed_uint32(0));
ASSERT_TRUE(test_fixed_uint32(UINT32_MAX));
std::uniform_int_distribution<uint32_t> distr1(0, UINT32_MAX);
for (int i = 0; i < 1000000; i++) {
ASSERT_TRUE(test_fixed_uint32(rand()));
ASSERT_TRUE(test_fixed_uint32(distr1(gen32)));
}
std::mt19937_64 gen (std::random_device{}());
// int32_t
ASSERT_TRUE(test_fixed_int32(INT32_MIN));
ASSERT_TRUE(test_fixed_int32(INT32_MAX));
std::uniform_int_distribution<int32_t> distr2(INT32_MIN, INT32_MAX);
for (int i = 0; i < 1000000; i++) {
ASSERT_TRUE(test_fixed_int32(distr2(gen32)));
}
std::mt19937_64 gen64(std::random_device{}());
// uint64_t
std::uniform_int_distribution<uint64_t> distr3(0, UINT64_MAX);
ASSERT_TRUE(test_fixed_uint64(0));
ASSERT_TRUE(test_fixed_uint64(UINT64_MAX));
for (int i = 0; i < 1000000; i++) {
ASSERT_TRUE(test_fixed_uint64(gen()));
ASSERT_TRUE(test_fixed_uint64(distr3(gen64)));
}
// int64_t
std::uniform_int_distribution<int64_t> distr4(INT64_MIN, INT64_MAX);
ASSERT_TRUE(test_fixed_int64(INT64_MIN));
ASSERT_TRUE(test_fixed_int64(INT64_MAX));
for (int i = 0; i < 1000000; i++) {
ASSERT_TRUE(test_fixed_int64(distr4(gen64)));
}
}
TEST(codingTest, variant_encode_decode) {
srand(time(0));
// uint16_t
for (uint16_t value = 0; value <= UINT16_MAX; value++) {
ASSERT_TRUE(test_variant_uint16(value));
if (value == UINT16_MAX) break;
}
// int16_t
for (int16_t value = INT16_MIN; value <= INT16_MAX; value++) {
ASSERT_TRUE(test_variant_int16(value));
if (value == INT16_MAX) break;
}
std::mt19937 gen32(std::random_device{}());
// uint32_t
std::uniform_int_distribution<uint32_t> distr1(0, UINT32_MAX);
ASSERT_TRUE(test_variant_uint32(0));
ASSERT_TRUE(test_variant_uint32(UINT32_MAX));
for (int i = 0; i < 5000000; i++) {
ASSERT_TRUE(test_variant_uint32(rand()));
ASSERT_TRUE(test_variant_uint32(distr1(gen32)));
}
// int32_t
std::uniform_int_distribution<int32_t> distr2(INT32_MIN, INT32_MAX);
ASSERT_TRUE(test_variant_int32(INT32_MIN));
ASSERT_TRUE(test_variant_int32(INT32_MAX));
for (int i = 0; i < 5000000; i++) {
ASSERT_TRUE(test_variant_int32(distr2(gen32)));
}
std::mt19937_64 gen (std::random_device{}());
std::mt19937_64 gen64(std::random_device{}());
// uint64_t
std::uniform_int_distribution<uint64_t> distr3(0, UINT64_MAX);
ASSERT_TRUE(test_variant_uint64(0));
ASSERT_TRUE(test_variant_uint64(UINT64_MAX));
for (int i = 0; i < 5000000; i++) {
uint64_t value = gen();
// uint64_t value = gen();
// printf("%ull\n", value);
ASSERT_TRUE(test_variant_uint64(distr3(gen64)));
}
// int64_t
std::uniform_int_distribution<int64_t> distr4(INT64_MIN, INT64_MAX);
ASSERT_TRUE(test_variant_int64(INT64_MIN));
ASSERT_TRUE(test_variant_int64(INT64_MAX));
for (int i = 0; i < 5000000; i++) {
// uint64_t value = gen();
// printf("%ull\n", value);
ASSERT_TRUE(test_variant_uint64(value));
ASSERT_TRUE(test_variant_int64(distr4(gen64)));
}
}
\ No newline at end of file
......@@ -104,65 +104,14 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR
}
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
SMDCreateTableMsg *pTable = pCont;
int32_t code = 0;
vTrace("vgId:%d, table:%s, start to create", pVnode->vgId, pTable->tableId);
int16_t numOfColumns = htons(pTable->numOfColumns);
int16_t numOfTags = htons(pTable->numOfTags);
int32_t sid = htonl(pTable->sid);
uint64_t uid = htobe64(pTable->uid);
SSchema * pSchema = (SSchema *)pTable->data;
STSchema *pDestTagSchema = NULL;
SDataRow dataRow = NULL;
int32_t totalCols = numOfColumns + numOfTags;
STableCfg tCfg;
tsdbInitTableCfg(&tCfg, pTable->tableType, uid, sid);
STableCfg *pCfg = tsdbCreateTableCfgFromMsg((SMDCreateTableMsg *)pCont);
if (pCfg == NULL) return terrno;
int32_t code = tsdbCreateTable(pVnode->tsdb, pCfg);
STSchema *pDestSchema = tdNewSchema(numOfColumns);
for (int i = 0; i < numOfColumns; i++) {
tdSchemaAddCol(pDestSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
}
tsdbTableSetSchema(&tCfg, pDestSchema, false);
tsdbTableSetName(&tCfg, pTable->tableId, false);
if (numOfTags != 0) {
pDestTagSchema = tdNewSchema(numOfTags);
for (int i = numOfColumns; i < totalCols; i++) {
tdSchemaAddCol(pDestTagSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
}
tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false);
tsdbTableSetSName(&tCfg, pTable->superTableId, false);
tsdbTableSetSuperUid(&tCfg, htobe64(pTable->superTableUid));
char *pTagData = pTable->data + totalCols * sizeof(SSchema);
int accumBytes = 0;
dataRow = tdNewTagRowFromSchema(pDestTagSchema, numOfTags);
for (int i = 0; i < numOfTags; i++) {
STColumn *pTCol = schemaColAt(pDestTagSchema, i);
tdAppendTagColVal(dataRow, pTagData + accumBytes, pTCol->type, pTCol->bytes, pTCol->colId);
accumBytes += htons(pSchema[i + numOfColumns].bytes);
}
tsdbTableSetTagValue(&tCfg, dataRow, false);
}
// only normal has sql string
if (pTable->tableType == TSDB_STREAM_TABLE) {
char *sql = pTable->data + totalCols * sizeof(SSchema);
vTrace("vgId:%d, table:%s is creating, sql:%s", pVnode->vgId, pTable->tableId, sql);
tsdbTableSetStreamSql(&tCfg, sql, false);
}
code = tsdbCreateTable(pVnode->tsdb, &tCfg);
tdFreeDataRow(dataRow);
tfree(pDestTagSchema);
tfree(pDestSchema);
vTrace("vgId:%d, table:%s is created, result:%x", pVnode->vgId, pTable->tableId, code);
return code;
tsdbClearTableCfg(pCfg);
free(pCfg);
return code;
}
static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册