提交 adf04528 编写于 作者: H Haojun Liao

Merge branch 'develop' into feature/query

......@@ -177,6 +177,7 @@ matrix:
cd ${TRAVIS_BUILD_DIR}
lcov -d . --capture --rc lcov_branch_coverage=1 -o coverage.info
lcov --remove coverage.info '*tests*' '*deps*' -o coverage.info
lcov -l --rc lcov_branch_coverage=1 coverage.info || travis_terminate $?
gem install coveralls-lcov
......
......@@ -44,7 +44,7 @@ sudo apt-get install maven
Build TDengine:
```
mkdir build && cd build
mkdir debug && cd debug
cmake .. && cmake --build .
```
......
......@@ -19,6 +19,7 @@
#include <stdlib.h>
#include <string.h>
#include "talgo.h"
#include "taosdef.h"
#include "tutil.h"
......@@ -26,19 +27,24 @@
extern "C" {
#endif
#define STR_TO_VARSTR(x, str) do {VarDataLenT __len = strlen(str); \
*(VarDataLenT*)(x) = __len; \
strncpy(varDataVal(x), (str), __len);} while(0);
#define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) do {\
char* _e = stpncpy(varDataVal(x), (str), (_maxs));\
varDataSetLen(x, (_e - (x) - VARSTR_HEADER_SIZE));\
} while(0)
#define STR_WITH_SIZE_TO_VARSTR(x, str, _size) do {\
*(VarDataLenT*)(x) = (_size); \
strncpy(varDataVal(x), (str), (_size));\
} while(0);
#define STR_TO_VARSTR(x, str) \
do { \
VarDataLenT __len = strlen(str); \
*(VarDataLenT *)(x) = __len; \
strncpy(varDataVal(x), (str), __len); \
} while (0);
#define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) \
do { \
char *_e = stpncpy(varDataVal(x), (str), (_maxs)); \
varDataSetLen(x, (_e - (x)-VARSTR_HEADER_SIZE)); \
} while (0)
#define STR_WITH_SIZE_TO_VARSTR(x, str, _size) \
do { \
*(VarDataLenT *)(x) = (_size); \
strncpy(varDataVal(x), (str), (_size)); \
} while (0);
// ----------------- TSDB COLUMN DEFINITION
typedef struct {
......@@ -60,7 +66,7 @@ typedef struct {
// ----------------- TSDB SCHEMA DEFINITION
typedef struct {
int totalCols; // Total columns allocated
int version; // version
int numOfCols; // Number of columns appended
int tlen; // maximum length of a SDataRow without the header part
int flen; // First part length in a SDataRow after the header part
......@@ -68,19 +74,48 @@ typedef struct {
} STSchema;
#define schemaNCols(s) ((s)->numOfCols)
#define schemaTotalCols(s) ((s)->totalCols)
#define schemaVersion(s) ((s)->version)
#define schemaTLen(s) ((s)->tlen)
#define schemaFLen(s) ((s)->flen)
#define schemaColAt(s, i) ((s)->columns + i)
#define tdFreeSchema(s) tfree((s))
STSchema *tdNewSchema(int32_t nCols);
#define tdFreeSchema(s) tfree((s))
int tdSchemaAddCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes);
STSchema *tdDupSchema(STSchema *pSchema);
int tdGetSchemaEncodeSize(STSchema *pSchema);
void * tdEncodeSchema(void *dst, STSchema *pSchema);
STSchema *tdDecodeSchema(void **psrc);
static FORCE_INLINE int comparColId(const void *key1, const void *key2) {
if (*(int16_t *)key1 > ((STColumn *)key2)->colId) {
return 1;
} else if (*(int16_t *)key1 < ((STColumn *)key2)->colId) {
return -1;
} else {
return 0;
}
}
static FORCE_INLINE STColumn *tdGetColOfID(STSchema *pSchema, int16_t colId) {
void *ptr = bsearch(&colId, (void *)pSchema->columns, schemaNCols(pSchema), sizeof(STColumn), comparColId);
if (ptr == NULL) return NULL;
return (STColumn *)ptr;
}
// ----------------- SCHEMA BUILDER DEFINITION
typedef struct {
int tCols;
int nCols;
int tlen;
int flen;
int version;
STColumn *columns;
} STSchemaBuilder;
int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version);
void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder);
void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version);
int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int32_t bytes);
STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder);
// ----------------- Data row structure
/* A data row, the format is like below:
......@@ -188,12 +223,11 @@ static FORCE_INLINE int32_t dataColGetNEleLen(SDataCol *pDataCol, int rows) {
}
}
typedef struct {
int maxRowSize;
int maxCols; // max number of columns
int maxPoints; // max number of points
int bufSize;
int maxRowSize;
int maxCols; // max number of columns
int maxPoints; // max number of points
int bufSize;
int numOfRows;
int numOfCols; // Total number of cols
......@@ -213,62 +247,102 @@ void tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData);
void tdFreeDataCols(SDataCols *pCols);
void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols);
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!!
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!!
int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge);
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows);
// ----------------- K-V data row structure
/*
* +----------+----------+---------------------------------+---------------------------------+
* | int16_t | int16_t | | |
* +----------+----------+---------------------------------+---------------------------------+
* | len | ncols | cols index | data part |
* +----------+----------+---------------------------------+---------------------------------+
*/
typedef void *SKVRow;
typedef struct {
int16_t colId;
int16_t offset;
} SColIdx;
#define TD_KV_ROW_HEAD_SIZE 2 * sizeof(int16_t)
#define kvRowLen(r) (*(int16_t *)(r))
#define kvRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(int16_t)))
#define kvRowSetLen(r, len) kvRowLen(r) = (len)
#define kvRowSetNCols(r, n) kvRowNCols(r) = (n)
#define kvRowColIdx(r) (SColIdx *)POINTER_SHIFT(r, TD_KV_ROW_HEAD_SIZE)
#define kvRowValues(r) POINTER_SHIFT(r, TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * kvRowNCols(r))
#define kvRowCpy(dst, r) memcpy((dst), (r), kvRowLen(r))
#define kvRowColVal(r, colIdx) POINTER_SHIFT(kvRowValues(r), (colIdx)->offset)
#define kvRowColIdxAt(r, i) (kvRowColIdx(r) + (i))
#define kvRowFree(r) tfree(r)
SKVRow tdKVRowDup(SKVRow row);
SKVRow tdSetKVRowDataOfCol(SKVRow row, int16_t colId, int8_t type, void *value);
void * tdEncodeKVRow(void *buf, SKVRow row);
void * tdDecodeKVRow(void *buf, SKVRow *row);
static FORCE_INLINE int comparTagId(const void *key1, const void *key2) {
if (*(int16_t *)key1 > ((SColIdx *)key2)->colId) {
return 1;
} else if (*(int16_t *)key1 < ((SColIdx *)key2)->colId) {
return -1;
} else {
return 0;
}
}
static FORCE_INLINE void *tdGetKVRowValOfCol(SKVRow row, int16_t colId) {
void *ret = taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_EQ);
if (ret == NULL) return NULL;
return kvRowColVal(row, (SColIdx *)ret);
}
// ----------------- Tag row structure
// ----------------- K-V data row builder
typedef struct {
int16_t tCols;
int16_t nCols;
SColIdx *pColIdx;
int16_t alloc;
int16_t size;
void * buf;
} SKVRowBuilder;
/* A tag row, the format is like below:
+----------+----------------------------------------------------------------+
| STagRow | STagCol | STagCol | STagCol | STagCol | ...| STagCol | STagCol |
+----------+----------------------------------------------------------------+
int tdInitKVRowBuilder(SKVRowBuilder *pBuilder);
void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder);
void tdResetKVRowBuilder(SKVRowBuilder *pBuilder);
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder);
pData
+----------+----------------------------------------------------------------+
| value 1 | value 2 | value 3 | value 4 | ....|value n |
+----------+----------------------------------------------------------------+
static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, void *value) {
ASSERT(pBuilder->nCols == 0 || colId > pBuilder->pColIdx[pBuilder->nCols - 1].colId);
*/
if (pBuilder->nCols >= pBuilder->tCols) {
pBuilder->tCols *= 2;
pBuilder->pColIdx = (SColIdx *)realloc((void *)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols);
if (pBuilder->pColIdx == NULL) return -1;
}
pBuilder->pColIdx[pBuilder->nCols].colId = colId;
pBuilder->pColIdx[pBuilder->nCols].offset = pBuilder->size;
#define TD_TAG_ROW_HEAD_SIZE sizeof(int16_t)
pBuilder->nCols++;
#define tagRowNum(r) (*(int16_t *)(r))
#define tagRowArray(r) POINTER_SHIFT(r, TD_TAG_ROW_HEAD_SIZE)
//#define dataRowKey(r) (*(TSKEY *)(dataRowTuple(r)))
//#define dataRowSetLen(r, l) (dataRowLen(r) = (l))
//#define dataRowCpy(dst, r) memcpy((dst), (r), dataRowLen(r))
//#define dataRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_DATA_ROW_HEAD_SIZE)
int tlen = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type];
if (tlen > pBuilder->alloc - pBuilder->size) {
while (tlen > pBuilder->alloc - pBuilder->size) {
pBuilder->alloc *= 2;
}
pBuilder->buf = realloc(pBuilder->buf, pBuilder->alloc);
if (pBuilder->buf == NULL) return -1;
}
typedef struct {
int16_t colId; // column ID
int16_t colType;
uint16_t offset; //to store value for numeric col or offset for binary/Nchar
} STagCol;
memcpy(POINTER_SHIFT(pBuilder->buf, pBuilder->size), value, tlen);
pBuilder->size += tlen;
typedef struct {
int32_t len;
void * pData; // Space to store the tag value
uint16_t dataLen;
int16_t ncols; // Total columns allocated
STagCol tagCols[];
} STagRow;
#define tagColSize(r) (sizeof(STagCol) + r.colLen)
int tdSetTagCol(SDataRow row, void *value, int16_t len, int8_t type, int16_t colId); //insert tag value and update all the information
int tdDeleteTagCol(SDataRow row, int16_t colId); // delete tag value and update all the information
void * tdQueryTagByID(SDataRow row, int16_t colId, int16_t *type); //if find tag, 0, else return -1;
int tdAppendTagColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int16_t colId);
SDataRow tdTagRowDup(SDataRow row);
void tdFreeTagRow(SDataRow row);
SDataRow tdTagRowDecode(SDataRow row);
int tdTagRowCpy(SDataRow dst, SDataRow src);
void * tdNewTagRowFromSchema(STSchema *pSchema, int16_t numofTags);
STSchema *tdGetSchemaFromData(SDataRow *row);
return 0;
}
#ifdef __cplusplus
}
......
......@@ -13,96 +13,30 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdataformat.h"
#include "wchar.h"
#include "talgo.h"
/**
* Create a SSchema object with nCols columns
* ASSUMPTIONS: VALID PARAMETERS
*
* @param nCols number of columns the schema has
*
* @return a STSchema object for success
* NULL for failure
*/
STSchema *tdNewSchema(int32_t nCols) {
int32_t size = sizeof(STSchema) + sizeof(STColumn) * nCols;
STSchema *pSchema = (STSchema *)calloc(1, size);
if (pSchema == NULL) return NULL;
pSchema->numOfCols = 0;
pSchema->totalCols = nCols;
pSchema->flen = 0;
pSchema->tlen = 0;
return pSchema;
}
/**
* Append a column to the schema
*/
int tdSchemaAddCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes) {
if (!isValidDataType(type, 0) || pSchema->numOfCols >= pSchema->totalCols) return -1;
STColumn *pCol = schemaColAt(pSchema, schemaNCols(pSchema));
colSetType(pCol, type);
colSetColId(pCol, colId);
if (schemaNCols(pSchema) == 0) {
colSetOffset(pCol, 0);
} else {
STColumn *pTCol = schemaColAt(pSchema, schemaNCols(pSchema)-1);
colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]);
}
switch (type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
colSetBytes(pCol, bytes); // Set as maximum bytes
pSchema->tlen += (TYPE_BYTES[type] + sizeof(VarDataLenT) + bytes);
break;
default:
colSetBytes(pCol, TYPE_BYTES[type]);
pSchema->tlen += TYPE_BYTES[type];
break;
}
pSchema->numOfCols++;
pSchema->flen += TYPE_BYTES[type];
ASSERT(pCol->offset < pSchema->flen);
return 0;
}
#include "wchar.h"
/**
* Duplicate the schema and return a new object
*/
STSchema *tdDupSchema(STSchema *pSchema) {
STSchema *tSchema = tdNewSchema(schemaNCols(pSchema));
int tlen = sizeof(STSchema) + sizeof(STColumn) * schemaNCols(pSchema);
STSchema *tSchema = (STSchema *)malloc(tlen);
if (tSchema == NULL) return NULL;
int32_t size = sizeof(STSchema) + sizeof(STColumn) * schemaNCols(pSchema);
memcpy((void *)tSchema, (void *)pSchema, size);
memcpy((void *)tSchema, (void *)pSchema, tlen);
return tSchema;
}
/**
* Return the size of encoded schema
*/
int tdGetSchemaEncodeSize(STSchema *pSchema) {
return T_MEMBER_SIZE(STSchema, totalCols) +
schemaNCols(pSchema) *
(T_MEMBER_SIZE(STColumn, type) + T_MEMBER_SIZE(STColumn, colId) + T_MEMBER_SIZE(STColumn, bytes));
}
/**
* Encode a schema to dst, and return the next pointer
*/
void *tdEncodeSchema(void *dst, STSchema *pSchema) {
ASSERT(pSchema->numOfCols == pSchema->totalCols);
T_APPEND_MEMBER(dst, pSchema, STSchema, totalCols);
T_APPEND_MEMBER(dst, pSchema, STSchema, version);
T_APPEND_MEMBER(dst, pSchema, STSchema, numOfCols);
for (int i = 0; i < schemaNCols(pSchema); i++) {
STColumn *pCol = schemaColAt(pSchema, i);
T_APPEND_MEMBER(dst, pCol, STColumn, type);
......@@ -118,11 +52,14 @@ void *tdEncodeSchema(void *dst, STSchema *pSchema) {
*/
STSchema *tdDecodeSchema(void **psrc) {
int totalCols = 0;
int version = 0;
STSchemaBuilder schemaBuilder = {0};
T_READ_MEMBER(*psrc, int, version);
T_READ_MEMBER(*psrc, int, totalCols);
STSchema *pSchema = tdNewSchema(totalCols);
if (pSchema == NULL) return NULL;
if (tdInitTSchemaBuilder(&schemaBuilder, version) < 0) return NULL;
for (int i = 0; i < totalCols; i++) {
int8_t type = 0;
int16_t colId = 0;
......@@ -131,173 +68,109 @@ STSchema *tdDecodeSchema(void **psrc) {
T_READ_MEMBER(*psrc, int16_t, colId);
T_READ_MEMBER(*psrc, int32_t, bytes);
tdSchemaAddCol(pSchema, type, colId, bytes);
if (tdAddColToSchema(&schemaBuilder, type, colId, bytes) < 0) {
tdDestroyTSchemaBuilder(&schemaBuilder);
return NULL;
}
}
STSchema *pSchema = tdGetSchemaFromBuilder(&schemaBuilder);
tdDestroyTSchemaBuilder(&schemaBuilder);
return pSchema;
}
/**
* Initialize a data row
*/
void tdInitDataRow(SDataRow row, STSchema *pSchema) { dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + schemaFLen(pSchema)); }
int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
if (pBuilder == NULL) return -1;
SDataRow tdNewDataRowFromSchema(STSchema *pSchema) {
int32_t size = dataRowMaxBytesFromSchema(pSchema);
pBuilder->tCols = 256;
pBuilder->columns = (STColumn *)malloc(sizeof(STColumn) * pBuilder->tCols);
if (pBuilder->columns == NULL) return -1;
SDataRow row = malloc(size);
if (row == NULL) return NULL;
tdResetTSchemaBuilder(pBuilder, version);
return 0;
}
tdInitDataRow(row, pSchema);
return row;
void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder) {
if (pBuilder) {
tfree(pBuilder->columns);
}
}
int tdSetTagCol(SDataRow row, void *value, int16_t len, int8_t type, int16_t colId){ //insert/update tag value and update all the information
ASSERT(((STagRow *)row)->pData != NULL);
//STagCol * stCol = tdQueryTagColByID()
void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
pBuilder->nCols = 0;
pBuilder->tlen = 0;
pBuilder->flen = 0;
pBuilder->version = version;
}
return 0;
};
int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int32_t bytes) {
if (!isValidDataType(type, 0)) return -1;
int tdDeleteTagCol(SDataRow row, int16_t colId){ // delete tag value and update all the information
//todo
return 0;
};
if (pBuilder->nCols >= pBuilder->tCols) {
pBuilder->tCols *= 2;
pBuilder->columns = (STColumn *)realloc(pBuilder->columns, sizeof(STColumn) * pBuilder->tCols);
if (pBuilder->columns == NULL) return -1;
}
static int compTagId(const void *key1, const void *key2) {
if (((STagCol *)key1)->colId > ((STagCol *)key2)->colId) {
return 1;
} else if (((STagCol *)key1)->colId == ((STagCol *)key2)->colId) {
return 0;
STColumn *pCol = &(pBuilder->columns[pBuilder->nCols]);
colSetType(pCol, type);
colSetColId(pCol, colId);
if (pBuilder->nCols == 0) {
colSetOffset(pCol, 0);
} else {
return -1;
STColumn *pTCol = &(pBuilder->columns[pBuilder->nCols-1]);
colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]);
}
}
/**
* Find tag structure by colId, if find, return tag structure, else return NULL;
*/
STagCol * tdQueryTagColByID(SDataRow row, int16_t colId, int flags) { //if find tag, 0, else return -1;
ASSERT(((STagRow *)row)->pData != NULL);
STagCol *pBase = ((STagRow *)row)->tagCols;
int16_t nCols = ((STagRow *)row)->ncols;
STagCol key = {colId,0,0};
STagCol * stCol = taosbsearch(&key, pBase, nCols, sizeof(STagCol), compTagId, flags);
return stCol;
};
/**
* Find tag value by colId, if find, return tag value, else return NULL;
*/
void * tdQueryTagByID(SDataRow row, int16_t colId, int16_t *type) {
ASSERT(((STagRow *)row)->pData != NULL);
STagCol *pBase = ((STagRow *)row)->tagCols;
int16_t nCols = ((STagRow *)row)->ncols;
STagCol key = {colId,0,0};
STagCol * stCol = taosbsearch(&key, pBase, nCols, sizeof(STagCol), compTagId, TD_EQ);
if (NULL == stCol) {
type = TSDB_DATA_TYPE_NULL;
return NULL;
if (IS_VAR_DATA_TYPE(type)) {
colSetBytes(pCol, bytes);
pBuilder->tlen += (TYPE_BYTES[type] + sizeof(VarDataLenT) + bytes);
} else {
colSetBytes(pCol, TYPE_BYTES[type]);
pBuilder->tlen += TYPE_BYTES[type];
}
void * pData = ((STagRow *)row)->pData;
*type = stCol->colType;
return pData + stCol->offset;
};
int tdAppendTagColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int16_t colId){
ASSERT(value != NULL);
//ASSERT(bytes-2 == varDataTLen(value));
ASSERT(row != NULL);
STagRow *pTagrow = row;
pTagrow->tagCols[pTagrow->ncols].colId = colId;
pTagrow->tagCols[pTagrow->ncols].colType = type;
pTagrow->tagCols[pTagrow->ncols].offset = pTagrow->dataLen;
switch (type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
memcpy((char *)pTagrow->pData + pTagrow->dataLen, value, varDataTLen(value));
pTagrow->dataLen += varDataTLen(value);
break;
default:
memcpy((char *)pTagrow->pData + pTagrow->dataLen, value, TYPE_BYTES[type]);
pTagrow->dataLen += TYPE_BYTES[type];
break;
}
pTagrow->ncols++;
pBuilder->nCols++;
pBuilder->flen += TYPE_BYTES[type];
ASSERT(pCol->offset < pBuilder->flen);
return 0;
};
}
void * tdNewTagRowFromSchema(STSchema *pSchema, int16_t numofTags) {
int32_t size = sizeof(STagRow) + numofTags * sizeof(STagCol);
STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) {
if (pBuilder->nCols <= 0) return NULL;
STagRow *row = malloc(size);
if (row == NULL) return NULL;
int tlen = sizeof(STSchema) + sizeof(STColumn) * pBuilder->nCols;
int32_t datasize = pSchema->tlen;
row->pData = malloc(datasize);
if (NULL == row->pData) {
free(row);
return NULL;
}
STSchema *pSchema = (STSchema *)malloc(tlen);
if (pSchema == NULL) return NULL;
schemaVersion(pSchema) = pBuilder->version;
schemaNCols(pSchema) = pBuilder->nCols;
schemaTLen(pSchema) = pBuilder->tlen;
schemaFLen(pSchema) = pBuilder->flen;
row->len = size;
row->dataLen = 0;
row->ncols = 0;
return row;
memcpy(schemaColAt(pSchema, 0), pBuilder->columns, sizeof(STColumn) * pBuilder->nCols);
return pSchema;
}
/**
* free tag row
* Initialize a data row
*/
void tdFreeTagRow(SDataRow row) {
if (row) {
free(((STagRow *)row)->pData);
free(row);
}
}
void tdInitDataRow(SDataRow row, STSchema *pSchema) { dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + schemaFLen(pSchema)); }
SDataRow tdTagRowDup(SDataRow row) {
STagRow *trow = malloc(dataRowLen(row));
if (trow == NULL) return NULL;
dataRowCpy(trow, row);
trow->pData = malloc(trow->dataLen);
if (NULL == trow->pData) {
free(trow);
return NULL;
}
memcpy(trow->pData, ((STagRow *)row)->pData, trow->dataLen);
return trow;
}
SDataRow tdNewDataRowFromSchema(STSchema *pSchema) {
int32_t size = dataRowMaxBytesFromSchema(pSchema);
SDataRow tdTagRowDecode(SDataRow row) {
STagRow *trow = malloc(dataRowLen(row));
if (trow == NULL) return NULL;
dataRowCpy(trow, row);
trow->pData = malloc(trow->dataLen);
if (NULL == trow->pData) {
free(trow);
return NULL;
}
char * pData = (char *)row + dataRowLen(row);
memcpy(trow->pData, pData, trow->dataLen);
return trow;
}
SDataRow row = malloc(size);
if (row == NULL) return NULL;
int tdTagRowCpy(SDataRow dst, SDataRow src) {
if (src == NULL) return -1;
dataRowCpy(dst, src);
void * pData = dst + dataRowLen(src);
memcpy(pData, ((STagRow *)src)->pData, ((STagRow *)src)->dataLen);
return 0;
tdInitDataRow(row, pSchema);
return row;
}
/**
* Free the SDataRow object
*/
......@@ -331,7 +204,6 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints)
pDataCol->pData = *pBuf;
*pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize);
}
}
void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints) {
......@@ -415,7 +287,7 @@ void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) {
void dataColSetOffset(SDataCol *pCol, int nEle) {
ASSERT(((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR)));
void * tptr = pCol->pData;
void *tptr = pCol->pData;
// char *tptr = (char *)(pCol->pData);
VarDataOffsetT offset = 0;
......@@ -595,4 +467,131 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCol
(*iter2)++;
}
}
}
SKVRow tdKVRowDup(SKVRow row) {
SKVRow trow = malloc(kvRowLen(row));
if (trow == NULL) return NULL;
kvRowCpy(trow, row);
return trow;
}
SKVRow tdSetKVRowDataOfCol(SKVRow row, int16_t colId, int8_t type, void *value) {
// TODO
return NULL;
// SColIdx *pColIdx = NULL;
// SKVRow rrow = row;
// SKVRow nrow = NULL;
// void *ptr = taosbsearch(&colId, kvDataRowColIdx(row), kvDataRowNCols(row), sizeof(SColIdx), comparTagId, TD_GE);
// if (ptr == NULL || ((SColIdx *)ptr)->colId < colId) { // need to add a column value to the row
// int tlen = kvDataRowLen(row) + sizeof(SColIdx) + (IS_VAR_DATA_TYPE(type) ? varDataTLen(value) :
// TYPE_BYTES[type]); nrow = malloc(tlen); if (nrow == NULL) return NULL;
// kvDataRowSetNCols(nrow, kvDataRowNCols(row)+1);
// kvDataRowSetLen(nrow, tlen);
// if (ptr == NULL) ptr = kvDataRowValues(row);
// // Copy the columns before the col
// if (POINTER_DISTANCE(ptr, kvDataRowColIdx(row)) > 0) {
// memcpy(kvDataRowColIdx(nrow), kvDataRowColIdx(row), POINTER_DISTANCE(ptr, kvDataRowColIdx(row)));
// memcpy(kvDataRowValues(nrow), kvDataRowValues(row), ((SColIdx *)ptr)->offset); // TODO: here is not correct
// }
// // Set the new col value
// pColIdx = (SColIdx *)POINTER_SHIFT(nrow, POINTER_DISTANCE(ptr, row));
// pColIdx->colId = colId;
// pColIdx->offset = ((SColIdx *)ptr)->offset; // TODO: here is not correct
// if (IS_VAR_DATA_TYPE(type)) {
// memcpy(POINTER_SHIFT(kvDataRowValues(nrow), pColIdx->offset), value, varDataLen(value));
// } else {
// memcpy(POINTER_SHIFT(kvDataRowValues(nrow), pColIdx->offset), value, TYPE_BYTES[type]);
// }
// // Copy the columns after the col
// if (POINTER_DISTANCE(kvDataRowValues(row), ptr) > 0) {
// // TODO: memcpy();
// }
// } else {
// // TODO
// ASSERT(((SColIdx *)ptr)->colId == colId);
// if (IS_VAR_DATA_TYPE(type)) {
// void *pOldVal = kvDataRowColVal(row, (SColIdx *)ptr);
// if (varDataTLen(value) == varDataTLen(pOldVal)) { // just update the column value in place
// memcpy(pOldVal, value, varDataTLen(value));
// } else { // enlarge the memory
// // rrow = realloc(rrow, kvDataRowLen(rrow) + varDataTLen(value) - varDataTLen(pOldVal));
// // if (rrow == NULL) return NULL;
// // memmove();
// // for () {
// // ((SColIdx *)ptr)->offset += balabala;
// // }
// // kvDataRowSetLen();
// }
// } else {
// memcpy(kvDataRowColVal(row, (SColIdx *)ptr), value, TYPE_BYTES[type]);
// }
// }
// return rrow;
}
void *tdEncodeKVRow(void *buf, SKVRow row) {
// May change the encode purpose
kvRowCpy(buf, row);
return POINTER_SHIFT(buf, kvRowLen(row));
}
void *tdDecodeKVRow(void *buf, SKVRow *row) {
*row = tdKVRowDup(buf);
return POINTER_SHIFT(buf, kvRowLen(*row));
}
int tdInitKVRowBuilder(SKVRowBuilder *pBuilder) {
pBuilder->tCols = 128;
pBuilder->nCols = 0;
pBuilder->pColIdx = (SColIdx *)malloc(sizeof(SColIdx) * pBuilder->tCols);
if (pBuilder->pColIdx == NULL) return -1;
pBuilder->alloc = 1024;
pBuilder->size = 0;
pBuilder->buf = malloc(pBuilder->alloc);
if (pBuilder->buf == NULL) {
free(pBuilder->pColIdx);
return -1;
}
return 0;
}
void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder) {
tfree(pBuilder->pColIdx);
tfree(pBuilder->buf);
}
void tdResetKVRowBuilder(SKVRowBuilder *pBuilder) {
pBuilder->nCols = 0;
pBuilder->size = 0;
}
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size;
if (tlen == 0) return NULL;
tlen += TD_KV_ROW_HEAD_SIZE;
SKVRow row = malloc(tlen);
if (row == NULL) return NULL;
kvRowSetNCols(row, pBuilder->nCols);
kvRowSetLen(row, tlen);
memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols);
memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size);
return row;
}
\ No newline at end of file
......@@ -59,9 +59,15 @@ int main(int argc, char *argv[]) {
exit(-1);
}
STSchema *pSchema = tdNewSchema(2);
tdSchemaAddCol(pSchema, TSDB_DATA_TYPE_TIMESTAMP, 0, 8);
tdSchemaAddCol(pSchema, TSDB_DATA_TYPE_INT, 1, 4);
STSchemaBuilder schemaBuilder = {0};
tdInitTSchemaBuilder(&schemaBuilder, 0);
tdAddColToSchema(&schemaBuilder, TSDB_DATA_TYPE_TIMESTAMP, 0, 8);
tdAddColToSchema(&schemaBuilder, TSDB_DATA_TYPE_INT, 1, 4);
STSchema *pSchema = tdGetSchemaFromBuilder(&schemaBuilder);
tdDestroyTSchemaBuilder(&schemaBuilder);
for (int sid =1; sid<10; ++sid) {
cqCreate(pCq, sid, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema);
......
......@@ -22,7 +22,7 @@ extern "C" {
int32_t dnodeInitModules();
void dnodeStartModules();
void dnodeCleanUpModules();
void dnodeCleanupModules();
void dnodeProcessModuleStatus(uint32_t moduleStatus);
#ifdef __cplusplus
......
......@@ -36,6 +36,46 @@ static void dnodeCleanupStorage();
static void dnodeSetRunStatus(SDnodeRunStatus status);
static void dnodeCheckDataDirOpenned(char *dir);
static SDnodeRunStatus tsDnodeRunStatus = TSDB_DNODE_RUN_STATUS_STOPPED;
static int32_t dnodeInitComponents();
static void dnodeCleanupComponents(int32_t stepId);
typedef struct {
const char *const name;
int (*init)();
void (*cleanup)();
} SDnodeComponent;
static const SDnodeComponent SDnodeComponents[] = {
{"storage", dnodeInitStorage, dnodeCleanupStorage},
{"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead},
{"vwrite", dnodeInitVnodeWrite, dnodeCleanupVnodeWrite},
{"mread", dnodeInitMnodeRead, dnodeCleanupMnodeRead},
{"mwrite", dnodeInitMnodeWrite, dnodeCleanupMnodeWrite},
{"mpeer", dnodeInitMnodePeer, dnodeCleanupMnodePeer},
{"client", dnodeInitClient, dnodeCleanupClient},
{"server", dnodeInitServer, dnodeCleanupServer},
{"mgmt", dnodeInitMgmt, dnodeCleanupMgmt},
{"modules", dnodeInitModules, dnodeCleanupModules},
{"shell", dnodeInitShell, dnodeCleanupShell}
};
static void dnodeCleanupComponents(int32_t stepId) {
for (int32_t i = stepId; i >= 0; i--) {
SDnodeComponents[i].cleanup();
}
}
static int32_t dnodeInitComponents() {
int32_t code = 0;
for (int32_t i = 0; i < sizeof(SDnodeComponents) / sizeof(SDnodeComponents[0]); i++) {
if (SDnodeComponents[i].init() != 0) {
dnodeCleanupComponents(i);
code = -1;
break;
}
}
return code;
}
int32_t dnodeInitSystem() {
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_INITIALIZE);
......@@ -67,17 +107,9 @@ int32_t dnodeInitSystem() {
dPrint("start to initialize TDengine on %s", tsLocalEp);
if (dnodeInitStorage() != 0) return -1;
if (dnodeInitVnodeRead() != 0) return -1;
if (dnodeInitVnodeWrite() != 0) return -1;
if (dnodeInitMnodeRead() != 0) return -1;
if (dnodeInitMnodeWrite() != 0) return -1;
if (dnodeInitMnodePeer() != 0) return -1;
if (dnodeInitClient() != 0) return -1;
if (dnodeInitServer() != 0) return -1;
if (dnodeInitMgmt() != 0) return -1;
if (dnodeInitModules() != 0) return -1;
if (dnodeInitShell() != 0) return -1;
if (dnodeInitComponents() != 0) {
return -1;
}
dnodeStartModules();
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_RUNING);
......@@ -90,17 +122,7 @@ int32_t dnodeInitSystem() {
void dnodeCleanUpSystem() {
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_STOPPED) {
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED);
dnodeCleanupShell();
dnodeCleanUpModules();
dnodeCleanupMgmt();
dnodeCleanupServer();
dnodeCleanupClient();
dnodeCleanupMnodePeer();
dnodeCleanupMnodeWrite();
dnodeCleanupMnodeRead();
dnodeCleanupVnodeWrite();
dnodeCleanupVnodeRead();
dnodeCleanupStorage();
dnodeCleanupComponents(sizeof(SDnodeComponents) / sizeof(SDnodeComponents[0]) - 1);
taos_cleanup();
taosCloseLog();
}
......
......@@ -83,8 +83,8 @@ static void dnodeAllocModules() {
}
}
void dnodeCleanUpModules() {
for (int32_t module = 1; module < TSDB_MOD_MAX; ++module) {
void dnodeCleanupModules() {
for (EModuleType module = 1; module < TSDB_MOD_MAX; ++module) {
if (tsModule[module].enable && tsModule[module].stopFp) {
(*tsModule[module].stopFp)();
}
......
......@@ -39,11 +39,11 @@ int32_t main(int32_t argc, char *argv[]) {
exit(EXIT_FAILURE);
}
} else if (strcmp(argv[i], "-V") == 0) {
#ifdef _SYNC
#ifdef _SYNC
char *versionStr = "enterprise";
#else
#else
char *versionStr = "community";
#endif
#endif
printf("%s version: %s compatible_version: %s\n", versionStr, version, compatible_version);
printf("gitinfo: %s\n", gitinfo);
printf("gitinfoI: %s\n", gitinfoOfInternal);
......@@ -93,8 +93,6 @@ int32_t main(int32_t argc, char *argv[]) {
if (dnodeInitSystem() < 0) {
syslog(LOG_ERR, "Error initialize TDengine system");
closelog();
dnodeCleanUpSystem();
exit(EXIT_FAILURE);
}
......
......@@ -34,7 +34,7 @@ typedef struct {
} SReadMsg;
typedef struct {
pthread_t thread; // thread
pthread_t thread; // thread
int32_t workerId; // worker ID
} SReadWorker;
......@@ -85,8 +85,8 @@ void dnodeCleanupVnodeRead() {
}
}
taosCloseQset(readQset);
free(readPool.readWorker);
taosCloseQset(readQset);
dPrint("dnode read is closed");
}
......@@ -95,7 +95,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
int32_t queuedMsgNum = 0;
int32_t leftLen = pMsg->contLen;
char *pCont = (char *) pMsg->pCont;
void *pVnode;
void *pVnode;
while (leftLen > 0) {
SMsgHead *pHead = (SMsgHead *) pCont;
......@@ -166,7 +166,7 @@ void *dnodeAllocateVnodeRqueue(void *pVnode) {
} while (readPool.num < readPool.min);
}
dTrace("pVnode:%p, read queue:%p is allocated", pVnode, queue);
dTrace("pVnode:%p, read queue:%p is allocated", pVnode, queue);
return queue;
}
......@@ -177,13 +177,13 @@ void dnodeFreeVnodeRqueue(void *rqueue) {
// dynamically adjust the number of threads
}
static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) {
static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) {
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg = pMsg->rpcMsg;
pRead->pCont = qhandle;
pRead->contLen = 0;
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
taos_queue queue = vnodeGetRqueue(pVnode);
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
}
......
......@@ -32,9 +32,9 @@
typedef struct {
taos_qall qall;
taos_qset qset; // queue set
pthread_t thread; // thread
pthread_t thread; // thread
int32_t workerId; // worker ID
} SWriteWorker;
} SWriteWorker;
typedef struct {
SRspRet rspRet;
......@@ -136,17 +136,24 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) {
taosAddIntoQset(pWorker->qset, queue, pVnode);
pWorker->qall = taosAllocateQall();
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
if (pWorker->qall == NULL) {
taosCloseQset(pWorker->qset);
taosCloseQueue(queue);
return NULL;
}
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessWriteQueue, pWorker) != 0) {
dError("failed to create thread to process read queue, reason:%s", strerror(errno));
taosFreeQall(pWorker->qall);
taosCloseQset(pWorker->qset);
taosCloseQueue(queue);
queue = NULL;
} else {
dTrace("write worker:%d is launched", pWorker->workerId);
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
}
pthread_attr_destroy(&thAttr);
......@@ -195,7 +202,7 @@ static void *dnodeProcessWriteQueue(void *param) {
while (1) {
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode);
if (numOfMsgs ==0) {
if (numOfMsgs == 0) {
dTrace("dnodeProcessWriteQueee: got no message from qset, exiting...");
break;
}
......@@ -243,7 +250,7 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
if (num > 0) {
usleep(30000);
sched_yield();
sched_yield();
} else {
taosFreeQall(pWorker->qall);
taosCloseQset(pWorker->qset);
......
......@@ -52,6 +52,7 @@ typedef struct tstr {
#define varDataCopy(dst, v) memcpy((dst), (void*) (v), varDataTLen(v))
#define varDataLenByData(v) (*(VarDataLenT *)(((char*)(v)) - VARSTR_HEADER_SIZE))
#define varDataSetLen(v, _len) (((VarDataLenT *)(v))[0] = (VarDataLenT) (_len))
#define IS_VAR_DATA_TYPE(t) (((t) == TSDB_DATA_TYPE_BINARY) || ((t) == TSDB_DATA_TYPE_NCHAR))
// this data type is internally used only in 'in' query to hold the values
#define TSDB_DATA_TYPE_ARRAY (TSDB_DATA_TYPE_NCHAR + 1)
......
......@@ -102,14 +102,15 @@ int tsdbInitTableCfg(STableCfg *config, ETableType type, uint64_t uid, int32_t
int tsdbTableSetSuperUid(STableCfg *config, uint64_t uid);
int tsdbTableSetSchema(STableCfg *config, STSchema *pSchema, bool dup);
int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup);
int tsdbTableSetTagValue(STableCfg *config, SDataRow row, bool dup);
int tsdbTableSetTagValue(STableCfg *config, SKVRow row, bool dup);
int tsdbTableSetName(STableCfg *config, char *name, bool dup);
int tsdbTableSetSName(STableCfg *config, char *sname, bool dup);
int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup);
void tsdbClearTableCfg(STableCfg *config);
int32_t tsdbGetTableTagVal(TsdbRepoT *repo, STableId* id, int32_t colId, int16_t *type, int16_t *bytes, char **val);
char* tsdbGetTableName(TsdbRepoT *repo, const STableId* id, int16_t* bytes);
int32_t tsdbGetTableTagVal(TsdbRepoT *repo, STableId *id, int32_t colId, int16_t *type, int16_t *bytes, char **val);
char * tsdbGetTableName(TsdbRepoT *repo, const STableId *id, int16_t *bytes);
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);
int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg);
int tsdbDropTable(TsdbRepoT *pRepo, STableId tableId);
......
......@@ -280,6 +280,12 @@ static int32_t mnodeGetUserMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "account");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
strcpy(pMeta->tableId, "show users");
pShow->numOfColumns = cols;
......@@ -329,6 +335,10 @@ static int32_t mnodeRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, voi
*(int64_t *)pWrite = pUser->createdTime;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pUser->acct, TSDB_USER_LEN);
cols++;
numOfRows++;
mnodeDecUserRef(pUser);
}
......
......@@ -117,7 +117,7 @@ static int32_t flushFromResultBuf(SQInfo *pQInfo);
bool doFilterData(SQuery *pQuery, int32_t elemPos) {
for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) {
SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k];
char *pElem = pFilterInfo->pData + pFilterInfo->info.bytes * elemPos;
if (isNull(pElem, pFilterInfo->info.type)) {
return false;
......@@ -126,7 +126,7 @@ bool doFilterData(SQuery *pQuery, int32_t elemPos) {
bool qualified = false;
for (int32_t j = 0; j < pFilterInfo->numOfFilters; ++j) {
SColumnFilterElem *pFilterElem = &pFilterInfo->pFilters[j];
if (pFilterElem->fp(pFilterElem, pElem, pElem)) {
qualified = true;
break;
......@@ -765,7 +765,7 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
SArray *pDataBlock) {
char *dataBlock = NULL;
SQuery *pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
int32_t functionId = pQuery->pSelectExpr[col].base.functionId;
......@@ -778,18 +778,18 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
} else {
pCtx->startOffset = pQuery->pos - (size - 1);
}
sas->offset = 0;
sas->colList = pQuery->colList;
sas->numOfCols = pQuery->numOfCols;
sas->data = calloc(pQuery->numOfCols, POINTER_BYTES);
// here the pQuery->colList and sas->colList are identical
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
SColumnInfo *pColMsg = &pQuery->colList[i];
int32_t numOfCols = taosArrayGetSize(pDataBlock);
dataBlock = NULL;
for (int32_t k = 0; k < numOfCols; ++k) { //todo refactor
SColumnInfoData *p = taosArrayGet(pDataBlock, k);
......@@ -798,7 +798,7 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
break;
}
}
assert(dataBlock != NULL);
sas->data[i] = dataBlock/* + pQuery->colList[i].bytes*/; // start from the offset
}
......@@ -838,7 +838,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
}
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport));
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock);
setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k);
......@@ -901,10 +901,10 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
if (pQuery->pSelectExpr[i].base.functionId != TSDB_FUNC_ARITHM) {
continue;
}
tfree(sasArray[i].data);
}
tfree(sasArray);
}
......@@ -964,7 +964,7 @@ static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes,
* column in cache with the corresponding meter schema is reinforced.
*/
int32_t numOfCols = taosArrayGetSize(pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData *p = taosArrayGet(pDataBlock, i);
if (pColIndex->colId == p->info.colId) {
......@@ -972,7 +972,7 @@ static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes,
}
}
}
return NULL;
}
......@@ -1037,7 +1037,7 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo,
SWindowResInfo *pWindowResInfo, SArray *pDataBlock) {
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
SQuery *pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* item = pQuery->current;
......@@ -1170,10 +1170,10 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
if (pQuery->pSelectExpr[i].base.functionId != TSDB_FUNC_ARITHM) {
continue;
}
tfree(sasArray[i].data);
}
free(sasArray);
}
......@@ -1362,7 +1362,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
SColIndex* pIndex = &pSqlFuncMsg->colInfo;
int32_t index = pSqlFuncMsg->colInfo.colIndex;
if (TSDB_COL_IS_TAG(pIndex->flag)) {
if (pIndex->colId == TSDB_TBNAME_COLUMN_INDEX) { // todo refactor
......@@ -1443,7 +1443,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pRuntimeEnv);
qTrace("QInfo:%p teardown runtime env", pQInfo);
cleanupTimeWindowInfo(&pRuntimeEnv->windowResInfo, pQuery->numOfOutput);
......@@ -1485,7 +1485,7 @@ static bool isQueryKilled(SQInfo *pQInfo) {
pQInfo->killed = 1;
return true;
}
return (pQInfo->killed == 1);
#endif
}
......@@ -1591,7 +1591,7 @@ static bool onlyQueryTags(SQuery* pQuery) {
return false;
}
}
return true;
}
......@@ -1893,14 +1893,14 @@ UNUSED_FUNC void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) {
SMeterObj *pMeter = getMeterObj(pQInfo->groupInfo, pQInfo->pSidSet->pTableIdList[i]->sid);
atomic_fetch_sub_32(&(pMeter->numOfQueries), 1);
if (pMeter->numOfQueries > 0) {
qTrace("QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d", pQInfo, pMeter->vnode, pMeter->sid,
pMeter->meterId, pMeter->numOfQueries);
num++;
}
}
/*
* in order to reduce log output, for all meters of which numOfQueries count are 0,
* we do not output corresponding information
......@@ -1922,26 +1922,26 @@ static bool needToLoadDataBlock(SQuery *pQuery, SDataStatis *pDataStatis, SQLFun
for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) {
SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k];
int32_t colIndex = pFilterInfo->info.colIndex;
// this column not valid in current data block
if (colIndex < 0 || pDataStatis[colIndex].colId != pFilterInfo->info.data.colId) {
continue;
}
// not support pre-filter operation on binary/nchar data type
if (!vnodeSupportPrefilter(pFilterInfo->info.data.type)) {
continue;
}
// all points in current column are NULL, no need to check its boundary value
if (pDataStatis[colIndex].numOfNull == numOfTotalPoints) {
continue;
}
if (pFilterInfo->info.info.type == TSDB_DATA_TYPE_FLOAT) {
float minval = *(double *)(&pDataStatis[colIndex].min);
float maxval = *(double *)(&pDataStatis[colIndex].max);
for (int32_t i = 0; i < pFilterInfo->numOfFilters; ++i) {
if (pFilterInfo->pFilters[i].fp(&pFilterInfo->pFilters[i], (char *)&minval, (char *)&maxval)) {
return true;
......@@ -1956,7 +1956,7 @@ static bool needToLoadDataBlock(SQuery *pQuery, SDataStatis *pDataStatis, SQLFun
}
}
}
// todo disable this opt code block temporarily
// for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
// int32_t functId = pQuery->pSelectExpr[i].base.functionId;
......@@ -2392,7 +2392,7 @@ void UNUSED_FUNC displayInterResult(tFilePage **pdata, SQueryRuntimeEnv* pRuntim
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t numOfCols = pQuery->numOfOutput;
printf("super table query intermediate result, total:%d\n", numOfRows);
for (int32_t j = 0; j < numOfRows; ++j) {
for (int32_t i = 0; i < numOfCols; ++i) {
......@@ -3534,7 +3534,7 @@ void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo
SQuery * pQuery = pRuntimeEnv->pQuery;
SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo;
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1;
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) {
rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock);
} else {
......@@ -4110,11 +4110,11 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
if (pInfo->id.tid == blockInfo.tid) {
assert(pInfo->id.uid == blockInfo.uid);
pTableQueryInfo = item->info;
break;
}
}
if (pTableQueryInfo != NULL) {
break;
}
......@@ -4182,11 +4182,11 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
pRuntimeEnv->pQueryHandle = NULL;
}
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp);
taosArrayDestroy(tx);
taosArrayDestroy(g1);
if (pRuntimeEnv->pTSBuf != NULL) {
if (pRuntimeEnv->cur.vgroupIndex == -1) {
int64_t tag = pRuntimeEnv->pCtx[0].tag.i64Key;
......@@ -4859,22 +4859,22 @@ static void stableQueryImpl(SQInfo *pQInfo) {
static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg, SColumnInfo* pTagCols) {
int32_t j = 0;
if (TSDB_COL_IS_TAG(pExprMsg->colInfo.flag)) {
while(j < pQueryMsg->numOfTags) {
if (pExprMsg->colInfo.colId == pTagCols[j].colId) {
return j;
}
j += 1;
}
} else {
while (j < pQueryMsg->numOfCols) {
if (pExprMsg->colInfo.colId == pQueryMsg->colList[j].colId) {
return j;
}
j += 1;
}
}
......@@ -4923,7 +4923,7 @@ static bool validateQuerySourceCols(SQueryTableMsg *pQueryMsg, SSqlFuncMsg** pEx
}
}
}
return true;
}
......@@ -5065,10 +5065,10 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pExprMsg = (SSqlFuncMsg *)pMsg;
}
if (!validateQuerySourceCols(pQueryMsg, *pExpr)) {
tfree(*pExpr);
return TSDB_CODE_INVALID_QUERY_MSG;
}
......@@ -5111,12 +5111,12 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
(*tagCols) = calloc(1, sizeof(SColumnInfo) * pQueryMsg->numOfTags);
for (int32_t i = 0; i < pQueryMsg->numOfTags; ++i) {
SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
pTagCol->colId = htons(pTagCol->colId);
pTagCol->bytes = htons(pTagCol->bytes);
pTagCol->type = htons(pTagCol->type);
pTagCol->numOfFilters = 0;
(*tagCols)[i] = *pTagCol;
pMsg += sizeof(SColumnInfo);
}
......@@ -5128,14 +5128,14 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
memcpy(*tagCond, pMsg, pQueryMsg->tagCondLen);
pMsg += pQueryMsg->tagCondLen;
}
if (*pMsg != 0) {
size_t len = strlen(pMsg) + 1;
*tbnameCond = malloc(len);
strcpy(*tbnameCond, pMsg);
pMsg += len;
}
qTrace("qmsg:%p query %d tables, qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, order:%d, "
"outputCols:%d, numOfCols:%d, interval:%" PRId64 ", fillType:%d, comptsLen:%d, limit:%" PRId64 ", offset:%" PRId64,
pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols,
......@@ -5160,7 +5160,7 @@ static int32_t buildAirthmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTable
qError("qmsg:%p failed to create arithmetic expression string from:%s", pQueryMsg, pArithExprInfo->base.arg[0].argValue.pz);
return TSDB_CODE_APP_ERROR;
}
pArithExprInfo->pExpr = pExprNode;
return TSDB_CODE_SUCCESS;
}
......@@ -5225,7 +5225,7 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo
for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) {
pExprs[i].base = *pExprMsg[i];
int16_t functId = pExprs[i].base.functionId;
if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) {
int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols);
assert(j < pQueryMsg->numOfCols);
......@@ -5265,7 +5265,7 @@ static SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SCol
for(int32_t i = 0; i < pQueryMsg->numOfGroupCols; ++i) {
taosArrayPush(pGroupbyExpr->columnInfo, &pColIndex[i]);
}
return pGroupbyExpr;
}
......@@ -5288,7 +5288,7 @@ static int32_t createFilterInfo(void *pQInfo, SQuery *pQuery) {
memcpy(&pFilterInfo->info, &pQuery->colList[i], sizeof(SColumnInfoData));
pFilterInfo->info = pQuery->colList[i];
pFilterInfo->numOfFilters = pQuery->colList[i].numOfFilters;
pFilterInfo->pFilters = calloc(pFilterInfo->numOfFilters, sizeof(SColumnFilterElem));
......@@ -5435,9 +5435,9 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
pQuery->colList[i] = pQueryMsg->colList[i];
pQuery->colList[i].filters = tscFilterInfoClone(pQueryMsg->colList[i].filters, pQuery->colList[i].numOfFilters);
}
pQuery->tagColList = pTagCols;
// calculate the result row size
for (int16_t col = 0; col < numOfOutput; ++col) {
assert(pExprs[col].bytes > 0);
......@@ -5484,22 +5484,23 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
// to make sure third party won't overwrite this structure
pQInfo->signature = pQInfo;
pQInfo->tableIdGroupInfo = *groupInfo;
size_t numOfGroups = taosArrayGetSize(groupInfo->pGroupList);
pQInfo->groupInfo.pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES);
pQInfo->groupInfo.numOfTables = groupInfo->numOfTables;
int tableIndex = 0;
STimeWindow window = pQueryMsg->window;
taosArraySort(pTableIdList, compareTableIdInfo);
for(int32_t i = 0; i < numOfGroups; ++i) {
SArray* pa = taosArrayGetP(groupInfo->pGroupList, i);
size_t s = taosArrayGetSize(pa);
SArray* p1 = taosArrayInit(s, sizeof(SGroupItem));
for(int32_t j = 0; j < s; ++j) {
STableId id = *(STableId*) taosArrayGet(pa, j);
SGroupItem item = { .id = id };
......@@ -5515,6 +5516,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
item.info->tableIndex = tableIndex++;
taosArrayPush(p1, &item);
}
taosArrayPush(pQInfo->groupInfo.pGroupList, &p1);
}
......@@ -5658,7 +5660,7 @@ static void freeQInfo(SQInfo *pQInfo) {
int32_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
for (int32_t i = 0; i < numOfGroups; ++i) {
SArray *p = taosArrayGetP(pQInfo->groupInfo.pGroupList, i);
size_t num = taosArrayGetSize(p);
for(int32_t j = 0; j < num; ++j) {
SGroupItem* item = taosArrayGet(p, j);
......@@ -5666,17 +5668,17 @@ static void freeQInfo(SQInfo *pQInfo) {
destroyTableQueryInfo(item->info, pQuery->numOfOutput);
}
}
taosArrayDestroy(p);
}
taosArrayDestroy(pQInfo->groupInfo.pGroupList);
for(int32_t i = 0; i < numOfGroups; ++i) {
SArray* p = taosArrayGetP(pQInfo->tableIdGroupInfo.pGroupList, i);
taosArrayDestroy(p);
}
taosArrayDestroy(pQInfo->tableIdGroupInfo.pGroupList);
taosArrayDestroy(pQInfo->arrTableIdInfo);
......@@ -5684,14 +5686,14 @@ static void freeQInfo(SQInfo *pQInfo) {
taosArrayDestroy(pQuery->pGroupbyExpr->columnInfo);
tfree(pQuery->pGroupbyExpr);
}
tfree(pQuery->tagColList);
tfree(pQuery->pFilterInfo);
tfree(pQuery->colList);
tfree(pQuery->sdata);
tfree(pQuery);
qTrace("QInfo:%p QInfo is freed", pQInfo);
// destroy signature, in order to avoid the query process pass the object safety check
......@@ -5744,7 +5746,7 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
qError("QInfo:%p failed to open tmp file to send ts-comp data to client, path:%s, reason:%s", pQInfo,
pQuery->sdata[0]->data, strerror(errno));
}
// all data returned, set query over
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
setQueryStatus(pQuery, QUERY_OVER);
......@@ -5861,7 +5863,12 @@ _over:
tfree(tagCond);
tfree(tbnameCond);
taosArrayDestroy(pTableIdList);
if (code != TSDB_CODE_SUCCESS) {
tfree(*pQInfo);
*pQInfo = NULL;
}
// if failed to add ref for all meters in this query, abort current query
return code;
}
......@@ -5885,7 +5892,7 @@ void qTableQuery(qinfo_t qinfo) {
}
qTrace("QInfo:%p query task is launched", pQInfo);
if (onlyQueryTags(pQInfo->runtimeEnv.pQuery)) {
buildTagQueryResult(pQInfo); // todo support the limit/offset
} else if (pQInfo->runtimeEnv.stableQuery) {
......@@ -5893,7 +5900,7 @@ void qTableQuery(qinfo_t qinfo) {
} else {
tableQueryImpl(pQInfo);
}
sem_post(&pQInfo->dataReady);
// vnodeDecRefCount(pQInfo);
}
......@@ -5987,7 +5994,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
static void buildTagQueryResult(SQInfo* pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
size_t num = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
assert(num == 0 || num == 1);
if (num == 0) {
......@@ -5996,44 +6003,44 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
SArray* pa = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0);
num = taosArrayGetSize(pa);
assert(num == pQInfo->groupInfo.numOfTables);
int16_t type, bytes;
int32_t functionId = pQuery->pSelectExpr[0].base.functionId;
if (functionId == TSDB_FUNC_TID_TAG) { // return the tags & table Id
assert(pQuery->numOfOutput == 1);
SExprInfo* pExprInfo = &pQuery->pSelectExpr[0];
int32_t rsize = pExprInfo->bytes;
char* data = NULL;
for(int32_t i = 0; i < num; ++i) {
SGroupItem* item = taosArrayGet(pa, i);
char* output = pQuery->sdata[0]->data + i * rsize;
varDataSetLen(output, rsize - VARSTR_HEADER_SIZE);
output = varDataVal(output);
*(int64_t*) output = item->id.uid; // memory align problem, todo serialize
output += sizeof(item->id.uid);
*(int32_t*) output = item->id.tid;
output += sizeof(item->id.tid);
*(int32_t*) output = pQInfo->vgId;
output += sizeof(pQInfo->vgId);
tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo->base.colInfo.colId, &type, &bytes, &data);
memcpy(output, data, bytes);
}
qTrace("QInfo:%p create (tableId, tag) info completed, rows:%d", pQInfo, num);
} else { // return only the tags|table name etc.
for(int32_t i = 0; i < num; ++i) {
SExprInfo* pExprInfo = pQuery->pSelectExpr;
SGroupItem* item = taosArrayGet(pa, i);
char* data = NULL;
for(int32_t j = 0; j < pQuery->numOfOutput; ++j) {
// todo check the return value, refactor codes
......@@ -6059,7 +6066,7 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
pQInfo->tableIndex = pQInfo->groupInfo.numOfTables;
qTrace("QInfo:%p create tag values results completed, rows:%d", pQInfo, num);
}
pQuery->rec.rows = num;
setQueryStatus(pQuery, QUERY_COMPLETED);
}
......
......@@ -683,6 +683,7 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) {
pConn->tretry = 0;
pConn->ahandle = pContext->ahandle;
sprintf(pConn->info, "%s %p %p", pRpc->label, pConn, pConn->ahandle);
pConn->tretry = 0;
} else {
tError("%s %p, failed to set up connection(%s)", pRpc->label, pContext->ahandle, tstrerror(terrno));
}
......@@ -802,7 +803,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
pConn = rpcGetConnObj(pRpc, sid, pRecv);
if (pConn == NULL) {
tError("%s %p, failed to get connection obj(%s)", pRpc->label, pHead->ahandle, tstrerror(terrno));
tTrace("%s %p, failed to get connection obj(%s)", pRpc->label, pHead->ahandle, tstrerror(terrno));
return NULL;
} else {
if (rpcIsReq(pHead->msgType)) {
......@@ -833,8 +834,8 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
terrno = rpcProcessReqHead(pConn, pHead);
pConn->connType = pRecv->connType;
// client shall send the request within tsRpcTime again, put 20 mseconds tolerance
taosTmrReset(rpcProcessIdleTimer, tsRpcTimer+20, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
// client shall send the request within tsRpcTime again, double it
taosTmrReset(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
} else {
terrno = rpcProcessRspHead(pConn, pHead);
}
......@@ -1099,7 +1100,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
pHead->port = 0;
pHead->linkUid = pConn->linkUid;
pHead->ahandle = (uint64_t)pConn->ahandle;
if (!pConn->secured) memcpy(pHead->user, pConn->user, tListLen(pHead->user));
memcpy(pHead->user, pConn->user, tListLen(pHead->user));
// set the connection parameters
pConn->outType = msgType;
......@@ -1398,7 +1399,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
code = TSDB_CODE_INVALID_TIME_STAMP;
} else {
if (rpcAuthenticateMsg(pHead, msgLen-TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
tError("%s, authentication failed, msg discarded", pConn->info);
tTrace("%s, authentication failed, msg discarded", pConn->info);
code = TSDB_CODE_AUTH_FAILURE;
} else {
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest);
......@@ -1407,7 +1408,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
}
}
} else {
tError("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi);
tTrace("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi);
code = pHead->spi ? TSDB_CODE_AUTH_FAILURE : TSDB_CODE_AUTH_REQUIRED;
}
......
......@@ -380,7 +380,7 @@ static void *taosProcessTcpData(void *param) {
int32_t headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead));
if (headLen != sizeof(SRpcHead)) {
tError("%s %p, read error, headLen:%d", pThreadObj->label, pFdObj->thandle, headLen);
tTrace("%s %p, read error, headLen:%d", pThreadObj->label, pFdObj->thandle, headLen);
taosReportBrokenLink(pFdObj);
continue;
}
......
......@@ -36,7 +36,7 @@ void processShellMsg() {
while (1) {
int numOfMsgs = taosReadAllQitems(qhandle, qall);
if (numOfMsgs <= 0) {
usleep(1000);
usleep(100);
continue;
}
......@@ -115,7 +115,7 @@ int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char
void processRequestMsg(SRpcMsg *pMsg, SRpcIpSet *pIpSet) {
SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
......@@ -171,7 +171,6 @@ int main(int argc, char *argv[]) {
tsAsyncLog = 0;
rpcInit.connType = TAOS_CONN_SERVER;
taosInitLog("server.log", 100000, 10);
void *pRpc = rpcOpen(&rpcInit);
......
......@@ -76,7 +76,7 @@ typedef struct STable {
int32_t sversion;
STSchema * schema;
STSchema * tagSchema;
SDataRow tagVal;
SKVRow tagVal;
SMemTable * mem;
SMemTable * imem;
void * pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index
......
......@@ -94,7 +94,7 @@ static int tsdbInitFile(char *dataDir, int fid, const char *suffix, SFile *pFile
if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) return -1;
void *pBuf = buf;
pBuf = taosDecodeFixed32(pBuf, &version);
pBuf = taosDecodeFixedU32(pBuf, &version);
pBuf = tsdbDecodeSFileInfo(pBuf, &(pFile->info));
tsdbCloseFile(pFile);
......
......@@ -510,11 +510,11 @@ int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup) {
return 0;
}
int tsdbTableSetTagValue(STableCfg *config, SDataRow row, bool dup) {
int tsdbTableSetTagValue(STableCfg *config, SKVRow row, bool dup) {
if (config->type != TSDB_CHILD_TABLE) return -1;
if (dup) {
config->tagValues = tdDataRowDup(row);
config->tagValues = tdKVRowDup(row);
} else {
config->tagValues = row;
}
......@@ -561,7 +561,7 @@ int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup) {
void tsdbClearTableCfg(STableCfg *config) {
if (config->schema) tdFreeSchema(config->schema);
if (config->tagSchema) tdFreeSchema(config->tagSchema);
if (config->tagValues) tdFreeDataRow(config->tagValues);
if (config->tagValues) kvRowFree(config->tagValues);
tfree(config->name);
tfree(config->sname);
tfree(config->sql);
......
......@@ -47,8 +47,7 @@ void tsdbEncodeTable(STable *pTable, char *buf, int *contLen) {
ptr = tdEncodeSchema(ptr, pTable->schema);
ptr = tdEncodeSchema(ptr, pTable->tagSchema);
} else if (pTable->type == TSDB_CHILD_TABLE) {
tdTagRowCpy(ptr, pTable->tagVal);
ptr = POINTER_SHIFT(ptr, dataRowLen(pTable->tagVal) + ((STagRow *)pTable->tagVal)->dataLen);
ptr = tdEncodeKVRow(ptr, pTable->tagVal);
} else {
ptr = tdEncodeSchema(ptr, pTable->schema);
}
......@@ -94,8 +93,7 @@ STable *tsdbDecodeTable(void *cont, int contLen) {
pTable->schema = tdDecodeSchema(&ptr);
pTable->tagSchema = tdDecodeSchema(&ptr);
} else if (pTable->type == TSDB_CHILD_TABLE) {
pTable->tagVal = tdTagRowDecode(ptr);
ptr = POINTER_SHIFT(ptr, dataRowLen(pTable->tagVal) + ((STagRow *)pTable->tagVal)->dataLen);
ptr = tdDecodeKVRow(ptr, &pTable->tagVal);
} else {
pTable->schema = tdDecodeSchema(&ptr);
}
......@@ -115,12 +113,9 @@ void tsdbFreeEncode(void *cont) {
static char* getTagIndexKey(const void* pData) {
STableIndexElem* elem = (STableIndexElem*) pData;
SDataRow row = elem->pTable->tagVal;
STSchema* pSchema = tsdbGetTableTagSchema(elem->pMeta, elem->pTable);
STColumn* pCol = &pSchema->columns[DEFAULT_TAG_INDEX_COLUMN];
int16_t type = 0;
void * res = tdQueryTagByID(row, pCol->colId, &type);
ASSERT(type == pCol->type);
void * res = tdGetKVRowValOfCol(elem->pTable->tagVal, pCol->colId);
return res;
}
......@@ -255,19 +250,24 @@ STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable) {
int32_t tsdbGetTableTagVal(TsdbRepoT* repo, STableId* id, int32_t colId, int16_t* type, int16_t* bytes, char** val) {
STsdbMeta* pMeta = tsdbGetMeta(repo);
STable* pTable = tsdbGetTableByUid(pMeta, id->uid);
STSchema *pSchema = tsdbGetTableTagSchema(pMeta, pTable);
STColumn *pCol = tdGetColOfID(pSchema, colId);
if (pCol == NULL) {
return -1; // No matched tag volumn
}
*val = tdQueryTagByID(pTable->tagVal, colId, type);
*val = tdGetKVRowValOfCol(pTable->tagVal, colId);
*type = pCol->type;
if (*val != NULL) {
switch(*type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: *bytes = varDataLen(*val); break;
case TSDB_DATA_TYPE_NULL: *bytes = 0; break;
default:
*bytes = tDataTypeDesc[*type].nSize;break;
if (IS_VAR_DATA_TYPE(*type)) {
*bytes = varDataLen(*val);
} else {
*bytes = TYPE_BYTES[*type];
}
}
return TSDB_CODE_SUCCESS;
}
......@@ -341,7 +341,7 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) {
if (pCfg->type == TSDB_CHILD_TABLE) {
pTable->superUid = pCfg->superUid;
pTable->tagVal = tdDataRowDup(pCfg->tagValues);
pTable->tagVal = tdKVRowDup(pCfg->tagValues);
} else if (pCfg->type == TSDB_NORMAL_TABLE) {
pTable->superUid = -1;
pTable->schema = tdDupSchema(pCfg->schema);
......@@ -438,6 +438,61 @@ STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId) {
return pTable;
}
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) {
if (pMsg == NULL) return NULL;
SSchema * pSchema = (SSchema *)pMsg->data;
int16_t numOfCols = htons(pMsg->numOfColumns);
int16_t numOfTags = htons(pMsg->numOfTags);
STSchemaBuilder schemaBuilder = {0};
STableCfg *pCfg = (STableCfg *)calloc(1, sizeof(STableCfg));
if (pCfg == NULL) return NULL;
if (tsdbInitTableCfg(pCfg, pMsg->tableType, htobe64(pMsg->uid), htonl(pMsg->sid)) < 0) goto _err;
if (tdInitTSchemaBuilder(&schemaBuilder, htonl(pMsg->sversion)) < 0) goto _err;
for (int i = 0; i < numOfCols; i++) {
tdAddColToSchema(&schemaBuilder, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
}
if (tsdbTableSetSchema(pCfg, tdGetSchemaFromBuilder(&schemaBuilder), false) < 0) goto _err;
if (tsdbTableSetName(pCfg, pMsg->tableId, true) < 0) goto _err;
if (numOfTags > 0) {
int accBytes = 0;
char *pTagData = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema);
SKVRowBuilder kvRowBuilder = {0};
tdResetTSchemaBuilder(&schemaBuilder, htonl(pMsg->tversion));
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) goto _err;
for (int i = numOfCols; i < numOfCols + numOfTags; i++) {
tdAddColToSchema(&schemaBuilder, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
tdAddColToKVRow(&kvRowBuilder, htons(pSchema[i].colId), pSchema[i].type, pTagData + accBytes);
accBytes += htons(pSchema[i].bytes);
}
if (tsdbTableSetTagSchema(pCfg, tdGetSchemaFromBuilder(&schemaBuilder), false) < 0) goto _err;
if (tsdbTableSetSName(pCfg, pMsg->superTableId, true) < 0) goto _err;
if (tsdbTableSetSuperUid(pCfg, htobe64(pMsg->superTableUid)) < 0) goto _err;
tsdbTableSetTagValue(pCfg, tdGetKVRowFromBuilder(&kvRowBuilder), false);
tdDestroyKVRowBuilder(&kvRowBuilder);
}
if (pMsg->tableType == TSDB_STREAM_TABLE) {
char *sql = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema);
tsdbTableSetStreamSql(pCfg, sql, true);
}
tdDestroyTSchemaBuilder(&schemaBuilder);
return pCfg;
_err:
tdDestroyTSchemaBuilder(&schemaBuilder);
tsdbClearTableCfg(pCfg);
tfree(pCfg);
return NULL;
}
// int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId) {
int tsdbDropTable(TsdbRepoT *repo, STableId tableId) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
......@@ -478,7 +533,7 @@ static int tsdbFreeTable(STable *pTable) {
if (pTable == NULL) return 0;
if (pTable->type == TSDB_CHILD_TABLE) {
tdFreeTagRow(pTable->tagVal);
kvRowFree(pTable->tagVal);
} else {
tdFreeSchema(pTable->schema);
}
......@@ -627,9 +682,7 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) {
STSchema* pSchema = tsdbGetTableTagSchema(pMeta, pTable);
STColumn* pCol = &pSchema->columns[DEFAULT_TAG_INDEX_COLUMN];
int16_t tagtype = 0;
char* key = tdQueryTagByID(pTable->tagVal, pCol->colId, &tagtype);
ASSERT(pCol->type == tagtype);
char* key = tdGetKVRowValOfCol(pTable->tagVal, pCol->colId);
SArray* res = tSkipListGet(pSTable->pIndex, key);
size_t size = taosArrayGetSize(res);
......
......@@ -237,20 +237,24 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
if (pHelper->files.headF.fd > 0) {
fsync(pHelper->files.headF.fd);
close(pHelper->files.headF.fd);
pHelper->files.headF.fd = -1;
}
if (pHelper->files.dataF.fd > 0) {
if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.dataF), 0);
fsync(pHelper->files.dataF.fd);
close(pHelper->files.dataF.fd);
pHelper->files.dataF.fd = -1;
}
if (pHelper->files.lastF.fd > 0) {
fsync(pHelper->files.lastF.fd);
close(pHelper->files.lastF.fd);
pHelper->files.lastF.fd = -1;
}
if (pHelper->files.nHeadF.fd > 0) {
if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.nHeadF), 0);
fsync(pHelper->files.nHeadF.fd);
close(pHelper->files.nHeadF.fd);
pHelper->files.nHeadF.fd = -1;
if (hasError) {
......@@ -263,6 +267,7 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
if (pHelper->files.nLastF.fd > 0) {
if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.nLastF), 0);
fsync(pHelper->files.nLastF.fd);
close(pHelper->files.nLastF.fd);
pHelper->files.nLastF.fd = -1;
if (hasError) {
......@@ -448,7 +453,7 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
pHelper->pBuffer = trealloc(pHelper->pBuffer, tsizeof(pHelper->pBuffer)*2);
}
buf = POINTER_SHIFT(pHelper->pBuffer, drift);
buf = taosEncodeVariant32(buf, i);
buf = taosEncodeVariantU32(buf, i);
buf = tsdbEncodeSCompIdx(buf, pCompIdx);
}
}
......@@ -486,7 +491,7 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
void *ptr = pHelper->pBuffer;
while (((char *)ptr - (char *)pHelper->pBuffer) < (pFile->info.len - sizeof(TSCKSUM))) {
uint32_t tid = 0;
if ((ptr = taosDecodeVariant32(ptr, &tid)) == NULL) return -1;
if ((ptr = taosDecodeVariantU32(ptr, &tid)) == NULL) return -1;
ASSERT(tid > 0 && tid < pHelper->config.maxTables);
if ((ptr = tsdbDecodeSCompIdx(ptr, pHelper->pCompIdx + tid)) == NULL) return -1;
......@@ -1248,12 +1253,12 @@ static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey)
}
void *tsdbEncodeSCompIdx(void *buf, SCompIdx *pIdx) {
buf = taosEncodeVariant32(buf, pIdx->len);
buf = taosEncodeVariant32(buf, pIdx->offset);
buf = taosEncodeFixed8(buf, pIdx->hasLast);
buf = taosEncodeVariant32(buf, pIdx->numOfBlocks);
buf = taosEncodeFixed64(buf, pIdx->uid);
buf = taosEncodeFixed64(buf, pIdx->maxKey);
buf = taosEncodeVariantU32(buf, pIdx->len);
buf = taosEncodeVariantU32(buf, pIdx->offset);
buf = taosEncodeFixedU8(buf, pIdx->hasLast);
buf = taosEncodeVariantU32(buf, pIdx->numOfBlocks);
buf = taosEncodeFixedU64(buf, pIdx->uid);
buf = taosEncodeFixedU64(buf, pIdx->maxKey);
return buf;
}
......@@ -1263,15 +1268,15 @@ void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx) {
uint32_t numOfBlocks = 0;
uint64_t value = 0;
if ((buf = taosDecodeVariant32(buf, &(pIdx->len))) == NULL) return NULL;
if ((buf = taosDecodeVariant32(buf, &(pIdx->offset))) == NULL) return NULL;
if ((buf = taosDecodeFixed8(buf, &(hasLast))) == NULL) return NULL;
if ((buf = taosDecodeVariantU32(buf, &(pIdx->len))) == NULL) return NULL;
if ((buf = taosDecodeVariantU32(buf, &(pIdx->offset))) == NULL) return NULL;
if ((buf = taosDecodeFixedU8(buf, &(hasLast))) == NULL) return NULL;
pIdx->hasLast = hasLast;
if ((buf = taosDecodeVariant32(buf, &(numOfBlocks))) == NULL) return NULL;
if ((buf = taosDecodeVariantU32(buf, &(numOfBlocks))) == NULL) return NULL;
pIdx->numOfBlocks = numOfBlocks;
if ((buf = taosDecodeFixed64(buf, &value)) == NULL) return NULL;
if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL;
pIdx->uid = (int64_t)value;
if ((buf = taosDecodeFixed64(buf, &value)) == NULL) return NULL;
if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL;
pIdx->maxKey = (TSKEY)value;
return buf;
......@@ -1281,7 +1286,7 @@ int tsdbUpdateFileHeader(SFile *pFile, uint32_t version) {
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
void *pBuf = (void *)buf;
pBuf = taosEncodeFixed32(pBuf, version);
pBuf = taosEncodeFixedU32(pBuf, version);
pBuf = tsdbEncodeSFileInfo(pBuf, &(pFile->info));
taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE);
......@@ -1295,23 +1300,23 @@ int tsdbUpdateFileHeader(SFile *pFile, uint32_t version) {
void *tsdbEncodeSFileInfo(void *buf, const STsdbFileInfo *pInfo) {
buf = taosEncodeFixed32(buf, pInfo->offset);
buf = taosEncodeFixed32(buf, pInfo->len);
buf = taosEncodeFixed64(buf, pInfo->size);
buf = taosEncodeFixed64(buf, pInfo->tombSize);
buf = taosEncodeFixed32(buf, pInfo->totalBlocks);
buf = taosEncodeFixed32(buf, pInfo->totalSubBlocks);
buf = taosEncodeFixedU32(buf, pInfo->offset);
buf = taosEncodeFixedU32(buf, pInfo->len);
buf = taosEncodeFixedU64(buf, pInfo->size);
buf = taosEncodeFixedU64(buf, pInfo->tombSize);
buf = taosEncodeFixedU32(buf, pInfo->totalBlocks);
buf = taosEncodeFixedU32(buf, pInfo->totalSubBlocks);
return buf;
}
void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo) {
buf = taosDecodeFixed32(buf, &(pInfo->offset));
buf = taosDecodeFixed32(buf, &(pInfo->len));
buf = taosDecodeFixed64(buf, &(pInfo->size));
buf = taosDecodeFixed64(buf, &(pInfo->tombSize));
buf = taosDecodeFixed32(buf, &(pInfo->totalBlocks));
buf = taosDecodeFixed32(buf, &(pInfo->totalSubBlocks));
buf = taosDecodeFixedU32(buf, &(pInfo->offset));
buf = taosDecodeFixedU32(buf, &(pInfo->len));
buf = taosDecodeFixedU64(buf, &(pInfo->size));
buf = taosDecodeFixedU64(buf, &(pInfo->tombSize));
buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks));
buf = taosDecodeFixedU32(buf, &(pInfo->totalSubBlocks));
return buf;
}
\ No newline at end of file
......@@ -1909,9 +1909,8 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
STColumn* pCol = schemaColAt(pTableGroupSupp->pTagSchema, colIndex);
bytes = pCol->bytes;
type = pCol->type;
int16_t tgtype1, tgtype2 = 0;
f1 = tdQueryTagByID(pTable1->tagVal, pCol->colId, &tgtype1);
f2 = tdQueryTagByID(pTable2->tagVal, pCol->colId, &tgtype2);
f1 = tdGetKVRowValOfCol(pTable1->tagVal, pCol->colId);
f2 = tdGetKVRowValOfCol(pTable2->tagVal, pCol->colId);
}
int32_t ret = doCompare(f1, f2, type, bytes);
......@@ -1999,9 +1998,7 @@ bool indexedNodeFilterFp(const void* pNode, void* param) {
val = (char*) elem->pTable->name;
type = TSDB_DATA_TYPE_BINARY;
} else {
int16_t t1;
val = tdQueryTagByID(elem->pTable->tagVal, pInfo->sch.colId, &t1);
assert(pInfo->sch.type == t1);
val = tdGetKVRowValOfCol(elem->pTable->tagVal, pInfo->sch.colId);
}
//todo :the val is possible to be null, so check it out carefully
......
......@@ -29,12 +29,33 @@ extern "C" {
static const int32_t TNUMBER = 1;
#define IS_LITTLE_ENDIAN() (*(uint8_t *)(&TNUMBER) != 0)
static FORCE_INLINE void *taosEncodeFixed8(void *buf, uint8_t value) {
#define ZIGZAGE(T, v) ((u##T)((v) >> (sizeof(T) * 8 - 1))) ^ (((u##T)(v)) << 1) // zigzag encode
#define ZIGZAGD(T, v) ((v) >> 1) ^ -((T)((v)&1)) // zigzag decode
// ---- Fixed U8
static FORCE_INLINE void *taosEncodeFixedU8(void *buf, uint8_t value) {
((uint8_t *)buf)[0] = value;
return POINTER_SHIFT(buf, sizeof(value));
}
static FORCE_INLINE void *taosEncodeFixed16(void *buf, uint16_t value) {
static FORCE_INLINE void *taosDecodeFixedU8(void *buf, uint8_t *value) {
*value = ((uint8_t *)buf)[0];
return POINTER_SHIFT(buf, sizeof(*value));
}
// ---- Fixed I8
static FORCE_INLINE void *taosEncodeFixedI8(void *buf, int8_t value) {
((int8_t *)buf)[0] = value;
return POINTER_SHIFT(buf, sizeof(value));
}
static FORCE_INLINE void *taosDecodeFixedI8(void *buf, int8_t *value) {
*value = ((int8_t *)buf)[0];
return POINTER_SHIFT(buf, sizeof(*value));
}
// ---- Fixed U16
static FORCE_INLINE void *taosEncodeFixedU16(void *buf, uint16_t value) {
if (IS_LITTLE_ENDIAN()) {
memcpy(buf, &value, sizeof(value));
} else {
......@@ -45,20 +66,31 @@ static FORCE_INLINE void *taosEncodeFixed16(void *buf, uint16_t value) {
return POINTER_SHIFT(buf, sizeof(value));
}
static FORCE_INLINE void *taosEncodeFixed32(void *buf, uint32_t value) {
static FORCE_INLINE void *taosDecodeFixedU16(void *buf, uint16_t *value) {
if (IS_LITTLE_ENDIAN()) {
memcpy(buf, &value, sizeof(value));
memcpy(value, buf, sizeof(*value));
} else {
((uint8_t *)buf)[0] = value & 0xff;
((uint8_t *)buf)[1] = (value >> 8) & 0xff;
((uint8_t *)buf)[2] = (value >> 16) & 0xff;
((uint8_t *)buf)[3] = (value >> 24) & 0xff;
((uint8_t *)value)[1] = ((uint8_t *)buf)[0];
((uint8_t *)value)[0] = ((uint8_t *)buf)[1];
}
return POINTER_SHIFT(buf, sizeof(value));
return POINTER_SHIFT(buf, sizeof(*value));
}
// ---- Fixed I16
static FORCE_INLINE void *taosEncodeFixedI16(void *buf, int16_t value) {
return taosEncodeFixedU16(buf, ZIGZAGE(int16_t, value));
}
static FORCE_INLINE void *taosEncodeFixed64(void *buf, uint64_t value) {
static FORCE_INLINE void *taosDecodeFixedI16(void *buf, int16_t *value) {
uint16_t tvalue = 0;
void * ret = taosDecodeFixedU16(buf, &tvalue);
*value = ZIGZAGD(int16_t, tvalue);
return ret;
}
// ---- Fixed U32
static FORCE_INLINE void *taosEncodeFixedU32(void *buf, uint32_t value) {
if (IS_LITTLE_ENDIAN()) {
memcpy(buf, &value, sizeof(value));
} else {
......@@ -66,45 +98,55 @@ static FORCE_INLINE void *taosEncodeFixed64(void *buf, uint64_t value) {
((uint8_t *)buf)[1] = (value >> 8) & 0xff;
((uint8_t *)buf)[2] = (value >> 16) & 0xff;
((uint8_t *)buf)[3] = (value >> 24) & 0xff;
((uint8_t *)buf)[4] = (value >> 32) & 0xff;
((uint8_t *)buf)[5] = (value >> 40) & 0xff;
((uint8_t *)buf)[6] = (value >> 48) & 0xff;
((uint8_t *)buf)[7] = (value >> 56) & 0xff;
}
return POINTER_SHIFT(buf, sizeof(value));
}
static FORCE_INLINE void *taosDecodeFixed8(void *buf, uint8_t *value) {
*value = ((uint8_t *)buf)[0];
return POINTER_SHIFT(buf, sizeof(*value));
}
static FORCE_INLINE void *taosDecodeFixed16(void *buf, uint16_t *value) {
static FORCE_INLINE void *taosDecodeFixedU32(void *buf, uint32_t *value) {
if (IS_LITTLE_ENDIAN()) {
memcpy(value, buf, sizeof(*value));
} else {
((uint8_t *)value)[1] = ((uint8_t *)buf)[0];
((uint8_t *)value)[0] = ((uint8_t *)buf)[1];
((uint8_t *)value)[3] = ((uint8_t *)buf)[0];
((uint8_t *)value)[2] = ((uint8_t *)buf)[1];
((uint8_t *)value)[1] = ((uint8_t *)buf)[2];
((uint8_t *)value)[0] = ((uint8_t *)buf)[3];
}
return POINTER_SHIFT(buf, sizeof(*value));
}
static FORCE_INLINE void *taosDecodeFixed32(void *buf, uint32_t *value) {
// ---- Fixed I32
static FORCE_INLINE void *taosEncodeFixedI32(void *buf, int32_t value) {
return taosEncodeFixedU32(buf, ZIGZAGE(int32_t, value));
}
static FORCE_INLINE void *taosDecodeFixedI32(void *buf, int32_t *value) {
uint32_t tvalue = 0;
void * ret = taosDecodeFixedU32(buf, &tvalue);
*value = ZIGZAGD(int32_t, tvalue);
return ret;
}
// ---- Fixed U64
static FORCE_INLINE void *taosEncodeFixedU64(void *buf, uint64_t value) {
if (IS_LITTLE_ENDIAN()) {
memcpy(value, buf, sizeof(*value));
memcpy(buf, &value, sizeof(value));
} else {
((uint8_t *)value)[3] = ((uint8_t *)buf)[0];
((uint8_t *)value)[2] = ((uint8_t *)buf)[1];
((uint8_t *)value)[1] = ((uint8_t *)buf)[2];
((uint8_t *)value)[0] = ((uint8_t *)buf)[3];
((uint8_t *)buf)[0] = value & 0xff;
((uint8_t *)buf)[1] = (value >> 8) & 0xff;
((uint8_t *)buf)[2] = (value >> 16) & 0xff;
((uint8_t *)buf)[3] = (value >> 24) & 0xff;
((uint8_t *)buf)[4] = (value >> 32) & 0xff;
((uint8_t *)buf)[5] = (value >> 40) & 0xff;
((uint8_t *)buf)[6] = (value >> 48) & 0xff;
((uint8_t *)buf)[7] = (value >> 56) & 0xff;
}
return POINTER_SHIFT(buf, sizeof(*value));
return POINTER_SHIFT(buf, sizeof(value));
}
static FORCE_INLINE void *taosDecodeFixed64(void *buf, uint64_t *value) {
static FORCE_INLINE void *taosDecodeFixedU64(void *buf, uint64_t *value) {
if (IS_LITTLE_ENDIAN()) {
memcpy(value, buf, sizeof(*value));
} else {
......@@ -121,41 +163,26 @@ static FORCE_INLINE void *taosDecodeFixed64(void *buf, uint64_t *value) {
return POINTER_SHIFT(buf, sizeof(*value));
}
static FORCE_INLINE void *taosEncodeVariant16(void *buf, uint16_t value) {
int i = 0;
while (value >= ENCODE_LIMIT) {
((uint8_t *)buf)[i] = (value | ENCODE_LIMIT);
value >>= 7;
i++;
ASSERT(i < 3);
}
((uint8_t *)buf)[i] = value;
return POINTER_SHIFT(buf, i+1);
// ---- Fixed I64
static FORCE_INLINE void *taosEncodeFixedI64(void *buf, int64_t value) {
return taosEncodeFixedU64(buf, ZIGZAGE(int64_t, value));
}
static FORCE_INLINE void *taosEncodeVariant32(void *buf, uint32_t value) {
int i = 0;
while (value >= ENCODE_LIMIT) {
((uint8_t *)buf)[i] = (value | ENCODE_LIMIT);
value >>= 7;
i++;
ASSERT(i < 5);
}
((uint8_t *)buf)[i] = value;
return POINTER_SHIFT(buf, i + 1);
static FORCE_INLINE void *taosDecodeFixedI64(void *buf, int64_t *value) {
uint64_t tvalue = 0;
void * ret = taosDecodeFixedU64(buf, &tvalue);
*value = ZIGZAGD(int64_t, tvalue);
return ret;
}
static FORCE_INLINE void *taosEncodeVariant64(void *buf, uint64_t value) {
// ---- Variant U16
static FORCE_INLINE void *taosEncodeVariantU16(void *buf, uint16_t value) {
int i = 0;
while (value >= ENCODE_LIMIT) {
((uint8_t *)buf)[i] = (value | ENCODE_LIMIT);
value >>= 7;
i++;
ASSERT(i < 10);
ASSERT(i < 3);
}
((uint8_t *)buf)[i] = value;
......@@ -163,8 +190,8 @@ static FORCE_INLINE void *taosEncodeVariant64(void *buf, uint64_t value) {
return POINTER_SHIFT(buf, i + 1);
}
static FORCE_INLINE void *taosDecodeVariant16(void *buf, uint16_t *value) {
int i = 0;
static FORCE_INLINE void *taosDecodeVariantU16(void *buf, uint16_t *value) {
int i = 0;
uint16_t tval = 0;
*value = 0;
while (i < 3) {
......@@ -181,8 +208,35 @@ static FORCE_INLINE void *taosDecodeVariant16(void *buf, uint16_t *value) {
return NULL; // error happened
}
static FORCE_INLINE void *taosDecodeVariant32(void *buf, uint32_t *value) {
// ---- Variant I16
static FORCE_INLINE void *taosEncodeVariantI16(void *buf, int16_t value) {
return taosEncodeVariantU16(buf, ZIGZAGE(int16_t, value));
}
static FORCE_INLINE void *taosDecodeVariantI16(void *buf, int16_t *value) {
uint16_t tvalue = 0;
void * ret = taosDecodeVariantU16(buf, &tvalue);
*value = ZIGZAGD(int16_t, tvalue);
return ret;
}
// ---- Variant U32
static FORCE_INLINE void *taosEncodeVariantU32(void *buf, uint32_t value) {
int i = 0;
while (value >= ENCODE_LIMIT) {
((uint8_t *)buf)[i] = (value | ENCODE_LIMIT);
value >>= 7;
i++;
ASSERT(i < 5);
}
((uint8_t *)buf)[i] = value;
return POINTER_SHIFT(buf, i + 1);
}
static FORCE_INLINE void *taosDecodeVariantU32(void *buf, uint32_t *value) {
int i = 0;
uint32_t tval = 0;
*value = 0;
while (i < 5) {
......@@ -199,8 +253,35 @@ static FORCE_INLINE void *taosDecodeVariant32(void *buf, uint32_t *value) {
return NULL; // error happened
}
static FORCE_INLINE void *taosDecodeVariant64(void *buf, uint64_t *value) {
// ---- Variant I32
static FORCE_INLINE void *taosEncodeVariantI32(void *buf, int32_t value) {
return taosEncodeVariantU32(buf, ZIGZAGE(int32_t, value));
}
static FORCE_INLINE void *taosDecodeVariantI32(void *buf, int32_t *value) {
uint32_t tvalue = 0;
void * ret = taosDecodeVariantU32(buf, &tvalue);
*value = ZIGZAGD(int32_t, tvalue);
return ret;
}
// ---- Variant U64
static FORCE_INLINE void *taosEncodeVariantU64(void *buf, uint64_t value) {
int i = 0;
while (value >= ENCODE_LIMIT) {
((uint8_t *)buf)[i] = (value | ENCODE_LIMIT);
value >>= 7;
i++;
ASSERT(i < 10);
}
((uint8_t *)buf)[i] = value;
return POINTER_SHIFT(buf, i + 1);
}
static FORCE_INLINE void *taosDecodeVariantU64(void *buf, uint64_t *value) {
int i = 0;
uint64_t tval = 0;
*value = 0;
while (i < 10) {
......@@ -217,10 +298,23 @@ static FORCE_INLINE void *taosDecodeVariant64(void *buf, uint64_t *value) {
return NULL; // error happened
}
// ---- Variant I64
static FORCE_INLINE void *taosEncodeVariantI64(void *buf, int64_t value) {
return taosEncodeVariantU64(buf, ZIGZAGE(int64_t, value));
}
static FORCE_INLINE void *taosDecodeVariantI64(void *buf, int64_t *value) {
uint64_t tvalue = 0;
void * ret = taosDecodeVariantU64(buf, &tvalue);
*value = ZIGZAGD(int64_t, tvalue);
return ret;
}
// ---- string
static FORCE_INLINE void *taosEncodeString(void *buf, char *value) {
size_t size = strlen(value);
buf = taosEncodeVariant64(buf, size);
buf = taosEncodeVariantU64(buf, size);
memcpy(buf, value, size);
return POINTER_SHIFT(buf, size);
......@@ -229,7 +323,7 @@ static FORCE_INLINE void *taosEncodeString(void *buf, char *value) {
static FORCE_INLINE void *taosDecodeString(void *buf, char **value) {
uint64_t size = 0;
buf = taosDecodeVariant64(buf, &size);
buf = taosDecodeVariantU64(buf, &size);
*value = (char *)malloc(size + 1);
if (*value == NULL) return NULL;
memcpy(*value, buf, size);
......
......@@ -9,8 +9,18 @@ static bool test_fixed_uint16(uint16_t value) {
char buf[20] = "\0";
uint16_t value_check = 0;
void *ptr1 = taosEncodeFixed16(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeFixed16(static_cast<void *>(buf), &value_check);
void *ptr1 = taosEncodeFixedU16(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeFixedU16(static_cast<void *>(buf), &value_check);
return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2));
}
static bool test_fixed_int16(int16_t value) {
char buf[20] = "\0";
int16_t value_check = 0;
void *ptr1 = taosEncodeFixedI16(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeFixedI16(static_cast<void *>(buf), &value_check);
return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2));
}
......@@ -19,8 +29,18 @@ static bool test_fixed_uint32(uint32_t value) {
char buf[20] = "\0";
uint32_t value_check = 0;
void *ptr1 = taosEncodeFixed32(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeFixed32(static_cast<void *>(buf), &value_check);
void *ptr1 = taosEncodeFixedU32(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeFixedU32(static_cast<void *>(buf), &value_check);
return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2));
}
static bool test_fixed_int32(int32_t value) {
char buf[20] = "\0";
int32_t value_check = 0;
void *ptr1 = taosEncodeFixedI32(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeFixedI32(static_cast<void *>(buf), &value_check);
return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2));
}
......@@ -29,8 +49,18 @@ static bool test_fixed_uint64(uint64_t value) {
char buf[20] = "\0";
uint64_t value_check = 0;
void *ptr1 = taosEncodeFixed64(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeFixed64(static_cast<void *>(buf), &value_check);
void *ptr1 = taosEncodeFixedU64(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeFixedU64(static_cast<void *>(buf), &value_check);
return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2));
}
static bool test_fixed_int64(int64_t value) {
char buf[20] = "\0";
int64_t value_check = 0;
void *ptr1 = taosEncodeFixedI64(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeFixedI64(static_cast<void *>(buf), &value_check);
return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2));
}
......@@ -39,8 +69,18 @@ static bool test_variant_uint16(uint16_t value) {
char buf[20] = "\0";
uint16_t value_check = 0;
void *ptr1 = taosEncodeVariant16(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeVariant16(static_cast<void *>(buf), &value_check);
void *ptr1 = taosEncodeVariantU16(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeVariantU16(static_cast<void *>(buf), &value_check);
return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2));
}
static bool test_variant_int16(int16_t value) {
char buf[20] = "\0";
int16_t value_check = 0;
void *ptr1 = taosEncodeVariantI16(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeVariantI16(static_cast<void *>(buf), &value_check);
return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2));
}
......@@ -49,8 +89,18 @@ static bool test_variant_uint32(uint32_t value) {
char buf[20] = "\0";
uint32_t value_check = 0;
void *ptr1 = taosEncodeVariant32(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeVariant32(static_cast<void *>(buf), &value_check);
void *ptr1 = taosEncodeVariantU32(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeVariantU32(static_cast<void *>(buf), &value_check);
return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2));
}
static bool test_variant_int32(int32_t value) {
char buf[20] = "\0";
int32_t value_check = 0;
void *ptr1 = taosEncodeVariantI32(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeVariantI32(static_cast<void *>(buf), &value_check);
return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2));
}
......@@ -59,8 +109,18 @@ static bool test_variant_uint64(uint64_t value) {
char buf[20] = "\0";
uint64_t value_check = 0;
void *ptr1 = taosEncodeVariant64(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeVariant64(static_cast<void *>(buf), &value_check);
void *ptr1 = taosEncodeVariantU64(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeVariantU64(static_cast<void *>(buf), &value_check);
return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2));
}
static bool test_variant_int64(int64_t value) {
char buf[20] = "\0";
int64_t value_check = 0;
void *ptr1 = taosEncodeVariantI64(static_cast<void *>(buf), value);
void *ptr2 = taosDecodeVariantI64(static_cast<void *>(buf), &value_check);
return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2));
}
......@@ -68,49 +128,111 @@ static bool test_variant_uint64(uint64_t value) {
TEST(codingTest, fixed_encode_decode) {
srand(time(0));
// uint16_t
for (uint16_t value = 0; value <= UINT16_MAX; value++) {
ASSERT_TRUE(test_fixed_uint16(value));
if (value == UINT16_MAX) break;
}
// int16_t
for (int16_t value = INT16_MIN; value <= INT16_MAX; value++) {
ASSERT_TRUE(test_fixed_int16(value));
if (value == INT16_MAX) break;
}
std::mt19937 gen32(std::random_device{}());
// uint32_t
ASSERT_TRUE(test_fixed_uint32(0));
ASSERT_TRUE(test_fixed_uint32(UINT32_MAX));
std::uniform_int_distribution<uint32_t> distr1(0, UINT32_MAX);
for (int i = 0; i < 1000000; i++) {
ASSERT_TRUE(test_fixed_uint32(rand()));
ASSERT_TRUE(test_fixed_uint32(distr1(gen32)));
}
std::mt19937_64 gen (std::random_device{}());
// int32_t
ASSERT_TRUE(test_fixed_int32(INT32_MIN));
ASSERT_TRUE(test_fixed_int32(INT32_MAX));
std::uniform_int_distribution<int32_t> distr2(INT32_MIN, INT32_MAX);
for (int i = 0; i < 1000000; i++) {
ASSERT_TRUE(test_fixed_int32(distr2(gen32)));
}
std::mt19937_64 gen64(std::random_device{}());
// uint64_t
std::uniform_int_distribution<uint64_t> distr3(0, UINT64_MAX);
ASSERT_TRUE(test_fixed_uint64(0));
ASSERT_TRUE(test_fixed_uint64(UINT64_MAX));
for (int i = 0; i < 1000000; i++) {
ASSERT_TRUE(test_fixed_uint64(gen()));
ASSERT_TRUE(test_fixed_uint64(distr3(gen64)));
}
// int64_t
std::uniform_int_distribution<int64_t> distr4(INT64_MIN, INT64_MAX);
ASSERT_TRUE(test_fixed_int64(INT64_MIN));
ASSERT_TRUE(test_fixed_int64(INT64_MAX));
for (int i = 0; i < 1000000; i++) {
ASSERT_TRUE(test_fixed_int64(distr4(gen64)));
}
}
TEST(codingTest, variant_encode_decode) {
srand(time(0));
// uint16_t
for (uint16_t value = 0; value <= UINT16_MAX; value++) {
ASSERT_TRUE(test_variant_uint16(value));
if (value == UINT16_MAX) break;
}
// int16_t
for (int16_t value = INT16_MIN; value <= INT16_MAX; value++) {
ASSERT_TRUE(test_variant_int16(value));
if (value == INT16_MAX) break;
}
std::mt19937 gen32(std::random_device{}());
// uint32_t
std::uniform_int_distribution<uint32_t> distr1(0, UINT32_MAX);
ASSERT_TRUE(test_variant_uint32(0));
ASSERT_TRUE(test_variant_uint32(UINT32_MAX));
for (int i = 0; i < 5000000; i++) {
ASSERT_TRUE(test_variant_uint32(rand()));
ASSERT_TRUE(test_variant_uint32(distr1(gen32)));
}
// int32_t
std::uniform_int_distribution<int32_t> distr2(INT32_MIN, INT32_MAX);
ASSERT_TRUE(test_variant_int32(INT32_MIN));
ASSERT_TRUE(test_variant_int32(INT32_MAX));
for (int i = 0; i < 5000000; i++) {
ASSERT_TRUE(test_variant_int32(distr2(gen32)));
}
std::mt19937_64 gen (std::random_device{}());
std::mt19937_64 gen64(std::random_device{}());
// uint64_t
std::uniform_int_distribution<uint64_t> distr3(0, UINT64_MAX);
ASSERT_TRUE(test_variant_uint64(0));
ASSERT_TRUE(test_variant_uint64(UINT64_MAX));
for (int i = 0; i < 5000000; i++) {
uint64_t value = gen();
// uint64_t value = gen();
// printf("%ull\n", value);
ASSERT_TRUE(test_variant_uint64(distr3(gen64)));
}
// int64_t
std::uniform_int_distribution<int64_t> distr4(INT64_MIN, INT64_MAX);
ASSERT_TRUE(test_variant_int64(INT64_MIN));
ASSERT_TRUE(test_variant_int64(INT64_MAX));
for (int i = 0; i < 5000000; i++) {
// uint64_t value = gen();
// printf("%ull\n", value);
ASSERT_TRUE(test_variant_uint64(value));
ASSERT_TRUE(test_variant_int64(distr4(gen64)));
}
}
\ No newline at end of file
......@@ -29,6 +29,8 @@
#include "vnode.h"
#include "vnodeInt.h"
#define TSDB_VNODE_VERSION_CONTENT_LEN 31
static int32_t tsOpennedVnodes;
static void *tsDnodeVnodesHash;
static void vnodeCleanUp(SVnodeObj *pVnode);
......@@ -108,7 +110,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
tsdbCfg.maxRowsPerFileBlock = pVnodeCfg->cfg.maxRowsPerFileBlock;
tsdbCfg.precision = pVnodeCfg->cfg.precision;
tsdbCfg.compression = pVnodeCfg->cfg.compression;;
char tsdbDir[TSDB_FILENAME_LEN] = {0};
sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
code = tsdbCreateRepo(tsdbDir, &tsdbCfg, NULL);
......@@ -139,7 +141,7 @@ int32_t vnodeDrop(int32_t vgId) {
vTrace("vgId:%d, vnode will be dropped", pVnode->vgId);
pVnode->status = TAOS_VN_STATUS_DELETING;
vnodeCleanUp(pVnode);
return TSDB_CODE_SUCCESS;
}
......@@ -262,7 +264,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
#endif
// start continuous query
if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
cqStart(pVnode->cq);
pVnode->events = NULL;
......@@ -342,7 +344,7 @@ void *vnodeAccquireVnode(int32_t vgId) {
}
void *vnodeGetRqueue(void *pVnode) {
return ((SVnodeObj *)pVnode)->rqueue;
return ((SVnodeObj *)pVnode)->rqueue;
}
void *vnodeGetWqueue(int32_t vgId) {
......@@ -352,7 +354,7 @@ void *vnodeGetWqueue(int32_t vgId) {
}
void *vnodeGetWal(void *pVnode) {
return ((SVnodeObj *)pVnode)->wal;
return ((SVnodeObj *)pVnode)->wal;
}
static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SDMStatusMsg *pStatus) {
......@@ -447,9 +449,9 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) {
vPrint("vgId:%d, sync role changed from %d to %d", pVnode->vgId, pVnode->role, role);
pVnode->role = role;
if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
cqStart(pVnode->cq);
else
else
cqStop(pVnode->cq);
}
......@@ -488,6 +490,10 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
int32_t len = 0;
int32_t maxLen = 1000;
char * content = calloc(1, maxLen + 1);
if (content == NULL) {
fclose(fp);
return TSDB_CODE_NO_RESOURCE;
}
len += snprintf(content + len, maxLen - len, "{\n");
......@@ -501,14 +507,14 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
len += snprintf(content + len, maxLen - len, " \"daysToKeep2\": %d,\n", pVnodeCfg->cfg.daysToKeep2);
len += snprintf(content + len, maxLen - len, " \"minRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.minRowsPerFileBlock);
len += snprintf(content + len, maxLen - len, " \"maxRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.maxRowsPerFileBlock);
len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnodeCfg->cfg.commitTime);
len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnodeCfg->cfg.commitTime);
len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnodeCfg->cfg.precision);
len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnodeCfg->cfg.compression);
len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pVnodeCfg->cfg.walLevel);
len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pVnodeCfg->cfg.replications);
len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pVnodeCfg->cfg.wals);
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pVnodeCfg->cfg.quorum);
len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
for (int32_t i = 0; i < pVnodeCfg->cfg.replications; i++) {
len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", pVnodeCfg->nodes[i].nodeId);
......@@ -528,7 +534,7 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
vPrint("vgId:%d, save vnode cfg successed", pVnodeCfg->cfg.vgId);
return 0;
return TSDB_CODE_SUCCESS;
}
static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
......@@ -742,7 +748,7 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode) {
int32_t len = 0;
int32_t maxLen = 30;
char * content = calloc(1, maxLen + 1);
char content[TSDB_VNODE_VERSION_CONTENT_LEN] = {0};
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"version\": %" PRId64 "\n", pVnode->fversion);
......@@ -750,11 +756,10 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode) {
fwrite(content, 1, len, fp);
fclose(fp);
free(content);
vPrint("vgId:%d, save vnode version:%" PRId64 " succeed", pVnode->vgId, pVnode->fversion);
return 0;
return TSDB_CODE_SUCCESS;
}
static int32_t vnodeReadVersion(SVnodeObj *pVnode) {
......
......@@ -39,8 +39,8 @@ void vnodeInitReadFp(void) {
int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, SRspRet *ret) {
SVnodeObj *pVnode = (SVnodeObj *)param;
if (vnodeProcessReadMsgFp[msgType] == NULL)
return TSDB_CODE_MSG_NOT_PROCESSED;
if (vnodeProcessReadMsgFp[msgType] == NULL)
return TSDB_CODE_MSG_NOT_PROCESSED;
if (pVnode->status == TAOS_VN_STATUS_DELETING || pVnode->status == TAOS_VN_STATUS_CLOSING)
return TSDB_CODE_INVALID_VGROUP_ID;
......@@ -53,26 +53,29 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont
memset(pRet, 0, sizeof(SRspRet));
int32_t code = TSDB_CODE_SUCCESS;
qinfo_t pQInfo = NULL;
if (contLen != 0) {
pRet->code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->qhandle = htobe64((uint64_t) (pQInfo));
pRsp->code = pRet->code;
pRet->len = sizeof(SQueryTableRsp);
pRet->rsp = pRsp;
vTrace("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo);
} else {
assert(pCont != NULL);
pQInfo = pCont;
code = TSDB_CODE_ACTION_IN_PROGRESS;
}
qTableQuery(pQInfo); // do execute query
if (pQInfo != NULL) {
qTableQuery(pQInfo); // do execute query
}
return code;
}
......
......@@ -104,65 +104,14 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR
}
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
SMDCreateTableMsg *pTable = pCont;
int32_t code = 0;
vTrace("vgId:%d, table:%s, start to create", pVnode->vgId, pTable->tableId);
int16_t numOfColumns = htons(pTable->numOfColumns);
int16_t numOfTags = htons(pTable->numOfTags);
int32_t sid = htonl(pTable->sid);
uint64_t uid = htobe64(pTable->uid);
SSchema * pSchema = (SSchema *)pTable->data;
STSchema *pDestTagSchema = NULL;
SDataRow dataRow = NULL;
int32_t totalCols = numOfColumns + numOfTags;
STableCfg tCfg;
tsdbInitTableCfg(&tCfg, pTable->tableType, uid, sid);
STSchema *pDestSchema = tdNewSchema(numOfColumns);
for (int i = 0; i < numOfColumns; i++) {
tdSchemaAddCol(pDestSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
}
tsdbTableSetSchema(&tCfg, pDestSchema, false);
tsdbTableSetName(&tCfg, pTable->tableId, false);
if (numOfTags != 0) {
pDestTagSchema = tdNewSchema(numOfTags);
for (int i = numOfColumns; i < totalCols; i++) {
tdSchemaAddCol(pDestTagSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
}
tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false);
tsdbTableSetSName(&tCfg, pTable->superTableId, false);
tsdbTableSetSuperUid(&tCfg, htobe64(pTable->superTableUid));
char *pTagData = pTable->data + totalCols * sizeof(SSchema);
int accumBytes = 0;
dataRow = tdNewTagRowFromSchema(pDestTagSchema, numOfTags);
for (int i = 0; i < numOfTags; i++) {
STColumn *pTCol = schemaColAt(pDestTagSchema, i);
tdAppendTagColVal(dataRow, pTagData + accumBytes, pTCol->type, pTCol->bytes, pTCol->colId);
accumBytes += htons(pSchema[i + numOfColumns].bytes);
}
tsdbTableSetTagValue(&tCfg, dataRow, false);
}
// only normal has sql string
if (pTable->tableType == TSDB_STREAM_TABLE) {
char *sql = pTable->data + totalCols * sizeof(SSchema);
vTrace("vgId:%d, table:%s is creating, sql:%s", pVnode->vgId, pTable->tableId, sql);
tsdbTableSetStreamSql(&tCfg, sql, false);
}
code = tsdbCreateTable(pVnode->tsdb, &tCfg);
tdFreeDataRow(dataRow);
tfree(pDestTagSchema);
tfree(pDestSchema);
STableCfg *pCfg = tsdbCreateTableCfgFromMsg((SMDCreateTableMsg *)pCont);
if (pCfg == NULL) return terrno;
int32_t code = tsdbCreateTable(pVnode->tsdb, pCfg);
vTrace("vgId:%d, table:%s is created, result:%x", pVnode->vgId, pTable->tableId, code);
return code;
tsdbClearTableCfg(pCfg);
free(pCfg);
return code;
}
static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
......@@ -181,52 +130,11 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
}
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
SMDCreateTableMsg *pTable = pCont;
int32_t code = 0;
vTrace("vgId:%d, table:%s, start to alter", pVnode->vgId, pTable->tableId);
int16_t numOfColumns = htons(pTable->numOfColumns);
int16_t numOfTags = htons(pTable->numOfTags);
int32_t sid = htonl(pTable->sid);
uint64_t uid = htobe64(pTable->uid);
SSchema *pSchema = (SSchema *) pTable->data;
int32_t totalCols = numOfColumns + numOfTags;
STableCfg tCfg;
tsdbInitTableCfg(&tCfg, pTable->tableType, uid, sid);
STSchema *pDestSchema = tdNewSchema(numOfColumns);
for (int i = 0; i < numOfColumns; i++) {
tdSchemaAddCol(pDestSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
}
tsdbTableSetSchema(&tCfg, pDestSchema, false);
if (numOfTags != 0) {
STSchema *pDestTagSchema = tdNewSchema(numOfTags);
for (int i = numOfColumns; i < totalCols; i++) {
tdSchemaAddCol(pDestTagSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
}
tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false);
char *pTagData = pTable->data + totalCols * sizeof(SSchema);
int accumBytes = 0;
SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema);
for (int i = 0; i < numOfTags; i++) {
STColumn *pTCol = schemaColAt(pDestTagSchema, i);
tdAppendColVal(dataRow, pTagData + accumBytes, pTCol->type, pTCol->bytes, pTCol->offset);
accumBytes += htons(pSchema[i + numOfColumns].bytes);
}
tsdbTableSetTagValue(&tCfg, dataRow, false);
}
code = tsdbAlterTable(pVnode->tsdb, &tCfg);
tfree(pDestSchema);
vTrace("vgId:%d, table:%s, alter table result:%d", pVnode->vgId, pTable->tableId, code);
STableCfg *pCfg = tsdbCreateTableCfgFromMsg((SMDCreateTableMsg *)pCont);
if (pCfg == NULL) return terrno;
int32_t code = tsdbAlterTable(pVnode->tsdb, pCfg);
tsdbClearTableCfg(pCfg);
free(pCfg);
return code;
}
......
......@@ -44,7 +44,7 @@ function buildTDengine {
echo "repo need to pull"
git pull
LOCAL_COMMIT=`git rev-parse @`
LOCAL_COMMIT=`git rev-parse --short @`
cd debug
rm -rf *
cmake ..
......
......@@ -108,7 +108,7 @@ cd ../../../debug; make
./test.sh -f general/parser/col_arithmetic_operation.sim
./test.sh -f general/parser/columnValue.sim
./test.sh -f general/parser/commit.sim
#./test.sh -f general/parser/create_db.sim #there are bugs in this sim script
./test.sh -f general/parser/create_db.sim
./test.sh -f general/parser/create_mt.sim
./test.sh -f general/parser/create_tb.sim
./test.sh -f general/parser/dbtbnameValidate.sim
......@@ -117,7 +117,7 @@ cd ../../../debug; make
./test.sh -f general/parser/import_commit3.sim
./test.sh -f general/parser/insert_tb.sim
./test.sh -f general/parser/first_last.sim
# ./test.sh -f general/parser/import_file.sim
#unsupport ./test.sh -f general/parser/import_file.sim
./test.sh -f general/parser/lastrow.sim
./test.sh -f general/parser/nchar.sim
#unsupport ./test.sh -f general/parser/null_char.sim
......@@ -133,17 +133,17 @@ cd ../../../debug; make
./test.sh -f general/parser/tbnameIn.sim
./test.sh -f general/parser/binary_escapeCharacter.sim
./test.sh -f general/parser/projection_limit_offset.sim
# ./test.sh -f general/parser/limit2.sim
# ./test.sh -f general/parser/slimit.sim
# ./test.sh -f general/parser/fill.sim
# ./test.sh -f general/parser/fill_stb.sim
# ./test.sh -f general/parser/interp.sim
# ./test.sh -f general/parser/where.sim
# ./test.sh -f general/parser/join.sim
# ./test.sh -f general/parser/join_multivnode.sim
# ./test.sh -f general/parser/select_with_tags.sim
# ./test.sh -f general/parser/groupby.sim
# ./test.sh -f general/parser/bug.sim
./test.sh -f general/parser/limit2.sim
./test.sh -f general/parser/slimit.sim
./test.sh -f general/parser/fill.sim
./test.sh -f general/parser/fill_stb.sim
./test.sh -f general/parser/interp.sim
./test.sh -f general/parser/where.sim
#unsupport ./test.sh -f general/parser/join.sim
#unsupport ./test.sh -f general/parser/join_multivnode.sim
./test.sh -f general/parser/select_with_tags.sim
#unsupport ./test.sh -f general/parser/groupby.sim
#unsupport ./test.sh -f general/parser/bug.sim
#unsupport ./test.sh -f general/parser/tags_dynamically_specifiy.sim
#unsupport ./test.sh -f general/parser/set_tag_vals.sim
#unsupport ./test.sh -f general/parser/repeatAlter.sim
......@@ -173,7 +173,7 @@ cd ../../../debug; make
./test.sh -f general/table/db.table.sim
./test.sh -f general/table/delete_reuse1.sim
./test.sh -f general/table/delete_reuse2.sim
#liao ./test.sh -f general/table/delete_writing.sim
./test.sh -f general/table/delete_writing.sim
./test.sh -f general/table/describe.sim
./test.sh -f general/table/double.sim
./test.sh -f general/table/fill.sim
......@@ -306,3 +306,78 @@ cd ../../../debug; make
./test.sh -f unique/vnode/replica3_basic.sim
./test.sh -f unique/vnode/replica3_repeat.sim
./test.sh -f unique/vnode/replica3_vgroup.sim
./test.sh -f unique/account/account_create.sim
./test.sh -f unique/account/account_delete.sim
./test.sh -f unique/account/account_len.sim
./test.sh -f unique/account/authority.sim
./test.sh -f unique/account/basic.sim
./test.sh -f unique/account/paras.sim
./test.sh -f unique/account/pass_alter.sim
./test.sh -f unique/account/pass_len.sim
./test.sh -f unique/account/usage.sim
./test.sh -f unique/account/user_create.sim
./test.sh -f unique/account/user_len.sim
./test.sh -f unique/big/balance.sim
./test.sh -f unique/big/maxvnodes.sim
./test.sh -f unique/big/tcp.sim
./test.sh -f unique/cluster/balance1.sim
./test.sh -f unique/cluster/balance2.sim
./test.sh -f unique/cluster/balance3.sim
./test.sh -f unique/cluster/cache.sim
./test.sh -f unique/column/replica3.sim
./test.sh -f unique/db/commit.sim
./test.sh -f unique/db/delete.sim
./test.sh -f unique/db/delete_part.sim
./test.sh -f unique/db/replica_add12.sim
./test.sh -f unique/db/replica_add13.sim
./test.sh -f unique/db/replica_add23.sim
./test.sh -f unique/db/replica_reduce21.sim
./test.sh -f unique/db/replica_reduce32.sim
./test.sh -f unique/db/replica_reduce31.sim
./test.sh -f unique/db/replica_part.sim
./test.sh -f unique/dnode/balance1.sim
./test.sh -f unique/dnode/balance2.sim
./test.sh -f unique/dnode/balance3.sim
./test.sh -f unique/dnode/balancex.sim
./test.sh -f unique/dnode/offline1.sim
./test.sh -f unique/dnode/offline2.sim
./test.sh -f unique/dnode/remove1.sim
./test.sh -f unique/dnode/remove2.sim
./test.sh -f unique/dnode/vnode_clean.sim
./test.sh -f unique/http/admin.sim
./test.sh -f unique/http/opentsdb.sim
./test.sh -f unique/import/replica2.sim
./test.sh -f unique/import/replica3.sim
./test.sh -f unique/stable/balance_replica1.sim
./test.sh -f unique/stable/dnode2_stop.sim
./test.sh -f unique/stable/dnode2.sim
./test.sh -f unique/stable/dnode3.sim
./test.sh -f unique/stable/replica2_dnode4.sim
./test.sh -f unique/stable/replica2_vnode3.sim
./test.sh -f unique/stable/replica3_dnode6.sim
./test.sh -f unique/stable/replica3_vnode3.sim
./test.sh -f unique/mnode/mgmt22.sim
./test.sh -f unique/mnode/mgmt23.sim
./test.sh -f unique/mnode/mgmt24.sim
./test.sh -f unique/mnode/mgmt25.sim
./test.sh -f unique/mnode/mgmt26.sim
./test.sh -f unique/mnode/mgmt33.sim
./test.sh -f unique/mnode/mgmt34.sim
./test.sh -f unique/mnode/mgmtr2.sim
./test.sh -f unique/vnode/many.sim
./test.sh -f unique/vnode/replica2_basic2.sim
./test.sh -f unique/vnode/replica2_repeat.sim
./test.sh -f unique/vnode/replica3_basic.sim
./test.sh -f unique/vnode/replica3_repeat.sim
./test.sh -f unique/vnode/replica3_vgroup.sim
......@@ -42,7 +42,7 @@ fi
TAOS_DIR=`pwd`
BIN_DIR=`find . -name "taosd"|grep bin| cut -d '/' --fields=2,3`
BIN_DIR=`find . -name "taosd"|grep bin|head -n1|cut -d '/' --fields=2,3`
BUILD_DIR=$TAOS_DIR/$BIN_DIR
......
......@@ -45,7 +45,7 @@ fi
TAOS_DIR=`pwd`
BIN_DIR=`find . -name "taosd"|grep bin| cut -d '/' --fields=2,3`
BIN_DIR=`find . -name "taosd"|grep bin|head -n1|cut -d '/' --fields=2,3`
BUILD_DIR=$TAOS_DIR/$BIN_DIR
......
......@@ -40,7 +40,7 @@ fi
TAOS_DIR=`pwd`
BIN_DIR=`find . -name "taosd"|grep bin| cut -d '/' --fields=2,3`
BIN_DIR=`find . -name "taosd"|grep bin|head -n1|cut -d '/' --fields=2,3`
BUILD_DIR=$TAOS_DIR/$BIN_DIR
......
......@@ -51,7 +51,7 @@ fi
TAOS_DIR=`pwd`
BIN_DIR=`find . -name "taosd"|grep bin| cut -d '/' --fields=2,3`
BIN_DIR=`find . -name "taosd"|grep bin|head -n1|cut -d '/' --fields=2,3`
BUILD_DIR=$TAOS_DIR/$BIN_DIR
......
......@@ -48,7 +48,7 @@ fi
TAOS_DIR=`pwd`
BIN_DIR=`find . -name "taosd"|grep bin| cut -d '/' --fields=2,3`
BIN_DIR=`find . -name "taosd"|grep bin|head -n1|cut -d '/' --fields=2,3`
BUILD_DIR=$TAOS_DIR/$BIN_DIR
......
......@@ -50,7 +50,7 @@ fi
TOP_DIR=`pwd`
BIN_DIR=`find . -name "taosd"|grep bin| cut -d '/' --fields=2,3`
BIN_DIR=`find . -name "taosd"|grep bin|head -n1|cut -d '/' --fields=2,3`
BUILD_DIR=$TOP_DIR/$BIN_DIR
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
\ No newline at end of file
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/cfg.sh -n dnode1 -c walLevel -v 2
system sh/cfg.sh -n dnode2 -c walLevel -v 2
system sh/cfg.sh -n dnode3 -c walLevel -v 2
system sh/cfg.sh -n dnode4 -c walLevel -v 2
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1
system sh/cfg.sh -n dnode2 -c numOfMnodes -v 1
system sh/cfg.sh -n dnode3 -c numOfMnodes -v 1
system sh/cfg.sh -n dnode4 -c numOfMnodes -v 1
system sh/cfg.sh -n dnode1 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode2 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode3 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode4 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 4
system sh/cfg.sh -n dnode1 -c http -v 1
system sh/cfg.sh -n dnode2 -c http -v 1
system sh/cfg.sh -n dnode3 -c http -v 1
system sh/cfg.sh -n dnode1 -c monitor -v 1
system sh/cfg.sh -n dnode2 -c monitor -v 1
system sh/cfg.sh -n dnode3 -c monitor -v 1
system sh/cfg.sh -n dnode1 -c monitorInterval -v 1
system sh/cfg.sh -n dnode2 -c monitorInterval -v 1
system sh/cfg.sh -n dnode3 -c monitorInterval -v 1
system sh/exec.sh -n dnode1 -s start
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册