提交 6be31856 编写于 作者: H Haojun Liao

Merge branch 'develop' into feature/query

...@@ -404,6 +404,7 @@ void tscKillSTableQuery(SSqlObj *pSql); ...@@ -404,6 +404,7 @@ void tscKillSTableQuery(SSqlObj *pSql);
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen); void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);
bool tscIsUpdateQuery(SSqlObj* pSql); bool tscIsUpdateQuery(SSqlObj* pSql);
bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes); bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes);
bool tscResultsetFetchCompleted(TAOS_RES *result);
char *tscGetErrorMsgPayload(SSqlCmd *pCmd); char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
......
...@@ -182,21 +182,25 @@ static SArray* getTableList( SSqlObj* pSql ) { ...@@ -182,21 +182,25 @@ static SArray* getTableList( SSqlObj* pSql ) {
char* sql = alloca(strlen(p) + 32); char* sql = alloca(strlen(p) + 32);
sprintf(sql, "select tbid(tbname)%s", p); sprintf(sql, "select tbid(tbname)%s", p);
SSqlObj* pSql1 = taos_query(pSql->pTscObj, sql); SSqlObj* pNew = taos_query(pSql->pTscObj, sql);
if (terrno != TSDB_CODE_SUCCESS) { if (pNew == NULL) {
tscError("failed to retrieve table id: %s", tstrerror(terrno)); tscError("failed to retrieve table id: cannot create new sql object.");
return NULL;
} else if (taos_errno(pNew) != TSDB_CODE_SUCCESS) {
tscError("failed to retrieve table id: %s", tstrerror(taos_errno(pNew)));
return NULL; return NULL;
} }
TAOS_ROW row; TAOS_ROW row;
SArray* result = taosArrayInit( 128, sizeof(STidTags) ); SArray* result = taosArrayInit( 128, sizeof(STidTags) );
while ((row = taos_fetch_row(pSql1))) { while ((row = taos_fetch_row(pNew))) {
STidTags tags; STidTags tags;
memcpy(&tags, row[0], sizeof(tags)); memcpy(&tags, row[0], sizeof(tags));
taosArrayPush(result, &tags); taosArrayPush(result, &tags);
} }
taos_free_result(pSql1); taos_free_result(pNew);
return result; return result;
} }
...@@ -222,6 +226,9 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { ...@@ -222,6 +226,9 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
} }
SArray* tables = getTableList(pSql); SArray* tables = getTableList(pSql);
if (tables == NULL) {
return 0;
}
size_t numOfTables = taosArrayGetSize(tables); size_t numOfTables = taosArrayGetSize(tables);
SArray* progress = taosArrayInit(numOfTables, sizeof(SSubscriptionProgress)); SArray* progress = taosArrayInit(numOfTables, sizeof(SSubscriptionProgress));
...@@ -242,6 +249,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { ...@@ -242,6 +249,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
} }
taosArrayDestroy(tables); taosArrayDestroy(tables);
TSDB_QUERY_SET_TYPE(tscGetQueryInfoDetail(pCmd, 0)->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
return 1; return 1;
} }
...@@ -413,7 +421,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { ...@@ -413,7 +421,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
} }
if (pRes->code != TSDB_CODE_SUCCESS) { if (pRes->code != TSDB_CODE_SUCCESS) {
tscError("failed to query data, error code=%d", pRes->code); tscError("failed to query data: %s", tstrerror(pRes->code));
tscRemoveFromSqlList(pSql); tscRemoveFromSqlList(pSql);
return NULL; return NULL;
} }
......
...@@ -1984,6 +1984,11 @@ bool tscHasReachLimitation(SQueryInfo* pQueryInfo, SSqlRes* pRes) { ...@@ -1984,6 +1984,11 @@ bool tscHasReachLimitation(SQueryInfo* pQueryInfo, SSqlRes* pRes) {
return (pQueryInfo->clauseLimit > 0 && pRes->numOfClauseTotal >= pQueryInfo->clauseLimit); return (pQueryInfo->clauseLimit > 0 && pRes->numOfClauseTotal >= pQueryInfo->clauseLimit);
} }
bool tscResultsetFetchCompleted(TAOS_RES *result) {
SSqlRes* pRes = result;
return pRes->completed;
}
char* tscGetErrorMsgPayload(SSqlCmd* pCmd) { return pCmd->payload; } char* tscGetErrorMsgPayload(SSqlCmd* pCmd) { return pCmd->payload; }
/** /**
......
...@@ -83,8 +83,8 @@ typedef struct { ...@@ -83,8 +83,8 @@ typedef struct {
#define tdFreeSchema(s) tfree((s)) #define tdFreeSchema(s) tfree((s))
STSchema *tdDupSchema(STSchema *pSchema); STSchema *tdDupSchema(STSchema *pSchema);
void * tdEncodeSchema(void *dst, STSchema *pSchema); int tdEncodeSchema(void **buf, STSchema *pSchema);
STSchema *tdDecodeSchema(void **psrc); void * tdDecodeSchema(void *buf, STSchema **pRSchema);
static FORCE_INLINE int comparColId(const void *key1, const void *key2) { static FORCE_INLINE int comparColId(const void *key1, const void *key2) {
if (*(int16_t *)key1 > ((STColumn *)key2)->colId) { if (*(int16_t *)key1 > ((STColumn *)key2)->colId) {
...@@ -288,7 +288,7 @@ typedef struct { ...@@ -288,7 +288,7 @@ typedef struct {
SKVRow tdKVRowDup(SKVRow row); SKVRow tdKVRowDup(SKVRow row);
int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value); int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value);
void * tdEncodeKVRow(void *buf, SKVRow row); int tdEncodeKVRow(void **buf, SKVRow row);
void * tdDecodeKVRow(void *buf, SKVRow *row); void * tdDecodeKVRow(void *buf, SKVRow *row);
static FORCE_INLINE int comparTagId(const void *key1, const void *key2) { static FORCE_INLINE int comparTagId(const void *key1, const void *key2) {
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "tdataformat.h" #include "tdataformat.h"
#include "talgo.h" #include "talgo.h"
#include "tcoding.h"
#include "wchar.h" #include "wchar.h"
/** /**
...@@ -33,50 +34,50 @@ STSchema *tdDupSchema(STSchema *pSchema) { ...@@ -33,50 +34,50 @@ STSchema *tdDupSchema(STSchema *pSchema) {
/** /**
* Encode a schema to dst, and return the next pointer * Encode a schema to dst, and return the next pointer
*/ */
void *tdEncodeSchema(void *dst, STSchema *pSchema) { int tdEncodeSchema(void **buf, STSchema *pSchema) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, schemaVersion(pSchema));
tlen += taosEncodeFixedI32(buf, schemaNCols(pSchema));
T_APPEND_MEMBER(dst, pSchema, STSchema, version);
T_APPEND_MEMBER(dst, pSchema, STSchema, numOfCols);
for (int i = 0; i < schemaNCols(pSchema); i++) { for (int i = 0; i < schemaNCols(pSchema); i++) {
STColumn *pCol = schemaColAt(pSchema, i); STColumn *pCol = schemaColAt(pSchema, i);
T_APPEND_MEMBER(dst, pCol, STColumn, type); tlen += taosEncodeFixedI8(buf, colType(pCol));
T_APPEND_MEMBER(dst, pCol, STColumn, colId); tlen += taosEncodeFixedI16(buf, colColId(pCol));
T_APPEND_MEMBER(dst, pCol, STColumn, bytes); tlen += taosEncodeFixedI32(buf, colBytes(pCol));
} }
return dst; return tlen;
} }
/** /**
* Decode a schema from a binary. * Decode a schema from a binary.
*/ */
STSchema *tdDecodeSchema(void **psrc) { void *tdDecodeSchema(void *buf, STSchema **pRSchema) {
int totalCols = 0;
int version = 0; int version = 0;
STSchemaBuilder schemaBuilder = {0}; int numOfCols = 0;
STSchemaBuilder schemaBuilder;
T_READ_MEMBER(*psrc, int, version); buf = taosDecodeFixedI32(buf, &version);
T_READ_MEMBER(*psrc, int, totalCols); buf = taosDecodeFixedI32(buf, &numOfCols);
if (tdInitTSchemaBuilder(&schemaBuilder, version) < 0) return NULL; if (tdInitTSchemaBuilder(&schemaBuilder, version) < 0) return NULL;
for (int i = 0; i < totalCols; i++) { for (int i = 0; i < numOfCols; i++) {
int8_t type = 0; int8_t type = 0;
int16_t colId = 0; int16_t colId = 0;
int32_t bytes = 0; int32_t bytes = 0;
T_READ_MEMBER(*psrc, int8_t, type); buf = taosDecodeFixedI8(buf, &type);
T_READ_MEMBER(*psrc, int16_t, colId); buf = taosDecodeFixedI16(buf, &colId);
T_READ_MEMBER(*psrc, int32_t, bytes); buf = taosDecodeFixedI32(buf, &bytes);
if (tdAddColToSchema(&schemaBuilder, type, colId, bytes) < 0) { if (tdAddColToSchema(&schemaBuilder, type, colId, bytes) < 0) {
tdDestroyTSchemaBuilder(&schemaBuilder); tdDestroyTSchemaBuilder(&schemaBuilder);
return NULL; return NULL;
} }
} }
STSchema *pSchema = tdGetSchemaFromBuilder(&schemaBuilder); *pRSchema = tdGetSchemaFromBuilder(&schemaBuilder);
tdDestroyTSchemaBuilder(&schemaBuilder); tdDestroyTSchemaBuilder(&schemaBuilder);
return pSchema; return buf;
} }
int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) { int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
...@@ -605,14 +606,19 @@ int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) { ...@@ -605,14 +606,19 @@ int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) {
return 0; return 0;
} }
void *tdEncodeKVRow(void *buf, SKVRow row) { int tdEncodeKVRow(void **buf, SKVRow row) {
// May change the encode purpose // May change the encode purpose
kvRowCpy(buf, row); if (buf != NULL) {
return POINTER_SHIFT(buf, kvRowLen(row)); kvRowCpy(*buf, row);
*buf = POINTER_SHIFT(*buf, kvRowLen(row));
}
return kvRowLen(row);
} }
void *tdDecodeKVRow(void *buf, SKVRow *row) { void *tdDecodeKVRow(void *buf, SKVRow *row) {
*row = tdKVRowDup(buf); *row = tdKVRowDup(buf);
if (*row == NULL) return NULL;
return POINTER_SHIFT(buf, kvRowLen(*row)); return POINTER_SHIFT(buf, kvRowLen(*row));
} }
......
...@@ -183,8 +183,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0214, "vnode no w ...@@ -183,8 +183,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0214, "vnode no w
// tsdb // tsdb
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, 0, 0x0600, "tsdb invalid table id") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, 0, 0x0600, "tsdb invalid table id")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_TYPE, 0, 0x0601, "tsdb invalid table schema version") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_TYPE, 0, 0x0601, "tsdb invalid table type")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_SCHEMA_VERSION, 0, 0x0602, "tsdb invalid table schema version") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION, 0, 0x0602, "tsdb invalid table schema version")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_ALREADY_EXIST, 0, 0x0603, "tsdb table already exist") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_ALREADY_EXIST, 0, 0x0603, "tsdb table already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_CONFIG, 0, 0x0604, "tsdb invalid configuration") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_CONFIG, 0, 0x0604, "tsdb invalid configuration")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INIT_FAILED, 0, 0x0605, "tsdb init failed") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INIT_FAILED, 0, 0x0605, "tsdb init failed")
...@@ -194,6 +194,11 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_CORRUPTED, 0, 0x0608, "tsdb file ...@@ -194,6 +194,11 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_CORRUPTED, 0, 0x0608, "tsdb file
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_OUT_OF_MEMORY, 0, 0x0609, "tsdb out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_OUT_OF_MEMORY, 0, 0x0609, "tsdb out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE, 0, 0x060A, "tsdb tag version is out of date") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE, 0, 0x060A, "tsdb tag version is out of date")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE, 0, 0x060B, "tsdb timestamp is out of range") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE, 0, 0x060B, "tsdb timestamp is out of range")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP, 0, 0x060C, "tsdb submit message is messed up")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_ACTION, 0, 0x060D, "tsdb invalid action")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_CREATE_TB_MSG, 0, 0x060E, "tsdb invalid create table message")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM, 0, 0x060F, "tsdb no table data in memory skiplist")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_ALREADY_EXISTS, 0, 0x0610, "tsdb file already exists")
// query // query
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, 0, 0x0700, "query invalid handle") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, 0, 0x0700, "query invalid handle")
......
...@@ -19,11 +19,11 @@ ...@@ -19,11 +19,11 @@
#include <stdbool.h> #include <stdbool.h>
#include <stdint.h> #include <stdint.h>
#include "tdataformat.h"
#include "tname.h"
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tarray.h" #include "tarray.h"
#include "tdataformat.h"
#include "tname.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -72,19 +72,16 @@ typedef struct { ...@@ -72,19 +72,16 @@ typedef struct {
int64_t pointsWritten; // total data points written int64_t pointsWritten; // total data points written
} STsdbStat; } STsdbStat;
typedef void TsdbRepoT; // use void to hide implementation details from outside typedef void TSDB_REPO_T; // use void to hide implementation details from outside
void tsdbSetDefaultCfg(STsdbCfg *pCfg); STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo);
STsdbCfg *tsdbCreateDefaultCfg();
void tsdbFreeCfg(STsdbCfg *pCfg);
STsdbCfg *tsdbGetCfg(const TsdbRepoT *repo);
// --------- TSDB REPOSITORY DEFINITION // --------- TSDB REPOSITORY DEFINITION
int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter); int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg);
int32_t tsdbDropRepo(TsdbRepoT *repo); int32_t tsdbDropRepo(char *rootDir);
TsdbRepoT *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH); TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH);
int32_t tsdbCloseRepo(TsdbRepoT *repo, int toCommit); void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit);
int32_t tsdbConfigRepo(TsdbRepoT *repo, STsdbCfg *pCfg); int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg);
// --------- TSDB TABLE DEFINITION // --------- TSDB TABLE DEFINITION
typedef struct { typedef struct {
...@@ -106,28 +103,19 @@ typedef struct { ...@@ -106,28 +103,19 @@ typedef struct {
char * sql; char * sql;
} STableCfg; } STableCfg;
int tsdbInitTableCfg(STableCfg *config, ETableType type, uint64_t uid, int32_t tid);
int tsdbTableSetSuperUid(STableCfg *config, uint64_t uid);
int tsdbTableSetSchema(STableCfg *config, STSchema *pSchema, bool dup);
int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup);
int tsdbTableSetTagValue(STableCfg *config, SKVRow row, bool dup);
int tsdbTableSetName(STableCfg *config, char *name, bool dup);
int tsdbTableSetSName(STableCfg *config, char *sname, bool dup);
int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup);
void tsdbClearTableCfg(STableCfg *config); void tsdbClearTableCfg(STableCfg *config);
void* tsdbGetTableTagVal(TsdbRepoT* repo, const STableId* id, int32_t colId, int16_t type, int16_t bytes); void * tsdbGetTableTagVal(TSDB_REPO_T *repo, const STableId *id, int32_t colId, int16_t type, int16_t bytes);
char* tsdbGetTableName(TsdbRepoT *repo, const STableId *id); char * tsdbGetTableName(TSDB_REPO_T *repo, const STableId *id);
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg); STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);
int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg); int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg);
int tsdbDropTable(TsdbRepoT *pRepo, STableId tableId); int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId);
int tsdbAlterTable(TsdbRepoT *repo, STableCfg *pCfg); int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg);
int tsdbUpdateTagValue(TsdbRepoT *repo, SUpdateTableTagValMsg *pMsg); TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid);
TSKEY tsdbGetTableLastKey(TsdbRepoT *repo, uint64_t uid); void tsdbStartStream(TSDB_REPO_T *repo);
void tsdbStartStream(TsdbRepoT *repo);
uint32_t tsdbGetFileInfo(TsdbRepoT *repo, char *name, uint32_t *index, uint32_t eindex, int32_t *size); uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int32_t *size);
// the TSDB repository info // the TSDB repository info
typedef struct STsdbRepoInfo { typedef struct STsdbRepoInfo {
...@@ -137,7 +125,7 @@ typedef struct STsdbRepoInfo { ...@@ -137,7 +125,7 @@ typedef struct STsdbRepoInfo {
int64_t tsdbTotalDiskSize; // the total disk size taken by this TSDB repository int64_t tsdbTotalDiskSize; // the total disk size taken by this TSDB repository
// TODO: Other informations to add // TODO: Other informations to add
} STsdbRepoInfo; } STsdbRepoInfo;
STsdbRepoInfo *tsdbGetStatus(TsdbRepoT *pRepo); STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo);
// the meter information report structure // the meter information report structure
typedef struct { typedef struct {
...@@ -146,7 +134,7 @@ typedef struct { ...@@ -146,7 +134,7 @@ typedef struct {
int64_t tableTotalDataSize; // In bytes int64_t tableTotalDataSize; // In bytes
int64_t tableTotalDiskSize; // In bytes int64_t tableTotalDiskSize; // In bytes
} STableInfo; } STableInfo;
STableInfo *tsdbGetTableInfo(TsdbRepoT *pRepo, STableId tid); STableInfo *tsdbGetTableInfo(TSDB_REPO_T *pRepo, STableId tid);
// -- FOR INSERT DATA // -- FOR INSERT DATA
/** /**
...@@ -156,7 +144,7 @@ STableInfo *tsdbGetTableInfo(TsdbRepoT *pRepo, STableId tid); ...@@ -156,7 +144,7 @@ STableInfo *tsdbGetTableInfo(TsdbRepoT *pRepo, STableId tid);
* *
* @return the number of points inserted, -1 for failure and the error number is set * @return the number of points inserted, -1 for failure and the error number is set
*/ */
int32_t tsdbInsertData(TsdbRepoT *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg * pRsp) ; int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp);
// -- FOR QUERY TIME SERIES DATA // -- FOR QUERY TIME SERIES DATA
...@@ -199,7 +187,7 @@ typedef void *TsdbPosT; ...@@ -199,7 +187,7 @@ typedef void *TsdbPosT;
* @param qinfo query info handle from query processor * @param qinfo query info handle from query processor
* @return * @return
*/ */
TsdbQueryHandleT *tsdbQueryTables(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo, void* qinfo); TsdbQueryHandleT *tsdbQueryTables(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo, void *qinfo);
/** /**
* Get the last row of the given query time window for all the tables in STableGroupInfo object. * Get the last row of the given query time window for all the tables in STableGroupInfo object.
...@@ -207,15 +195,17 @@ TsdbQueryHandleT *tsdbQueryTables(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STable ...@@ -207,15 +195,17 @@ TsdbQueryHandleT *tsdbQueryTables(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STable
* all tables in this group. * all tables in this group.
* *
* @param tsdb tsdb handle * @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block * @param pCond query condition, including time window, result set order, and basic required columns for each
* block
* @param groupInfo tableId list. * @param groupInfo tableId list.
* @return * @return
*/ */
TsdbQueryHandleT tsdbQueryLastRow(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo, void* qinfo); TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo, void *qinfo);
SArray* tsdbGetQueriedTableIdList(TsdbQueryHandleT *pHandle); SArray *tsdbGetQueriedTableIdList(TsdbQueryHandleT *pHandle);
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, void* qinfo); TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList,
void *qinfo);
/** /**
* move to next block if exists * move to next block if exists
...@@ -293,7 +283,7 @@ SArray *tsdbGetTableList(TsdbQueryHandleT *pQueryHandle); ...@@ -293,7 +283,7 @@ SArray *tsdbGetTableList(TsdbQueryHandleT *pQueryHandle);
* @param stableid. super table sid * @param stableid. super table sid
* @param pTagCond. tag query condition * @param pTagCond. tag query condition
*/ */
int32_t tsdbQuerySTableByTagCond(TsdbRepoT *tsdb, uint64_t uid, const char *pTagCond, size_t len, int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T *tsdb, uint64_t uid, const char *pTagCond, size_t len,
int16_t tagNameRelType, const char *tbnameCond, STableGroupInfo *pGroupList, int16_t tagNameRelType, const char *tbnameCond, STableGroupInfo *pGroupList,
SColIndex *pColIndex, int32_t numOfCols); SColIndex *pColIndex, int32_t numOfCols);
...@@ -305,7 +295,7 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT *tsdb, uint64_t uid, const char *pTag ...@@ -305,7 +295,7 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT *tsdb, uint64_t uid, const char *pTag
* @param pGroupInfo the generated result * @param pGroupInfo the generated result
* @return * @return
*/ */
int32_t tsdbGetOneTableGroup(TsdbRepoT *tsdb, uint64_t uid, STableGroupInfo *pGroupInfo); int32_t tsdbGetOneTableGroup(TSDB_REPO_T *tsdb, uint64_t uid, STableGroupInfo *pGroupInfo);
/** /**
* clean up the query handle * clean up the query handle
......
...@@ -82,7 +82,8 @@ typedef struct STableObj { ...@@ -82,7 +82,8 @@ typedef struct STableObj {
typedef struct SSuperTableObj { typedef struct SSuperTableObj {
STableObj info; STableObj info;
int8_t reserved0[3]; // for fill struct STableObj to 4byte align int8_t reserved0[1]; // for fill struct STableObj to 4byte align
int16_t nextColId;
int32_t sversion; int32_t sversion;
uint64_t uid; uint64_t uid;
int64_t createdTime; int64_t createdTime;
...@@ -95,13 +96,13 @@ typedef struct SSuperTableObj { ...@@ -95,13 +96,13 @@ typedef struct SSuperTableObj {
int32_t numOfTables; int32_t numOfTables;
SSchema * schema; SSchema * schema;
void * vgHash; void * vgHash;
int16_t nextColId;
int8_t reserved2[6]; int8_t reserved2[6];
} SSuperTableObj; } SSuperTableObj;
typedef struct { typedef struct {
STableObj info; STableObj info;
int8_t reserved0[3]; // for fill struct STableObj to 4byte align int8_t reserved0[1]; // for fill struct STableObj to 4byte align
int16_t nextColId; //used by normal table
int32_t sversion; //used by normal table int32_t sversion; //used by normal table
uint64_t uid; uint64_t uid;
uint64_t suid; uint64_t suid;
...@@ -112,7 +113,6 @@ typedef struct { ...@@ -112,7 +113,6 @@ typedef struct {
int32_t sqlLen; int32_t sqlLen;
int8_t updateEnd[1]; int8_t updateEnd[1];
int8_t reserved1[1]; int8_t reserved1[1];
int16_t nextColId; //used by normal table
int32_t refCount; int32_t refCount;
char* sql; //used by normal table char* sql; //used by normal table
SSchema* schema; //used by normal table SSchema* schema; //used by normal table
......
...@@ -678,29 +678,16 @@ int32_t sdbDeleteRow(SSdbOper *pOper) { ...@@ -678,29 +678,16 @@ int32_t sdbDeleteRow(SSdbOper *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void * key = sdbGetObjKey(pTable, pOper->pObj); int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK;
int32_t keySize = 0;
switch (pTable->keyType) {
case SDB_KEY_STRING:
case SDB_KEY_VAR_STRING:
keySize = strlen((char *)key) + 1;
break;
case SDB_KEY_INT:
case SDB_KEY_AUTO:
keySize = sizeof(uint32_t);
break;
default:
return TSDB_CODE_MND_SDB_INVAID_KEY_TYPE;
}
int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + keySize + SDB_SYNC_HACK;
SSdbOper *pNewOper = taosAllocateQitem(size); SSdbOper *pNewOper = taosAllocateQitem(size);
SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper) + SDB_SYNC_HACK; SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper) + SDB_SYNC_HACK;
pHead->version = 0; pHead->version = 0;
pHead->len = keySize;
pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE; pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE;
memcpy(pHead->cont, key, keySize);
pOper->rowData = pHead->cont;
(*pTable->encodeFp)(pOper);
pHead->len = pOper->rowSize;
memcpy(pNewOper, pOper, sizeof(SSdbOper)); memcpy(pNewOper, pOper, sizeof(SSdbOper));
......
...@@ -65,7 +65,7 @@ int32_t mnodeInitShow() { ...@@ -65,7 +65,7 @@ int32_t mnodeInitShow() {
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mnodeProcessConnectMsg); mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_CONNECT, mnodeProcessConnectMsg);
mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mnodeProcessUseMsg); mnodeAddReadMsgHandle(TSDB_MSG_TYPE_CM_USE_DB, mnodeProcessUseMsg);
tsMnodeShowCache = taosCacheInitWithCb(10, mnodeFreeShowObj); tsMnodeShowCache = taosCacheInitWithCb(5, mnodeFreeShowObj);
return 0; return 0;
} }
...@@ -139,7 +139,7 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) { ...@@ -139,7 +139,7 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) {
pShowRsp->qhandle = htobe64((uint64_t) pShow); pShowRsp->qhandle = htobe64((uint64_t) pShow);
int32_t code = (*tsMnodeShowMetaFp[pShowMsg->type])(&pShowRsp->tableMeta, pShow, pMsg->rpcMsg.handle); int32_t code = (*tsMnodeShowMetaFp[pShowMsg->type])(&pShowRsp->tableMeta, pShow, pMsg->rpcMsg.handle);
mTrace("%p, show type:%s index:%d, get meta finished, rows:%d cols:%d result:%s", pShow, mTrace("%p, show type:%s index:%d, get meta finished, numOfRows:%d cols:%d result:%s", pShow,
mnodeGetShowType(pShowMsg->type), pShow->index, pShow->numOfRows, pShow->numOfColumns, tstrerror(code)); mnodeGetShowType(pShowMsg->type), pShow->index, pShow->numOfRows, pShow->numOfColumns, tstrerror(code));
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
...@@ -219,8 +219,10 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) { ...@@ -219,8 +219,10 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) {
if (rowsToRead == 0 || (rowsRead == rowsToRead && pShow->numOfRows == pShow->numOfReads)) { if (rowsToRead == 0 || (rowsRead == rowsToRead && pShow->numOfRows == pShow->numOfReads)) {
pRsp->completed = 1; pRsp->completed = 1;
mTrace("%p, retrieve completed", pShow);
mnodeReleaseShowObj(pShow, true); mnodeReleaseShowObj(pShow, true);
} else { } else {
mTrace("%p, retrieve not completed yet", pShow);
mnodeReleaseShowObj(pShow, false); mnodeReleaseShowObj(pShow, false);
} }
...@@ -379,10 +381,10 @@ static void *mnodePutShowObj(SShowObj *pShow, int32_t size) { ...@@ -379,10 +381,10 @@ static void *mnodePutShowObj(SShowObj *pShow, int32_t size) {
pShow->index = atomic_add_fetch_32(&tsShowObjIndex, 1); pShow->index = atomic_add_fetch_32(&tsShowObjIndex, 1);
sprintf(key, "%d", pShow->index); sprintf(key, "%d", pShow->index);
SShowObj *newQhandle = taosCachePut(tsMnodeShowCache, key, pShow, size, 60); SShowObj *newQhandle = taosCachePut(tsMnodeShowCache, key, pShow, size, 6);
free(pShow); free(pShow);
mTrace("%p, show is put into cache", newQhandle); mTrace("%p, show is put into cache, index:%s", newQhandle, key);
return newQhandle; return newQhandle;
} }
......
...@@ -1753,11 +1753,43 @@ static int32_t mnodeFindNormalTableColumnIndex(SChildTableObj *pTable, char *col ...@@ -1753,11 +1753,43 @@ static int32_t mnodeFindNormalTableColumnIndex(SChildTableObj *pTable, char *col
return -1; return -1;
} }
static int32_t mnodeAddNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) { static int32_t mnodeAlterNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
mLPrint("app:%p:%p, ctable %s, add column result:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, if (code != TSDB_CODE_SUCCESS) {
mError("app:%p:%p, ctable %s, failed to alter column, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
tstrerror(code)); tstrerror(code));
return code; return code;
}
SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(NULL, pTable);
if (pMDCreate == NULL) {
return terrno;
}
if (pMsg->pVgroup == NULL) {
pMsg->pVgroup = mnodeGetVgroup(pTable->vgId);
if (pMsg->pVgroup == NULL) {
rpcFreeCont(pMDCreate);
mError("app:%p:%p, ctable %s, vgId:%d not exist in mnode", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
pTable->vgId);
return TSDB_CODE_MND_VGROUP_NOT_EXIST;
}
}
SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pMsg->pVgroup);
SRpcMsg rpcMsg = {
.handle = pMsg,
.pCont = pMDCreate,
.contLen = htonl(pMDCreate->contLen),
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_ALTER_TABLE
};
mTrace("app:%p:%p, ctable %s, send alter column msg to vgId:%d", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
pMsg->pVgroup->vgId);
dnodeSendMsgToDnode(&ipSet, &rpcMsg);
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32_t ncols) { static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32_t ncols) {
...@@ -1802,7 +1834,7 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3 ...@@ -1802,7 +1834,7 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3
.table = tsChildTableSdb, .table = tsChildTableSdb,
.pObj = pTable, .pObj = pTable,
.pMsg = pMsg, .pMsg = pMsg,
.cb = mnodeAddNormalTableColumnCb .cb = mnodeAlterNormalTableColumnCb
}; };
int32_t code = sdbUpdateRow(&oper); int32_t code = sdbUpdateRow(&oper);
...@@ -1813,13 +1845,6 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3 ...@@ -1813,13 +1845,6 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3
return code; return code;
} }
static int32_t mnodeDropNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
mLPrint("app:%p:%p, ctable %s, drop column result:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
tstrerror(code));
return code;
}
static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) { static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) {
SDbObj *pDb = pMsg->pDb; SDbObj *pDb = pMsg->pDb;
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
...@@ -1847,7 +1872,7 @@ static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) { ...@@ -1847,7 +1872,7 @@ static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) {
.table = tsChildTableSdb, .table = tsChildTableSdb,
.pObj = pTable, .pObj = pTable,
.pMsg = pMsg, .pMsg = pMsg,
.cb = mnodeDropNormalTableColumnCb .cb = mnodeAlterNormalTableColumnCb
}; };
int32_t code = sdbUpdateRow(&oper); int32_t code = sdbUpdateRow(&oper);
...@@ -2185,9 +2210,33 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { ...@@ -2185,9 +2210,33 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
} }
} }
// not implemented yet
static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg) { static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg) {
mTrace("alter table rsp received, handle:%p code:%s", rpcMsg->handle, tstrerror(rpcMsg->code)); if (rpcMsg->handle == NULL) return;
SMnodeMsg *mnodeMsg = rpcMsg->handle;
mnodeMsg->received++;
SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable;
assert(pTable);
if (rpcMsg->code == TSDB_CODE_SUCCESS || rpcMsg->code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
mTrace("app:%p:%p, ctable:%s, altered in dnode, thandle:%p result:%s", mnodeMsg->rpcMsg.ahandle, mnodeMsg,
pTable->info.tableId, mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code));
dnodeSendRpcMnodeWriteRsp(mnodeMsg, TSDB_CODE_SUCCESS);
} else {
if (mnodeMsg->retry++ < 3) {
mTrace("app:%p:%p, table:%s, alter table rsp received, need retry, times:%d result:%s thandle:%p",
mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, mnodeMsg->retry, tstrerror(rpcMsg->code),
mnodeMsg->rpcMsg.handle);
dnodeDelayReprocessMnodeWriteMsg(mnodeMsg);
} else {
mError("app:%p:%p, table:%s, failed to alter in dnode, result:%s thandle:%p", mnodeMsg->rpcMsg.ahandle, mnodeMsg,
pTable->info.tableId, tstrerror(rpcMsg->code), mnodeMsg->rpcMsg.handle);
dnodeSendRpcMnodeWriteRsp(mnodeMsg, rpcMsg->code);
}
}
} }
static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
......
...@@ -258,7 +258,37 @@ void mnodeUpdateVgroup(SVgObj *pVgroup) { ...@@ -258,7 +258,37 @@ void mnodeUpdateVgroup(SVgObj *pVgroup) {
mnodeSendCreateVgroupMsg(pVgroup, NULL); mnodeSendCreateVgroupMsg(pVgroup, NULL);
} }
void mnodeCheckUnCreatedVgroup(SDnodeObj *pDnode, SVnodeLoad *pVloads, int32_t openVnodes) {} /*
Traverse all vgroups on mnode, if there no such vgId on a dnode, so send msg to this dnode for re-creating this vgId/vnode
*/
void mnodeCheckUnCreatedVgroup(SDnodeObj *pDnode, SVnodeLoad *pVloads, int32_t openVnodes) {
SVnodeLoad *pNextV = NULL;
void *pIter = NULL;
while (1) {
SVgObj *pVgroup;
pIter = mnodeGetNextVgroup(pIter, &pVgroup);
if (pVgroup == NULL) break;
pNextV = pVloads;
int32_t i;
for (i = 0; i < openVnodes; ++i) {
if ((pVgroup->vnodeGid[i].pDnode == pDnode) && (pVgroup->vgId == pNextV->vgId)) {
break;
}
pNextV++;
}
if (i == openVnodes) {
mnodeSendCreateVgroupMsg(pVgroup, NULL);
}
mnodeDecVgroupRef(pVgroup);
}
sdbFreeIter(pIter);
return;
}
void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVload) { void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVload) {
bool dnodeExist = false; bool dnodeExist = false;
......
...@@ -205,6 +205,10 @@ void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int num ...@@ -205,6 +205,10 @@ void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int num
} }
} }
if (tscResultsetFetchCompleted(result)) {
isContinue = false;
}
if (isContinue) { if (isContinue) {
// retrieve next batch of rows // retrieve next batch of rows
httpTrace("context:%p, fd:%d, ip:%s, user:%s, continue retrieve, numOfRows:%d", pContext, pContext->fd, httpTrace("context:%p, fd:%d, ip:%s, user:%s, continue retrieve, numOfRows:%d", pContext, pContext->fd,
......
...@@ -269,14 +269,20 @@ int tgReadSchema(char *fileName) { ...@@ -269,14 +269,20 @@ int tgReadSchema(char *fileName) {
httpPrint("open telegraf schema file:%s success", fileName); httpPrint("open telegraf schema file:%s success", fileName);
fseek(fp, 0, SEEK_END); fseek(fp, 0, SEEK_END);
int32_t contentSize = (int32_t)ftell(fp); int32_t contentSize = (int32_t)ftell(fp);
if (contentSize <= 0) {
fclose(fp);
return 0;
}
rewind(fp); rewind(fp);
char * content = (char *)calloc(contentSize + 1, 1); char * content = (char *)calloc(contentSize + 1, 1);
int32_t result = fread(content, 1, contentSize, fp); int32_t result = fread(content, 1, contentSize, fp);
if (result != contentSize) { if (result != contentSize) {
httpError("failed to read telegraf schema file:%s", fileName); httpError("failed to read telegraf schema file:%s", fileName);
fclose(fp); fclose(fp);
free(content); free(content);
return -1; return 0;
} }
content[contentSize] = 0; content[contentSize] = 0;
......
...@@ -774,7 +774,7 @@ static void tQueryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, ...@@ -774,7 +774,7 @@ static void tQueryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo,
char * pData = SL_GET_NODE_DATA(pNode); char * pData = SL_GET_NODE_DATA(pNode);
// todo refactor: // todo refactor:
tstr *name = ((STableIndexElem *)pData)->pTable->name; tstr *name = (*(STable **)pData)->name;
// todo speed up by using hash // todo speed up by using hash
if (pQueryInfo->colIndex == TSDB_TBNAME_COLUMN_INDEX) { if (pQueryInfo->colIndex == TSDB_TBNAME_COLUMN_INDEX) {
if (pQueryInfo->optr == TSDB_RELATION_IN) { if (pQueryInfo->optr == TSDB_RELATION_IN) {
......
...@@ -437,7 +437,7 @@ static void *taosProcessTcpData(void *param) { ...@@ -437,7 +437,7 @@ static void *taosProcessTcpData(void *param) {
while (1) { while (1) {
int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1); int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1);
if (pThreadObj->stop) { if (pThreadObj->stop) {
tTrace("%s, tcp thread get stop event, exiting...", pThreadObj->label); tTrace("%s TCP thread get stop event, exiting...", pThreadObj->label);
break; break;
} }
if (fdNum < 0) continue; if (fdNum < 0) continue;
......
...@@ -142,16 +142,15 @@ void taosCleanUpUdpConnection(void *handle) { ...@@ -142,16 +142,15 @@ void taosCleanUpUdpConnection(void *handle) {
pConn = pSet->udpConn + i; pConn = pSet->udpConn + i;
pConn->signature = NULL; pConn->signature = NULL;
// shutdown to signal the thread to exit if (pConn->fd >=0) shutdown(pConn->fd, SHUT_RDWR);
if ( pConn->fd >=0) shutdown(pConn->fd, SHUT_RD); if (pConn->fd >=0) taosCloseSocket(pConn->fd);
} }
for (int i = 0; i < pSet->threads; ++i) { for (int i = 0; i < pSet->threads; ++i) {
pConn = pSet->udpConn + i; pConn = pSet->udpConn + i;
if (pConn->thread) pthread_join(pConn->thread, NULL); if (pConn->thread) pthread_join(pConn->thread, NULL);
if (pConn->fd >=0) taosCloseSocket(pConn->fd);
tfree(pConn->buffer); tfree(pConn->buffer);
tTrace("UDP chandle:%p is closed", pConn); tTrace("%s UDP thread is closed, inedx:%d", pConn->label, i);
} }
tfree(pSet); tfree(pSet);
...@@ -185,15 +184,15 @@ static void *taosRecvUdpData(void *param) { ...@@ -185,15 +184,15 @@ static void *taosRecvUdpData(void *param) {
while (1) { while (1) {
dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen); dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen);
if(dataLen == 0) { if(dataLen <= 0) {
tTrace("data length is 0, socket was closed, exiting"); tTrace("%s UDP socket was closed, exiting", pConn->label);
break; break;
} }
port = ntohs(sourceAdd.sin_port); port = ntohs(sourceAdd.sin_port);
if (dataLen < sizeof(SRpcHead)) { if (dataLen < sizeof(SRpcHead)) {
tError("%s recvfrom failed, reason:%s\n", pConn->label, strerror(errno)); tError("%s recvfrom failed(%s)", pConn->label, strerror(errno));
continue; continue;
} }
......
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdb.h"
#include "tsdbMain.h"
#define POOL_IS_EMPTY(b) (listNEles((b)->bufBlockList) == 0)
static STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize);
static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock);
// ---------------- INTERNAL FUNCTIONS ----------------
STsdbBufPool *tsdbNewBufPool() {
STsdbBufPool *pBufPool = (STsdbBufPool *)calloc(1, sizeof(*pBufPool));
if (pBufPool == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
int code = pthread_cond_init(&(pBufPool->poolNotEmpty), NULL);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
goto _err;
}
pBufPool->bufBlockList = tdListNew(sizeof(STsdbBufBlock *));
if (pBufPool->bufBlockList == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
return pBufPool;
_err:
tsdbFreeBufPool(pBufPool);
return NULL;
}
void tsdbFreeBufPool(STsdbBufPool *pBufPool) {
if (pBufPool) {
if (pBufPool->bufBlockList) {
ASSERT(listNEles(pBufPool->bufBlockList) == 0);
tdListFree(pBufPool->bufBlockList);
}
pthread_cond_destroy(&pBufPool->poolNotEmpty);
free(pBufPool);
}
}
int tsdbOpenBufPool(STsdbRepo *pRepo) {
STsdbCfg * pCfg = &(pRepo->config);
STsdbBufPool *pPool = pRepo->pPool;
ASSERT(pPool != NULL);
pPool->bufBlockSize = pCfg->cacheBlockSize * 1024 * 1024; // MB
pPool->tBufBlocks = pCfg->totalBlocks;
pPool->nBufBlocks = 0;
pPool->index = 0;
for (int i = 0; i < pCfg->totalBlocks; i++) {
STsdbBufBlock *pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize);
if (pBufBlock == NULL) goto _err;
if (tdListAppend(pPool->bufBlockList, (void *)(&pBufBlock)) < 0) {
tsdbFreeBufBlock(pBufBlock);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
pPool->nBufBlocks++;
}
tsdbTrace("vgId:%d buffer pool is opened! bufBlockSize:%d tBufBlocks:%d nBufBlocks:%d", REPO_ID(pRepo),
pPool->bufBlockSize, pPool->tBufBlocks, pPool->nBufBlocks);
return 0;
_err:
tsdbCloseBufPool(pRepo);
return -1;
}
void tsdbCloseBufPool(STsdbRepo *pRepo) {
if (pRepo == NULL) return;
STsdbBufPool * pBufPool = pRepo->pPool;
STsdbBufBlock *pBufBlock = NULL;
if (pBufPool) {
SListNode *pNode = NULL;
while ((pNode = tdListPopHead(pBufPool->bufBlockList)) != NULL) {
tdListNodeGetData(pBufPool->bufBlockList, pNode, (void *)(&pBufBlock));
tsdbFreeBufBlock(pBufBlock);
free(pNode);
}
}
tsdbTrace("vgId:%d buffer pool is closed", REPO_ID(pRepo));
}
SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) {
ASSERT(pRepo != NULL && pRepo->pPool != NULL);
ASSERT(IS_REPO_LOCKED(pRepo));
STsdbBufPool *pBufPool = pRepo->pPool;
while (POOL_IS_EMPTY(pBufPool)) {
pthread_cond_wait(&(pBufPool->poolNotEmpty), &(pRepo->mutex));
}
SListNode * pNode = tdListPopHead(pBufPool->bufBlockList);
STsdbBufBlock *pBufBlock = NULL;
tdListNodeGetData(pBufPool->bufBlockList, pNode, (void *)(&pBufBlock));
pBufBlock->blockId = pBufPool->index++;
pBufBlock->offset = 0;
pBufBlock->remain = pBufPool->bufBlockSize;
tsdbTrace("vgId:%d buffer block is allocated, blockId:%" PRId64, REPO_ID(pRepo), pBufBlock->blockId);
return pNode;
}
// ---------------- LOCAL FUNCTIONS ----------------
static STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) {
STsdbBufBlock *pBufBlock = (STsdbBufBlock *)malloc(sizeof(*pBufBlock) + bufBlockSize);
if (pBufBlock == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
pBufBlock->blockId = 0;
pBufBlock->offset = 0;
pBufBlock->remain = bufBlockSize;
return pBufBlock;
_err:
tsdbFreeBufBlock(pBufBlock);
return NULL;
}
static void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); }
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include "tsdb.h"
#include "tsdbMain.h"
static int tsdbAllocBlockFromPool(STsdbCache *pCache);
static void tsdbFreeBlockList(SList *list);
static void tsdbFreeCacheMem(SCacheMem *mem);
static int tsdbAddCacheBlockToPool(STsdbCache *pCache);
STsdbCache *tsdbInitCache(int cacheBlockSize, int totalBlocks, TsdbRepoT *pRepo) {
STsdbCache *pCache = (STsdbCache *)calloc(1, sizeof(STsdbCache));
if (pCache == NULL) return NULL;
if (cacheBlockSize < 0) cacheBlockSize = TSDB_DEFAULT_CACHE_BLOCK_SIZE;
cacheBlockSize *= (1024 * 1024);
if (totalBlocks <= 1) totalBlocks = TSDB_DEFAULT_TOTAL_BLOCKS;
pCache->cacheBlockSize = cacheBlockSize;
pCache->totalCacheBlocks = totalBlocks;
pCache->pRepo = pRepo;
STsdbBufferPool *pPool = &(pCache->pool);
pPool->index = 0;
pPool->memPool = tdListNew(sizeof(STsdbCacheBlock *));
if (pPool->memPool == NULL) goto _err;
for (int i = 0; i < totalBlocks; i++) {
if (tsdbAddCacheBlockToPool(pCache) < 0) goto _err;
}
pCache->mem = NULL;
pCache->imem = NULL;
return pCache;
_err:
tsdbFreeCache(pCache);
return NULL;
}
void tsdbFreeCache(STsdbCache *pCache) {
tsdbFreeCacheMem(pCache->imem);
tsdbFreeCacheMem(pCache->mem);
tsdbFreeBlockList(pCache->pool.memPool);
free(pCache);
}
void *tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key) {
if (pCache == NULL) return NULL;
if (bytes > pCache->cacheBlockSize) return NULL;
if (pCache->curBlock == NULL || pCache->curBlock->remain < bytes) {
if (pCache->curBlock !=NULL && listNEles(pCache->mem->list) >= pCache->totalCacheBlocks/2) {
tsdbTriggerCommit(pCache->pRepo);
}
while (tsdbAllocBlockFromPool(pCache) < 0) {
// TODO: deal with the error
// printf("Failed to allocate from cache pool\n");
}
}
void *ptr = (void *)(pCache->curBlock->data + pCache->curBlock->offset);
pCache->curBlock->offset += bytes;
pCache->curBlock->remain -= bytes;
memset(ptr, 0, bytes);
if (key < pCache->mem->keyFirst) pCache->mem->keyFirst = key;
if (key > pCache->mem->keyLast) pCache->mem->keyLast = key;
pCache->mem->numOfRows++;
return ptr;
}
static void tsdbFreeBlockList(SList *list) {
SListNode * node = NULL;
STsdbCacheBlock *pBlock = NULL;
while ((node = tdListPopHead(list)) != NULL) {
tdListNodeGetData(list, node, (void *)(&pBlock));
free(pBlock);
listNodeFree(node);
}
tdListFree(list);
}
static void tsdbFreeCacheMem(SCacheMem *mem) {
if (mem == NULL) return;
SList *list = mem->list;
tsdbFreeBlockList(list);
free(mem);
}
static int tsdbAllocBlockFromPool(STsdbCache *pCache) {
STsdbBufferPool *pPool = &(pCache->pool);
tsdbLockRepo(pCache->pRepo);
if (listNEles(pPool->memPool) == 0) {
tsdbUnLockRepo(pCache->pRepo);
return -1;
}
SListNode *node = tdListPopHead(pPool->memPool);
STsdbCacheBlock *pBlock = NULL;
tdListNodeGetData(pPool->memPool, node, (void *)(&pBlock));
pBlock->blockId = pPool->index++;
pBlock->offset = 0;
pBlock->remain = pCache->cacheBlockSize;
if (pCache->mem == NULL) { // Create a new one
pCache->mem = (SCacheMem *)malloc(sizeof(SCacheMem));
if (pCache->mem == NULL) return -1;
pCache->mem->keyFirst = INT64_MAX;
pCache->mem->keyLast = 0;
pCache->mem->numOfRows = 0;
pCache->mem->list = tdListNew(sizeof(STsdbCacheBlock *));
}
tdListAppendNode(pCache->mem->list, node);
pCache->curBlock = pBlock;
tsdbUnLockRepo(pCache->pRepo);
return 0;
}
int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) {
STsdbCache *pCache = pRepo->tsdbCache;
int oldNumOfBlocks = pCache->totalCacheBlocks;
tsdbLockRepo((TsdbRepoT *)pRepo);
ASSERT(pCache->totalCacheBlocks != totalBlocks);
if (pCache->totalCacheBlocks < totalBlocks) {
ASSERT(pCache->totalCacheBlocks == pCache->pool.numOfCacheBlocks);
int blocksToAdd = pCache->totalCacheBlocks - totalBlocks;
pCache->totalCacheBlocks = totalBlocks;
for (int i = 0; i < blocksToAdd; i++) {
if (tsdbAddCacheBlockToPool(pCache) < 0) {
tsdbUnLockRepo((TsdbRepoT *)pRepo);
tsdbError("tsdbId:%d, failed to add cache block to cache pool", pRepo->config.tsdbId);
return -1;
}
}
} else {
pCache->totalCacheBlocks = totalBlocks;
tsdbAdjustCacheBlocks(pCache);
}
pRepo->config.totalBlocks = totalBlocks;
tsdbUnLockRepo((TsdbRepoT *)pRepo);
tsdbTrace("vgId:%d, tsdb total cache blocks changed from %d to %d", pRepo->config.tsdbId, oldNumOfBlocks, totalBlocks);
return 0;
}
static int tsdbAddCacheBlockToPool(STsdbCache *pCache) {
STsdbBufferPool *pPool = &pCache->pool;
STsdbCacheBlock *pBlock = malloc(sizeof(STsdbCacheBlock) + pCache->cacheBlockSize);
if (pBlock == NULL) return -1;
pBlock->offset = 0;
pBlock->remain = pCache->cacheBlockSize;
tdListAppend(pPool->memPool, (void *)(&pBlock));
pPool->numOfCacheBlocks++;
return 0;
}
static int tsdbRemoveCacheBlockFromPool(STsdbCache *pCache) {
STsdbBufferPool *pPool = &pCache->pool;
STsdbCacheBlock *pBlock = NULL;
ASSERT(pCache->totalCacheBlocks >= 0);
SListNode *node = tdListPopHead(pPool->memPool);
if (node == NULL) return -1;
tdListNodeGetData(pPool->memPool, node, &pBlock);
free(pBlock);
listNodeFree(node);
pPool->numOfCacheBlocks--;
return 0;
}
void tsdbAdjustCacheBlocks(STsdbCache *pCache) {
while (pCache->totalCacheBlocks < pCache->pool.numOfCacheBlocks) {
if (tsdbRemoveCacheBlockFromPool(pCache) < 0) break;
}
}
\ No newline at end of file
...@@ -29,178 +29,172 @@ ...@@ -29,178 +29,172 @@
#include "tutil.h" #include "tutil.h"
#include "ttime.h" #include "ttime.h"
const char *tsdbFileSuffix[] = { const char *tsdbFileSuffix[] = {".head", ".data", ".last", "", ".h", ".h"};
".head", // TSDB_FILE_TYPE_HEAD
".data", // TSDB_FILE_TYPE_DATA
".last" // TSDB_FILE_TYPE_LAST
};
static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type);
static void tsdbDestroyFile(SFile *pFile);
static int compFGroup(const void *arg1, const void *arg2); static int compFGroup(const void *arg1, const void *arg2);
static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid); static int keyFGroupCompFunc(const void *key, const void *fgroup);
STsdbFileH *tsdbInitFileH(char *dataDir, STsdbCfg *pCfg) { // ---------------- INTERNAL FUNCTIONS ----------------
STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(STsdbFileH)); STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) {
if (pFileH == NULL) { // TODO: deal with ERROR here STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(*pFileH));
return NULL; if (pFileH == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
} }
pFileH->maxFGroups = pCfg->keep / pCfg->daysPerFile + 3; int code = pthread_rwlock_init(&(pFileH->fhlock), NULL);
if (code != 0) {
pFileH->fGroup = (SFileGroup *)calloc(pFileH->maxFGroups, sizeof(SFileGroup)); tsdbError("vgId:%d failed to init file handle lock since %s", pCfg->tsdbId, strerror(code));
if (pFileH->fGroup == NULL) { terrno = TAOS_SYSTEM_ERROR(code);
free(pFileH); goto _err;
return NULL;
} }
DIR *dir = opendir(dataDir); pFileH->maxFGroups = TSDB_MAX_FILE(pCfg->keep, pCfg->daysPerFile);
if (dir == NULL) {
free(pFileH);
return NULL;
}
struct dirent *dp = NULL; pFileH->pFGroup = (SFileGroup *)calloc(pFileH->maxFGroups, sizeof(SFileGroup));
while ((dp = readdir(dir)) != NULL) { if (pFileH->pFGroup == NULL) {
if (strncmp(dp->d_name, ".", 1) == 0 || strncmp(dp->d_name, "..", 1) == 0) continue; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
int fid = 0; goto _err;
sscanf(dp->d_name, "f%d", &fid);
if (tsdbOpenFGroup(pFileH, dataDir, fid) < 0) {
break;
// TODO
}
} }
closedir(dir);
return pFileH; return pFileH;
_err:
tsdbFreeFileH(pFileH);
return NULL;
} }
void tsdbCloseFileH(STsdbFileH *pFileH) { void tsdbFreeFileH(STsdbFileH *pFileH) {
if (pFileH) { if (pFileH) {
tfree(pFileH->fGroup); pthread_rwlock_destroy(&pFileH->fhlock);
tfree(pFileH->pFGroup);
free(pFileH); free(pFileH);
} }
} }
static int tsdbInitFile(char *dataDir, int fid, const char *suffix, SFile *pFile) { int tsdbOpenFileH(STsdbRepo *pRepo) {
uint32_t version; ASSERT(pRepo != NULL && pRepo->tsdbFileH != NULL);
char buf[512] = "\0";
tsdbGetFileName(dataDir, fid, suffix, pFile->fname); char *tDataDir = NULL;
if (access(pFile->fname, F_OK|R_OK|W_OK) < 0) return -1; DIR * dir = NULL;
pFile->fd = -1; int fid = 0;
if (tsdbOpenFile(pFile, O_RDONLY) < 0) return -1; int vid = 0;
if (tread(pFile->fd, buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) return -1; SFileGroup fileGroup = {0};
if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) return -1; STsdbFileH *pFileH = pRepo->tsdbFileH;
void *pBuf = buf; tDataDir = tsdbGetDataDirName(pRepo->rootDir);
pBuf = taosDecodeFixedU32(pBuf, &version); if (tDataDir == NULL) {
pBuf = tsdbDecodeSFileInfo(pBuf, &(pFile->info)); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
tsdbCloseFile(pFile); dir = opendir(tDataDir);
if (dir == NULL) {
tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), tDataDir, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
struct dirent *dp = NULL;
while ((dp = readdir(dir)) != NULL) {
if (strncmp(dp->d_name, ".", 1) == 0 || strncmp(dp->d_name, "..", 2) == 0) continue;
sscanf(dp->d_name, "v%df%d", &vid, &fid);
if (tsdbSearchFGroup(pRepo->tsdbFileH, fid, TD_EQ) != NULL) continue;
memset((void *)(&fileGroup), 0, sizeof(SFileGroup));
fileGroup.fileId = fid;
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
if (tsdbInitFile(&fileGroup.files[type], pRepo, fid, type) < 0) {
tsdbError("vgId:%d failed to init file fid %d type %d", REPO_ID(pRepo), fid, type);
goto _err;
}
}
tsdbTrace("vgId:%d file group %d init", REPO_ID(pRepo), fid);
pFileH->pFGroup[pFileH->nFGroups++] = fileGroup;
qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), compFGroup);
}
tfree(tDataDir);
closedir(dir);
return 0; return 0;
}
static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid) { _err:
if (tsdbSearchFGroup(pFileH, fid) != NULL) return 0; for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&fileGroup.files[type]);
SFileGroup fGroup = {0}; tfree(tDataDir);
fGroup.fileId = fid; if (dir != NULL) closedir(dir);
tsdbCloseFileH(pRepo);
return -1;
}
void tsdbCloseFileH(STsdbRepo *pRepo) {
STsdbFileH *pFileH = pRepo->tsdbFileH;
for (int i = 0; i < pFileH->nFGroups; i++) {
SFileGroup *pFGroup = pFileH->pFGroup + i;
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
if (tsdbInitFile(dataDir, fid, tsdbFileSuffix[type], &fGroup.files[type]) < 0) return -1; tsdbDestroyFile(&pFGroup->files[type]);
}
} }
pFileH->fGroup[pFileH->numOfFGroups++] = fGroup;
qsort((void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroup);
return 0;
} }
/** SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid, int maxTables) {
* Create the file group if the file group not exists. STsdbFileH *pFileH = pRepo->tsdbFileH;
*
* @return A pointer to if (pFileH->nFGroups >= pFileH->maxFGroups) return NULL;
*/
SFileGroup *tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables) {
if (pFileH->numOfFGroups >= pFileH->maxFGroups) return NULL;
SFileGroup fGroup; SFileGroup fGroup;
SFileGroup *pFGroup = &fGroup; SFileGroup *pFGroup = &fGroup;
SFileGroup *pGroup = tsdbSearchFGroup(pFileH, fid); SFileGroup *pGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ);
if (pGroup == NULL) { // if not exists, create one if (pGroup == NULL) { // if not exists, create one
pFGroup->fileId = fid; pFGroup->fileId = fid;
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
if (tsdbCreateFile(dataDir, fid, tsdbFileSuffix[type], &(pFGroup->files[type])) < 0) if (tsdbCreateFile(&pFGroup->files[type], pRepo, fid, type) < 0)
goto _err; goto _err;
} }
pFileH->fGroup[pFileH->numOfFGroups++] = fGroup; pFileH->pFGroup[pFileH->nFGroups++] = fGroup;
qsort((void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroup); qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), compFGroup);
return tsdbSearchFGroup(pFileH, fid); return tsdbSearchFGroup(pFileH, fid, TD_EQ);
} }
return pGroup; return pGroup;
_err: _err:
// TODO: deal with the err here for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&pGroup->files[type]);
return NULL; return NULL;
} }
int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) { void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direction) { // TODO
SFileGroup *pGroup =
bsearch((void *)&fid, (void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroupKey);
if (pGroup == NULL) return -1;
// Remove from disk
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
remove(pGroup->files[type].fname);
}
// Adjust the memory
int filesBehind = pFileH->numOfFGroups - (((char *)pGroup - (char *)(pFileH->fGroup)) / sizeof(SFileGroup) + 1);
if (filesBehind > 0) {
memmove((void *)pGroup, (void *)((char *)pGroup + sizeof(SFileGroup)), sizeof(SFileGroup) * filesBehind);
}
pFileH->numOfFGroups--;
return 0;
}
void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direction) {
pIter->direction = direction; pIter->direction = direction;
pIter->base = pFileH->fGroup; pIter->base = pFileH->pFGroup;
pIter->numOfFGroups = pFileH->numOfFGroups; pIter->numOfFGroups = pFileH->nFGroups;
if (pFileH->numOfFGroups == 0){ if (pFileH->nFGroups == 0) {
pIter->pFileGroup = NULL; pIter->pFileGroup = NULL;
} else { } else {
if (direction == TSDB_FGROUP_ITER_FORWARD) { if (direction == TSDB_FGROUP_ITER_FORWARD) {
pIter->pFileGroup = pFileH->fGroup; pIter->pFileGroup = pFileH->pFGroup;
} else { } else {
pIter->pFileGroup = pFileH->fGroup + pFileH->numOfFGroups - 1; pIter->pFileGroup = pFileH->pFGroup + pFileH->nFGroups - 1;
} }
} }
} }
void tsdbFitRetention(STsdbRepo *pRepo) { void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) { // TODO
STsdbFileH *pFileH = pRepo->tsdbFileH;
SFileGroup *pGroup = pFileH->fGroup;
int mfid =
tsdbGetKeyFileId(taosGetTimestamp(pRepo->config.precision), pRepo->config.daysPerFile, pRepo->config.precision) - pFileH->maxFGroups + 3;
while (pFileH->numOfFGroups > 0 && pGroup[0].fileId < mfid) {
tsdbRemoveFileGroup(pFileH, pGroup[0].fileId);
}
}
void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) {
if (pIter->numOfFGroups == 0) { if (pIter->numOfFGroups == 0) {
assert(pIter->pFileGroup == NULL); assert(pIter->pFileGroup == NULL);
return; return;
} }
int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE; int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE;
void *ptr = taosbsearch(&fid, pIter->base, pIter->numOfFGroups, sizeof(SFileGroup), compFGroupKey, flags); void *ptr = taosbsearch(&fid, pIter->base, pIter->numOfFGroups, sizeof(SFileGroup), keyFGroupCompFunc, flags);
if (ptr == NULL) { if (ptr == NULL) {
pIter->pFileGroup = NULL; pIter->pFileGroup = NULL;
} else { } else {
...@@ -208,7 +202,7 @@ void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) { ...@@ -208,7 +202,7 @@ void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) {
} }
} }
SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) { SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) {//TODO
SFileGroup *ret = pIter->pFileGroup; SFileGroup *ret = pIter->pFileGroup;
if (ret == NULL) return NULL; if (ret == NULL) return NULL;
...@@ -228,147 +222,202 @@ SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) { ...@@ -228,147 +222,202 @@ SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) {
return ret; return ret;
} }
// int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SDataCols *pCols, SCompData *pCompData) { int tsdbOpenFile(SFile *pFile, int oflag) {
// SCompBlock *pBlock = pStartBlock; ASSERT(!TSDB_IS_FILE_OPENED(pFile));
// for (int i = 0; i < numOfBlocks; i++) {
// if (tsdbLoadCompCols(pFile, pBlock, (void *)pCompData) < 0) return -1; pFile->fd = open(pFile->fname, oflag, 0755);
// pCols->numOfRows += (pCompData->cols[0].len / 8); if (pFile->fd < 0) {
// for (int iCol = 0; iCol < pBlock->numOfCols; iCol++) { tsdbError("failed to open file %s since %s", pFile->fname, strerror(errno));
// SCompCol *pCompCol = &(pCompData->cols[iCol]); terrno = TAOS_SYSTEM_ERROR(errno);
// // pCols->numOfRows += pBlock->numOfRows; return -1;
// int k = 0; }
// for (; k < pCols->numOfCols; k++) {
// if (pCompCol->colId == pCols->cols[k].colId) break;
// }
// if (tsdbLoadColData(pFile, pCompCol, pBlock->offset,
// (void *)((char *)(pCols->cols[k].pData) + pCols->cols[k].len)) < 0)
// return -1;
// }
// pStartBlock++;
// }
// return 0;
// }
int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInfo, int idx, int isLast, SDataCols *pCols) {
SCompBlock *pSuperBlock = TSDB_COMPBLOCK_AT(pCompInfo, idx);
SCompBlock *pStartBlock = NULL;
SCompBlock *pBlock = NULL;
int numOfBlocks = pSuperBlock->numOfSubBlocks;
if (numOfBlocks == 1)
pStartBlock = pSuperBlock;
else
pStartBlock = TSDB_COMPBLOCK_AT(pCompInfo, pSuperBlock->offset);
int maxNumOfCols = 0;
pBlock = pStartBlock;
for (int i = 0; i < numOfBlocks; i++) {
if (pBlock->numOfCols > maxNumOfCols) maxNumOfCols = pBlock->numOfCols;
pBlock++;
}
SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * maxNumOfCols);
if (pCompData == NULL) return -1;
// Load data from the block
// if (tsdbLoadDataBlock(pOutFile, pStartBlock, numOfBlocks, pCols, pCompData));
// Write data block to the file
{
// TODO
}
if (pCompData) free(pCompData);
return 0; return 0;
} }
int compFGroupKey(const void *key, const void *fgroup) { void tsdbCloseFile(SFile *pFile) {
int fid = *(int *)key; if (TSDB_IS_FILE_OPENED(pFile)) {
SFileGroup *pFGroup = (SFileGroup *)fgroup; close(pFile->fd);
if (fid == pFGroup->fileId) { pFile->fd = -1;
return 0;
} else {
return fid > pFGroup->fileId? 1:-1;
} }
} }
static int compFGroup(const void *arg1, const void *arg2) { int tsdbCreateFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
return ((SFileGroup *)arg1)->fileId - ((SFileGroup *)arg2)->fileId; memset((void *)pFile, 0, sizeof(SFile));
} pFile->fd = -1;
int tsdbGetFileName(char *dataDir, int fileId, const char *suffix, char *fname) { tsdbGetDataFileName(pRepo, fid, type, pFile->fname);
if (dataDir == NULL || fname == NULL) return -1;
sprintf(fname, "%s/f%d%s", dataDir, fileId, suffix); if (access(pFile->fname, F_OK) == 0) {
tsdbError("vgId:%d file %s already exists", REPO_ID(pRepo), pFile->fname);
terrno = TSDB_CODE_TDB_FILE_ALREADY_EXISTS;
goto _err;
}
return 0; if (tsdbOpenFile(pFile, O_RDWR | O_CREAT) < 0) {
} goto _err;
}
int tsdbOpenFile(SFile *pFile, int oflag) { // TODO: change the function pFile->info.size = TSDB_FILE_HEAD_SIZE;
if (TSDB_IS_FILE_OPENED(pFile)) return -1;
pFile->fd = open(pFile->fname, oflag, 0755); if (tsdbUpdateFileHeader(pFile, 0) < 0) {
if (pFile->fd < 0) return -1; tsdbCloseFile(pFile);
return -1;
}
tsdbCloseFile(pFile);
return 0; return 0;
_err:
return -1;
} }
int tsdbCloseFile(SFile *pFile) { SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid, int flags) {
int ret = close(pFile->fd); void *ptr =
pFile->fd = -1; taosbsearch((void *)(&fid), (void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc, flags);
return ret; if (ptr == NULL) return NULL;
return (SFileGroup *)ptr;
} }
SFileGroup * tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid) { void tsdbFitRetention(STsdbRepo *pRepo) {
SFileGroup *pGroup = tsdbSearchFGroup(pFileH, fid); STsdbCfg *pCfg = &(pRepo->config);
if (pGroup == NULL) return NULL; STsdbFileH *pFileH = pRepo->tsdbFileH;
SFileGroup *pGroup = pFileH->pFGroup;
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { int mfid = TSDB_KEY_FILEID(taosGetTimestamp(pCfg->precision), pCfg->daysPerFile, pCfg->precision) -
tsdbOpenFile(&(pGroup->files[type]), O_RDWR); TSDB_MAX_FILE(pCfg->keep, pCfg->daysPerFile);
pthread_rwlock_wrlock(&(pFileH->fhlock));
while (pFileH->nFGroups > 0 && pGroup[0].fileId < mfid) {
tsdbRemoveFileGroup(pRepo, pGroup);
} }
return pGroup;
pthread_rwlock_unlock(&(pFileH->fhlock));
} }
int tsdbCreateFile(char *dataDir, int fileId, const char *suffix, SFile *pFile) { int tsdbUpdateFileHeader(SFile *pFile, uint32_t version) {
memset((void *)pFile, 0, sizeof(SFile)); char buf[TSDB_FILE_HEAD_SIZE] = "\0";
pFile->fd = -1;
tsdbGetFileName(dataDir, fileId, suffix, pFile->fname); void *pBuf = (void *)buf;
taosEncodeFixedU32((void *)(&pBuf), version);
tsdbEncodeSFileInfo((void *)(&pBuf), &(pFile->info));
if (access(pFile->fname, F_OK) == 0) { taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE);
// File already exists
if (lseek(pFile->fd, 0, SEEK_SET) < 0) {
tsdbError("failed to lseek file %s since %s", pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
if (twrite(pFile->fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
if (tsdbOpenFile(pFile, O_RDWR | O_CREAT) < 0) { tsdbError("failed to write %d bytes to file %s since %s", TSDB_FILE_HEAD_SIZE, pFile->fname, strerror(errno));
// TODO: deal with the ERROR here terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
pFile->info.size = TSDB_FILE_HEAD_SIZE; return 0;
}
if (tsdbUpdateFileHeader(pFile, 0) < 0) { int tsdbEncodeSFileInfo(void **buf, const STsdbFileInfo *pInfo) {
tsdbCloseFile(pFile); int tlen = 0;
return -1; tlen += taosEncodeFixedU32(buf, pInfo->offset);
tlen += taosEncodeFixedU32(buf, pInfo->len);
tlen += taosEncodeFixedU64(buf, pInfo->size);
tlen += taosEncodeFixedU64(buf, pInfo->tombSize);
tlen += taosEncodeFixedU32(buf, pInfo->totalBlocks);
tlen += taosEncodeFixedU32(buf, pInfo->totalSubBlocks);
return tlen;
}
void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo) {
buf = taosDecodeFixedU32(buf, &(pInfo->offset));
buf = taosDecodeFixedU32(buf, &(pInfo->len));
buf = taosDecodeFixedU64(buf, &(pInfo->size));
buf = taosDecodeFixedU64(buf, &(pInfo->tombSize));
buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks));
buf = taosDecodeFixedU32(buf, &(pInfo->totalSubBlocks));
return buf;
}
void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
ASSERT(pFGroup != NULL);
STsdbFileH *pFileH = pRepo->tsdbFileH;
SFileGroup fileGroup = *pFGroup;
int nFilesLeft = pFileH->nFGroups - (POINTER_DISTANCE(pFGroup, pFileH->pFGroup) / sizeof(SFileGroup) + 1);
if (nFilesLeft > 0) {
memmove((void *)pFGroup, POINTER_SHIFT(pFGroup, sizeof(SFileGroup)), sizeof(SFileGroup) * nFilesLeft);
}
pFileH->nFGroups--;
ASSERT(pFileH->nFGroups >= 0);
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
remove(fileGroup.files[type].fname);
tsdbDestroyFile(&fileGroup.files[type]);
}
}
// ---------------- LOCAL FUNCTIONS ----------------
static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
uint32_t version;
char buf[512] = "\0";
tsdbGetDataFileName(pRepo, fid, type, pFile->fname);
pFile->fd = -1;
if (tsdbOpenFile(pFile, O_RDONLY) < 0) goto _err;
if (tread(pFile->fd, buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pRepo), TSDB_FILE_HEAD_SIZE,
pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) {
tsdbError("vgId:%d file %s head part is corrupted", REPO_ID(pRepo), pFile->fname);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
goto _err;
} }
void *pBuf = buf;
pBuf = taosDecodeFixedU32(pBuf, &version);
pBuf = tsdbDecodeSFileInfo(pBuf, &(pFile->info));
tsdbCloseFile(pFile); tsdbCloseFile(pFile);
return 0; return 0;
_err:
tsdbDestroyFile(pFile);
return -1;
} }
void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, static void tsdbDestroyFile(SFile *pFile) { tsdbCloseFile(pFile); }
TSKEY *maxKey) {
*minKey = fileId * daysPerFile * tsMsPerDay[precision]; static int compFGroup(const void *arg1, const void *arg2) {
*maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1; int val1 = ((SFileGroup *)arg1)->fileId;
int val2 = ((SFileGroup *)arg2)->fileId;
if (val1 < val2) {
return -1;
} else if (val1 > val2) {
return 1;
} else {
return 0;
}
} }
SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid) { static int keyFGroupCompFunc(const void *key, const void *fgroup) {
if (pFileH->numOfFGroups == 0 || fid < pFileH->fGroup[0].fileId || fid > pFileH->fGroup[pFileH->numOfFGroups - 1].fileId) int fid = *(int *)key;
return NULL; SFileGroup *pFGroup = (SFileGroup *)fgroup;
void *ptr = bsearch((void *)&fid, (void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroupKey); if (fid == pFGroup->fileId) {
if (ptr == NULL) return NULL; return 0;
return (SFileGroup *)ptr; } else {
return fid > pFGroup->fileId ? 1 : -1;
}
} }
此差异已折叠。
此差异已折叠。
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "taosdef.h"
#include "hash.h"
#include "tsdbMain.h"
#define TSDB_META_FILE_VERSION_MAJOR 1
#define TSDB_META_FILE_VERSION_MINOR 0
#define TSDB_META_FILE_HEADER_SIZE 512
typedef struct {
int32_t offset;
int32_t size;
uint64_t uid;
} SRecordInfo;
// static int32_t tsdbGetMetaFileName(char *rootDir, char *fname);
// static int32_t tsdbCheckMetaHeader(int fd);
static int32_t tsdbWriteMetaHeader(int fd);
static int tsdbCreateMetaFile(char *fname);
static int tsdbRestoreFromMetaFile(char *fname, SMetaFile *mfh);
SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables, iterFunc iFunc, afterFunc aFunc, void *appH) {
char fname[128] = "\0";
if (tsdbGetMetaFileName(rootDir, fname) < 0) return NULL;
SMetaFile *mfh = (SMetaFile *)calloc(1, sizeof(SMetaFile));
if (mfh == NULL) return NULL;
mfh->iFunc = iFunc;
mfh->aFunc = aFunc;
mfh->appH = appH;
mfh->nDel = 0;
mfh->tombSize = 0;
mfh->size = 0;
// OPEN MAP
mfh->map =
taosHashInit(maxTables * TSDB_META_HASH_FRACTION, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
if (mfh->map == NULL) {
free(mfh);
return NULL;
}
// OPEN FILE
if (access(fname, F_OK) < 0) { // file not exists
mfh->fd = tsdbCreateMetaFile(fname);
if (mfh->fd < 0) {
taosHashCleanup(mfh->map);
free(mfh);
return NULL;
}
mfh->size += TSDB_META_FILE_HEADER_SIZE;
} else { // file exists, recover from file
if (tsdbRestoreFromMetaFile(fname, mfh) < 0) {
taosHashCleanup(mfh->map);
free(mfh);
return NULL;
}
}
return mfh;
}
int32_t tsdbInsertMetaRecord(SMetaFile *mfh, uint64_t uid, void *cont, int32_t contLen) {
if (taosHashGet(mfh->map, (char *)(&uid), sizeof(uid)) != NULL) {
return -1;
}
SRecordInfo info;
info.offset = mfh->size;
info.size = contLen;
info.uid = uid;
mfh->size += (contLen + sizeof(SRecordInfo));
if (taosHashPut(mfh->map, (char *)(&uid), sizeof(uid), (void *)(&info), sizeof(SRecordInfo)) < 0) {
return -1;
}
// TODO: make below a function to implement
if (lseek(mfh->fd, info.offset, SEEK_SET) < 0) {
return -1;
}
if (write(mfh->fd, (void *)(&info), sizeof(SRecordInfo)) < 0) {
return -1;
}
if (write(mfh->fd, cont, contLen) < 0) {
return -1;
}
// fsync(mfh->fd);
mfh->tombSize++;
return 0;
}
int32_t tsdbDeleteMetaRecord(SMetaFile *mfh, uint64_t uid) {
char *ptr = taosHashGet(mfh->map, (char *)(&uid), sizeof(uid));
if (ptr == NULL) return -1;
SRecordInfo info = *(SRecordInfo *)ptr;
// Remove record from hash table
taosHashRemove(mfh->map, (char *)(&uid), sizeof(uid));
// Remove record from file
info.offset = -info.offset;
if (lseek(mfh->fd, -info.offset, SEEK_CUR) < 0) {
return -1;
}
if (write(mfh->fd, (void *)(&info), sizeof(SRecordInfo)) < 0) {
return -1;
}
// fsync(mfh->fd);
mfh->nDel++;
return 0;
}
int32_t tsdbUpdateMetaRecord(SMetaFile *mfh, uint64_t uid, void *cont, int32_t contLen) {
char *ptr = taosHashGet(mfh->map, (char *)(&uid), sizeof(uid));
if (ptr == NULL) return -1;
SRecordInfo info = *(SRecordInfo *)ptr;
// Update the hash table
if (taosHashPut(mfh->map, (char *)(&uid), sizeof(uid), (void *)(&info), sizeof(SRecordInfo)) < 0) {
return -1;
}
// Update record in file
if (info.size >= contLen) { // Just update it in place
info.size = contLen;
} else { // Just append to the end of file
info.offset = mfh->size;
info.size = contLen;
mfh->size += contLen;
}
if (lseek(mfh->fd, -info.offset, SEEK_CUR) < 0) {
return -1;
}
if (write(mfh->fd, (void *)(&info), sizeof(SRecordInfo)) < 0) {
return -1;
}
// fsync(mfh->fd);
return 0;
}
void tsdbCloseMetaFile(SMetaFile *mfh) {
if (mfh == NULL) return;
close(mfh->fd);
taosHashCleanup(mfh->map);
tfree(mfh);
}
int32_t tsdbGetMetaFileName(char *rootDir, char *fname) {
if (rootDir == NULL) return -1;
sprintf(fname, "%s/%s", rootDir, TSDB_META_FILE_NAME);
return 0;
}
// static int32_t tsdbCheckMetaHeader(int fd) {
// // TODO: write the meta file header check function
// return 0;
// }
static int32_t tsdbWriteMetaHeader(int fd) {
// TODO: write the meta file header to file
char head[TSDB_META_FILE_HEADER_SIZE] = "\0";
sprintf(head, "version: %d.%d", TSDB_META_FILE_VERSION_MAJOR, TSDB_META_FILE_VERSION_MINOR);
write(fd, (void *)head, TSDB_META_FILE_HEADER_SIZE);
return 0;
}
// static int32_t tsdbReadMetaHeader(int fd) {
// lseek(fd, TSDB_META_FILE_HEADER_SIZE, SEEK_SET);
// return 0;
// }
static int tsdbCreateMetaFile(char *fname) {
int fd = open(fname, O_RDWR | O_CREAT, 0755);
if (fd < 0) return -1;
if (tsdbWriteMetaHeader(fd) < 0) {
close(fd);
return -1;
}
return fd;
}
static int tsdbCheckMetaFileIntegrety(int fd) {
// TODO
return 0;
}
static int tsdbRestoreFromMetaFile(char *fname, SMetaFile *mfh) {
int fd = open(fname, O_RDWR);
if (fd < 0) return -1;
if (tsdbCheckMetaFileIntegrety(fd) < 0) {
// TODO: decide if to auto-recover the file
close(fd);
return -1;
}
if (lseek(fd, TSDB_META_FILE_HEADER_SIZE, SEEK_SET) < 0) {
// TODO: deal with the error
close(fd);
return -1;
}
mfh->size += TSDB_META_FILE_HEADER_SIZE;
mfh->fd = fd;
void *buf = NULL;
// int buf_size = 0;
SRecordInfo info;
while (1) {
if (read(mfh->fd, (void *)(&info), sizeof(SRecordInfo)) == 0) break;
if (info.offset < 0) {
mfh->size += (info.size + sizeof(SRecordInfo));
mfh->tombSize += (info.size + sizeof(SRecordInfo));
lseek(mfh->fd, info.size, SEEK_CUR);
mfh->size = mfh->size + sizeof(SRecordInfo) + info.size;
mfh->tombSize = mfh->tombSize + sizeof(SRecordInfo) + info.size;
} else {
if (taosHashPut(mfh->map, (char *)(&info.uid), sizeof(info.uid), (void *)(&info), sizeof(SRecordInfo)) < 0) {
if (buf) free(buf);
return -1;
}
buf = realloc(buf, info.size);
if (buf == NULL) return -1;
if (read(mfh->fd, buf, info.size) < 0) {
if (buf) free(buf);
return -1;
}
(*mfh->iFunc)(mfh->appH, buf, info.size);
mfh->size = mfh->size + sizeof(SRecordInfo) + info.size;
}
}
(*mfh->aFunc)(mfh->appH);
if (buf) free(buf);
return 0;
}
\ No newline at end of file
此差异已折叠。
...@@ -72,9 +72,11 @@ typedef struct STableCheckInfo { ...@@ -72,9 +72,11 @@ typedef struct STableCheckInfo {
int32_t compSize; int32_t compSize;
int32_t numOfBlocks; // number of qualified data blocks not the original blocks int32_t numOfBlocks; // number of qualified data blocks not the original blocks
SDataCols* pDataCols; SDataCols* pDataCols;
int32_t chosen; // indicate which iterator should move forward int32_t chosen; // indicate which iterator should move forward
bool initBuf; // whether to initialize the in-memory skip list iterator or not bool initBuf; // whether to initialize the in-memory skip list iterator or not
SMemTable* mem; // in-mem buffer, hold the ref count
SMemTable* imem; // imem buffer, hold the ref count to avoid release
SSkipListIterator* iter; // mem buffer skip list iterator SSkipListIterator* iter; // mem buffer skip list iterator
SSkipListIterator* iiter; // imem buffer skip list iterator SSkipListIterator* iiter; // imem buffer skip list iterator
} STableCheckInfo; } STableCheckInfo;
...@@ -135,7 +137,7 @@ static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) { ...@@ -135,7 +137,7 @@ static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) {
pCompBlockLoadInfo->fileId = -1; pCompBlockLoadInfo->fileId = -1;
} }
TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, void* qinfo) { TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, void* qinfo) {
// todo 1. filter not exist table // todo 1. filter not exist table
// todo 2. add the reference count for each table that is involved in query // todo 2. add the reference count for each table that is involved in query
...@@ -203,7 +205,7 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable ...@@ -203,7 +205,7 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable
return (TsdbQueryHandleT) pQueryHandle; return (TsdbQueryHandleT) pQueryHandle;
} }
TsdbQueryHandleT tsdbQueryLastRow(TsdbRepoT *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo) { TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo) {
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo); STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo);
pQueryHandle->type = TSDB_QUERY_TYPE_LAST; pQueryHandle->type = TSDB_QUERY_TYPE_LAST;
...@@ -229,7 +231,7 @@ SArray* tsdbGetQueriedTableIdList(TsdbQueryHandleT *pHandle) { ...@@ -229,7 +231,7 @@ SArray* tsdbGetQueriedTableIdList(TsdbQueryHandleT *pHandle) {
return res; return res;
} }
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, void* qinfo) { TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TSDB_REPO_T *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, void* qinfo) {
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo); STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo);
pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL; pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL;
...@@ -248,21 +250,23 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh ...@@ -248,21 +250,23 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
pCheckInfo->initBuf = true; pCheckInfo->initBuf = true;
int32_t order = pHandle->order; int32_t order = pHandle->order;
tsdbTakeMemSnapshot(pHandle->pTsdb, &pCheckInfo->mem, &pCheckInfo->imem);
// no data in buffer, abort // no data in buffer, abort
if (pTable->mem == NULL && pTable->imem == NULL) { if (pCheckInfo->mem == NULL && pCheckInfo->imem == NULL) {
return false; return false;
} }
assert(pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL); assert(pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL);
if (pTable->mem) { if (pCheckInfo->mem && pCheckInfo->mem->tData[pCheckInfo->tableId.tid] != NULL) {
pCheckInfo->iter = tSkipListCreateIterFromVal(pTable->mem->pData, (const char*) &pCheckInfo->lastKey, pCheckInfo->iter = tSkipListCreateIterFromVal(pCheckInfo->mem->tData[pCheckInfo->tableId.tid]->pData,
TSDB_DATA_TYPE_TIMESTAMP, order); (const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
} }
if (pTable->imem) { if (pCheckInfo->imem && pCheckInfo->imem->tData[pCheckInfo->tableId.tid] != NULL) {
pCheckInfo->iiter = tSkipListCreateIterFromVal(pTable->imem->pData, (const char*) &pCheckInfo->lastKey, pCheckInfo->iiter = tSkipListCreateIterFromVal(pCheckInfo->imem->tData[pCheckInfo->tableId.tid]->pData,
TSDB_DATA_TYPE_TIMESTAMP, order); (const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
} }
// both iterators are NULL, no data in buffer right now // both iterators are NULL, no data in buffer right now
...@@ -586,10 +590,12 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo ...@@ -586,10 +590,12 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
if (pCheckInfo->pDataCols == NULL) { if (pCheckInfo->pDataCols == NULL) {
STsdbMeta* pMeta = tsdbGetMeta(pRepo); STsdbMeta* pMeta = tsdbGetMeta(pRepo);
pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock); // TODO
pCheckInfo->pDataCols =
tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
} }
tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj)); tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(pCheckInfo->pTableObj));
if (tsdbLoadBlockData(&(pQueryHandle->rhelper), pBlock, NULL) == 0) { if (tsdbLoadBlockData(&(pQueryHandle->rhelper), pBlock, NULL) == 0) {
SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo; SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo;
...@@ -875,7 +881,7 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, ...@@ -875,7 +881,7 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
char* pData = NULL; char* pData = NULL;
// the schema version info is embeded in SDataRow // the schema version info is embeded in SDataRow
STSchema* pSchema = tsdbGetTableSchemaByVersion(pMeta, pTable, dataRowVersion(row)); STSchema* pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row));
int32_t numOfRowCols = schemaNCols(pSchema); int32_t numOfRowCols = schemaNCols(pSchema);
int32_t i = 0, j = 0; int32_t i = 0, j = 0;
...@@ -1861,8 +1867,8 @@ static int32_t getAllTableIdList(STable* pSuperTable, SArray* list) { ...@@ -1861,8 +1867,8 @@ static int32_t getAllTableIdList(STable* pSuperTable, SArray* list) {
while (tSkipListIterNext(iter)) { while (tSkipListIterNext(iter)) {
SSkipListNode* pNode = tSkipListIterGet(iter); SSkipListNode* pNode = tSkipListIterGet(iter);
STableIndexElem* elem = (STableIndexElem*)(SL_GET_NODE_DATA((SSkipListNode*) pNode)); STable** pTable = (STable**) SL_GET_NODE_DATA((SSkipListNode*) pNode);
taosArrayPush(list, &elem->pTable->tableId); taosArrayPush(list, &(*pTable)->tableId);
} }
tSkipListDestroyIter(iter); tSkipListDestroyIter(iter);
...@@ -1881,8 +1887,8 @@ static void convertQueryResult(SArray* pRes, SArray* pTableList) { ...@@ -1881,8 +1887,8 @@ static void convertQueryResult(SArray* pRes, SArray* pTableList) {
size_t size = taosArrayGetSize(pTableList); size_t size = taosArrayGetSize(pTableList);
for (int32_t i = 0; i < size; ++i) { // todo speedup by using reserve space. for (int32_t i = 0; i < size; ++i) { // todo speedup by using reserve space.
STableIndexElem* elem = taosArrayGet(pTableList, i); STable* pTable = taosArrayGetP(pTableList, i);
taosArrayPush(pRes, &elem->pTable->tableId); taosArrayPush(pRes, &pTable->tableId);
} }
} }
...@@ -2041,7 +2047,7 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableIdList, size_t numOfTab ...@@ -2041,7 +2047,7 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableIdList, size_t numOfTab
} }
SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols, SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols,
TsdbRepoT* tsdb) { TSDB_REPO_T* tsdb) {
assert(pTableList != NULL); assert(pTableList != NULL);
SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES); SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
...@@ -2078,16 +2084,16 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC ...@@ -2078,16 +2084,16 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
bool indexedNodeFilterFp(const void* pNode, void* param) { bool indexedNodeFilterFp(const void* pNode, void* param) {
tQueryInfo* pInfo = (tQueryInfo*) param; tQueryInfo* pInfo = (tQueryInfo*) param;
STableIndexElem* elem = (STableIndexElem*)(SL_GET_NODE_DATA((SSkipListNode*)pNode)); STable* pTable = *(STable**)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
char* val = NULL; char* val = NULL;
int8_t type = pInfo->sch.type; int8_t type = pInfo->sch.type;
if (pInfo->colIndex == TSDB_TBNAME_COLUMN_INDEX) { if (pInfo->colIndex == TSDB_TBNAME_COLUMN_INDEX) {
val = (char*) elem->pTable->name; val = (char*) pTable->name;
type = TSDB_DATA_TYPE_BINARY; type = TSDB_DATA_TYPE_BINARY;
} else { } else {
val = tdGetKVRowValOfCol(elem->pTable->tagVal, pInfo->sch.colId); val = tdGetKVRowValOfCol(pTable->tagVal, pInfo->sch.colId);
} }
//todo :the val is possible to be null, so check it out carefully //todo :the val is possible to be null, so check it out carefully
...@@ -2143,7 +2149,7 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) ...@@ -2143,7 +2149,7 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr)
.pExtInfo = pSTable->tagSchema, .pExtInfo = pSTable->tagSchema,
}; };
SArray* pTableList = taosArrayInit(8, sizeof(STableIndexElem)); SArray* pTableList = taosArrayInit(8, POINTER_BYTES);
tExprTreeTraverse(pExpr, pSTable->pIndex, pTableList, &supp); tExprTreeTraverse(pExpr, pSTable->pIndex, pTableList, &supp);
tExprTreeDestroy(&pExpr, destroyHelper); tExprTreeDestroy(&pExpr, destroyHelper);
...@@ -2153,7 +2159,7 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) ...@@ -2153,7 +2159,7 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tsdbQuerySTableByTagCond(TsdbRepoT* tsdb, uint64_t uid, const char* pTagCond, size_t len, int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pTagCond, size_t len,
int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo, int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
SColIndex* pColIndex, int32_t numOfCols) { SColIndex* pColIndex, int32_t numOfCols) {
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid); STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
...@@ -2170,7 +2176,7 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT* tsdb, uint64_t uid, const char* pTag ...@@ -2170,7 +2176,7 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT* tsdb, uint64_t uid, const char* pTag
} }
SArray* res = taosArrayInit(8, sizeof(STableId)); SArray* res = taosArrayInit(8, sizeof(STableId));
STSchema* pTagSchema = tsdbGetTableTagSchema(tsdbGetMeta(tsdb), pTable); STSchema* pTagSchema = tsdbGetTableTagSchema(pTable);
// no tags and tbname condition, all child tables of this stable are involved // no tags and tbname condition, all child tables of this stable are involved
if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) { if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) {
...@@ -2230,7 +2236,7 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT* tsdb, uint64_t uid, const char* pTag ...@@ -2230,7 +2236,7 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT* tsdb, uint64_t uid, const char* pTag
return ret; return ret;
} }
int32_t tsdbGetOneTableGroup(TsdbRepoT* tsdb, uint64_t uid, STableGroupInfo* pGroupInfo) { int32_t tsdbGetOneTableGroup(TSDB_REPO_T* tsdb, uint64_t uid, STableGroupInfo* pGroupInfo) {
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid); STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
if (pTable == NULL) { if (pTable == NULL) {
return TSDB_CODE_TDB_INVALID_TABLE_ID; return TSDB_CODE_TDB_INVALID_TABLE_ID;
...@@ -2257,6 +2263,10 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { ...@@ -2257,6 +2263,10 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo); size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
tsdbUnRefMemTable(pQueryHandle->pTsdb, pTableCheckInfo->mem);
tsdbUnRefMemTable(pQueryHandle->pTsdb, pTableCheckInfo->imem);
tSkipListDestroyIter(pTableCheckInfo->iter); tSkipListDestroyIter(pTableCheckInfo->iter);
if (pTableCheckInfo->pDataCols != NULL) { if (pTableCheckInfo->pDataCols != NULL) {
......
...@@ -13,7 +13,7 @@ static double getCurTime() { ...@@ -13,7 +13,7 @@ static double getCurTime() {
} }
typedef struct { typedef struct {
TsdbRepoT *pRepo; TSDB_REPO_T *pRepo;
bool isAscend; bool isAscend;
int tid; int tid;
uint64_t uid; uint64_t uid;
...@@ -136,7 +136,7 @@ TEST(TsdbTest, createRepo) { ...@@ -136,7 +136,7 @@ TEST(TsdbTest, createRepo) {
tsdbSetDefaultCfg(&config); tsdbSetDefaultCfg(&config);
ASSERT_EQ(tsdbCreateRepo("/home/ubuntu/work/ttest/vnode0", &config, NULL), 0); ASSERT_EQ(tsdbCreateRepo("/home/ubuntu/work/ttest/vnode0", &config, NULL), 0);
TsdbRepoT *pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL); TSDB_REPO_T *pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL);
ASSERT_NE(pRepo, nullptr); ASSERT_NE(pRepo, nullptr);
// 2. Create a normal table // 2. Create a normal table
......
此差异已折叠。
...@@ -25,7 +25,7 @@ typedef int (*iterFunc)(void *, void *cont, int contLen); ...@@ -25,7 +25,7 @@ typedef int (*iterFunc)(void *, void *cont, int contLen);
typedef void (*afterFunc)(void *); typedef void (*afterFunc)(void *);
typedef struct { typedef struct {
int64_t size; int64_t size; // including 512 bytes of header size
int64_t tombSize; int64_t tombSize;
int64_t nRecords; int64_t nRecords;
int64_t nDels; int64_t nDels;
......
...@@ -57,6 +57,7 @@ SListNode *tdListPopHead(SList *list); ...@@ -57,6 +57,7 @@ SListNode *tdListPopHead(SList *list);
SListNode *tdListPopTail(SList *list); SListNode *tdListPopTail(SList *list);
SListNode *tdListPopNode(SList *list, SListNode *node); SListNode *tdListPopNode(SList *list, SListNode *node);
void tdListMove(SList *src, SList *dst); void tdListMove(SList *src, SList *dst);
void tdListDiscard(SList *list);
void tdListNodeGetData(SList *list, SListNode *node, void *target); void tdListNodeGetData(SList *list, SListNode *node, void *target);
void tdListInitIter(SList *list, SListIter *pIter, TD_LIST_DIRECTION_T direction); void tdListInitIter(SList *list, SListIter *pIter, TD_LIST_DIRECTION_T direction);
......
...@@ -31,7 +31,7 @@ typedef void (*_ref_fn_t)(const void* pObj); ...@@ -31,7 +31,7 @@ typedef void (*_ref_fn_t)(const void* pObj);
_ref_fn_t end; \ _ref_fn_t end; \
} _ref_func = {.begin = (s), .end = (e)}; } _ref_func = {.begin = (s), .end = (e)};
#define T_REF_INC(x) (atomic_add_fetch_16(&((x)->_ref.val), 1)); #define T_REF_INC(x) (atomic_add_fetch_16(&((x)->_ref.val), 1))
#define T_REF_INC_WITH_CB(x, p) \ #define T_REF_INC_WITH_CB(x, p) \
do { \ do { \
...@@ -41,7 +41,7 @@ typedef void (*_ref_fn_t)(const void* pObj); ...@@ -41,7 +41,7 @@ typedef void (*_ref_fn_t)(const void* pObj);
} \ } \
} while (0) } while (0)
#define T_REF_DEC(x) (atomic_sub_fetch_16(&((x)->_ref.val), 1)); #define T_REF_DEC(x) (atomic_sub_fetch_16(&((x)->_ref.val), 1))
#define T_REF_DEC_WITH_CB(x, p) \ #define T_REF_DEC_WITH_CB(x, p) \
do { \ do { \
......
...@@ -309,9 +309,7 @@ void taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) { ...@@ -309,9 +309,7 @@ void taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) {
} }
void taosHashCleanup(SHashObj *pHashObj) { void taosHashCleanup(SHashObj *pHashObj) {
if (pHashObj == NULL || pHashObj->capacity <= 0) { if (pHashObj == NULL) return;
return;
}
SHashNode *pNode, *pNext; SHashNode *pNode, *pNext;
......
此差异已折叠。
...@@ -39,8 +39,10 @@ void tdListEmpty(SList *list) { ...@@ -39,8 +39,10 @@ void tdListEmpty(SList *list) {
} }
void tdListFree(SList *list) { void tdListFree(SList *list) {
if (list) {
tdListEmpty(list); tdListEmpty(list);
free(list); free(list);
}
} }
void tdListPrependNode(SList *list, SListNode *node) { void tdListPrependNode(SList *list, SListNode *node) {
...@@ -81,7 +83,7 @@ int tdListPrepend(SList *list, void *data) { ...@@ -81,7 +83,7 @@ int tdListPrepend(SList *list, void *data) {
} }
int tdListAppend(SList *list, void *data) { int tdListAppend(SList *list, void *data) {
SListNode *node = (SListNode *)malloc(sizeof(SListNode) + list->eleSize); SListNode *node = (SListNode *)calloc(1, sizeof(SListNode) + list->eleSize);
if (node == NULL) return -1; if (node == NULL) return -1;
memcpy((void *)(node->data), data, list->eleSize); memcpy((void *)(node->data), data, list->eleSize);
...@@ -148,6 +150,13 @@ void tdListMove(SList *src, SList *dst) { ...@@ -148,6 +150,13 @@ void tdListMove(SList *src, SList *dst) {
} }
} }
void tdListDiscard(SList *list) {
if (list) {
list->head = list->tail = NULL;
list->numOfEles = 0;
}
}
void tdListNodeGetData(SList *list, SListNode *node, void *target) { memcpy(target, node->data, list->eleSize); } void tdListNodeGetData(SList *list, SListNode *node, void *target) { memcpy(target, node->data, list->eleSize); }
void tdListInitIter(SList *list, SListIter *pIter, TD_LIST_DIRECTION_T direction) { void tdListInitIter(SList *list, SListIter *pIter, TD_LIST_DIRECTION_T direction) {
......
...@@ -428,7 +428,11 @@ void taosPrintLongString(const char *flags, int32_t dflag, const char *format, . ...@@ -428,7 +428,11 @@ void taosPrintLongString(const char *flags, int32_t dflag, const char *format, .
buffer[len] = 0; buffer[len] = 0;
if ((dflag & DEBUG_FILE) && tsLogObj.logHandle && tsLogObj.logHandle->fd >= 0) { if ((dflag & DEBUG_FILE) && tsLogObj.logHandle && tsLogObj.logHandle->fd >= 0) {
if (tsAsyncLog) {
taosPushLogBuffer(tsLogObj.logHandle, buffer, len); taosPushLogBuffer(tsLogObj.logHandle, buffer, len);
} else {
twrite(tsLogObj.logHandle->fd, buffer, len);
}
if (tsLogObj.maxLines > 0) { if (tsLogObj.maxLines > 0) {
atomic_add_fetch_32(&tsLogObj.lines, 1); atomic_add_fetch_32(&tsLogObj.lines, 1);
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
文件模式从 100755 更改为 100644
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册