diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 2dc08e85dafa2ee5f804cca8b1aa30c98e601671..9dce9193c59fd2972160a60db4106ca2680db9fa 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -219,6 +219,13 @@ typedef struct SBuildTableMetaInput { char *tableFullName; } SBuildTableMetaInput; +typedef struct SBuildUseDBInput { + char db[TSDB_TABLE_FNAME_LEN]; + int32_t vgroupVersion; + int32_t dbGroupVersion; +} SBuildUseDBInput; + + #pragma pack(push, 1) // null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta @@ -620,9 +627,12 @@ typedef struct { typedef struct { char db[TSDB_TABLE_FNAME_LEN]; int8_t ignoreNotExists; + int32_t vgroupVersion; + int32_t dbGroupVersion; int32_t reserve[8]; } SUseDbMsg; + typedef struct { char db[TSDB_TABLE_FNAME_LEN]; int32_t reserve[8]; @@ -809,8 +819,6 @@ typedef struct SVgroupListRspMsg { SVgroupInfo vgroupInfo[]; } SVgroupListRspMsg; -typedef SVgroupListRspMsg SVgroupListInfo; - typedef struct { int32_t vgId; int8_t numOfEps; @@ -855,6 +863,19 @@ typedef struct { char *data; } STagData; +typedef struct { + int32_t vgroupNum; + int32_t vgroupVersion; + char db[TSDB_TABLE_FNAME_LEN]; + int32_t dbVgroupVersion; + int32_t dbVgroupNum; + int32_t dbHashRange; + SVgroupInfo vgroupInfo[]; +//int32_t vgIdList[]; +} SUseDbRspMsg; + + + /* * sql: show tables like '%a_%' * payload is the query condition, e.g., '%a_%' diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 0b45f715574f0621bd0d093e57fa8ac382d3597c..f9d3b3c8c18c627df6decfbca87e969aa759fe27 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -27,16 +27,10 @@ extern "C" { #include "transport.h" #include "common.h" #include "taosmsg.h" +#include "query.h" struct SCatalog; -typedef struct SDBVgroupInfo { - int32_t vgroupVersion; - SArray *vgId; - int32_t hashRange; - int32_t hashNum; -} SDBVgroupInfo; - typedef struct SCatalogReq { char clusterId[TSDB_CLUSTER_ID_LEN]; //???? SArray *pTableName; // table full name diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 1ff3f02da5ac9a30d6b338f797805dbc2a0feae7..8f217a0deb9ff509a4cc39e0f851510129c8bb62 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -22,6 +22,7 @@ extern "C" { #define QUERY_TYPE_MERGE 1 #define QUERY_TYPE_PARTIAL 2 +#define QUERY_TYPE_SCAN 3 enum OPERATOR_TYPE_E { OP_TableScan = 1, @@ -54,90 +55,37 @@ enum OPERATOR_TYPE_E { struct SEpSet; struct SQueryPlanNode; -struct SQueryDistPlanNode; +struct SPhyNode; struct SQueryStmtInfo; -typedef struct SSubquery { - int64_t queryId; // the subquery id created by qnode - int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL - int32_t level; // the execution level of current subquery, starting from 0. - SArray *pUpstream; // the upstream,from which to fetch the result - struct SQueryDistPlanNode *pNode; // physical plan of current subquery -} SSubquery; +typedef struct SSubplan { + int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN + SArray *pDatasource; // the datasource subplan,from which to fetch the result + struct SPhyNode *pNode; // physical plan of current subplan +} SSubplan; -typedef struct SQueryJob { - SArray **pSubqueries; - int32_t numOfLevels; - int32_t currentLevel; -} SQueryJob; - - -/** - * Optimize the query execution plan, currently not implement yet. - * @param pQueryNode - * @return - */ -int32_t qOptimizeQueryPlan(struct SQueryPlanNode* pQueryNode); - -/** - * Create the query plan according to the bound AST, which is in the form of pQueryInfo - * @param pQueryInfo - * @param pQueryNode - * @return - */ -int32_t qCreateQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryPlanNode** pQueryNode); - -/** - * Convert the query plan to string, in order to display it in the shell. - * @param pQueryNode - * @return - */ -int32_t qQueryPlanToString(struct SQueryPlanNode* pQueryNode, char** str); +typedef struct SQueryDag { + SArray **pSubplans; +} SQueryDag; /** - * Restore the SQL statement according to the logic query plan. - * @param pQueryNode - * @param sql - * @return + * Create the physical plan for the query, according to the AST. */ -int32_t qQueryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql); +int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, struct SQueryDag** pDag); -/** - * Create the physical plan for the query, according to the logic plan. - * @param pQueryNode - * @param pPhyNode - * @return - */ -int32_t qCreatePhysicalPlan(struct SQueryPlanNode* pQueryNode, struct SEpSet* pQnode, struct SQueryDistPlanNode *pPhyNode); +int32_t qExplainQuery(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, char** str); /** - * Convert to physical plan to string to enable to print it out in the shell. - * @param pPhyNode - * @param str - * @return + * Convert to subplan to string for the scheduler to send to the executor */ -int32_t qPhyPlanToString(struct SQueryDistPlanNode *pPhyNode, char** str); - -/** - * Destroy the query plan object. - * @return - */ -void* qDestroyQueryPlan(struct SQueryPlanNode* pQueryNode); +int32_t qSubPlanToString(struct SSubplan *pPhyNode, char** str); /** * Destroy the physical plan. * @param pQueryPhyNode * @return */ -void* qDestroyQueryPhyPlan(struct SQueryDistPlanNode* pQueryPhyNode); - -/** - * Create the query job from the physical execution plan - * @param pPhyNode - * @param pJob - * @return - */ -int32_t qCreateQueryJob(const struct SQueryDistPlanNode* pPhyNode, struct SQueryJob** pJob); +void* qDestroyQueryDag(struct SQueryDag* pDag); #ifdef __cplusplus } diff --git a/include/libs/query/query.h b/include/libs/query/query.h index 866a69ed8e72378b7937d5ef46ef730c9ce4d5ee..02ae70887484428710cd224ae389b0de8b18c9bf 100644 --- a/include/libs/query/query.h +++ b/include/libs/query/query.h @@ -20,6 +20,22 @@ extern "C" { #endif +#include "tarray.h" + +typedef SVgroupListRspMsg SVgroupListInfo; + +typedef struct SDBVgroupInfo { + int32_t vgroupVersion; + SArray *vgId; + int32_t hashRange; +} SDBVgroupInfo; + +typedef struct SUseDbOutput { + SVgroupListInfo *vgroupList; + char db[TSDB_TABLE_FNAME_LEN]; + SDBVgroupInfo *dbVgroup; +} SUseDbOutput; + extern int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen); extern int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize); diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index d9653046cfadc84683e8922b2ad94ebe2149017b..6b3c9ed021c6629684bf1e5a7a7fca7bca2b9551 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -20,7 +20,42 @@ extern "C" { #endif -struct SQueryJob; +typedef struct SQueryProfileSummary { + int64_t startTs; // Object created and added into the message queue + int64_t endTs; // the timestamp when the task is completed + int64_t cputime; // total cpu cost, not execute elapsed time + + int64_t loadRemoteDataDuration; // remote io time + int64_t loadNativeDataDuration; // native disk io time + + uint64_t loadNativeData; // blocks + SMA + header files + uint64_t loadRemoteData; // remote data acquired by exchange operator. + + uint64_t waitDuration; // the time to waiting to be scheduled in queue does matter, so we need to record it + int64_t addQTs; // the time to be added into the message queue, used to calculate the waiting duration in queue. + + uint64_t totalRows; + uint64_t loadRows; + uint32_t totalBlocks; + uint32_t loadBlocks; + uint32_t loadBlockAgg; + uint32_t skipBlocks; + uint64_t resultSize; // generated result size in Kb. +} SQueryProfileSummary; + +typedef struct SQueryTask { + uint64_t queryId; // query id + uint64_t taskId; // task id + char *pSubplan; // operator tree + uint64_t status; // task status + SQueryProfileSummary summary; // task execution summary + void *pOutputHandle; // result buffer handle, to temporarily keep the output result for next stage +} SQueryTask; + +typedef struct SQueryJob { + SArray **pSubtasks; + // todo +} SQueryJob; /** * Process the query job, generated according to the query physical plan. diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 744275e6ff66937ef0a61068bdb7e5aadf61f7da..e19d65837a822fd7e8f484944aa6675f97beb7ba 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -32,23 +32,19 @@ extern int32_t wDebugFlag; #define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }} #define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }} -#define WAL_PREFIX "wal" -#define WAL_PREFIX_LEN 3 +#define WAL_HEAD_VER 0 #define WAL_NOSUFFIX_LEN 20 #define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN+1) #define WAL_LOG_SUFFIX "log" #define WAL_INDEX_SUFFIX "idx" #define WAL_REFRESH_MS 1000 -#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16) +#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead)) #define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) #define WAL_FILE_LEN (WAL_PATH_LEN + 32) -#define WAL_IDX_ENTRY_SIZE (sizeof(int64_t)*2) -#define WAL_CUR_POS_WRITABLE 1 -#define WAL_CUR_FILE_WRITABLE 2 -#define WAL_CUR_FAILED 4 +#define WAL_CUR_FAILED 1 -#pragma pack(push,1) +#pragma pack(push, 1) typedef enum { TAOS_WAL_NOLOG = 0, TAOS_WAL_WRITE = 1, @@ -56,11 +52,11 @@ typedef enum { } EWalType; typedef struct SWalReadHead { - int8_t sver; + int8_t headVer; uint8_t msgType; int8_t reserved[2]; int32_t len; - //int64_t ingestTs; //not implemented + int64_t ingestTs; //not implemented int64_t version; char body[]; } SWalReadHead; @@ -72,18 +68,10 @@ typedef struct { int32_t rollPeriod; // secs int64_t retentionSize; int64_t segSize; - EWalType level; // wal level + EWalType level; // wal level } SWalCfg; typedef struct { - //union { - //uint32_t info; - //struct { - //uint32_t sver:3; - //uint32_t msgtype: 5; - //uint32_t reserved : 24; - //}; - //}; uint32_t cksumHead; uint32_t cksumBody; SWalReadHead head; @@ -102,16 +90,16 @@ typedef struct SWal { SWalCfg cfg; SWalVer vers; //file set - int32_t writeCur; int64_t writeLogTfd; int64_t writeIdxTfd; + int32_t writeCur; SArray* fileInfoSet; + //statistics + int64_t totSize; + int64_t lastRollSeq; //ctl - int32_t curStatus; int32_t fsyncSeq; - int64_t totSize; int64_t refId; - int64_t lastRollSeq; pthread_mutex_t mutex; //path char path[WAL_PATH_LEN]; @@ -131,7 +119,7 @@ typedef struct SWalReadHandle { } SWalReadHandle; #pragma pack(pop) -typedef int32_t (*FWalWrite)(void *ahandle, void *pHead); +//typedef int32_t (*FWalWrite)(void *ahandle, void *pHead); // module initialization int32_t walInit(); @@ -151,8 +139,8 @@ int32_t walCommit(SWal *, int64_t ver); // truncate after int32_t walRollback(SWal *, int64_t ver); // notify that previous logs can be pruned safely -int32_t walBeginTakeSnapshot(SWal *, int64_t ver); -int32_t walEndTakeSnapshot(SWal *); +int32_t walBeginSnapshot(SWal *, int64_t ver); +int32_t walEndSnapshot(SWal *); //int32_t walDataCorrupted(SWal*); // read @@ -161,7 +149,7 @@ void walCloseReadHandle(SWalReadHandle *); int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver); int32_t walRead(SWal *, SWalHead **, int64_t ver); -int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum); +//int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum); // lifecycle check int64_t walGetFirstVer(SWal *); diff --git a/include/os/osMemory.h b/include/os/osMemory.h index 10c90cd9aaf13ba073b7124fd046b8d5c5e7b6ba..5f1d5a9a8a6a7028295cac28d6711f6171be7307 100644 --- a/include/os/osMemory.h +++ b/include/os/osMemory.h @@ -23,8 +23,8 @@ extern "C" { #define tfree(x) \ do { \ if (x) { \ - free((void *)x); \ - x = 0; \ + free((void *)(x)); \ + (x) = 0; \ } \ } while (0) diff --git a/include/util/tref.h b/include/util/tref.h index cc7d075f5209dfb3ab463a1c2cdb4ec39d0bbcf2..6680204d63f58a75c61c45c340cc14c9129cd7c0 100644 --- a/include/util/tref.h +++ b/include/util/tref.h @@ -17,6 +17,8 @@ #ifndef _TD_UTIL_REF_H #define _TD_UTIL_REF_H +#include "os.h" + #ifdef __cplusplus extern "C" { #endif diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 720f197782f7ee59dbe525d9cc9d1f5733dde71c..a08b64f9a91b8db5b16bfae330dab3dd9b9388c8 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -26,6 +26,7 @@ extern "C" { #define CTG_DEFAULT_CLUSTER_NUMBER 6 #define CTG_DEFAULT_VGROUP_NUMBER 100 +#define CTG_DEFAULT_DB_NUMBER 20 #define CTG_DEFAULT_INVALID_VERSION (-1) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index b488ab8101c49d6d446fc625bb16e6ed564914a2..92b60945291184afc39c151b6d7067acef8c8192 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -63,21 +63,69 @@ int32_t ctgGetVgroupFromCache(SCatalog* pCatalog, SArray** pVgroupList, int32_t* } -int32_t ctgGetDBVgroupFromCache(SCatalog* pCatalog, char *dbName, SDBVgroupInfo **dbInfo, int32_t *exist) { -/* +int32_t ctgGetDBVgroupFromCache(SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, int32_t *exist) { if (NULL == pCatalog->dbCache.cache) { *exist = 0; return TSDB_CODE_SUCCESS; } - taosHashGet(SHashObj * pHashObj, const void * key, size_t keyLen) + SDBVgroupInfo *info = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName)); + + if (NULL == info || info->vgroupVersion < pCatalog->vgroupCache.vgroupVersion) { + *exist = 0; + return TSDB_CODE_SUCCESS; + } if (dbInfo) { - *pVgroupList = taosArrayDup(pCatalog->vgroupCache.arrayCache); + *dbInfo = calloc(1, sizeof(**dbInfo)); + if (NULL == *dbInfo) { + ctgError("calloc size[%d] failed", (int32_t)sizeof(**dbInfo)); + return TSDB_CODE_CTG_MEM_ERROR; + } + + (*dbInfo)->vgId = taosArrayDup(info->vgId); + if (NULL == (*dbInfo)->vgId) { + ctgError("taos array duplicate failed"); + tfree(*dbInfo); + return TSDB_CODE_CTG_MEM_ERROR; + } + + (*dbInfo)->vgroupVersion = info->vgroupVersion; + (*dbInfo)->hashRange = info->hashRange; } *exist = 1; -*/ + + return TSDB_CODE_SUCCESS; +} + + + +int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SBuildUseDBInput *input, SUseDbOutput *out) { + char *msg = NULL; + SEpSet *pVnodeEpSet = NULL; + int32_t msgLen = 0; + + int32_t code = queryBuildMsg[TSDB_MSG_TYPE_USE_DB](input, &msg, 0, &msgLen); + if (code) { + return code; + } + + SRpcMsg rpcMsg = { + .msgType = TSDB_MSG_TYPE_USE_DB, + .pCont = msg, + .contLen = msgLen, + }; + + SRpcMsg rpcRsp = {0}; + + rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); + + code = queryProcessMsgRsp[TSDB_MSG_TYPE_USE_DB](out, rpcRsp.pCont, rpcRsp.contLen); + if (code) { + return code; + } + return TSDB_CODE_SUCCESS; } @@ -144,7 +192,7 @@ int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version) { int32_t catalogUpdateVgroup(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup) { if (NULL == pVgroup) { - ctgError("vgroup get from mnode succeed, but no output"); + ctgError("no valid vgroup list info to update"); return TSDB_CODE_CTG_INTERNAL_ERROR; } @@ -262,7 +310,33 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, } int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) { + if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) { + return TSDB_CODE_CTG_INVALID_INPUT; + } + + if (dbInfo->vgroupVersion < 0) { + if (pCatalog->dbCache.cache) { + taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName)); + } + + ctgWarn("remove db [%s] from cache", dbName); + return TSDB_CODE_SUCCESS; + } + if (NULL == pCatalog->dbCache.cache) { + pCatalog->dbCache.cache = taosHashInit(CTG_DEFAULT_DB_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (NULL == pCatalog->dbCache.cache) { + ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_DB_NUMBER); + return TSDB_CODE_CTG_MEM_ERROR; + } + } + + if (taosHashPut(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo)) != 0) { + ctgError("push to vgroup hash cache failed"); + return TSDB_CODE_CTG_MEM_ERROR; + } + + return TSDB_CODE_SUCCESS; } @@ -273,8 +347,8 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* return TSDB_CODE_CTG_INVALID_INPUT; } -/* int32_t exist = 0; + int32_t code = 0; if (0 == forceUpdate) { CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &exist)); @@ -284,18 +358,34 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* } } - SDBVgroupInfo* newDbInfo = NULL; + SUseDbOutput DbOut = {0}; + SBuildUseDBInput input = {0}; + + strncpy(input.db, dbName, sizeof(input.db)); + input.db[sizeof(input.db) - 1] = 0; + input.vgroupVersion = pCatalog->vgroupCache.vgroupVersion; + input.dbGroupVersion = CTG_DEFAULT_INVALID_VERSION; - CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, dbName, &newDbInfo)); + CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut)); - CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, newDbInfo)); + if (DbOut.vgroupList) { + CTG_ERR_JRET(catalogUpdateVgroup(pCatalog, DbOut.vgroupList)); + } + + if (DbOut.dbVgroup) { + CTG_ERR_JRET(catalogUpdateDBVgroup(pCatalog, dbName, DbOut.dbVgroup)); + } if (dbInfo) { - *dbInfo = newDbInfo; + *dbInfo = DbOut.dbVgroup; + DbOut.dbVgroup = NULL; } -*/ - return TSDB_CODE_SUCCESS; +_return: + tfree(DbOut.dbVgroup); + tfree(DbOut.vgroupList); + + return code; } diff --git a/source/libs/index/inc/index_fst_automation.h b/source/libs/index/inc/index_fst_automation.h index 7eb2ba97aefbe8203d8727861f6587bc8548780d..8050b85b0878e378bbe40098a4e2fbd0b1535b8a 100644 --- a/source/libs/index/inc/index_fst_automation.h +++ b/source/libs/index/inc/index_fst_automation.h @@ -40,7 +40,8 @@ typedef struct Complement { // automation typedef struct AutomationCtx { AutomationType type; - void *data; + void *stdata; + char *data; } AutomationCtx; @@ -58,7 +59,9 @@ typedef struct StartWithStateValue { } ; } StartWithStateValue; +StartWithStateValue *startWithStateValueCreate(StartWithStateKind kind, ValueType ty, void *val); StartWithStateValue *startWithStateValueDump(StartWithStateValue *sv); +void startWithStateValueDestroy(void *sv); typedef struct AutomationFunc { @@ -70,7 +73,7 @@ typedef struct AutomationFunc { void* (*acceptEof)(AutomationCtx *ct, void *state); } AutomationFunc; -AutomationCtx *automCtxCreate(void *data, AutomationType type); +AutomationCtx *automCtxCreate(void *data, AutomationType atype); void automCtxDestroy(AutomationCtx *ctx); extern AutomationFunc automFuncs[]; diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index e2708c59192902511fba03405b86ea8931149cd2..37bdcb0ecfa15a0a2726de07d4a69470b44f4b0e 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -17,6 +17,7 @@ #include "tcoding.h" #include "tchecksum.h" #include "indexInt.h" +#include "index_fst_automation.h" static void fstPackDeltaIn(FstCountingWriter *wrt, CompiledAddr nodeAddr, CompiledAddr transAddr, uint8_t nBytes) { @@ -1402,6 +1403,7 @@ void swsResultDestroy(StreamWithStateResult *result) { if (NULL == result) { return; } fstSliceDestroy(&result->data); + startWithStateValueDestroy(result->state); free(result); } diff --git a/source/libs/index/src/index_fst_automation.c b/source/libs/index/src/index_fst_automation.c index bb0f0da76d1aef65702f72a2ab7282c836ff74db..d905147654593b74dc9898fb44c4838aa17927fb 100644 --- a/source/libs/index/src/index_fst_automation.c +++ b/source/libs/index/src/index_fst_automation.c @@ -34,7 +34,8 @@ StartWithStateValue *startWithStateValueCreate(StartWithStateKind kind, ValueTyp } return nsv; } -void startWithStateValueDestroy(StartWithStateValue *sv) { +void startWithStateValueDestroy(void *val) { + StartWithStateValue *sv = (StartWithStateValue *)val; if (sv == NULL) { return; } if (sv->type == FST_INT) { @@ -68,19 +69,28 @@ StartWithStateValue *startWithStateValueDump(StartWithStateValue *sv) { static void* prefixStart(AutomationCtx *ctx) { StartWithStateValue *data = (StartWithStateValue *)(ctx->data); - - return data; + return startWithStateValueDump(data); }; -static bool prefixIsMatch(AutomationCtx *ctx, void *data) { - return true; +static bool prefixIsMatch(AutomationCtx *ctx, void *sv) { + StartWithStateValue* ssv = (StartWithStateValue *)sv; + return ssv->val == strlen(ctx->data); } -static bool prefixCanMatch(AutomationCtx *ctx, void *data) { - return true; +static bool prefixCanMatch(AutomationCtx *ctx, void *sv) { + StartWithStateValue* ssv = (StartWithStateValue *)sv; + return ssv->val >= 0; } static bool prefixWillAlwaysMatch(AutomationCtx *ctx, void *state) { return true; } static void* prefixAccept(AutomationCtx *ctx, void *state, uint8_t byte) { + StartWithStateValue* ssv = (StartWithStateValue *)state; + if (ssv == NULL || ctx == NULL) {return NULL;} + + char *data = ctx->data; + if ((strlen(data) > ssv->val) && data[ssv->val] == byte) { + int val = ssv->val + 1; + return startWithStateValueCreate(Running, FST_INT, &val); + } return NULL; } static void* prefixAcceptEof(AutomationCtx *ctx, void *state) { @@ -129,28 +139,33 @@ AutomationFunc automFuncs[] = {{ // add more search type }; -AutomationCtx* automCtxCreate(void *data, AutomationType type) { +AutomationCtx* automCtxCreate(void *data,AutomationType atype) { AutomationCtx *ctx = calloc(1, sizeof(AutomationCtx)); if (ctx == NULL) { return NULL; } - if (type == AUTOMATION_PREFIX) { - StartWithStateValue *swsv = (StartWithStateValue *)calloc(1, sizeof(StartWithStateValue)); - swsv->kind = Done; - //swsv->value = NULL; - ctx->data = (void *)swsv; - } else if (type == AUTMMATION_MATCH) { + StartWithStateValue *sv = NULL; + if (atype == AUTOMATION_PREFIX) { + sv = startWithStateValueCreate(Running, FST_INT, 0); + ctx->stdata = (void *)sv; + } else if (atype == AUTMMATION_MATCH) { } else { // add more search type } - ctx->type = type; + char* src = (char *)data; + size_t len = strlen(src); + char* dst = (char *)malloc(len * sizeof(char) + 1); + memcpy(dst, src, len); + dst[len] = 0; + + ctx->data = dst; + ctx->type = atype; + ctx->stdata = (void *)sv; return ctx; } void automCtxDestroy(AutomationCtx *ctx) { - if (ctx->type == AUTOMATION_PREFIX) { - free(ctx->data); - } else if (ctx->type == AUTMMATION_MATCH) { - } + startWithStateValueDestroy(ctx->stdata); + free(ctx->data); free(ctx); } diff --git a/source/libs/parser/CMakeLists.txt b/source/libs/parser/CMakeLists.txt index 155b72c1f99e7faf9ac533f23f2d98d84911e187..5e635aa6a16a1c987cbdc6932597628003df36d8 100644 --- a/source/libs/parser/CMakeLists.txt +++ b/source/libs/parser/CMakeLists.txt @@ -8,7 +8,7 @@ target_include_directories( target_link_libraries( parser - PRIVATE os util common catalog function transport + PRIVATE os util common catalog function transport query ) -ADD_SUBDIRECTORY(test) \ No newline at end of file +ADD_SUBDIRECTORY(test) diff --git a/source/libs/parser/test/CMakeLists.txt b/source/libs/parser/test/CMakeLists.txt index f7d71132434eeeb4cfb375eb959a694b43162b69..4b9e586be3409336c6a0d8c3e4a56c47685d1912 100644 --- a/source/libs/parser/test/CMakeLists.txt +++ b/source/libs/parser/test/CMakeLists.txt @@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) ADD_EXECUTABLE(parserTest ${SOURCE_LIST}) TARGET_LINK_LIBRARIES( parserTest - PUBLIC os util common parser catalog transport gtest function planner + PUBLIC os util common parser catalog transport gtest function planner query ) TARGET_INCLUDE_DIRECTORIES( diff --git a/source/libs/parser/test/plannerTest.cpp b/source/libs/parser/test/plannerTest.cpp index bb9271a3c80e928eadee8ee2c42bbfe22b6e4dce..a2078defdaf1acbe5b9e0824f08a934371345d6c 100644 --- a/source/libs/parser/test/plannerTest.cpp +++ b/source/libs/parser/test/plannerTest.cpp @@ -30,6 +30,7 @@ #include "tdef.h" #include "tvariant.h" #include "planner.h" +#include "../../planner/inc/plannerInt.h" namespace { void setSchema(SSchema* p, int32_t type, int32_t bytes, const char* name, int32_t colId) { @@ -92,10 +93,10 @@ void generateLogicplan(const char* sql) { ASSERT_EQ(ret, 0); struct SQueryPlanNode* n = nullptr; - code = qCreateQueryPlan(pQueryInfo, &n); + code = createQueryPlan(pQueryInfo, &n); char* str = NULL; - qQueryPlanToString(n, &str); + queryPlanToString(n, &str); printf("--------SQL:%s\n", sql); printf("%s\n", str); @@ -155,10 +156,10 @@ TEST(testCase, planner_test) { ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 2); struct SQueryPlanNode* n = nullptr; - code = qCreateQueryPlan(pQueryInfo, &n); + code = createQueryPlan(pQueryInfo, &n); char* str = NULL; - qQueryPlanToString(n, &str); + queryPlanToString(n, &str); printf("%s\n", str); destroyQueryInfo(pQueryInfo); diff --git a/source/libs/planner/CMakeLists.txt b/source/libs/planner/CMakeLists.txt index 23efce38f44aeee2151a985e2351c54b1b164e7b..4e0d03d07ae55ca85ddb44fc621ad8cce0056f27 100644 --- a/source/libs/planner/CMakeLists.txt +++ b/source/libs/planner/CMakeLists.txt @@ -8,7 +8,7 @@ target_include_directories( target_link_libraries( planner - PRIVATE os util common catalog parser transport function + PRIVATE os util common catalog parser transport function query ) -ADD_SUBDIRECTORY(test) \ No newline at end of file +ADD_SUBDIRECTORY(test) diff --git a/source/libs/planner/inc/plannerInt.h b/source/libs/planner/inc/plannerInt.h index 6bd89905b1831eeeaa08049c992889dbc833be76..2231c933629e5acdd3ea7e08b66bb9e8495108e8 100644 --- a/source/libs/planner/inc/plannerInt.h +++ b/source/libs/planner/inc/plannerInt.h @@ -25,6 +25,19 @@ extern "C" { #include "planner.h" #include "taosmsg.h" +enum LOGIC_PLAN_E { + LP_SCAN = 1, + LP_SESSION = 2, + LP_STATE = 3, + LP_INTERVAL = 4, + LP_FILL = 5, + LP_AGG = 6, + LP_JOIN = 7, + LP_PROJECT = 8, + LP_DISTINCT = 9, + LP_ORDER = 10 +}; + typedef struct SQueryNodeBasicInfo { int32_t type; // operator type char *name; // operator name @@ -57,50 +70,94 @@ typedef struct SQueryPlanNode { struct SQueryPlanNode *nextNode; } SQueryPlanNode; -typedef struct SQueryDistPlanNode { +typedef SSchema SSlotSchema; + +typedef struct SDataBlockSchema { + int32_t index; + SSlotSchema *pSchema; + int32_t numOfCols; // number of columns +} SDataBlockSchema; + +typedef struct SPhyNode { SQueryNodeBasicInfo info; - SSchema *pSchema; // the schema of the input SSDatablock - int32_t numOfCols; // number of input columns - SArray *pExpr; // the query functions or sql aggregations - int32_t numOfExpr; // number of result columns, which is also the number of pExprs - void *pExtInfo; // additional information + SArray *pTargets; // target list to be computed or scanned at this node + SArray *pConditions; // implicitly-ANDed qual conditions + SDataBlockSchema targetSchema; + // children plan to generated result for current node to process + // in case of join, multiple plan nodes exist. + SArray *pChildren; +} SPhyNode; + +typedef struct SScanPhyNode { + SPhyNode node; + uint64_t uid; // unique id of the table +} SScanPhyNode; + +typedef SScanPhyNode STagScanPhyNode; + +typedef SScanPhyNode SSystemTableScanPhyNode; + +typedef struct SMultiTableScanPhyNode { + SScanPhyNode scan; + SArray *pTagsConditions; // implicitly-ANDed tag qual conditions +} SMultiTableScanPhyNode; + +typedef SMultiTableScanPhyNode SMultiTableSeqScanPhyNode; + +typedef struct SProjectPhyNode { + SPhyNode node; +} SProjectPhyNode; + +/** + * Optimize the query execution plan, currently not implement yet. + * @param pQueryNode + * @return + */ +int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode); - // previous operator to generated result for current node to process - // in case of join, multiple prev nodes exist. - SArray *pPrevNodes; // upstream nodes, or exchange operator to load data from multiple sources. -} SQueryDistPlanNode; - -typedef struct SQueryCostSummary { - int64_t startTs; // Object created and added into the message queue - int64_t endTs; // the timestamp when the task is completed - int64_t cputime; // total cpu cost, not execute elapsed time - - int64_t loadRemoteDataDuration; // remote io time - int64_t loadNativeDataDuration; // native disk io time - - uint64_t loadNativeData; // blocks + SMA + header files - uint64_t loadRemoteData; // remote data acquired by exchange operator. - - uint64_t waitDuration; // the time to waiting to be scheduled in queue does matter, so we need to record it - int64_t addQTs; // the time to be added into the message queue, used to calculate the waiting duration in queue. - - uint64_t totalRows; - uint64_t loadRows; - uint32_t totalBlocks; - uint32_t loadBlocks; - uint32_t loadBlockAgg; - uint32_t skipBlocks; - uint64_t resultSize; // generated result size in Kb. -} SQueryCostSummary; - -typedef struct SQueryTask { - uint64_t queryId; // query id - uint64_t taskId; // task id - SQueryDistPlanNode *pNode; // operator tree - uint64_t status; // task status - SQueryCostSummary summary; // task execution summary - void *pOutputHandle; // result buffer handle, to temporarily keep the output result for next stage -} SQueryTask; +/** + * Create the query plan according to the bound AST, which is in the form of pQueryInfo + * @param pQueryInfo + * @param pQueryNode + * @return + */ +int32_t createQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryPlanNode** pQueryNode); + +/** + * Convert the query plan to string, in order to display it in the shell. + * @param pQueryNode + * @return + */ +int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str); + +/** + * Restore the SQL statement according to the logic query plan. + * @param pQueryNode + * @param sql + * @return + */ +int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql); + +/** + * Convert to physical plan to string to enable to print it out in the shell. + * @param pPhyNode + * @param str + * @return + */ +int32_t phyPlanToString(struct SPhyNode *pPhyNode, char** str); + +/** + * Destroy the query plan object. + * @return + */ +void* destroyQueryPlan(struct SQueryPlanNode* pQueryNode); + +/** + * Destroy the physical plan. + * @param pQueryPhyNode + * @return + */ +void* destroyQueryPhyPlan(struct SPhyNode* pQueryPhyNode); #ifdef __cplusplus } diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c new file mode 100644 index 0000000000000000000000000000000000000000..2bdc159af856aabc45fd0ccf508bf139c6d7b622 --- /dev/null +++ b/source/libs/planner/src/physicalPlan.c @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "plannerInt.h" + +SPhyNode* createScanNode(SQueryPlanNode* pPlanNode) { + return NULL; +} + +SPhyNode* createPhyNode(SQueryPlanNode* node) { + switch (node->info.type) { + case LP_SCAN: + return createScanNode(node); + } + return NULL; +} + +SPhyNode* createSubplan(SQueryPlanNode* pSubquery) { + return NULL; +} + +int32_t createDag(struct SQueryPlanNode* pQueryNode, struct SEpSet* pQnode, struct SQueryDag** pDag) { + return 0; +} diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index 79c7691698f670a0ade9d7ae1dd38f5923ce41f5..e54b8472304bf48d62e93470467714c7aec0fc03 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -48,11 +48,11 @@ static SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo); static void doDestroyQueryNode(SQueryPlanNode* pQueryNode); int32_t printExprInfo(const char* buf, const SQueryPlanNode* pQueryNode, int32_t len); -int32_t qOptimizeQueryPlan(struct SQueryPlanNode* pQueryNode) { +int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode) { return 0; } -int32_t qCreateQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryPlanNode** pQueryNode) { +int32_t createQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryPlanNode** pQueryNode) { SArray* upstream = createQueryPlanImpl((struct SQueryStmtInfo*) pQueryInfo); assert(taosArrayGetSize(upstream) == 1); @@ -62,19 +62,20 @@ int32_t qCreateQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryP return TSDB_CODE_SUCCESS; } -int32_t qQueryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql) { +int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql) { return 0; } -int32_t qCreatePhysicalPlan(struct SQueryPlanNode* pQueryNode, struct SEpSet* pQnode, struct SQueryDistPlanNode *pPhyNode) { +int32_t qCreatePhysicalPlan(struct SQueryPlanNode* pQueryNode, struct SEpSet* pQnode, struct SQueryDag** pDag) { + return 0; } -int32_t qPhyPlanToString(struct SQueryDistPlanNode *pPhyNode, char** str) { +int32_t phyPlanToString(struct SPhyNode *pPhyNode, char** str) { return 0; } -void* qDestroyQueryPlan(SQueryPlanNode* pQueryNode) { +void* destroyQueryPlan(SQueryPlanNode* pQueryNode) { if (pQueryNode == NULL) { return NULL; } @@ -83,14 +84,10 @@ void* qDestroyQueryPlan(SQueryPlanNode* pQueryNode) { return NULL; } -void* qDestroyQueryPhyPlan(struct SQueryDistPlanNode* pQueryPhyNode) { +void* destroyQueryPhyPlan(struct SPhyNode* pQueryPhyNode) { return NULL; } -int32_t qCreateQueryJob(const struct SQueryDistPlanNode* pPhyNode, struct SQueryJob** pJob) { - return 0; -} - //====================================================================================================================== static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPlanNode** prev, int32_t numOfPrev, @@ -619,7 +616,7 @@ int32_t queryPlanToStringImpl(char* buf, SQueryPlanNode* pQueryNode, int32_t lev return len; } -int32_t qQueryPlanToString(struct SQueryPlanNode* pQueryNode, char** str) { +int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str) { assert(pQueryNode); *str = calloc(1, 4096); diff --git a/source/libs/planner/test/CMakeLists.txt b/source/libs/planner/test/CMakeLists.txt index a83d7a39d99a36fa1869dbc6874024155ef308f4..f00adfaeb25ce7ea8d1ea72b16f58ade153e7d66 100644 --- a/source/libs/planner/test/CMakeLists.txt +++ b/source/libs/planner/test/CMakeLists.txt @@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) ADD_EXECUTABLE(plannerTest ${SOURCE_LIST}) TARGET_LINK_LIBRARIES( plannerTest - PUBLIC os util common planner parser catalog transport gtest function + PUBLIC os util common planner parser catalog transport gtest function query ) TARGET_INCLUDE_DIRECTORIES( diff --git a/source/libs/query/src/querymsg.c b/source/libs/query/src/querymsg.c index 924878c872bd91f39b340aa91c5062ddfacf7a00..8f35fd9c3ef43732dfae9c19423baa91306cdc13 100644 --- a/source/libs/query/src/querymsg.c +++ b/source/libs/query/src/querymsg.c @@ -15,7 +15,7 @@ #include "taosmsg.h" #include "queryInt.h" - +#include "query.h" int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen) = {0}; @@ -60,6 +60,36 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3 return TSDB_CODE_SUCCESS; } +int32_t queryBuildUseDbMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) { + if (NULL == input || NULL == msg || NULL == msgLen) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + SBuildUseDBInput* bInput = (SBuildUseDBInput *)input; + + int32_t estimateSize = sizeof(SUseDbMsg); + if (NULL == *msg || msgSize < estimateSize) { + tfree(*msg); + *msg = calloc(1, estimateSize); + if (NULL == *msg) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + } + + SUseDbMsg *bMsg = (SUseDbMsg *)*msg; + + strncpy(bMsg->db, bInput->db, sizeof(bMsg->db)); + bMsg->db[sizeof(bMsg->db) - 1] = 0; + + bMsg->vgroupVersion = bInput->vgroupVersion; + bMsg->dbGroupVersion = bInput->dbGroupVersion; + + *msgLen = (int32_t)sizeof(*bMsg); + + return TSDB_CODE_SUCCESS; +} + + int32_t queryProcessVgroupListRsp(void* output, char *msg, int32_t msgSize) { if (NULL == output || NULL == msg || msgSize <= 0) { @@ -103,12 +133,126 @@ int32_t queryProcessVgroupListRsp(void* output, char *msg, int32_t msgSize) { return TSDB_CODE_SUCCESS; } + + + +int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) { + if (NULL == output || NULL == msg || msgSize <= 0) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + SUseDbRspMsg *pRsp = (SUseDbRspMsg *)msg; + SUseDbOutput *pOut = (SUseDbOutput *)output; + int32_t code = 0; + + if (msgSize <= sizeof(*pRsp)) { + qError("invalid use db rsp msg size, msgSize:%d", msgSize); + return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; + } + + pRsp->vgroupVersion = htonl(pRsp->vgroupVersion); + pRsp->dbVgroupVersion = htonl(pRsp->dbVgroupVersion); + + pRsp->vgroupNum = htonl(pRsp->vgroupNum); + pRsp->dbVgroupNum = htonl(pRsp->dbVgroupNum); + + if (pRsp->vgroupNum < 0) { + qError("invalid vgroup number[%d]", pRsp->vgroupNum); + return TSDB_CODE_TSC_INVALID_VALUE; + } + + if (pRsp->dbVgroupNum < 0) { + qError("invalid db vgroup number[%d]", pRsp->dbVgroupNum); + return TSDB_CODE_TSC_INVALID_VALUE; + } + + int32_t expectSize = pRsp->vgroupNum * sizeof(pRsp->vgroupInfo[0]) + pRsp->dbVgroupNum * sizeof(int32_t) + sizeof(*pRsp); + if (msgSize != expectSize) { + qError("vgroup list msg size mis-match, msgSize:%d, expected:%d, vgroup number:%d, db vgroup number:%d", msgSize, expectSize, pRsp->vgroupNum, pRsp->dbVgroupNum); + return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; + } + + if (pRsp->vgroupVersion < 0) { + qInfo("no new vgroup list info"); + if (pRsp->vgroupNum != 0) { + qError("invalid vgroup number[%d] for no new vgroup list case", pRsp->vgroupNum); + return TSDB_CODE_TSC_INVALID_VALUE; + } + } else { + int32_t s = sizeof(*pOut->vgroupList) + sizeof(pOut->vgroupList->vgroupInfo[0]) * pRsp->vgroupNum; + pOut->vgroupList = calloc(1, s); + if (NULL == pOut->vgroupList) { + qError("calloc size[%d] failed", s); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + pOut->vgroupList->vgroupNum = pRsp->vgroupNum; + pOut->vgroupList->vgroupVersion = pRsp->vgroupVersion; + + for (int32_t i = 0; i < pRsp->vgroupNum; ++i) { + pRsp->vgroupInfo[i].vgId = htonl(pRsp->vgroupInfo[i].vgId); + for (int32_t n = 0; n < pRsp->vgroupInfo[i].numOfEps; ++n) { + pRsp->vgroupInfo[i].epAddr[n].port = htonl(pRsp->vgroupInfo[i].epAddr[n].port); + } + + memcpy(&pOut->vgroupList->vgroupInfo[i], &pRsp->vgroupInfo[i], sizeof(pRsp->vgroupInfo[i])); + } + } + + int32_t *vgIdList = (int32_t *)((char *)pRsp->vgroupInfo + sizeof(pRsp->vgroupInfo[0]) * pRsp->vgroupNum); + + memcpy(pOut->db, pRsp->db, sizeof(pOut->db)); + + if (pRsp->dbVgroupVersion < 0) { + qInfo("no new vgroup info for db[%s]", pRsp->db); + } else { + pOut->dbVgroup = calloc(1, sizeof(*pOut->dbVgroup)); + if (NULL == pOut->dbVgroup) { + qError("calloc size[%d] failed", (int32_t)sizeof(*pOut->dbVgroup)); + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto _exit; + } + + pOut->dbVgroup->vgId = taosArrayInit(pRsp->dbVgroupNum, sizeof(int32_t)); + if (NULL == pOut->dbVgroup->vgId) { + qError("taosArrayInit size[%d] failed", pRsp->dbVgroupNum); + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto _exit; + } + + pOut->dbVgroup->vgroupVersion = pRsp->dbVgroupVersion; + pOut->dbVgroup->hashRange = htonl(pRsp->dbHashRange); + + for (int32_t i = 0; i < pRsp->dbVgroupNum; ++i) { + *(vgIdList + i) = htonl(*(vgIdList + i)); + + taosArrayPush(pOut->dbVgroup->vgId, vgIdList + i) ; + } + } + + return code; + +_exit: + if (pOut->dbVgroup && pOut->dbVgroup->vgId) { + taosArrayDestroy(pOut->dbVgroup->vgId); + pOut->dbVgroup->vgId = NULL; + } + + tfree(pOut->dbVgroup); + tfree(pOut->vgroupList); + + return code; +} + + void msgInit() { queryBuildMsg[TSDB_MSG_TYPE_TABLE_META] = queryBuildTableMetaReqMsg; queryBuildMsg[TSDB_MSG_TYPE_VGROUP_LIST] = queryBuildVgroupListReqMsg; + queryBuildMsg[TSDB_MSG_TYPE_USE_DB] = queryBuildUseDbMsg; //tscProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = tscProcessTableMetaRsp; queryProcessMsgRsp[TSDB_MSG_TYPE_VGROUP_LIST] = queryProcessVgroupListRsp; + queryProcessMsgRsp[TSDB_MSG_TYPE_USE_DB] = queryProcessUseDBRsp; /* tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg; diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index e546a87326952e8927c752da409dc6cdc812e985..48142878c3bb7ba911582a15dda3986714e364aa 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -33,12 +33,10 @@ typedef struct WalFileInfo { int64_t fileSize; } WalFileInfo; -#pragma pack(push,1) typedef struct WalIdxEntry { int64_t ver; int64_t offset; } WalIdxEntry; -#pragma pack(pop) static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight) { WalFileInfo* pInfoLeft = (WalFileInfo*)pLeft; @@ -107,8 +105,16 @@ static inline uint32_t walCalcBodyCksum(const void* body, uint32_t len) { return taosCalcChecksum(0, (uint8_t*)body, len); } -int walReadMeta(SWal* pWal); -int walWriteMeta(SWal* pWal); +static inline void walResetVer(SWalVer* pVer) { + pVer->firstVer = -1; + pVer->verInSnapshotting = -1; + pVer->snapshotVer = -1; + pVer->commitVer = -1; + pVer->lastVer = -1; +} + +int walLoadMeta(SWal* pWal); +int walSaveMeta(SWal* pWal); int walRollFileInfo(SWal* pWal); char* walMetaSerialize(SWal* pWal); diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 49f4fde3a0331ea19b10599b1fd52a2429d979ac..aa592b4fe8cded905de7bd56fd09f0c0e52254f1 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -24,18 +24,22 @@ #include #include -int64_t walGetFirstVer(SWal *pWal) { +int64_t inline walGetFirstVer(SWal *pWal) { return pWal->vers.firstVer; } -int64_t walGetSnaphostVer(SWal *pWal) { +int64_t inline walGetSnaphostVer(SWal *pWal) { return pWal->vers.snapshotVer; } -int64_t walGetLastVer(SWal *pWal) { +int64_t inline walGetLastVer(SWal *pWal) { return pWal->vers.lastVer; } +static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { + return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer); +} + int walRollFileInfo(SWal* pWal) { int64_t ts = taosGetTimestampSec(); @@ -150,10 +154,6 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) { return 0; } -static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { - return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer); -} - static int walFindCurMetaVer(SWal* pWal) { const char * pattern = "^meta-ver[0-9]+$"; regex_t walMetaRegexPattern; @@ -182,7 +182,7 @@ static int walFindCurMetaVer(SWal* pWal) { return metaVer; } -int walWriteMeta(SWal* pWal) { +int walSaveMeta(SWal* pWal) { int metaVer = walFindCurMetaVer(pWal); char fnameStr[WAL_FILE_LEN]; walBuildMetaName(pWal, metaVer+1, fnameStr); @@ -207,7 +207,7 @@ int walWriteMeta(SWal* pWal) { return 0; } -int walReadMeta(SWal* pWal) { +int walLoadMeta(SWal* pWal) { ASSERT(pWal->fileInfoSet->size == 0); //find existing meta file int metaVer = walFindCurMetaVer(pWal); diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index 629451a7223cb0530f775a550151d3aaf07468eb..9efeb83cf04d539ab13e2bffc768e0e50432ea9a 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -21,23 +21,17 @@ #include "compare.h" #include "walInt.h" -//internal -int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId); -int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId); -int32_t walGetNewFile(SWal *pWal, int64_t *newFileId); - typedef struct { - int32_t refSetId; - uint32_t seq; int8_t stop; int8_t inited; + uint32_t seq; + int32_t refSetId; pthread_t thread; } SWalMgmt; static SWalMgmt tsWal = {0, .seq = 1}; static int32_t walCreateThread(); static void walStopThread(); -static int32_t walInitObj(SWal *pWal); static void walFreeObj(void *pWal); int64_t walGetSeq() { @@ -68,7 +62,7 @@ int32_t walInit() { } void walCleanUp() { - int old = atomic_val_compare_exchange_8(&tsWal.inited, 1, 0); + int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 1, 0); if(old == 0) { return; } @@ -83,48 +77,59 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { terrno = TAOS_SYSTEM_ERROR(errno); return NULL; } - memset(pWal, 0, sizeof(SWal)); - pWal->writeLogTfd = -1; - pWal->writeIdxTfd = -1; - pWal->writeCur = -1; //set config memcpy(&pWal->cfg, pCfg, sizeof(SWalCfg)); + pWal->fsyncSeq = pCfg->fsyncPeriod / 1000; + if(pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1; - //init version info - pWal->vers.firstVer = -1; - pWal->vers.commitVer = -1; - pWal->vers.snapshotVer = -1; - pWal->vers.lastVer = -1; - - pWal->vers.verInSnapshotting = -1; + tstrncpy(pWal->path, path, sizeof(pWal->path)); + if(taosMkDir(pWal->path) != 0) { + wError("vgId:%d, path:%s, failed to create directory since %s", pWal->cfg.vgId, pWal->path, strerror(errno)); + return NULL; + } - pWal->totSize = 0; + //open meta + pWal->writeLogTfd = -1; + pWal->writeIdxTfd = -1; + pWal->writeCur = -1; + pWal->fileInfoSet = taosArrayInit(8, sizeof(WalFileInfo)); + if(pWal->fileInfoSet == NULL) { + wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->cfg.vgId, pWal->path, strerror(errno)); + free(pWal); + return NULL; + } //init status + walResetVer(&pWal->vers); + pWal->totSize = 0; pWal->lastRollSeq = -1; //init write buffer memset(&pWal->writeHead, 0, sizeof(SWalHead)); - pWal->writeHead.head.sver = 0; + pWal->writeHead.head.headVer = WAL_HEAD_VER; - tstrncpy(pWal->path, path, sizeof(pWal->path)); - pthread_mutex_init(&pWal->mutex, NULL); - - pWal->fsyncSeq = pCfg->fsyncPeriod / 1000; - if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1; + if(pthread_mutex_init(&pWal->mutex, NULL) < 0) { + taosArrayDestroy(pWal->fileInfoSet); + free(pWal); + return NULL; + } - if (walInitObj(pWal) != 0) { - walFreeObj(pWal); + pWal->refId = taosAddRef(tsWal.refSetId, pWal); + if(pWal->refId < 0) { + pthread_mutex_destroy(&pWal->mutex); + taosArrayDestroy(pWal->fileInfoSet); + free(pWal); return NULL; } - pWal->refId = taosAddRef(tsWal.refSetId, pWal); - if (pWal->refId < 0) { - walFreeObj(pWal); + if(walLoadMeta(pWal) < 0) { + taosRemoveRef(tsWal.refSetId, pWal->refId); + pthread_mutex_destroy(&pWal->mutex); + taosArrayDestroy(pWal->fileInfoSet); + free(pWal); return NULL; } - walReadMeta(pWal); wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level, pWal->cfg.fsyncPeriod); @@ -152,43 +157,23 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { } void walClose(SWal *pWal) { - if (pWal == NULL) return; - pthread_mutex_lock(&pWal->mutex); tfClose(pWal->writeLogTfd); pWal->writeLogTfd = -1; tfClose(pWal->writeIdxTfd); pWal->writeIdxTfd = -1; - walWriteMeta(pWal); + walSaveMeta(pWal); taosArrayDestroy(pWal->fileInfoSet); pWal->fileInfoSet = NULL; pthread_mutex_unlock(&pWal->mutex); - taosRemoveRef(tsWal.refSetId, pWal->refId); -} - -static int32_t walInitObj(SWal *pWal) { - if (taosMkDir(pWal->path) != 0) { - wError("vgId:%d, path:%s, failed to create directory since %s", pWal->cfg.vgId, pWal->path, strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); - } - pWal->fileInfoSet = taosArrayInit(8, sizeof(WalFileInfo)); - if(pWal->fileInfoSet == NULL) { - wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->cfg.vgId, pWal->path, strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); - } - wDebug("vgId:%d, object is initialized", pWal->cfg.vgId); - return 0; + taosRemoveRef(tsWal.refSetId, pWal->refId); } static void walFreeObj(void *wal) { SWal *pWal = wal; wDebug("vgId:%d, wal:%p is freed", pWal->cfg.vgId, pWal); - tfClose(pWal->writeLogTfd); - tfClose(pWal->writeIdxTfd); - taosArrayDestroy(pWal->fileInfoSet); - pWal->fileInfoSet = NULL; pthread_mutex_destroy(&pWal->mutex); tfree(pWal); } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index b6aafedea3b4ac9150370d7012407c34a9c7e311..42fcb8c375802f468f7e6061971d99bcd884d16d 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -54,7 +54,7 @@ static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i int64_t logTfd = pRead->readLogTfd; //seek position - int64_t offset = (ver - fileFirstVer) * WAL_IDX_ENTRY_SIZE; + int64_t offset = (ver - fileFirstVer) * sizeof(WalIdxEntry); code = tfLseek(idxTfd, offset, SEEK_SET); if(code < 0) { return -1; @@ -210,6 +210,6 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) { return 0; } -int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t readNum) { - return 0; -} +/*int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t readNum) {*/ + /*return 0;*/ +/*}*/ diff --git a/source/libs/wal/src/walSeek.c b/source/libs/wal/src/walSeek.c index 953aae703c75764acf047851022b15c79b88cc77..7db5b90c1db869697b85c0550b88344c6e17b912 100644 --- a/source/libs/wal/src/walSeek.c +++ b/source/libs/wal/src/walSeek.c @@ -27,7 +27,7 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) { int64_t logTfd = pWal->writeLogTfd; //seek position - int64_t offset = (ver - walGetCurFileFirstVer(pWal)) * WAL_IDX_ENTRY_SIZE; + int64_t offset = (ver - walGetCurFileFirstVer(pWal)) * sizeof(WalIdxEntry); code = tfLseek(idxTfd, offset, SEEK_SET); if(code != 0) { return -1; @@ -66,8 +66,6 @@ int walChangeFileToLast(SWal *pWal) { //switch file pWal->writeIdxTfd = idxTfd; pWal->writeLogTfd = logTfd; - //change status - pWal->curStatus = WAL_CUR_FILE_WRITABLE; return 0; } @@ -93,13 +91,11 @@ int walChangeFile(SWal *pWal, int64_t ver) { int64_t fileFirstVer = pRet->firstVer; //closed if(taosArrayGetLast(pWal->fileInfoSet) != pRet) { - pWal->curStatus &= ~WAL_CUR_FILE_WRITABLE; walBuildIdxName(pWal, fileFirstVer, fnameStr); idxTfd = tfOpenRead(fnameStr); walBuildLogName(pWal, fileFirstVer, fnameStr); logTfd = tfOpenRead(fnameStr); } else { - pWal->curStatus |= WAL_CUR_FILE_WRITABLE; walBuildIdxName(pWal, fileFirstVer, fnameStr); idxTfd = tfOpenReadWrite(fnameStr); walBuildLogName(pWal, fileFirstVer, fnameStr); diff --git a/source/libs/wal/src/walUtil.c b/source/libs/wal/src/walUtil.c deleted file mode 100644 index 849d0c3e5189acdd6fdd50a6ef3210351ffbb64d..0000000000000000000000000000000000000000 --- a/source/libs/wal/src/walUtil.c +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "os.h" -#include "walInt.h" - -#if 0 -int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId) { - int64_t curFileId = *nextFileId; - int64_t minFileId = INT64_MAX; - - DIR *dir = opendir(pWal->path); - if (dir == NULL) { - wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno)); - return -1; - } - - struct dirent *ent; - while ((ent = readdir(dir)) != NULL) { - char *name = ent->d_name; - - if (strncmp(name, WAL_PREFIX, WAL_PREFIX_LEN) == 0) { - int64_t id = atoll(name + WAL_PREFIX_LEN); - if (id <= curFileId) continue; - - if (id < minFileId) { - minFileId = id; - } - } - } - closedir(dir); - - if (minFileId == INT64_MAX) return -1; - - *nextFileId = minFileId; - wTrace("vgId:%d, path:%s, curFileId:%" PRId64 " nextFileId:%" PRId64, pWal->vgId, pWal->path, curFileId, *nextFileId); - - return 0; -} - -int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId) { - int64_t minFileId = INT64_MAX; - - DIR *dir = opendir(pWal->path); - if (dir == NULL) { - wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno)); - return -1; - } - - struct dirent *ent; - while ((ent = readdir(dir)) != NULL) { - char *name = ent->d_name; - - if (strncmp(name, WAL_PREFIX, WAL_PREFIX_LEN) == 0) { - int64_t id = atoll(name + WAL_PREFIX_LEN); - if (id >= curFileId) continue; - - minDiff--; - if (id < minFileId) { - minFileId = id; - } - } - } - closedir(dir); - - if (minFileId == INT64_MAX) return -1; - if (minDiff > 0) return -1; - - *oldFileId = minFileId; - wTrace("vgId:%d, path:%s, curFileId:%" PRId64 " oldFildId:%" PRId64, pWal->vgId, pWal->path, curFileId, *oldFileId); - - return 0; -} - -int32_t walGetNewFile(SWal *pWal, int64_t *newFileId) { - int64_t maxFileId = INT64_MIN; - - DIR *dir = opendir(pWal->path); - if (dir == NULL) { - wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno)); - return -1; - } - - struct dirent *ent; - while ((ent = readdir(dir)) != NULL) { - char *name = ent->d_name; - - if (strncmp(name, WAL_PREFIX, WAL_PREFIX_LEN) == 0) { - int64_t id = atoll(name + WAL_PREFIX_LEN); - if (id > maxFileId) { - maxFileId = id; - } - } - } - closedir(dir); - - if (maxFileId == INT64_MIN) { - *newFileId = 0; - } else { - *newFileId = maxFileId; - } - - wTrace("vgId:%d, path:%s, newFileId:%" PRId64, pWal->vgId, pWal->path, *newFileId); - - return 0; -} -#endif diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 994b8fc33323a001578bb38b0741a3a7c0eac5cf..ffbb19c6b7a8b293dd3d0f7a29d06a6ec2fc30e8 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -21,98 +21,6 @@ #include "tfile.h" #include "walInt.h" - -#if 0 -static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId); - -int32_t walRenew(void *handle) { - if (handle == NULL) return 0; - - SWal * pWal = handle; - int32_t code = 0; - - /*if (pWal->stop) {*/ - /*wDebug("vgId:%d, do not create a new wal file", pWal->vgId);*/ - /*return 0;*/ - /*}*/ - - pthread_mutex_lock(&pWal->mutex); - - if (tfValid(pWal->logTfd)) { - tfClose(pWal->logTfd); - wDebug("vgId:%d, file:%s, it is closed while renew", pWal->vgId, pWal->logName); - } - - /*if (pWal->keep == TAOS_WAL_KEEP) {*/ - /*pWal->fileId = 0;*/ - /*} else {*/ - /*if (walGetNewFile(pWal, &pWal->fileId) != 0) pWal->fileId = 0;*/ - /*pWal->fileId++;*/ - /*}*/ - - snprintf(pWal->logName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->curFileId); - pWal->logTfd = tfOpenCreateWrite(pWal->logName); - - if (!tfValid(pWal->logTfd)) { - code = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->logName, strerror(errno)); - } else { - wDebug("vgId:%d, file:%s, it is created and open while renew", pWal->vgId, pWal->logName); - } - - pthread_mutex_unlock(&pWal->mutex); - - return code; -} - -void walRemoveOneOldFile(void *handle) { - SWal *pWal = handle; - if (pWal == NULL) return; - /*if (pWal->keep == TAOS_WAL_KEEP) return;*/ - if (!tfValid(pWal->logTfd)) return; - - pthread_mutex_lock(&pWal->mutex); - - // remove the oldest wal file - int64_t oldFileId = -1; - if (walGetOldFile(pWal, pWal->curFileId, WAL_FILE_NUM, &oldFileId) == 0) { - char walName[WAL_FILE_LEN] = {0}; - snprintf(walName, sizeof(walName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, oldFileId); - - if (remove(walName) < 0) { - wError("vgId:%d, file:%s, failed to remove since %s", pWal->vgId, walName, strerror(errno)); - } else { - wInfo("vgId:%d, file:%s, it is removed", pWal->vgId, walName); - } - } - - pthread_mutex_unlock(&pWal->mutex); -} - -void walRemoveAllOldFiles(void *handle) { - if (handle == NULL) return; - - SWal * pWal = handle; - int64_t fileId = -1; - - pthread_mutex_lock(&pWal->mutex); - - tfClose(pWal->logTfd); - wDebug("vgId:%d, file:%s, it is closed before remove all wals", pWal->vgId, pWal->logName); - - while (walGetNextFile(pWal, &fileId) >= 0) { - snprintf(pWal->logName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId); - - if (remove(pWal->logName) < 0) { - wError("vgId:%d, wal:%p file:%s, failed to remove since %s", pWal->vgId, pWal, pWal->logName, strerror(errno)); - } else { - wInfo("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->logName); - } - } - pthread_mutex_unlock(&pWal->mutex); -} -#endif - int32_t walCommit(SWal *pWal, int64_t ver) { ASSERT(pWal->vers.commitVer >= pWal->vers.snapshotVer); ASSERT(pWal->vers.commitVer <= pWal->vers.lastVer); @@ -166,7 +74,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) { pthread_mutex_unlock(&pWal->mutex); return -1; } - int idxOff = (ver - walGetCurFileFirstVer(pWal)) * WAL_IDX_ENTRY_SIZE; + int idxOff = (ver - walGetCurFileFirstVer(pWal)) * sizeof(WalIdxEntry); code = tfLseek(idxTfd, idxOff, SEEK_SET); if(code < 0) { pthread_mutex_unlock(&pWal->mutex); @@ -229,7 +137,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) { return 0; } -int32_t walBeginTakeSnapshot(SWal* pWal, int64_t ver) { +int32_t walBeginSnapshot(SWal* pWal, int64_t ver) { pWal->vers.verInSnapshotting = ver; //check file rolling if(pWal->cfg.retentionPeriod == 0) { @@ -239,7 +147,7 @@ int32_t walBeginTakeSnapshot(SWal* pWal, int64_t ver) { return 0; } -int32_t walEndTakeSnapshot(SWal *pWal) { +int32_t walEndSnapshot(SWal *pWal) { int64_t ver = pWal->vers.verInSnapshotting; if(ver == -1) return -1; @@ -287,7 +195,7 @@ int32_t walEndTakeSnapshot(SWal *pWal) { pWal->vers.verInSnapshotting = -1; //save snapshot ver, commit ver - int code = walWriteMeta(pWal); + int code = walSaveMeta(pWal); if(code != 0) { return -1; } @@ -314,13 +222,13 @@ int walRoll(SWal *pWal) { int64_t newFileFirstVersion = pWal->vers.lastVer + 1; char fnameStr[WAL_FILE_LEN]; walBuildIdxName(pWal, newFileFirstVersion, fnameStr); - idxTfd = tfOpenCreateWrite(fnameStr); + idxTfd = tfOpenCreateWriteAppend(fnameStr); if(idxTfd < 0) { ASSERT(0); return -1; } walBuildLogName(pWal, newFileFirstVersion, fnameStr); - logTfd = tfOpenCreateWrite(fnameStr); + logTfd = tfOpenCreateWriteAppend(fnameStr); if(logTfd < 0) { ASSERT(0); return -1; @@ -335,8 +243,6 @@ int walRoll(SWal *pWal) { pWal->writeIdxTfd = idxTfd; pWal->writeLogTfd = logTfd; pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1; - //change status - pWal->curStatus = WAL_CUR_FILE_WRITABLE & WAL_CUR_POS_WRITABLE; pWal->lastRollSeq = walGetSeq(); return 0; @@ -425,74 +331,6 @@ void walFsync(SWal *pWal, bool forceFsync) { } } -#if 0 -int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) { - if (handle == NULL) return -1; - - SWal * pWal = handle; - int32_t count = 0; - int32_t code = 0; - int64_t fileId = -1; - - while ((code = walGetNextFile(pWal, &fileId)) >= 0) { - /*if (fileId == pWal->curFileId) continue;*/ - - char walName[WAL_FILE_LEN]; - snprintf(walName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId); - - wInfo("vgId:%d, file:%s, will be restored", pWal->vgId, walName); - code = walRestoreWalFile(pWal, pVnode, writeFp, walName, fileId); - if (code != TSDB_CODE_SUCCESS) { - wError("vgId:%d, file:%s, failed to restore since %s", pWal->vgId, walName, tstrerror(code)); - continue; - } - - wInfo("vgId:%d, file:%s, restore success, wver:%" PRIu64, pWal->vgId, walName, pWal->curVersion); - - count++; - } - - /*if (pWal->keep != TAOS_WAL_KEEP) return TSDB_CODE_SUCCESS;*/ - - if (count == 0) { - wDebug("vgId:%d, wal file not exist, renew it", pWal->vgId); - return walRenew(pWal); - } else { - // open the existing WAL file in append mode - /*pWal->curFileId = 0;*/ - snprintf(pWal->logName, sizeof(pWal->logName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->curFileId); - pWal->logTfd = tfOpenCreateWriteAppend(pWal->logName); - if (!tfValid(pWal->logTfd)) { - wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->logName, strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); - } - wDebug("vgId:%d, file:%s, it is created and open while restore", pWal->vgId, pWal->logName); - } - - return TSDB_CODE_SUCCESS; -} - -int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) { - if (handle == NULL) return -1; - SWal *pWal = handle; - - if (*fileId == 0) *fileId = -1; - - pthread_mutex_lock(&(pWal->mutex)); - - int32_t code = walGetNextFile(pWal, fileId); - if (code >= 0) { - sprintf(fileName, "wal/%s%" PRId64, WAL_PREFIX, *fileId); - /*code = (*fileId == pWal->curFileId) ? 0 : 1;*/ - } - - wDebug("vgId:%d, get wal file, code:%d curId:%" PRId64 " outId:%" PRId64, pWal->vgId, code, pWal->curFileId, *fileId); - pthread_mutex_unlock(&(pWal->mutex)); - - return code; -} -#endif - /*static int walValidateOffset(SWal* pWal, int64_t ver) {*/ /*int code = 0;*/ /*SWalHead *pHead = NULL;*/ @@ -516,139 +354,3 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) { /*return 0;*/ /*}*/ - -#if 0 -static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, int64_t *offset) { - int64_t pos = *offset; - while (1) { - pos++; - - if (tfLseek(tfd, pos, SEEK_SET) < 0) { - wError("vgId:%d, failed to seek from corrupted wal file since %s", pWal->vgId, strerror(errno)); - return TSDB_CODE_WAL_FILE_CORRUPTED; - } - - if (tfRead(tfd, pHead, sizeof(SWalHead)) <= 0) { - wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos); - return TSDB_CODE_WAL_FILE_CORRUPTED; - } - - if (pHead->signature != WAL_SIGNATURE) { - continue; - } - - if (pHead->sver >= 1) { - if (tfRead(tfd, pHead->cont, pHead->len) < pHead->len) { - wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos); - return TSDB_CODE_WAL_FILE_CORRUPTED; - } - - if (walValidateChecksum(pHead)) { - wInfo("vgId:%d, wal whole cksum check passed, offset:%" PRId64, pWal->vgId, pos); - *offset = pos; - return TSDB_CODE_SUCCESS; - } - } - } - - return TSDB_CODE_WAL_FILE_CORRUPTED; -} - -static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId) { - int32_t size = WAL_MAX_SIZE; - void * buffer = malloc(size); - if (buffer == NULL) { - wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); - } - - int64_t tfd = tfOpenReadWrite(name); - if (!tfValid(tfd)) { - wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); - tfree(buffer); - return TAOS_SYSTEM_ERROR(errno); - } else { - wDebug("vgId:%d, file:%s, open for restore", pWal->vgId, name); - } - - int32_t code = TSDB_CODE_SUCCESS; - int64_t offset = 0; - SWalHead *pHead = buffer; - - while (1) { - int32_t ret = (int32_t)tfRead(tfd, pHead, sizeof(SWalHead)); - if (ret == 0) break; - - if (ret < 0) { - wError("vgId:%d, file:%s, failed to read wal head since %s", pWal->vgId, name, strerror(errno)); - code = TAOS_SYSTEM_ERROR(errno); - break; - } - - if (ret < sizeof(SWalHead)) { - wError("vgId:%d, file:%s, failed to read wal head, ret is %d", pWal->vgId, name, ret); - walFtruncate(pWal, tfd, offset); - break; - } - - if ((pHead->sver == 0 && !walValidateChecksum(pHead)) || pHead->sver < 0 || pHead->sver > 2) { - wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, - pHead->version, pHead->len, offset); - code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset); - if (code != TSDB_CODE_SUCCESS) { - walFtruncate(pWal, tfd, offset); - break; - } - } - - if (pHead->len < 0 || pHead->len > size - sizeof(SWalHead)) { - wError("vgId:%d, file:%s, wal head len out of range, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, - pHead->version, pHead->len, offset); - code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset); - if (code != TSDB_CODE_SUCCESS) { - walFtruncate(pWal, tfd, offset); - break; - } - } - - ret = (int32_t)tfRead(tfd, pHead->cont, pHead->len); - if (ret < 0) { - wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno)); - code = TAOS_SYSTEM_ERROR(errno); - break; - } - - if (ret < pHead->len) { - wError("vgId:%d, file:%s, failed to read wal body, ret:%d len:%d", pWal->vgId, name, ret, pHead->len); - offset += sizeof(SWalHead); - continue; - } - - if ((pHead->sver >= 1) && !walValidateChecksum(pHead)) { - wError("vgId:%d, file:%s, wal whole cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, - pHead->version, pHead->len, offset); - code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset); - if (code != TSDB_CODE_SUCCESS) { - walFtruncate(pWal, tfd, offset); - break; - } - } - - offset = offset + sizeof(SWalHead) + pHead->len; - - wTrace("vgId:%d, restore wal, fileId:%" PRId64 " hver:%" PRIu64 " wver:%" PRIu64 " len:%d offset:%" PRId64, - pWal->vgId, fileId, pHead->version, pWal->curVersion, pHead->len, offset); - - pWal->curVersion = pHead->version; - - // wInfo("writeFp: %ld", offset); - (*writeFp)(pVnode, pHead); - } - - tfClose(tfd); - tfree(buffer); - - wDebug("vgId:%d, file:%s, it is closed after restore", pWal->vgId, name); - return code; -} -#endif diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp index 200bf39c5a087af9edd7cfefea0ed0a5e1c846e6..d06388201eb04be5894d48fb85a8ec16215ce392 100644 --- a/source/libs/wal/test/walMetaTest.cpp +++ b/source/libs/wal/test/walMetaTest.cpp @@ -142,7 +142,7 @@ TEST_F(WalCleanEnv, serialize) { char*ss = walMetaSerialize(pWal); printf("%s\n", ss); free(ss); - code = walWriteMeta(pWal); + code = walSaveMeta(pWal); ASSERT(code == 0); } @@ -150,11 +150,11 @@ TEST_F(WalCleanEnv, removeOldMeta) { int code = walRollFileInfo(pWal); ASSERT(code == 0); ASSERT(pWal->fileInfoSet != NULL); - code = walWriteMeta(pWal); + code = walSaveMeta(pWal); ASSERT(code == 0); code = walRollFileInfo(pWal); ASSERT(code == 0); - code = walWriteMeta(pWal); + code = walSaveMeta(pWal); ASSERT(code == 0); } @@ -199,7 +199,7 @@ TEST_F(WalCleanEnv, write) { ASSERT_EQ(code, -1); ASSERT_EQ(pWal->vers.lastVer, i); } - code = walWriteMeta(pWal); + code = walSaveMeta(pWal); ASSERT_EQ(code, 0); } @@ -216,7 +216,7 @@ TEST_F(WalCleanEnv, rollback) { code = walRollback(pWal, 3); ASSERT_EQ(code, 0); ASSERT_EQ(pWal->vers.lastVer, 2); - code = walWriteMeta(pWal); + code = walSaveMeta(pWal); ASSERT_EQ(code, 0); } @@ -231,9 +231,9 @@ TEST_F(WalCleanDeleteEnv, roll) { ASSERT_EQ(pWal->vers.commitVer, i); } - walBeginTakeSnapshot(pWal, i-1); + walBeginSnapshot(pWal, i-1); ASSERT_EQ(pWal->vers.verInSnapshotting, i-1); - walEndTakeSnapshot(pWal); + walEndSnapshot(pWal); ASSERT_EQ(pWal->vers.snapshotVer, i-1); ASSERT_EQ(pWal->vers.verInSnapshotting, -1); @@ -247,9 +247,9 @@ TEST_F(WalCleanDeleteEnv, roll) { ASSERT_EQ(pWal->vers.commitVer, i); } - code = walBeginTakeSnapshot(pWal, i - 1); + code = walBeginSnapshot(pWal, i - 1); ASSERT_EQ(code, 0); - code = walEndTakeSnapshot(pWal); + code = walEndSnapshot(pWal); ASSERT_EQ(code, 0); }