提交 84a6e47f 编写于 作者: S slguan

Merge branch 'develop' into hotfix/crash

......@@ -1321,7 +1321,6 @@ int tsParseSql(SSqlObj *pSql, bool initialParse) {
int32_t ret = TSDB_CODE_SUCCESS;
SSqlCmd* pCmd = &pSql->cmd;
tscTrace("------------------%p, initial:%d, sqlstr:%s", pSql, initialParse, pSql->sqlstr);
if (!pCmd->parseFinished) {
tscTrace("%p resume to parse sql: %s", pSql, pCmd->curSql);
}
......
......@@ -1853,18 +1853,17 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
SSubqueryState* pState = pSupporter->pState;
// record the total inserted rows
if (numOfRows > 0) {
if (tres != pParentObj) {
if (numOfRows > 0 && tres != pParentObj) {
pParentObj->res.numOfRows += numOfRows;
}
} else {
}
if (taos_errno(tres) != 0) {
SSqlObj* pSql = (SSqlObj*) tres;
assert(pSql != NULL && pSql->res.code == numOfRows);
pParentObj->res.code = pSql->res.code;
}
// it is not the initial sqlObj, free it
if (tres != pParentObj) {
taos_free_result(tres);
......@@ -1902,7 +1901,6 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
tscTrace("%p submit data to %d vnode(s)", pSql, pDataBlocks->nSize);
SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
pState->numOfTotal = pSql->numOfSubs;
pState->numOfRemain = pSql->numOfSubs;
pRes->code = TSDB_CODE_SUCCESS;
......@@ -1916,6 +1914,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
pSql->pSubs[0] = pSql; // the first sub insert points back to itself
tscTrace("%p sub:%p create subObj success. orderOfSub:%d", pSql, pSql, 0);
int32_t numOfSub = 1;
int32_t code = tscCopyDataBlockToPayload(pSql, pDataBlocks->pData[0]);
if (code != TSDB_CODE_SUCCESS) {
tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d", pSql, 0,
......@@ -1923,15 +1922,14 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
goto _error;
}
int32_t i = 1;
for (; i < pSql->numOfSubs; ++i) {
for (; numOfSub < pSql->numOfSubs; ++numOfSub) {
SInsertSupporter* pSupporter1 = calloc(1, sizeof(SInsertSupporter));
pSupporter1->pSql = pSql;
pSupporter1->pState = pState;
SSqlObj *pNew = createSubqueryObj(pSql, 0, multiVnodeInsertFinalize, pSupporter, TSDB_SQL_INSERT, NULL);
SSqlObj *pNew = createSubqueryObj(pSql, 0, multiVnodeInsertFinalize, pSupporter1, TSDB_SQL_INSERT, NULL);
if (pNew == NULL) {
tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, numOfSub, strerror(errno));
goto _error;
}
......@@ -1940,26 +1938,25 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
* the callback function (multiVnodeInsertFinalize) correctly.
*/
pNew->fetchFp = pNew->fp;
pSql->pSubs[i] = pNew;
pSql->pSubs[numOfSub] = pNew;
code = tscCopyDataBlockToPayload(pNew, pDataBlocks->pData[i]);
code = tscCopyDataBlockToPayload(pNew, pDataBlocks->pData[numOfSub]);
if (code != TSDB_CODE_SUCCESS) {
tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d", pSql, i,
tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d", pSql, numOfSub,
pDataBlocks->nSize, code);
goto _error;
} else {
tscTrace("%p sub:%p create subObj success. orderOfSub:%d", pSql, pNew, i);
tscTrace("%p sub:%p create subObj success. orderOfSub:%d", pSql, pNew, numOfSub);
}
}
if (i < pSql->numOfSubs) {
if (numOfSub < pSql->numOfSubs) {
tscError("%p failed to prepare subObj structure and launch sub-insertion", pSql);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return pRes->code; // free all allocated resource
}
// use the local variable
int32_t numOfSub = pSql->numOfSubs;
for (int32_t j = 0; j < numOfSub; ++j) {
SSqlObj *pSub = pSql->pSubs[j];
tscTrace("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, j);
......@@ -1971,11 +1968,12 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
_error:
// restore the udf fp
pSql->fp = pSql->fetchFp;
pSql->pSubs[0] = NULL;
tfree(pState);
tfree(pSql->param);
for(int32_t j = 1; j < i; ++j) {
for(int32_t j = 1; j < numOfSub; ++j) {
tfree(pSql->pSubs[j]->param);
taos_free_result(pSql->pSubs[j]);
}
......
......@@ -69,8 +69,8 @@ typedef struct {
int version; // version
int numOfCols; // Number of columns appended
int tlen; // maximum length of a SDataRow without the header part
uint16_t flen; // First part length in a SDataRow after the header part
uint16_t vlen; // pure value part length, excluded the overhead
uint16_t flen; // First part length in a SDataRow after the header part
uint16_t vlen; // pure value part length, excluded the overhead
STColumn columns[];
} STSchema;
......@@ -83,8 +83,8 @@ typedef struct {
#define tdFreeSchema(s) tfree((s))
STSchema *tdDupSchema(STSchema *pSchema);
void * tdEncodeSchema(void *dst, STSchema *pSchema);
STSchema *tdDecodeSchema(void **psrc);
int tdEncodeSchema(void **buf, STSchema *pSchema);
void * tdDecodeSchema(void *buf, STSchema **pRSchema);
static FORCE_INLINE int comparColId(const void *key1, const void *key2) {
if (*(int16_t *)key1 > ((STColumn *)key2)->colId) {
......@@ -107,8 +107,8 @@ typedef struct {
int tCols;
int nCols;
int tlen;
uint16_t flen;
uint16_t vlen;
uint16_t flen;
uint16_t vlen;
int version;
STColumn *columns;
} STSchemaBuilder;
......@@ -288,7 +288,7 @@ typedef struct {
SKVRow tdKVRowDup(SKVRow row);
int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value);
void * tdEncodeKVRow(void *buf, SKVRow row);
int tdEncodeKVRow(void **buf, SKVRow row);
void * tdDecodeKVRow(void *buf, SKVRow *row);
static FORCE_INLINE int comparTagId(const void *key1, const void *key2) {
......
......@@ -14,6 +14,7 @@
*/
#include "tdataformat.h"
#include "talgo.h"
#include "tcoding.h"
#include "wchar.h"
/**
......@@ -33,50 +34,50 @@ STSchema *tdDupSchema(STSchema *pSchema) {
/**
* Encode a schema to dst, and return the next pointer
*/
void *tdEncodeSchema(void *dst, STSchema *pSchema) {
int tdEncodeSchema(void **buf, STSchema *pSchema) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, schemaVersion(pSchema));
tlen += taosEncodeFixedI32(buf, schemaNCols(pSchema));
T_APPEND_MEMBER(dst, pSchema, STSchema, version);
T_APPEND_MEMBER(dst, pSchema, STSchema, numOfCols);
for (int i = 0; i < schemaNCols(pSchema); i++) {
STColumn *pCol = schemaColAt(pSchema, i);
T_APPEND_MEMBER(dst, pCol, STColumn, type);
T_APPEND_MEMBER(dst, pCol, STColumn, colId);
T_APPEND_MEMBER(dst, pCol, STColumn, bytes);
tlen += taosEncodeFixedI8(buf, colType(pCol));
tlen += taosEncodeFixedI16(buf, colColId(pCol));
tlen += taosEncodeFixedI32(buf, colBytes(pCol));
}
return dst;
return tlen;
}
/**
* Decode a schema from a binary.
*/
STSchema *tdDecodeSchema(void **psrc) {
int totalCols = 0;
void *tdDecodeSchema(void *buf, STSchema **pRSchema) {
int version = 0;
STSchemaBuilder schemaBuilder = {0};
int numOfCols = 0;
STSchemaBuilder schemaBuilder;
T_READ_MEMBER(*psrc, int, version);
T_READ_MEMBER(*psrc, int, totalCols);
buf = taosDecodeFixedI32(buf, &version);
buf = taosDecodeFixedI32(buf, &numOfCols);
if (tdInitTSchemaBuilder(&schemaBuilder, version) < 0) return NULL;
for (int i = 0; i < totalCols; i++) {
for (int i = 0; i < numOfCols; i++) {
int8_t type = 0;
int16_t colId = 0;
int32_t bytes = 0;
T_READ_MEMBER(*psrc, int8_t, type);
T_READ_MEMBER(*psrc, int16_t, colId);
T_READ_MEMBER(*psrc, int32_t, bytes);
buf = taosDecodeFixedI8(buf, &type);
buf = taosDecodeFixedI16(buf, &colId);
buf = taosDecodeFixedI32(buf, &bytes);
if (tdAddColToSchema(&schemaBuilder, type, colId, bytes) < 0) {
tdDestroyTSchemaBuilder(&schemaBuilder);
return NULL;
}
}
STSchema *pSchema = tdGetSchemaFromBuilder(&schemaBuilder);
*pRSchema = tdGetSchemaFromBuilder(&schemaBuilder);
tdDestroyTSchemaBuilder(&schemaBuilder);
return pSchema;
return buf;
}
int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
......@@ -605,14 +606,19 @@ int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) {
return 0;
}
void *tdEncodeKVRow(void *buf, SKVRow row) {
int tdEncodeKVRow(void **buf, SKVRow row) {
// May change the encode purpose
kvRowCpy(buf, row);
return POINTER_SHIFT(buf, kvRowLen(row));
if (buf != NULL) {
kvRowCpy(*buf, row);
*buf = POINTER_SHIFT(*buf, kvRowLen(row));
}
return kvRowLen(row);
}
void *tdDecodeKVRow(void *buf, SKVRow *row) {
*row = tdKVRowDup(buf);
if (*row == NULL) return NULL;
return POINTER_SHIFT(buf, kvRowLen(*row));
}
......
......@@ -183,8 +183,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0214, "vnode no w
// tsdb
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, 0, 0x0600, "tsdb invalid table id")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_TYPE, 0, 0x0601, "tsdb invalid table schema version")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_SCHEMA_VERSION, 0, 0x0602, "tsdb invalid table schema version")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_TYPE, 0, 0x0601, "tsdb invalid table type")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION, 0, 0x0602, "tsdb invalid table schema version")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_ALREADY_EXIST, 0, 0x0603, "tsdb table already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_CONFIG, 0, 0x0604, "tsdb invalid configuration")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INIT_FAILED, 0, 0x0605, "tsdb init failed")
......@@ -194,6 +194,11 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_CORRUPTED, 0, 0x0608, "tsdb file
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_OUT_OF_MEMORY, 0, 0x0609, "tsdb out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE, 0, 0x060A, "tsdb tag version is out of date")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE, 0, 0x060B, "tsdb timestamp is out of range")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP, 0, 0x060C, "tsdb submit message is messed up")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_ACTION, 0, 0x060D, "tsdb invalid action")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_CREATE_TB_MSG, 0, 0x060E, "tsdb invalid create table message")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM, 0, 0x060F, "tsdb no table data in memory skiplist")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_ALREADY_EXISTS, 0, 0x0610, "tsdb file already exists")
// query
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, 0, 0x0700, "query invalid handle")
......
......@@ -19,11 +19,11 @@
#include <stdbool.h>
#include <stdint.h>
#include "tdataformat.h"
#include "tname.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tarray.h"
#include "tdataformat.h"
#include "tname.h"
#ifdef __cplusplus
extern "C" {
......@@ -35,7 +35,7 @@ extern "C" {
#define TSDB_INVALID_SUPER_TABLE_ID -1
#define TSDB_STATUS_COMMIT_START 1
#define TSDB_STATUS_COMMIT_OVER 2
#define TSDB_STATUS_COMMIT_OVER 2
// --------- TSDB APPLICATION HANDLE DEFINITION
typedef struct {
......@@ -53,9 +53,9 @@ typedef struct {
int32_t tsdbId;
int32_t cacheBlockSize;
int32_t totalBlocks;
int32_t maxTables; // maximum number of tables this repository can have
int32_t daysPerFile; // day per file sharding policy
int32_t keep; // day of data to keep
int32_t maxTables; // maximum number of tables this repository can have
int32_t daysPerFile; // day per file sharding policy
int32_t keep; // day of data to keep
int32_t keep1;
int32_t keep2;
int32_t minRowsPerFileBlock; // minimum rows per file block
......@@ -72,19 +72,16 @@ typedef struct {
int64_t pointsWritten; // total data points written
} STsdbStat;
typedef void TsdbRepoT; // use void to hide implementation details from outside
typedef void TSDB_REPO_T; // use void to hide implementation details from outside
void tsdbSetDefaultCfg(STsdbCfg *pCfg);
STsdbCfg *tsdbCreateDefaultCfg();
void tsdbFreeCfg(STsdbCfg *pCfg);
STsdbCfg *tsdbGetCfg(const TsdbRepoT *repo);
STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo);
// --------- TSDB REPOSITORY DEFINITION
int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter);
int32_t tsdbDropRepo(TsdbRepoT *repo);
TsdbRepoT *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH);
int32_t tsdbCloseRepo(TsdbRepoT *repo, int toCommit);
int32_t tsdbConfigRepo(TsdbRepoT *repo, STsdbCfg *pCfg);
int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg);
int32_t tsdbDropRepo(char *rootDir);
TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH);
void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit);
int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg);
// --------- TSDB TABLE DEFINITION
typedef struct {
......@@ -106,28 +103,19 @@ typedef struct {
char * sql;
} STableCfg;
int tsdbInitTableCfg(STableCfg *config, ETableType type, uint64_t uid, int32_t tid);
int tsdbTableSetSuperUid(STableCfg *config, uint64_t uid);
int tsdbTableSetSchema(STableCfg *config, STSchema *pSchema, bool dup);
int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup);
int tsdbTableSetTagValue(STableCfg *config, SKVRow row, bool dup);
int tsdbTableSetName(STableCfg *config, char *name, bool dup);
int tsdbTableSetSName(STableCfg *config, char *sname, bool dup);
int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup);
void tsdbClearTableCfg(STableCfg *config);
void* tsdbGetTableTagVal(TsdbRepoT* repo, const STableId* id, int32_t colId, int16_t type, int16_t bytes);
char* tsdbGetTableName(TsdbRepoT *repo, const STableId *id);
void * tsdbGetTableTagVal(TSDB_REPO_T *repo, const STableId *id, int32_t colId, int16_t type, int16_t bytes);
char * tsdbGetTableName(TSDB_REPO_T *repo, const STableId *id);
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);
int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg);
int tsdbDropTable(TsdbRepoT *pRepo, STableId tableId);
int tsdbAlterTable(TsdbRepoT *repo, STableCfg *pCfg);
int tsdbUpdateTagValue(TsdbRepoT *repo, SUpdateTableTagValMsg *pMsg);
TSKEY tsdbGetTableLastKey(TsdbRepoT *repo, uint64_t uid);
void tsdbStartStream(TsdbRepoT *repo);
int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg);
int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId);
int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg);
TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid);
void tsdbStartStream(TSDB_REPO_T *repo);
uint32_t tsdbGetFileInfo(TsdbRepoT *repo, char *name, uint32_t *index, uint32_t eindex, int32_t *size);
uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int32_t *size);
// the TSDB repository info
typedef struct STsdbRepoInfo {
......@@ -137,7 +125,7 @@ typedef struct STsdbRepoInfo {
int64_t tsdbTotalDiskSize; // the total disk size taken by this TSDB repository
// TODO: Other informations to add
} STsdbRepoInfo;
STsdbRepoInfo *tsdbGetStatus(TsdbRepoT *pRepo);
STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo);
// the meter information report structure
typedef struct {
......@@ -146,7 +134,7 @@ typedef struct {
int64_t tableTotalDataSize; // In bytes
int64_t tableTotalDiskSize; // In bytes
} STableInfo;
STableInfo *tsdbGetTableInfo(TsdbRepoT *pRepo, STableId tid);
STableInfo *tsdbGetTableInfo(TSDB_REPO_T *pRepo, STableId tid);
// -- FOR INSERT DATA
/**
......@@ -156,7 +144,7 @@ STableInfo *tsdbGetTableInfo(TsdbRepoT *pRepo, STableId tid);
*
* @return the number of points inserted, -1 for failure and the error number is set
*/
int32_t tsdbInsertData(TsdbRepoT *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg * pRsp) ;
int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp);
// -- FOR QUERY TIME SERIES DATA
......@@ -164,10 +152,10 @@ typedef void *TsdbQueryHandleT; // Use void to hide implementation details
// query condition to build vnode iterator
typedef struct STsdbQueryCond {
STimeWindow twindow;
int32_t order; // desc|asc order to iterate the data block
int32_t numOfCols;
SColumnInfo *colList;
STimeWindow twindow;
int32_t order; // desc|asc order to iterate the data block
int32_t numOfCols;
SColumnInfo *colList;
} STsdbQueryCond;
typedef struct SDataBlockInfo {
......@@ -199,7 +187,7 @@ typedef void *TsdbPosT;
* @param qinfo query info handle from query processor
* @return
*/
TsdbQueryHandleT *tsdbQueryTables(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo, void* qinfo);
TsdbQueryHandleT *tsdbQueryTables(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo, void *qinfo);
/**
* Get the last row of the given query time window for all the tables in STableGroupInfo object.
......@@ -207,15 +195,17 @@ TsdbQueryHandleT *tsdbQueryTables(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STable
* all tables in this group.
*
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param pCond query condition, including time window, result set order, and basic required columns for each
* block
* @param groupInfo tableId list.
* @return
*/
TsdbQueryHandleT tsdbQueryLastRow(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo, void* qinfo);
TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo, void *qinfo);
SArray* tsdbGetQueriedTableIdList(TsdbQueryHandleT *pHandle);
SArray *tsdbGetQueriedTableIdList(TsdbQueryHandleT *pHandle);
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, void* qinfo);
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList,
void *qinfo);
/**
* move to next block if exists
......@@ -293,7 +283,7 @@ SArray *tsdbGetTableList(TsdbQueryHandleT *pQueryHandle);
* @param stableid. super table sid
* @param pTagCond. tag query condition
*/
int32_t tsdbQuerySTableByTagCond(TsdbRepoT *tsdb, uint64_t uid, const char *pTagCond, size_t len,
int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T *tsdb, uint64_t uid, const char *pTagCond, size_t len,
int16_t tagNameRelType, const char *tbnameCond, STableGroupInfo *pGroupList,
SColIndex *pColIndex, int32_t numOfCols);
......@@ -305,7 +295,7 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT *tsdb, uint64_t uid, const char *pTag
* @param pGroupInfo the generated result
* @return
*/
int32_t tsdbGetOneTableGroup(TsdbRepoT *tsdb, uint64_t uid, STableGroupInfo *pGroupInfo);
int32_t tsdbGetOneTableGroup(TSDB_REPO_T *tsdb, uint64_t uid, STableGroupInfo *pGroupInfo);
/**
* clean up the query handle
......
......@@ -774,7 +774,7 @@ static void tQueryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo,
char * pData = SL_GET_NODE_DATA(pNode);
// todo refactor:
tstr *name = ((STableIndexElem *)pData)->pTable->name;
tstr *name = (*(STable **)pData)->name;
// todo speed up by using hash
if (pQueryInfo->colIndex == TSDB_TBNAME_COLUMN_INDEX) {
if (pQueryInfo->optr == TSDB_RELATION_IN) {
......
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdb.h"
#include "tsdbMain.h"
#define POOL_IS_EMPTY(b) (listNEles((b)->bufBlockList) == 0)
static STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize);
static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock);
// ---------------- INTERNAL FUNCTIONS ----------------
STsdbBufPool *tsdbNewBufPool() {
STsdbBufPool *pBufPool = (STsdbBufPool *)calloc(1, sizeof(*pBufPool));
if (pBufPool == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
int code = pthread_cond_init(&(pBufPool->poolNotEmpty), NULL);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
goto _err;
}
pBufPool->bufBlockList = tdListNew(sizeof(STsdbBufBlock *));
if (pBufPool->bufBlockList == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
return pBufPool;
_err:
tsdbFreeBufPool(pBufPool);
return NULL;
}
void tsdbFreeBufPool(STsdbBufPool *pBufPool) {
if (pBufPool) {
if (pBufPool->bufBlockList) {
ASSERT(listNEles(pBufPool->bufBlockList) == 0);
tdListFree(pBufPool->bufBlockList);
}
pthread_cond_destroy(&pBufPool->poolNotEmpty);
free(pBufPool);
}
}
int tsdbOpenBufPool(STsdbRepo *pRepo) {
STsdbCfg * pCfg = &(pRepo->config);
STsdbBufPool *pPool = pRepo->pPool;
ASSERT(pPool != NULL);
pPool->bufBlockSize = pCfg->cacheBlockSize * 1024 * 1024; // MB
pPool->tBufBlocks = pCfg->totalBlocks;
pPool->nBufBlocks = 0;
pPool->index = 0;
for (int i = 0; i < pCfg->totalBlocks; i++) {
STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize);
if (pBufBlock == NULL) goto _err;
if (tdListAppend(pPool->bufBlockList, (void *)(&pBufBlock)) < 0) {
tsdbFreeBufBlock(pBufBlock);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
pPool->nBufBlocks++;
}
tsdbTrace("vgId:%d buffer pool is opened! bufBlockSize:%d tBufBlocks:%d nBufBlocks:%d", REPO_ID(pRepo),
pPool->bufBlockSize, pPool->tBufBlocks, pPool->nBufBlocks);
return 0;
_err:
tsdbCloseBufPool(pRepo);
return -1;
}
void tsdbCloseBufPool(STsdbRepo *pRepo) {
if (pRepo == NULL) return;
STsdbBufPool * pBufPool = pRepo->pPool;
STsdbBufBlock *pBufBlock = NULL;
if (pBufPool) {
SListNode *pNode = NULL;
while ((pNode = tdListPopHead(pBufPool->bufBlockList)) != NULL) {
tdListNodeGetData(pBufPool->bufBlockList, pNode, (void *)(&pBufBlock));
tsdbFreeBufBlock(pBufBlock);
free(pNode);
}
}
tsdbTrace("vgId:%d buffer pool is closed", REPO_ID(pRepo));
}
SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) {
ASSERT(pRepo != NULL && pRepo->pPool != NULL);
ASSERT(IS_REPO_LOCKED(pRepo));
STsdbBufPool *pBufPool = pRepo->pPool;
while (POOL_IS_EMPTY(pBufPool)) {
pthread_cond_wait(&(pBufPool->poolNotEmpty), &(pRepo->mutex));
}
SListNode * pNode = tdListPopHead(pBufPool->bufBlockList);
STsdbBufBlock *pBufBlock = NULL;
tdListNodeGetData(pBufPool->bufBlockList, pNode, (void *)(&pBufBlock));
pBufBlock->blockId = pBufPool->index++;
pBufBlock->offset = 0;
pBufBlock->remain = pBufPool->bufBlockSize;
tsdbTrace("vgId:%d buffer block is allocated, blockId:%" PRId64, REPO_ID(pRepo), pBufBlock->blockId);
return pNode;
}
// ---------------- LOCAL FUNCTIONS ----------------
static STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) {
STsdbBufBlock *pBufBlock = (STsdbBufBlock *)malloc(sizeof(*pBufBlock) + bufBlockSize);
if (pBufBlock == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
pBufBlock->blockId = 0;
pBufBlock->offset = 0;
pBufBlock->remain = bufBlockSize;
return pBufBlock;
_err:
tsdbFreeBufBlock(pBufBlock);
return NULL;
}
static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); }
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include "tsdb.h"
#include "tsdbMain.h"
static int tsdbAllocBlockFromPool(STsdbCache *pCache);
static void tsdbFreeBlockList(SList *list);
static void tsdbFreeCacheMem(SCacheMem *mem);
static int tsdbAddCacheBlockToPool(STsdbCache *pCache);
STsdbCache *tsdbInitCache(int cacheBlockSize, int totalBlocks, TsdbRepoT *pRepo) {
STsdbCache *pCache = (STsdbCache *)calloc(1, sizeof(STsdbCache));
if (pCache == NULL) return NULL;
if (cacheBlockSize < 0) cacheBlockSize = TSDB_DEFAULT_CACHE_BLOCK_SIZE;
cacheBlockSize *= (1024 * 1024);
if (totalBlocks <= 1) totalBlocks = TSDB_DEFAULT_TOTAL_BLOCKS;
pCache->cacheBlockSize = cacheBlockSize;
pCache->totalCacheBlocks = totalBlocks;
pCache->pRepo = pRepo;
STsdbBufferPool *pPool = &(pCache->pool);
pPool->index = 0;
pPool->memPool = tdListNew(sizeof(STsdbCacheBlock *));
if (pPool->memPool == NULL) goto _err;
for (int i = 0; i < totalBlocks; i++) {
if (tsdbAddCacheBlockToPool(pCache) < 0) goto _err;
}
pCache->mem = NULL;
pCache->imem = NULL;
return pCache;
_err:
tsdbFreeCache(pCache);
return NULL;
}
void tsdbFreeCache(STsdbCache *pCache) {
tsdbFreeCacheMem(pCache->imem);
tsdbFreeCacheMem(pCache->mem);
tsdbFreeBlockList(pCache->pool.memPool);
free(pCache);
}
void *tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key) {
if (pCache == NULL) return NULL;
if (bytes > pCache->cacheBlockSize) return NULL;
if (pCache->curBlock == NULL || pCache->curBlock->remain < bytes) {
if (pCache->curBlock !=NULL && listNEles(pCache->mem->list) >= pCache->totalCacheBlocks/2) {
tsdbTriggerCommit(pCache->pRepo);
}
while (tsdbAllocBlockFromPool(pCache) < 0) {
// TODO: deal with the error
// printf("Failed to allocate from cache pool\n");
}
}
void *ptr = (void *)(pCache->curBlock->data + pCache->curBlock->offset);
pCache->curBlock->offset += bytes;
pCache->curBlock->remain -= bytes;
memset(ptr, 0, bytes);
if (key < pCache->mem->keyFirst) pCache->mem->keyFirst = key;
if (key > pCache->mem->keyLast) pCache->mem->keyLast = key;
pCache->mem->numOfRows++;
return ptr;
}
static void tsdbFreeBlockList(SList *list) {
SListNode * node = NULL;
STsdbCacheBlock *pBlock = NULL;
while ((node = tdListPopHead(list)) != NULL) {
tdListNodeGetData(list, node, (void *)(&pBlock));
free(pBlock);
listNodeFree(node);
}
tdListFree(list);
}
static void tsdbFreeCacheMem(SCacheMem *mem) {
if (mem == NULL) return;
SList *list = mem->list;
tsdbFreeBlockList(list);
free(mem);
}
static int tsdbAllocBlockFromPool(STsdbCache *pCache) {
STsdbBufferPool *pPool = &(pCache->pool);
tsdbLockRepo(pCache->pRepo);
if (listNEles(pPool->memPool) == 0) {
tsdbUnLockRepo(pCache->pRepo);
return -1;
}
SListNode *node = tdListPopHead(pPool->memPool);
STsdbCacheBlock *pBlock = NULL;
tdListNodeGetData(pPool->memPool, node, (void *)(&pBlock));
pBlock->blockId = pPool->index++;
pBlock->offset = 0;
pBlock->remain = pCache->cacheBlockSize;
if (pCache->mem == NULL) { // Create a new one
pCache->mem = (SCacheMem *)malloc(sizeof(SCacheMem));
if (pCache->mem == NULL) return -1;
pCache->mem->keyFirst = INT64_MAX;
pCache->mem->keyLast = 0;
pCache->mem->numOfRows = 0;
pCache->mem->list = tdListNew(sizeof(STsdbCacheBlock *));
}
tdListAppendNode(pCache->mem->list, node);
pCache->curBlock = pBlock;
tsdbUnLockRepo(pCache->pRepo);
return 0;
}
int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) {
STsdbCache *pCache = pRepo->tsdbCache;
int oldNumOfBlocks = pCache->totalCacheBlocks;
tsdbLockRepo((TsdbRepoT *)pRepo);
ASSERT(pCache->totalCacheBlocks != totalBlocks);
if (pCache->totalCacheBlocks < totalBlocks) {
ASSERT(pCache->totalCacheBlocks == pCache->pool.numOfCacheBlocks);
int blocksToAdd = pCache->totalCacheBlocks - totalBlocks;
pCache->totalCacheBlocks = totalBlocks;
for (int i = 0; i < blocksToAdd; i++) {
if (tsdbAddCacheBlockToPool(pCache) < 0) {
tsdbUnLockRepo((TsdbRepoT *)pRepo);
tsdbError("tsdbId:%d, failed to add cache block to cache pool", pRepo->config.tsdbId);
return -1;
}
}
} else {
pCache->totalCacheBlocks = totalBlocks;
tsdbAdjustCacheBlocks(pCache);
}
pRepo->config.totalBlocks = totalBlocks;
tsdbUnLockRepo((TsdbRepoT *)pRepo);
tsdbTrace("vgId:%d, tsdb total cache blocks changed from %d to %d", pRepo->config.tsdbId, oldNumOfBlocks, totalBlocks);
return 0;
}
static int tsdbAddCacheBlockToPool(STsdbCache *pCache) {
STsdbBufferPool *pPool = &pCache->pool;
STsdbCacheBlock *pBlock = malloc(sizeof(STsdbCacheBlock) + pCache->cacheBlockSize);
if (pBlock == NULL) return -1;
pBlock->offset = 0;
pBlock->remain = pCache->cacheBlockSize;
tdListAppend(pPool->memPool, (void *)(&pBlock));
pPool->numOfCacheBlocks++;
return 0;
}
static int tsdbRemoveCacheBlockFromPool(STsdbCache *pCache) {
STsdbBufferPool *pPool = &pCache->pool;
STsdbCacheBlock *pBlock = NULL;
ASSERT(pCache->totalCacheBlocks >= 0);
SListNode *node = tdListPopHead(pPool->memPool);
if (node == NULL) return -1;
tdListNodeGetData(pPool->memPool, node, &pBlock);
free(pBlock);
listNodeFree(node);
pPool->numOfCacheBlocks--;
return 0;
}
void tsdbAdjustCacheBlocks(STsdbCache *pCache) {
while (pCache->totalCacheBlocks < pCache->pool.numOfCacheBlocks) {
if (tsdbRemoveCacheBlockFromPool(pCache) < 0) break;
}
}
\ No newline at end of file
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -13,7 +13,7 @@ static double getCurTime() {
}
typedef struct {
TsdbRepoT *pRepo;
TSDB_REPO_T *pRepo;
bool isAscend;
int tid;
uint64_t uid;
......@@ -136,7 +136,7 @@ TEST(TsdbTest, createRepo) {
tsdbSetDefaultCfg(&config);
ASSERT_EQ(tsdbCreateRepo("/home/ubuntu/work/ttest/vnode0", &config, NULL), 0);
TsdbRepoT *pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL);
TSDB_REPO_T *pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL);
ASSERT_NE(pRepo, nullptr);
// 2. Create a normal table
......
此差异已折叠。
......@@ -25,7 +25,7 @@ typedef int (*iterFunc)(void *, void *cont, int contLen);
typedef void (*afterFunc)(void *);
typedef struct {
int64_t size;
int64_t size; // including 512 bytes of header size
int64_t tombSize;
int64_t nRecords;
int64_t nDels;
......
......@@ -57,6 +57,7 @@ SListNode *tdListPopHead(SList *list);
SListNode *tdListPopTail(SList *list);
SListNode *tdListPopNode(SList *list, SListNode *node);
void tdListMove(SList *src, SList *dst);
void tdListDiscard(SList *list);
void tdListNodeGetData(SList *list, SListNode *node, void *target);
void tdListInitIter(SList *list, SListIter *pIter, TD_LIST_DIRECTION_T direction);
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册