提交 de2b09fa 编写于 作者: haoranc's avatar haoranc

Merge branch '3.0' of github.com:taosdata/TDengine into test/chr/TD-14699

...@@ -67,7 +67,13 @@ ELSE () ...@@ -67,7 +67,13 @@ ELSE ()
IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64" OR ${CMAKE_SYSTEM_PROCESSOR} MATCHES "aarch64") IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64" OR ${CMAKE_SYSTEM_PROCESSOR} MATCHES "aarch64")
ADD_DEFINITIONS("-D_TD_ARM_") ADD_DEFINITIONS("-D_TD_ARM_")
ELSE () ELSE ()
ADD_DEFINITIONS("-msse4.2 -mfma") ADD_DEFINITIONS("-msse4.2")
IF("${FMA_SUPPORT}" MATCHES "true")
MESSAGE(STATUS "turn fma function support on")
ADD_DEFINITIONS("-mfma")
ELSE ()
MESSAGE(STATUS "turn fma function support off")
ENDIF()
ENDIF () ENDIF ()
ENDIF () ENDIF ()
...@@ -20,8 +20,8 @@ ...@@ -20,8 +20,8 @@
extern "C" { extern "C" {
#endif #endif
#include "querynodes.h"
#include "query.h" #include "query.h"
#include "querynodes.h"
typedef struct SStmtCallback { typedef struct SStmtCallback {
TAOS_STMT* pStmt; TAOS_STMT* pStmt;
...@@ -34,24 +34,26 @@ typedef struct SStmtCallback { ...@@ -34,24 +34,26 @@ typedef struct SStmtCallback {
typedef struct SParseContext { typedef struct SParseContext {
uint64_t requestId; uint64_t requestId;
int32_t acctId; int32_t acctId;
const char *db; const char* db;
bool topicQuery; bool topicQuery;
void *pTransporter; void* pTransporter;
SEpSet mgmtEpSet; SEpSet mgmtEpSet;
const char *pSql; // sql string const char* pSql; // sql string
size_t sqlLen; // length of the sql string size_t sqlLen; // length of the sql string
char *pMsg; // extended error message if exists to help identifying the problem in sql statement. char* pMsg; // extended error message if exists to help identifying the problem in sql statement.
int32_t msgLen; // max length of the msg int32_t msgLen; // max length of the msg
struct SCatalog *pCatalog; struct SCatalog* pCatalog;
SStmtCallback *pStmtCb; SStmtCallback* pStmtCb;
const char* pUser;
bool isSuperUser;
} SParseContext; } SParseContext;
typedef struct SCmdMsgInfo { typedef struct SCmdMsgInfo {
int16_t msgType; int16_t msgType;
SEpSet epSet; SEpSet epSet;
void* pMsg; void* pMsg;
int32_t msgLen; int32_t msgLen;
void* pExtension; // todo remove it soon void* pExtension; // todo remove it soon
} SCmdMsgInfo; } SCmdMsgInfo;
typedef enum EQueryExecMode { typedef enum EQueryExecMode {
...@@ -63,21 +65,21 @@ typedef enum EQueryExecMode { ...@@ -63,21 +65,21 @@ typedef enum EQueryExecMode {
typedef struct SQuery { typedef struct SQuery {
EQueryExecMode execMode; EQueryExecMode execMode;
bool haveResultSet; bool haveResultSet;
SNode* pRoot; SNode* pRoot;
int32_t numOfResCols; int32_t numOfResCols;
SSchema* pResSchema; SSchema* pResSchema;
int8_t precision; int8_t precision;
SCmdMsgInfo* pCmdMsg; SCmdMsgInfo* pCmdMsg;
int32_t msgType; int32_t msgType;
SArray* pDbList; SArray* pDbList;
SArray* pTableList; SArray* pTableList;
bool showRewrite; bool showRewrite;
int32_t placeholderNum; int32_t placeholderNum;
} SQuery; } SQuery;
int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery); int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery);
bool isInsertSql(const char* pStr, size_t length); bool isInsertSql(const char* pStr, size_t length);
void qDestroyQuery(SQuery* pQueryNode); void qDestroyQuery(SQuery* pQueryNode);
...@@ -89,14 +91,16 @@ int32_t qCloneStmtDataBlock(void** pDst, void* pSrc); ...@@ -89,14 +91,16 @@ int32_t qCloneStmtDataBlock(void** pDst, void* pSrc);
void qFreeStmtDataBlock(void* pDataBlock); void qFreeStmtDataBlock(void* pDataBlock);
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc); int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc);
void qDestroyStmtDataBlock(void* pBlock); void qDestroyStmtDataBlock(void* pBlock);
int32_t qBindStmtColsValue(void *pBlock, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen); int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen);
int32_t qBindStmtSingleColValue(void *pBlock, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen, int32_t colIdx, int32_t rowNum); int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
int32_t qBuildStmtColFields(void *pDataBlock, int32_t *fieldNum, TAOS_FIELD** fields); int32_t rowNum);
int32_t qBuildStmtTagFields(void *pBlock, void *boundTags, int32_t *fieldNum, TAOS_FIELD** fields); int32_t qBuildStmtColFields(void* pDataBlock, int32_t* fieldNum, TAOS_FIELD** fields);
int32_t qBindStmtTagsValue(void *pBlock, void *boundTags, int64_t suid, SName *pName, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen); int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD** fields);
void destroyBoundColumnInfo(void* pBoundInfo); int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, SName* pName, TAOS_MULTI_BIND* bind,
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char *msgBuf, int32_t msgBufLen); char* msgBuf, int32_t msgBufLen);
void destroyBoundColumnInfo(void* pBoundInfo);
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf,
int32_t msgBufLen);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_UTIL_SKIPLIST2_H_
#define _TD_UTIL_SKIPLIST2_H_
#include "os.h"
#ifdef __cplusplus
extern "C" {
#endif
#define SL_MAX_LEVEL 15
typedef struct SSkipList2 SSkipList2;
typedef struct SSLCursor SSLCursor;
typedef struct SSLCfg SSLCfg;
typedef struct SSLNode SSLNode;
typedef int32_t (*tslCmprFn)(const void *pKey1, int32_t nKey1, const void *pKey2, int32_t nKey2);
// SSkipList2
int32_t slOpen(const SSLCfg *pCfg, SSkipList2 **ppSl);
int32_t slClose(SSkipList2 *pSl);
int32_t slClear(SSkipList2 *pSl);
// SSLCursor
int32_t slcOpen(SSkipList2 *pSl, SSLCursor *pSlc);
int32_t slcClose(SSLCursor *pSlc);
int32_t slcMoveTo(SSLCursor *pSlc, const void *pKey, int32_t nKey);
int32_t slcMoveToNext(SSLCursor *pSlc);
int32_t slcMoveToPrev(SSLCursor *pSlc);
int32_t slcMoveToFirst(SSLCursor *pSlc);
int32_t slcMoveToLast(SSLCursor *pSlc);
int32_t slcPut(SSLCursor *pSlc, const void *pKey, int32_t nKey, const void *pData, int32_t nData);
int32_t slcGet(SSLCursor *pSlc, const void **ppKey, int32_t *nKey, const void **ppData, int32_t *nData);
int32_t slcDrop(SSLCursor *pSlc);
// struct
struct SSLCfg {
int8_t maxLevel;
int32_t nKey;
int32_t nData;
tslCmprFn cmprFn;
void *pPool;
void *(*xMalloc)(void *, int32_t size);
void (*xFree)(void *, void *);
};
struct SSLCursor {
SSkipList2 *pSl;
SSLNode **forwards[SL_MAX_LEVEL];
};
#ifdef __cplusplus
}
#endif
#endif /*_TD_UTIL_SKIPLIST2_H_*/
\ No newline at end of file
...@@ -649,6 +649,19 @@ int stmtGetParamNum(TAOS_STMT *stmt, int *nums) { ...@@ -649,6 +649,19 @@ int stmtGetParamNum(TAOS_STMT *stmt, int *nums) {
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS)); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 && STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
pStmt->bInfo.needParse = false;
}
if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
taos_free_result(pStmt->exec.pRequest);
pStmt->exec.pRequest = NULL;
}
if (NULL == pStmt->exec.pRequest) {
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
}
if (pStmt->bInfo.needParse) { if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt)); STMT_ERR_RET(stmtParseSql(pStmt));
} }
...@@ -658,8 +671,11 @@ int stmtGetParamNum(TAOS_STMT *stmt, int *nums) { ...@@ -658,8 +671,11 @@ int stmtGetParamNum(TAOS_STMT *stmt, int *nums) {
STMT_ERR_RET(getQueryPlan(pStmt->exec.pRequest, pStmt->sql.pQuery, &pStmt->sql.nodeList)); STMT_ERR_RET(getQueryPlan(pStmt->exec.pRequest, pStmt->sql.pQuery, &pStmt->sql.nodeList));
pStmt->sql.pQueryPlan = pStmt->exec.pRequest->body.pDag; pStmt->sql.pQueryPlan = pStmt->exec.pRequest->body.pDag;
pStmt->exec.pRequest->body.pDag = NULL; pStmt->exec.pRequest->body.pDag = NULL;
STMT_ERR_RET(stmtBackupQueryFields(pStmt));
} else {
STMT_ERR_RET(stmtRestoreQueryFields(pStmt));
} }
*nums = taosArrayGetSize(pStmt->sql.pQueryPlan->pPlaceholderValues); *nums = taosArrayGetSize(pStmt->sql.pQueryPlan->pPlaceholderValues);
} else { } else {
STMT_ERR_RET(stmtFetchColFields(stmt, nums, NULL)); STMT_ERR_RET(stmtFetchColFields(stmt, nums, NULL));
......
...@@ -224,7 +224,8 @@ struct SConfig *taosGetCfg() { ...@@ -224,7 +224,8 @@ struct SConfig *taosGetCfg() {
return tsCfg; return tsCfg;
} }
static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile, char *apolloUrl) { static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile,
char *apolloUrl) {
char cfgDir[PATH_MAX] = {0}; char cfgDir[PATH_MAX] = {0};
char cfgFile[PATH_MAX + 100] = {0}; char cfgFile[PATH_MAX + 100] = {0};
...@@ -300,15 +301,10 @@ static int32_t taosAddServerLogCfg(SConfig *pCfg) { ...@@ -300,15 +301,10 @@ static int32_t taosAddServerLogCfg(SConfig *pCfg) {
static int32_t taosAddClientCfg(SConfig *pCfg) { static int32_t taosAddClientCfg(SConfig *pCfg) {
char defaultFqdn[TSDB_FQDN_LEN] = {0}; char defaultFqdn[TSDB_FQDN_LEN] = {0};
int32_t defaultServerPort = 6030; int32_t defaultServerPort = 6030;
char defaultFirstEp[TSDB_EP_LEN] = {0};
char defaultSecondEp[TSDB_EP_LEN] = {0};
if (taosGetFqdn(defaultFqdn) != 0) return -1; if (taosGetFqdn(defaultFqdn) != 0) return -1;
snprintf(defaultFirstEp, TSDB_EP_LEN, "%s:%d", defaultFqdn, defaultServerPort);
snprintf(defaultSecondEp, TSDB_EP_LEN, "%s:%d", defaultFqdn, defaultServerPort);
if (cfgAddString(pCfg, "firstEp", defaultFirstEp, 1) != 0) return -1; if (cfgAddString(pCfg, "firstEp", "", 1) != 0) return -1;
if (cfgAddString(pCfg, "secondEp", defaultSecondEp, 1) != 0) return -1; if (cfgAddString(pCfg, "secondEp", "", 1) != 0) return -1;
if (cfgAddString(pCfg, "fqdn", defaultFqdn, 1) != 0) return -1; if (cfgAddString(pCfg, "fqdn", defaultFqdn, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "serverPort", defaultServerPort, 1, 65056, 1) != 0) return -1; if (cfgAddInt32(pCfg, "serverPort", defaultServerPort, 1, 65056, 1) != 0) return -1;
if (cfgAddDir(pCfg, "tempDir", tsTempDir, 1) != 0) return -1; if (cfgAddDir(pCfg, "tempDir", tsTempDir, 1) != 0) return -1;
...@@ -478,15 +474,18 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { ...@@ -478,15 +474,18 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32; tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32;
snprintf(tsLocalEp, sizeof(tsLocalEp), "%s:%u", tsLocalFqdn, tsServerPort); snprintf(tsLocalEp, sizeof(tsLocalEp), "%s:%u", tsLocalFqdn, tsServerPort);
char defaultFirstEp[TSDB_EP_LEN] = {0};
snprintf(defaultFirstEp, TSDB_EP_LEN, "%s:%u", tsLocalFqdn, tsServerPort);
SConfigItem *pFirstEpItem = cfgGetItem(pCfg, "firstEp"); SConfigItem *pFirstEpItem = cfgGetItem(pCfg, "firstEp");
SEp firstEp = {0}; SEp firstEp = {0};
taosGetFqdnPortFromEp(pFirstEpItem->str, &firstEp); taosGetFqdnPortFromEp(strlen(pFirstEpItem->str) == 0 ? defaultFirstEp : pFirstEpItem->str, &firstEp);
snprintf(tsFirst, sizeof(tsFirst), "%s:%u", firstEp.fqdn, firstEp.port); snprintf(tsFirst, sizeof(tsFirst), "%s:%u", firstEp.fqdn, firstEp.port);
cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype); cfgSetItem(pCfg, "firstEp", tsFirst, pFirstEpItem->stype);
SConfigItem *pSecondpItem = cfgGetItem(pCfg, "secondEp"); SConfigItem *pSecondpItem = cfgGetItem(pCfg, "secondEp");
SEp secondEp = {0}; SEp secondEp = {0};
taosGetFqdnPortFromEp(pSecondpItem->str, &secondEp); taosGetFqdnPortFromEp(strlen(pSecondpItem->str) == 0 ? defaultFirstEp : pSecondpItem->str, &secondEp);
snprintf(tsSecond, sizeof(tsSecond), "%s:%u", secondEp.fqdn, secondEp.port); snprintf(tsSecond, sizeof(tsSecond), "%s:%u", secondEp.fqdn, secondEp.port);
cfgSetItem(pCfg, "secondEp", tsSecond, pSecondpItem->stype); cfgSetItem(pCfg, "secondEp", tsSecond, pSecondpItem->stype);
...@@ -583,8 +582,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { ...@@ -583,8 +582,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
return 0; return 0;
} }
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd, const char *envFile, int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd,
char *apolloUrl, SArray *pArgs, bool tsc) { const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc) {
osDefaultInit(); osDefaultInit();
SConfig *pCfg = cfgInit(); SConfig *pCfg = cfgInit();
...@@ -636,7 +635,24 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi ...@@ -636,7 +635,24 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi
return 0; return 0;
} }
int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc) { static int32_t taosCheckGlobalCfg() {
uint32_t ipv4 = taosGetIpv4FromFqdn(tsLocalFqdn);
if (ipv4 == 0xffffffff) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to get ip from fqdn:%s since %s, dnode can not be initialized", tsLocalFqdn, terrstr());
return -1;
}
if (tsServerPort <= 0) {
uError("invalid server port:%u, dnode can not be initialized", tsServerPort);
return -1;
}
return 0;
}
int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile, char *apolloUrl, SArray *pArgs,
bool tsc) {
if (tsCfg != NULL) return 0; if (tsCfg != NULL) return 0;
tsCfg = cfgInit(); tsCfg = cfgInit();
...@@ -674,6 +690,11 @@ int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile ...@@ -674,6 +690,11 @@ int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile
taosSetSystemCfg(tsCfg); taosSetSystemCfg(tsCfg);
cfgDumpCfg(tsCfg, tsc, false); cfgDumpCfg(tsCfg, tsc, false);
if (taosCheckGlobalCfg() != 0) {
return -1;
}
return 0; return 0;
} }
......
...@@ -61,6 +61,7 @@ void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb); ...@@ -61,6 +61,7 @@ void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb);
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
void mndTransProcessRsp(SNodeMsg *pRsp); void mndTransProcessRsp(SNodeMsg *pRsp);
void mndTransPullup(SMnode *pMnode); void mndTransPullup(SMnode *pMnode);
int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -997,7 +997,12 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr ...@@ -997,7 +997,12 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr
pAction->msgReceived = 0; pAction->msgReceived = 0;
pAction->errCode = 0; pAction->errCode = 0;
} else { } else {
if (terrno == TSDB_CODE_INVALID_PTR) rpcFreeCont(rpcMsg.pCont); pAction->msgSent = 0;
pAction->msgReceived = 0;
pAction->errCode = terrno;
if (terrno == TSDB_CODE_INVALID_PTR || terrno == TSDB_CODE_NODE_OFFLINE) {
rpcFreeCont(rpcMsg.pCont);
}
mError("trans:%d, action:%d not send since %s", pTrans->id, action, terrstr()); mError("trans:%d, action:%d not send since %s", pTrans->id, action, terrstr());
return -1; return -1;
} }
...@@ -1275,7 +1280,7 @@ static int32_t mndProcessTransReq(SNodeMsg *pReq) { ...@@ -1275,7 +1280,7 @@ static int32_t mndProcessTransReq(SNodeMsg *pReq) {
return 0; return 0;
} }
static int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) { int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) {
SArray *pArray = NULL; SArray *pArray = NULL;
if (pTrans->stage == TRN_STAGE_REDO_ACTION) { if (pTrans->stage == TRN_STAGE_REDO_ACTION) {
pArray = pTrans->redoActions; pArray = pTrans->redoActions;
...@@ -1293,14 +1298,14 @@ static int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) { ...@@ -1293,14 +1298,14 @@ static int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) {
if (pAction == NULL) continue; if (pAction == NULL) continue;
if (pAction->msgReceived == 0) { if (pAction->msgReceived == 0) {
mInfo("trans:%d, action:%d set processed", pTrans->id, i); mInfo("trans:%d, action:%d set processed for kill msg received", pTrans->id, i);
pAction->msgSent = 1; pAction->msgSent = 1;
pAction->msgReceived = 1; pAction->msgReceived = 1;
pAction->errCode = 0; pAction->errCode = 0;
} }
if (pAction->errCode != 0) { if (pAction->errCode != 0) {
mInfo("trans:%d, action:%d set processed, errCode from %s to success", pTrans->id, i, mInfo("trans:%d, action:%d set processed for kill msg received, errCode from %s to success", pTrans->id, i,
tstrerror(pAction->errCode)); tstrerror(pAction->errCode));
pAction->msgSent = 1; pAction->msgSent = 1;
pAction->msgReceived = 1; pAction->msgReceived = 1;
......
...@@ -86,7 +86,7 @@ class MndTestTrans2 : public ::testing::Test { ...@@ -86,7 +86,7 @@ class MndTestTrans2 : public ::testing::Test {
void SetUp() override {} void SetUp() override {}
void TearDown() override {} void TearDown() override {}
int32_t CreateUserLog(const char *acct, const char *user) { int32_t CreateUserLog(const char *acct, const char *user, ETrnType type, SDbObj *pDb) {
SUserObj userObj = {0}; SUserObj userObj = {0};
taosEncryptPass_c((uint8_t *)"taosdata", strlen("taosdata"), userObj.pass); taosEncryptPass_c((uint8_t *)"taosdata", strlen("taosdata"), userObj.pass);
tstrncpy(userObj.user, user, TSDB_USER_LEN); tstrncpy(userObj.user, user, TSDB_USER_LEN);
...@@ -96,7 +96,7 @@ class MndTestTrans2 : public ::testing::Test { ...@@ -96,7 +96,7 @@ class MndTestTrans2 : public ::testing::Test {
userObj.superUser = 1; userObj.superUser = 1;
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_USER, &rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, type, &rpcMsg);
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj); SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
mndTransAppendRedolog(pTrans, pRedoRaw); mndTransAppendRedolog(pTrans, pRedoRaw);
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY); sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
...@@ -108,13 +108,18 @@ class MndTestTrans2 : public ::testing::Test { ...@@ -108,13 +108,18 @@ class MndTestTrans2 : public ::testing::Test {
char *param = strdup("====> test log <====="); char *param = strdup("====> test log <=====");
mndTransSetCb(pTrans, TEST_TRANS_START_FUNC, TEST_TRANS_STOP_FUNC, param, strlen(param) + 1); mndTransSetCb(pTrans, TEST_TRANS_START_FUNC, TEST_TRANS_STOP_FUNC, param, strlen(param) + 1);
if (pDb != NULL) {
mndTransSetDbInfo(pTrans, pDb);
}
int32_t code = mndTransPrepare(pMnode, pTrans); int32_t code = mndTransPrepare(pMnode, pTrans);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return code; return code;
} }
int32_t CreateUserAction(const char *acct, const char *user, bool hasUndoAction, ETrnPolicy policy) { int32_t CreateUserAction(const char *acct, const char *user, bool hasUndoAction, ETrnPolicy policy, ETrnType type,
SDbObj *pDb) {
SUserObj userObj = {0}; SUserObj userObj = {0};
taosEncryptPass_c((uint8_t *)"taosdata", strlen("taosdata"), userObj.pass); taosEncryptPass_c((uint8_t *)"taosdata", strlen("taosdata"), userObj.pass);
tstrncpy(userObj.user, user, TSDB_USER_LEN); tstrncpy(userObj.user, user, TSDB_USER_LEN);
...@@ -124,7 +129,7 @@ class MndTestTrans2 : public ::testing::Test { ...@@ -124,7 +129,7 @@ class MndTestTrans2 : public ::testing::Test {
userObj.superUser = 1; userObj.superUser = 1;
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
STrans *pTrans = mndTransCreate(pMnode, policy, TRN_TYPE_CREATE_USER, &rpcMsg); STrans *pTrans = mndTransCreate(pMnode, policy, type, &rpcMsg);
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj); SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
mndTransAppendRedolog(pTrans, pRedoRaw); mndTransAppendRedolog(pTrans, pRedoRaw);
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY); sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
...@@ -170,6 +175,44 @@ class MndTestTrans2 : public ::testing::Test { ...@@ -170,6 +175,44 @@ class MndTestTrans2 : public ::testing::Test {
mndTransAppendUndoAction(pTrans, &action); mndTransAppendUndoAction(pTrans, &action);
} }
{
void *pRsp = taosMemoryCalloc(1, 256);
strcpy((char *)pRsp, "simple rsponse");
mndTransSetRpcRsp(pTrans, pRsp, 256);
}
if (pDb != NULL) {
mndTransSetDbInfo(pTrans, pDb);
}
int32_t code = mndTransPrepare(pMnode, pTrans);
mndTransDrop(pTrans);
return code;
}
int32_t CreateUserGlobal(const char *acct, const char *user) {
SUserObj userObj = {0};
taosEncryptPass_c((uint8_t *)"taosdata", strlen("taosdata"), userObj.pass);
tstrncpy(userObj.user, user, TSDB_USER_LEN);
tstrncpy(userObj.acct, acct, TSDB_USER_LEN);
userObj.createdTime = taosGetTimestampMs();
userObj.updateTime = userObj.createdTime;
userObj.superUser = 1;
SRpcMsg rpcMsg = {0};
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_USER, &rpcMsg);
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
mndTransAppendRedolog(pTrans, pRedoRaw);
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
SSdbRaw *pUndoRaw = mndUserActionEncode(&userObj);
mndTransAppendUndolog(pTrans, pUndoRaw);
sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED);
char *param = strdup("====> test log <=====");
mndTransSetCb(pTrans, TEST_TRANS_START_FUNC, TEST_TRANS_STOP_FUNC, param, strlen(param) + 1);
int32_t code = mndTransPrepare(pMnode, pTrans); int32_t code = mndTransPrepare(pMnode, pTrans);
mndTransDrop(pTrans); mndTransDrop(pTrans);
...@@ -189,12 +232,12 @@ TEST_F(MndTestTrans2, 01_Log) { ...@@ -189,12 +232,12 @@ TEST_F(MndTestTrans2, 01_Log) {
ASSERT_NE(pMnode, nullptr); ASSERT_NE(pMnode, nullptr);
EXPECT_EQ(CreateUserLog(acct, user1), 0); EXPECT_EQ(CreateUserLog(acct, user1, TRN_TYPE_CREATE_USER, NULL), 0);
pUser1 = mndAcquireUser(pMnode, user1); pUser1 = mndAcquireUser(pMnode, user1);
ASSERT_NE(pUser1, nullptr); ASSERT_NE(pUser1, nullptr);
// failed to create user and rollback // failed to create user and rollback
EXPECT_EQ(CreateUserLog(acct_invalid, user2), 0); EXPECT_EQ(CreateUserLog(acct_invalid, user2, TRN_TYPE_CREATE_USER, NULL), 0);
pUser2 = mndAcquireUser(pMnode, user2); pUser2 = mndAcquireUser(pMnode, user2);
ASSERT_EQ(pUser2, nullptr); ASSERT_EQ(pUser2, nullptr);
...@@ -214,44 +257,46 @@ TEST_F(MndTestTrans2, 02_Action) { ...@@ -214,44 +257,46 @@ TEST_F(MndTestTrans2, 02_Action) {
ASSERT_NE(pMnode, nullptr); ASSERT_NE(pMnode, nullptr);
// failed to create user and rollback
EXPECT_EQ(CreateUserAction(acct, user1, false, TRN_POLICY_ROLLBACK), 0);
pUser1 = mndAcquireUser(pMnode, user1);
ASSERT_EQ(pUser1, nullptr);
mndReleaseUser(pMnode, pUser1);
// create user, and fake a response
{ {
EXPECT_EQ(CreateUserAction(acct, user1, true, TRN_POLICY_ROLLBACK), 0); // failed to create user and rollback
EXPECT_EQ(CreateUserAction(acct, user1, false, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_USER, NULL), 0);
pUser1 = mndAcquireUser(pMnode, user1); pUser1 = mndAcquireUser(pMnode, user1);
ASSERT_NE(pUser1, nullptr); ASSERT_EQ(pUser1, nullptr);
mndReleaseUser(pMnode, pUser1); mndReleaseUser(pMnode, pUser1);
transId = 4; // create user, and fake a response
pTrans = mndAcquireTrans(pMnode, transId); {
EXPECT_EQ(pTrans->code, TSDB_CODE_INVALID_PTR); EXPECT_EQ(CreateUserAction(acct, user1, true, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_USER, NULL), 0);
EXPECT_EQ(pTrans->stage, TRN_STAGE_UNDO_ACTION); pUser1 = mndAcquireUser(pMnode, user1);
EXPECT_EQ(pTrans->failedTimes, 1); ASSERT_NE(pUser1, nullptr);
mndReleaseUser(pMnode, pUser1);
STransAction *pAction = (STransAction *)taosArrayGet(pTrans->undoActions, action); transId = 4;
pAction->msgSent = 1; pTrans = mndAcquireTrans(pMnode, transId);
EXPECT_EQ(pTrans->code, TSDB_CODE_INVALID_PTR);
EXPECT_EQ(pTrans->stage, TRN_STAGE_UNDO_ACTION);
EXPECT_EQ(pTrans->failedTimes, 1);
SNodeMsg rspMsg = {0}; STransAction *pAction = (STransAction *)taosArrayGet(pTrans->undoActions, action);
rspMsg.pNode = pMnode; pAction->msgSent = 1;
int64_t signature = transId;
signature = (signature << 32);
signature += action;
rspMsg.rpcMsg.ahandle = (void *)signature;
mndTransProcessRsp(&rspMsg);
mndReleaseTrans(pMnode, pTrans);
pUser1 = mndAcquireUser(pMnode, user1); SNodeMsg rspMsg = {0};
ASSERT_EQ(pUser1, nullptr); rspMsg.pNode = pMnode;
mndReleaseUser(pMnode, pUser1); int64_t signature = transId;
signature = (signature << 32);
signature += action;
rspMsg.rpcMsg.ahandle = (void *)signature;
mndTransProcessRsp(&rspMsg);
mndReleaseTrans(pMnode, pTrans);
pUser1 = mndAcquireUser(pMnode, user1);
ASSERT_EQ(pUser1, nullptr);
mndReleaseUser(pMnode, pUser1);
}
} }
{ {
EXPECT_EQ(CreateUserAction(acct, user1, false, TRN_POLICY_RETRY), 0); EXPECT_EQ(CreateUserAction(acct, user1, false, TRN_POLICY_RETRY, TRN_TYPE_CREATE_USER, NULL), 0);
pUser1 = mndAcquireUser(pMnode, user1); pUser1 = mndAcquireUser(pMnode, user1);
ASSERT_NE(pUser1, nullptr); ASSERT_NE(pUser1, nullptr);
mndReleaseUser(pMnode, pUser1); mndReleaseUser(pMnode, pUser1);
...@@ -305,4 +350,164 @@ TEST_F(MndTestTrans2, 02_Action) { ...@@ -305,4 +350,164 @@ TEST_F(MndTestTrans2, 02_Action) {
mndReleaseUser(pMnode, pUser1); mndReleaseUser(pMnode, pUser1);
} }
} }
{
EXPECT_EQ(CreateUserAction(acct, user2, true, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_USER, NULL), 0);
SUserObj *pUser2 = (SUserObj *)sdbAcquire(pMnode->pSdb, SDB_USER, user2);
ASSERT_NE(pUser2, nullptr);
mndReleaseUser(pMnode, pUser2);
{
transId = 6;
pTrans = mndAcquireTrans(pMnode, transId);
EXPECT_EQ(pTrans->code, TSDB_CODE_INVALID_PTR);
EXPECT_EQ(pTrans->stage, TRN_STAGE_UNDO_ACTION);
EXPECT_EQ(pTrans->failedTimes, 1);
SNodeMsg rspMsg = {0};
rspMsg.pNode = pMnode;
int64_t signature = transId;
signature = (signature << 32);
signature += action;
rspMsg.rpcMsg.ahandle = (void *)signature;
rspMsg.rpcMsg.code = 0;
mndTransProcessRsp(&rspMsg);
mndReleaseTrans(pMnode, pTrans);
pUser2 = mndAcquireUser(pMnode, user2);
ASSERT_NE(pUser2, nullptr);
mndReleaseUser(pMnode, pUser2);
}
{
transId = 6;
pTrans = mndAcquireTrans(pMnode, transId);
EXPECT_EQ(pTrans->code, TSDB_CODE_INVALID_PTR);
EXPECT_EQ(pTrans->stage, TRN_STAGE_UNDO_ACTION);
EXPECT_EQ(pTrans->failedTimes, 2);
STransAction *pAction = (STransAction *)taosArrayGet(pTrans->undoActions, action);
pAction->msgSent = 1;
SNodeMsg rspMsg = {0};
rspMsg.pNode = pMnode;
int64_t signature = transId;
signature = (signature << 32);
signature += action;
rspMsg.rpcMsg.ahandle = (void *)signature;
mndTransProcessRsp(&rspMsg);
mndReleaseTrans(pMnode, pTrans);
pUser2 = mndAcquireUser(pMnode, user2);
ASSERT_EQ(pUser2, nullptr);
mndReleaseUser(pMnode, pUser2);
}
}
} }
TEST_F(MndTestTrans2, 03_Kill) {
const char *acct = "root";
const char *user1 = "kill1";
const char *user2 = "kill2";
SUserObj *pUser1 = NULL;
SUserObj *pUser2 = NULL;
STrans *pTrans = NULL;
int32_t transId = 0;
int32_t action = 0;
ASSERT_NE(pMnode, nullptr);
{
EXPECT_EQ(CreateUserAction(acct, user1, true, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_USER, NULL), 0);
pUser1 = mndAcquireUser(pMnode, user1);
ASSERT_NE(pUser1, nullptr);
mndReleaseUser(pMnode, pUser1);
transId = 7;
pTrans = mndAcquireTrans(pMnode, transId);
EXPECT_EQ(pTrans->code, TSDB_CODE_INVALID_PTR);
EXPECT_EQ(pTrans->stage, TRN_STAGE_UNDO_ACTION);
EXPECT_EQ(pTrans->failedTimes, 1);
mndKillTrans(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans);
pUser1 = mndAcquireUser(pMnode, user1);
ASSERT_EQ(pUser1, nullptr);
mndReleaseUser(pMnode, pUser1);
}
}
TEST_F(MndTestTrans2, 04_Conflict) {
const char *acct = "root";
const char *user1 = "conflict1";
const char *user2 = "conflict2";
const char *user3 = "conflict3";
const char *user4 = "conflict4";
const char *user5 = "conflict5";
const char *user6 = "conflict6";
const char *user7 = "conflict7";
const char *user8 = "conflict8";
SUserObj *pUser = NULL;
STrans *pTrans = NULL;
int32_t transId = 0;
int32_t code = 0;
ASSERT_NE(pMnode, nullptr);
{
SDbObj dbObj1 = {0};
dbObj1.uid = 9521;
strcpy(dbObj1.name, "db");
SDbObj dbObj2 = {0};
dbObj2.uid = 9522;
strcpy(dbObj2.name, "conflict db");
EXPECT_EQ(CreateUserAction(acct, user1, true, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STB, &dbObj1), 0);
pUser = mndAcquireUser(pMnode, user1);
ASSERT_NE(pUser, nullptr);
mndReleaseUser(pMnode, pUser);
transId = 8;
pTrans = mndAcquireTrans(pMnode, transId);
EXPECT_EQ(pTrans->code, TSDB_CODE_INVALID_PTR);
EXPECT_EQ(pTrans->stage, TRN_STAGE_UNDO_ACTION);
// stb scope
EXPECT_EQ(CreateUserLog(acct, user2, TRN_TYPE_CREATE_DNODE, NULL), -1);
code = terrno;
EXPECT_EQ(code, TSDB_CODE_MND_TRANS_CONFLICT);
EXPECT_EQ(CreateUserLog(acct, user2, TRN_TYPE_CREATE_DB, &dbObj1), -1);
EXPECT_EQ(CreateUserLog(acct, user2, TRN_TYPE_CREATE_DB, &dbObj2), 0);
EXPECT_EQ(CreateUserLog(acct, user3, TRN_TYPE_CREATE_STB, &dbObj1), 0);
// db scope
pTrans->type = TRN_TYPE_CREATE_DB;
EXPECT_EQ(CreateUserLog(acct, user4, TRN_TYPE_CREATE_DNODE, NULL), -1);
EXPECT_EQ(CreateUserLog(acct, user4, TRN_TYPE_CREATE_DB, &dbObj1), -1);
EXPECT_EQ(CreateUserLog(acct, user4, TRN_TYPE_CREATE_DB, &dbObj2), 0);
EXPECT_EQ(CreateUserLog(acct, user5, TRN_TYPE_CREATE_STB, &dbObj1), -1);
EXPECT_EQ(CreateUserLog(acct, user5, TRN_TYPE_CREATE_STB, &dbObj2), 0);
// global scope
pTrans->type = TRN_TYPE_CREATE_DNODE;
EXPECT_EQ(CreateUserLog(acct, user6, TRN_TYPE_CREATE_DNODE, NULL), 0);
EXPECT_EQ(CreateUserLog(acct, user7, TRN_TYPE_CREATE_DB, &dbObj1), -1);
EXPECT_EQ(CreateUserLog(acct, user7, TRN_TYPE_CREATE_DB, &dbObj2), -1);
EXPECT_EQ(CreateUserLog(acct, user7, TRN_TYPE_CREATE_STB, &dbObj1), -1);
EXPECT_EQ(CreateUserLog(acct, user7, TRN_TYPE_CREATE_STB, &dbObj2), -1);
// global scope
pTrans->type = TRN_TYPE_CREATE_USER;
EXPECT_EQ(CreateUserLog(acct, user7, TRN_TYPE_CREATE_DB, &dbObj1), 0);
EXPECT_EQ(CreateUserLog(acct, user8, TRN_TYPE_CREATE_DB, &dbObj2), 0);
mndKillTrans(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans);
pUser = mndAcquireUser(pMnode, user1);
ASSERT_EQ(pUser, nullptr);
mndReleaseUser(pMnode, pUser);
}
}
\ No newline at end of file
...@@ -103,8 +103,6 @@ typedef struct { ...@@ -103,8 +103,6 @@ typedef struct {
#if 1 #if 1
// int metaCreateTable(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle);
int metaDropTable(SMeta* pMeta, tb_uid_t uid);
SMSmaCursor* metaOpenSmaCursor(SMeta* pMeta, tb_uid_t uid); SMSmaCursor* metaOpenSmaCursor(SMeta* pMeta, tb_uid_t uid);
void metaCloseSmaCursor(SMSmaCursor* pSmaCur); void metaCloseSmaCursor(SMSmaCursor* pSmaCur);
int64_t metaSmaCursorNext(SMSmaCursor* pSmaCur); int64_t metaSmaCursorNext(SMSmaCursor* pSmaCur);
......
...@@ -46,13 +46,13 @@ ...@@ -46,13 +46,13 @@
extern "C" { extern "C" {
#endif #endif
typedef struct SVnodeInfo SVnodeInfo; typedef struct SVnodeInfo SVnodeInfo;
typedef struct SMeta SMeta; typedef struct SMeta SMeta;
typedef struct STsdb STsdb; typedef struct STsdb STsdb;
typedef struct STQ STQ; typedef struct STQ STQ;
typedef struct SVState SVState; typedef struct SVState SVState;
typedef struct SVBufPool SVBufPool; typedef struct SVBufPool SVBufPool;
typedef struct SQWorker SQHandle; typedef struct SQWorker SQHandle;
#define VNODE_META_DIR "meta" #define VNODE_META_DIR "meta"
#define VNODE_TSDB_DIR "tsdb" #define VNODE_TSDB_DIR "tsdb"
...@@ -77,6 +77,7 @@ int metaCommit(SMeta* pMeta); ...@@ -77,6 +77,7 @@ int metaCommit(SMeta* pMeta);
int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq); int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq);
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq); int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline);
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver); STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
int metaGetTableEntryByName(SMetaReader* pReader, const char* name); int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
...@@ -100,7 +101,7 @@ int32_t tsdbCreateTSma(STsdb* pTsdb, char* pMsg); ...@@ -100,7 +101,7 @@ int32_t tsdbCreateTSma(STsdb* pTsdb, char* pMsg);
int32_t tsdbInsertTSmaData(STsdb* pTsdb, int64_t indexUid, const char* msg); int32_t tsdbInsertTSmaData(STsdb* pTsdb, int64_t indexUid, const char* msg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId, tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
uint64_t taskId); uint64_t taskId);
tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId, tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
void* pMemRef); void* pMemRef);
int32_t tsdbGetTableGroupFromIdListT(STsdb* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo); int32_t tsdbGetTableGroupFromIdListT(STsdb* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo);
...@@ -189,7 +190,6 @@ struct STbUidStore { ...@@ -189,7 +190,6 @@ struct STbUidStore {
#define TD_VID(PVNODE) (PVNODE)->config.vgId #define TD_VID(PVNODE) (PVNODE)->config.vgId
static FORCE_INLINE bool vnodeIsRollup(SVnode* pVnode) { static FORCE_INLINE bool vnodeIsRollup(SVnode* pVnode) {
SRetention* pRetention = &(pVnode->config.tsdbCfg.retentions[0]); SRetention* pRetention = &(pVnode->config.tsdbCfg.retentions[0]);
return (pRetention->freq > 0 && pRetention->keep > 0); return (pRetention->freq > 0 && pRetention->keep > 0);
......
...@@ -289,7 +289,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) { ...@@ -289,7 +289,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) {
pVal = pBuf = buf; pVal = pBuf = buf;
metaEncodeTbInfo(&pBuf, pTbCfg); metaEncodeTbInfo(&pBuf, pTbCfg);
vLen = POINTER_DISTANCE(pBuf, buf); vLen = POINTER_DISTANCE(pBuf, buf);
ret = tdbDbPut(pMetaDb->pTbDB, pKey, kLen, pVal, vLen, &pMetaDb->txn); ret = tdbDbInsert(pMetaDb->pTbDB, pKey, kLen, pVal, vLen, &pMetaDb->txn);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
...@@ -311,7 +311,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) { ...@@ -311,7 +311,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) {
pVal = pBuf = buf; pVal = pBuf = buf;
metaEncodeSchemaEx(&pBuf, &schemaWrapper); metaEncodeSchemaEx(&pBuf, &schemaWrapper);
vLen = POINTER_DISTANCE(pBuf, buf); vLen = POINTER_DISTANCE(pBuf, buf);
ret = tdbDbPut(pMetaDb->pSchemaDB, pKey, kLen, pVal, vLen, &pMeta->pDB->txn); ret = tdbDbInsert(pMetaDb->pSchemaDB, pKey, kLen, pVal, vLen, &pMeta->pDB->txn);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
...@@ -325,7 +325,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) { ...@@ -325,7 +325,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) {
kLen = nameLen + 1 + sizeof(uid); kLen = nameLen + 1 + sizeof(uid);
pVal = NULL; pVal = NULL;
vLen = 0; vLen = 0;
ret = tdbDbPut(pMetaDb->pNameIdx, pKey, kLen, pVal, vLen, &pMetaDb->txn); ret = tdbDbInsert(pMetaDb->pNameIdx, pKey, kLen, pVal, vLen, &pMetaDb->txn);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
...@@ -336,7 +336,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) { ...@@ -336,7 +336,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) {
kLen = sizeof(uid); kLen = sizeof(uid);
pVal = NULL; pVal = NULL;
vLen = 0; vLen = 0;
ret = tdbDbPut(pMetaDb->pStbIdx, pKey, kLen, pVal, vLen, &pMetaDb->txn); ret = tdbDbInsert(pMetaDb->pStbIdx, pKey, kLen, pVal, vLen, &pMetaDb->txn);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
...@@ -347,7 +347,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) { ...@@ -347,7 +347,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) {
kLen = sizeof(ctbIdxKey); kLen = sizeof(ctbIdxKey);
pVal = NULL; pVal = NULL;
vLen = 0; vLen = 0;
ret = tdbDbPut(pMetaDb->pCtbIdx, pKey, kLen, pVal, vLen, &pMetaDb->txn); ret = tdbDbInsert(pMetaDb->pCtbIdx, pKey, kLen, pVal, vLen, &pMetaDb->txn);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
...@@ -362,7 +362,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) { ...@@ -362,7 +362,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) {
kLen = sizeof(uid); kLen = sizeof(uid);
pVal = NULL; pVal = NULL;
vLen = 0; vLen = 0;
ret = tdbDbPut(pMetaDb->pNtbIdx, pKey, kLen, pVal, vLen, &pMetaDb->txn); ret = tdbDbInsert(pMetaDb->pNtbIdx, pKey, kLen, pVal, vLen, &pMetaDb->txn);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
...@@ -530,7 +530,7 @@ int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) { ...@@ -530,7 +530,7 @@ int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) {
int32_t kLen = sizeof(pSmaCfg->indexUid); int32_t kLen = sizeof(pSmaCfg->indexUid);
int32_t vLen = POINTER_DISTANCE(qBuf, pBuf); int32_t vLen = POINTER_DISTANCE(qBuf, pBuf);
ret = tdbDbPut(pMeta->pDB->pSmaDB, key, kLen, val, vLen, &pMetaDb->txn); ret = tdbDbInsert(pMeta->pDB->pSmaDB, key, kLen, val, vLen, &pMetaDb->txn);
if (ret < 0) { if (ret < 0) {
taosMemoryFreeClear(pBuf); taosMemoryFreeClear(pBuf);
return -1; return -1;
...@@ -545,7 +545,7 @@ int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) { ...@@ -545,7 +545,7 @@ int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) {
val = NULL; val = NULL;
vLen = 0; vLen = 0;
ret = tdbDbPut(pMeta->pDB->pSmaIdx, key, kLen, val, vLen, &pMetaDb->txn); ret = tdbDbInsert(pMeta->pDB->pSmaIdx, key, kLen, val, vLen, &pMetaDb->txn);
if (ret < 0) { if (ret < 0) {
taosMemoryFreeClear(pBuf); taosMemoryFreeClear(pBuf);
return -1; return -1;
......
...@@ -72,44 +72,61 @@ _err: ...@@ -72,44 +72,61 @@ _err:
} }
int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) { int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) {
SMetaReader mr = {0}; TDBC *pNameIdxc = NULL;
TDBC *pUidIdxc = NULL;
// validate req TDBC *pCtbIdxc = NULL;
metaReaderInit(&mr, pMeta, 0); SCtbIdxKey *pCtbIdxKey;
if (metaGetTableEntryByUid(&mr, pReq->suid) < 0) { const void *pKey = NULL;
terrno = TSDB_CODE_VND_TABLE_NOT_EXIST; int nKey;
const void *pData = NULL;
int nData;
int c, ret;
// prepare uid idx cursor
tdbDbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
ret = tdbDbcMoveTo(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &c);
if (ret < 0 || c != 0) {
terrno = TSDB_CODE_VND_TB_NOT_EXIST;
tdbDbcClose(pUidIdxc);
goto _err; goto _err;
} }
// do drop // prepare name idx cursor
// drop from pTbDb tdbDbcOpen(pMeta->pNameIdx, &pNameIdxc, &pMeta->txn);
// drop from pSkmDb ret = tdbDbcMoveTo(pNameIdxc, pReq->name, strlen(pReq->name) + 1, &c);
// drop from pUidIdx if (ret < 0 || c != 0) {
// drop from pNameIdx ASSERT(0);
// { }
// TDBC *pDbc1 = NULL;
// void *pKey = NULL; tdbDbcDelete(pUidIdxc);
// void *pVal = NULL; tdbDbcDelete(pNameIdxc);
// int kLen = 0; tdbDbcClose(pUidIdxc);
// int vLen = 0; tdbDbcClose(pNameIdxc);
// int ret = 0;
// loop to drop each child table
// // drop from pCtbIdx tdbDbcOpen(pMeta->pCtbIdx, &pCtbIdxc, &pMeta->txn);
// ret = tdbDbcOpen(pMeta->pCtbIdx, &pDbc1); ret = tdbDbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = pReq->suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c);
// tdbDbcMoveTo(pDbc1, &pReq->suid, sizeof(pReq->suid), NULL /*cmpr*/, 0 /*TDB_FORWARD_SEARCH*/); if (ret < 0 || (c < 0 && tdbDbcMoveToNext(pCtbIdxc) < 0)) {
// tdbDbcGet(pDbc1, &pKey, &kLen, &pVal, vLen); tdbDbcClose(pCtbIdxc);
// tdbDbcDrop(pDbc1); goto _exit;
// // drop from pTagIdx }
// // drop from pTtlIdx
// } for (;;) {
tdbDbcGet(pCtbIdxc, &pKey, &nKey, NULL, NULL);
// clear and return pCtbIdxKey = (SCtbIdxKey *)pKey;
metaReaderClear(&mr);
metaError("vgId:%d super table %s uid:%" PRId64 " is dropped", TD_VID(pMeta->pVnode), pReq->name, pReq->suid); if (pCtbIdxKey->suid > pReq->suid) break;
// drop the child table (TODO)
if (tdbDbcMoveToNext(pCtbIdxc) < 0) break;
}
_exit:
metaDebug("vgId:%d super table %s uid:%" PRId64 " is dropped", TD_VID(pMeta->pVnode), pReq->name, pReq->suid);
return 0; return 0;
_err: _err:
metaReaderClear(&mr);
metaError("vgId:%d failed to drop super table %s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), pReq->name, metaError("vgId:%d failed to drop super table %s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), pReq->name,
pReq->suid, tstrerror(terrno)); pReq->suid, tstrerror(terrno));
return -1; return -1;
...@@ -166,18 +183,122 @@ _err: ...@@ -166,18 +183,122 @@ _err:
return -1; return -1;
} }
int metaDropTable(SMeta *pMeta, tb_uid_t uid) { int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) {
#if 0 TDBC *pTbDbc = NULL;
if (metaRemoveTableFromIdx(pMeta, uid) < 0) { TDBC *pUidIdxc = NULL;
// TODO: handle error TDBC *pNameIdxc = NULL;
const void *pData;
int nData;
tb_uid_t uid;
int64_t tver;
SMetaEntry me = {0};
SCoder coder = {0};
int8_t type;
int64_t ctime;
tb_uid_t suid;
int c, ret;
// search & delete the name idx
tdbDbcOpen(pMeta->pNameIdx, &pNameIdxc, &pMeta->txn);
ret = tdbDbcMoveTo(pNameIdxc, pReq->name, strlen(pReq->name) + 1, &c);
if (ret < 0 || c) {
tdbDbcClose(pNameIdxc);
terrno = TSDB_CODE_VND_TABLE_NOT_EXIST;
return -1; return -1;
} }
if (metaRemoveTableFromIdx(pMeta, uid) < 0) { ret = tdbDbcGet(pNameIdxc, NULL, NULL, &pData, &nData);
// TODO if (ret < 0) {
ASSERT(0);
return -1; return -1;
} }
#endif
uid = *(tb_uid_t *)pData;
tdbDbcDelete(pNameIdxc);
tdbDbcClose(pNameIdxc);
// search & delete uid idx
tdbDbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
ret = tdbDbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c);
if (ret < 0 || c != 0) {
ASSERT(0);
return -1;
}
ret = tdbDbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
if (ret < 0) {
ASSERT(0);
return -1;
}
tver = *(int64_t *)pData;
tdbDbcDelete(pUidIdxc);
tdbDbcClose(pUidIdxc);
// search and get meta entry
tdbDbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn);
ret = tdbDbcMoveTo(pTbDbc, &(STbDbKey){.uid = uid, .version = tver}, sizeof(STbDbKey), &c);
if (ret < 0 || c != 0) {
ASSERT(0);
return -1;
}
ret = tdbDbcGet(pTbDbc, NULL, NULL, &pData, &nData);
if (ret < 0) {
ASSERT(0);
return -1;
}
// decode entry
void *pDataCopy = taosMemoryMalloc(nData); // remove the copy (todo)
memcpy(pDataCopy, pData, nData);
tCoderInit(&coder, TD_LITTLE_ENDIAN, pDataCopy, nData, TD_DECODER);
ret = metaDecodeEntry(&coder, &me);
if (ret < 0) {
ASSERT(0);
return -1;
}
type = me.type;
if (type == TSDB_CHILD_TABLE) {
ctime = me.ctbEntry.ctime;
suid = me.ctbEntry.suid;
} else if (type == TSDB_NORMAL_TABLE) {
ctime = me.ntbEntry.ctime;
suid = 0;
} else {
ASSERT(0);
}
taosMemoryFree(pDataCopy);
tCoderClear(&coder);
tdbDbcClose(pTbDbc);
if (type == TSDB_CHILD_TABLE) {
// remove the pCtbIdx
TDBC *pCtbIdxc = NULL;
tdbDbcOpen(pMeta->pCtbIdx, &pCtbIdxc, &pMeta->txn);
ret = tdbDbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = suid, .uid = uid}, sizeof(SCtbIdxKey), &c);
if (ret < 0 || c != 0) {
ASSERT(0);
return -1;
}
tdbDbcDelete(pCtbIdxc);
tdbDbcClose(pCtbIdxc);
// remove tags from pTagIdx (todo)
} else if (type == TSDB_NORMAL_TABLE) {
// remove from pSkmDb
} else {
ASSERT(0);
}
// remove from ttl (todo)
if (ctime > 0) {
}
return 0; return 0;
} }
...@@ -218,7 +339,7 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) { ...@@ -218,7 +339,7 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
tCoderClear(&coder); tCoderClear(&coder);
// write to table.db // write to table.db
if (tdbDbPut(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) { if (tdbDbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) {
goto _err; goto _err;
} }
...@@ -231,11 +352,11 @@ _err: ...@@ -231,11 +352,11 @@ _err:
} }
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) { static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbDbPut(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn); return tdbDbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn);
} }
static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) { static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbDbPut(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), &pMeta->txn); return tdbDbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), &pMeta->txn);
} }
static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME) { static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME) {
...@@ -258,12 +379,12 @@ static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME) { ...@@ -258,12 +379,12 @@ static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME) {
ttlKey.dtime = ctime + ttlDays * 24 * 60 * 60; ttlKey.dtime = ctime + ttlDays * 24 * 60 * 60;
ttlKey.uid = pME->uid; ttlKey.uid = pME->uid;
return tdbDbPut(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), NULL, 0, &pMeta->txn); return tdbDbInsert(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), NULL, 0, &pMeta->txn);
} }
static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) { static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) {
SCtbIdxKey ctbIdxKey = {.suid = pME->ctbEntry.suid, .uid = pME->uid}; SCtbIdxKey ctbIdxKey = {.suid = pME->ctbEntry.suid, .uid = pME->uid};
return tdbDbPut(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), NULL, 0, &pMeta->txn); return tdbDbInsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), NULL, 0, &pMeta->txn);
} }
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pME) { static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pME) {
...@@ -304,7 +425,7 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) { ...@@ -304,7 +425,7 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
tCoderInit(&coder, TD_LITTLE_ENDIAN, pVal, vLen, TD_ENCODER); tCoderInit(&coder, TD_LITTLE_ENDIAN, pVal, vLen, TD_ENCODER);
tEncodeSSchemaWrapper(&coder, pSW); tEncodeSSchemaWrapper(&coder, pSW);
if (tdbDbPut(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), pVal, vLen, &pMeta->txn) < 0) { if (tdbDbInsert(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), pVal, vLen, &pMeta->txn) < 0) {
rcode = -1; rcode = -1;
goto _exit; goto _exit;
} }
......
...@@ -97,7 +97,7 @@ int32_t tsdbCloseDBF(SDBFile *pDBF) { ...@@ -97,7 +97,7 @@ int32_t tsdbCloseDBF(SDBFile *pDBF) {
int32_t tsdbSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, int32_t valLen, TXN *txn) { int32_t tsdbSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, int32_t valLen, TXN *txn) {
int32_t ret; int32_t ret;
ret = tdbDbPut(pDBF->pDB, pKey, keyLen, pVal, valLen, txn); ret = tdbDbInsert(pDBF->pDB, pKey, keyLen, pVal, valLen, txn);
if (ret < 0) { if (ret < 0) {
tsdbError("Failed to create insert sma data into db, ret = %d", ret); tsdbError("Failed to create insert sma data into db, ret = %d", ret);
return -1; return -1;
......
...@@ -445,14 +445,45 @@ static int vnodeProcessAlterTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcM ...@@ -445,14 +445,45 @@ static int vnodeProcessAlterTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcM
} }
static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SVDropTbReq req = {0}; SVDropTbBatchReq req = {0};
SVDropTbReq rsp = {0}; SVDropTbBatchRsp rsp = {0};
SCoder coder = {0};
int ret;
pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
pRsp->pCont = NULL;
pRsp->contLen = 0;
pRsp->code = TSDB_CODE_SUCCESS;
// decode req // decode req
tCoderInit(&coder, TD_LITTLE_ENDIAN, pReq, len, TD_DECODER);
ret = tDecodeSVDropTbBatchReq(&coder, &req);
if (ret < 0) {
terrno = TSDB_CODE_INVALID_MSG;
pRsp->code = terrno;
goto _exit;
}
// process req // process req
rsp.pArray = taosArrayInit(sizeof(SVDropTbRsp), req.nReqs);
for (int iReq = 0; iReq < req.nReqs; iReq++) {
SVDropTbReq *pDropTbReq = req.pReqs + iReq;
SVDropTbRsp dropTbRsp = {0};
// return rsp /* code */
ret = metaDropTable(pVnode->pMeta, version, pDropTbReq);
if (ret < 0) {
dropTbRsp.code = TSDB_CODE_SUCCESS;
} else {
dropTbRsp.code = terrno;
}
taosArrayPush(rsp.pArray, &dropTbRsp);
}
_exit:
tCoderClear(&coder);
// encode rsp (TODO)
return 0; return 0;
} }
...@@ -482,7 +513,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in ...@@ -482,7 +513,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
} }
int32_t tsdbProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) { int32_t tsdbProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) {
if(!pReq) { if (!pReq) {
terrno = TSDB_CODE_INVALID_PTR; terrno = TSDB_CODE_INVALID_PTR;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
......
...@@ -4830,6 +4830,7 @@ static SArray* createSortInfo(SNodeList* pNodeList, SNodeList* pNodeListTarget); ...@@ -4830,6 +4830,7 @@ static SArray* createSortInfo(SNodeList* pNodeList, SNodeList* pNodeListTarget);
static SArray* createIndexMap(SNodeList* pNodeList); static SArray* createIndexMap(SNodeList* pNodeList);
static SArray* extractPartitionColInfo(SNodeList* pNodeList); static SArray* extractPartitionColInfo(SNodeList* pNodeList);
static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode); static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
static void setJoinColumnInfo(SColumnInfo* pInfo, const SColumnNode* pLeftNode);
static SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) { static SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
SInterval interval = { SInterval interval = {
...@@ -5624,25 +5625,29 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf ...@@ -5624,25 +5625,29 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf
goto _error; goto _error;
} }
pOperator->resultInfo.capacity = 4096; initResultSizeInfo(pOperator, 4096);
pOperator->resultInfo.threshold = 4096 * 0.75;
// initResultRowInf
// o(&pInfo->binfo.resultRowInfo, 8);
pInfo->pRes = pResBlock;
pOperator->name = "JoinOperator"; pInfo->pRes = pResBlock;
pOperator->name = "MergeJoinOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_JOIN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_JOIN;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo; pOperator->pExpr = pExprInfo;
pOperator->numOfOutput = numOfCols; pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
SOperatorNode* pNode = (SOperatorNode*)pOnCondition;
setJoinColumnInfo(&pInfo->leftCol, (SColumnNode*)pNode->pLeft);
setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pNode->pRight);
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyBasicOperatorInfo, NULL, NULL, NULL); createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyBasicOperatorInfo, NULL, NULL, NULL);
int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream); int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
return pOperator; return pOperator;
_error: _error:
...@@ -5651,3 +5656,11 @@ _error: ...@@ -5651,3 +5656,11 @@ _error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) {
pColumn->slotId = pColumnNode->slotId;
pColumn->type = pColumnNode->node.resType.type;
pColumn->bytes = pColumnNode->node.resType.bytes;
pColumn->precision = pColumnNode->node.resType.precision;
pColumn->scale = pColumnNode->node.resType.scale;
}
...@@ -913,7 +913,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -913,7 +913,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "tbname", .name = "tbname",
.type = FUNCTION_TYPE_TBNAME, .type = FUNCTION_TYPE_TBNAME,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC, .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_SCAN_PC_FUNC,
.translateFunc = translateTbnameColumn, .translateFunc = translateTbnameColumn,
.getEnvFunc = NULL, .getEnvFunc = NULL,
.initFunc = NULL, .initFunc = NULL,
......
...@@ -100,6 +100,17 @@ void generateInformationSchema(MockCatalogService* mcs) { ...@@ -100,6 +100,17 @@ void generateInformationSchema(MockCatalogService* mcs) {
} }
} }
/*
* Table:t1
* Field | Type | DataType | Bytes |
* ==========================================================================
* ts | column | TIMESTAMP | 8 |
* c1 | column | INT | 4 |
* c2 | column | VARCHAR | 20 |
* c3 | column | BIGINT | 8 |
* c4 | column | DOUBLE | 8 |
* c5 | column | DOUBLE | 8 |
*/
void generateTestT1(MockCatalogService* mcs) { void generateTestT1(MockCatalogService* mcs) {
ITableBuilder& builder = mcs->createTableBuilder("test", "t1", TSDB_NORMAL_TABLE, 6) ITableBuilder& builder = mcs->createTableBuilder("test", "t1", TSDB_NORMAL_TABLE, 6)
.setPrecision(TSDB_TIME_PRECISION_MILLI) .setPrecision(TSDB_TIME_PRECISION_MILLI)
...@@ -113,6 +124,17 @@ void generateTestT1(MockCatalogService* mcs) { ...@@ -113,6 +124,17 @@ void generateTestT1(MockCatalogService* mcs) {
builder.done(); builder.done();
} }
/*
* Super Table: st1
* Field | Type | DataType | Bytes |
* ==========================================================================
* ts | column | TIMESTAMP | 8 |
* c1 | column | INT | 4 |
* c2 | column | VARCHAR | 20 |
* tag1 | tag | INT | 4 |
* tag2 | tag | VARCHAR | 20 |
* Child Table: st1s1, st1s2
*/
void generateTestST1(MockCatalogService* mcs) { void generateTestST1(MockCatalogService* mcs) {
ITableBuilder& builder = mcs->createTableBuilder("test", "st1", TSDB_SUPER_TABLE, 3, 2) ITableBuilder& builder = mcs->createTableBuilder("test", "st1", TSDB_SUPER_TABLE, 3, 2)
.setPrecision(TSDB_TIME_PRECISION_MILLI) .setPrecision(TSDB_TIME_PRECISION_MILLI)
......
...@@ -23,4 +23,6 @@ TEST_F(PlanSuperTableTest, tbname) { ...@@ -23,4 +23,6 @@ TEST_F(PlanSuperTableTest, tbname) {
useDb("root", "test"); useDb("root", "test");
run("select tbname from st1"); run("select tbname from st1");
run("select tbname, tag1, tag2 from st1");
} }
...@@ -36,11 +36,11 @@ class PlannerEnv : public testing::Environment { ...@@ -36,11 +36,11 @@ class PlannerEnv : public testing::Environment {
static void parseArg(int argc, char* argv[]) { static void parseArg(int argc, char* argv[]) {
int opt = 0; int opt = 0;
const char* optstring = ""; const char* optstring = "";
static struct option long_options[] = {{"dump", no_argument, NULL, 'd'}, {0, 0, 0, 0}}; static struct option long_options[] = {{"dump", optional_argument, NULL, 'd'}, {0, 0, 0, 0}};
while ((opt = getopt_long(argc, argv, optstring, long_options, NULL)) != -1) { while ((opt = getopt_long(argc, argv, optstring, long_options, NULL)) != -1) {
switch (opt) { switch (opt) {
case 'd': case 'd':
g_isDump = true; setDumpModule(optarg);
break; break;
default: default:
break; break;
......
...@@ -34,7 +34,41 @@ using namespace testing; ...@@ -34,7 +34,41 @@ using namespace testing;
} \ } \
} while (0); } while (0);
bool g_isDump = false; enum DumpModule {
DUMP_MODULE_NOTHING = 1,
DUMP_MODULE_PARSER,
DUMP_MODULE_LOGIC,
DUMP_MODULE_OPTIMIZED,
DUMP_MODULE_SPLIT,
DUMP_MODULE_SCALED,
DUMP_MODULE_PHYSICAL,
DUMP_MODULE_SUBPLAN,
DUMP_MODULE_ALL
};
DumpModule g_dumpModule = DUMP_MODULE_NOTHING;
void setDumpModule(const char* pModule) {
if (NULL == pModule) {
g_dumpModule = DUMP_MODULE_ALL;
} else if (0 == strncasecmp(pModule, "parser", strlen(pModule))) {
g_dumpModule = DUMP_MODULE_PARSER;
} else if (0 == strncasecmp(pModule, "logic", strlen(pModule))) {
g_dumpModule = DUMP_MODULE_LOGIC;
} else if (0 == strncasecmp(pModule, "optimized", strlen(pModule))) {
g_dumpModule = DUMP_MODULE_OPTIMIZED;
} else if (0 == strncasecmp(pModule, "split", strlen(pModule))) {
g_dumpModule = DUMP_MODULE_SPLIT;
} else if (0 == strncasecmp(pModule, "scaled", strlen(pModule))) {
g_dumpModule = DUMP_MODULE_SCALED;
} else if (0 == strncasecmp(pModule, "physical", strlen(pModule))) {
g_dumpModule = DUMP_MODULE_PHYSICAL;
} else if (0 == strncasecmp(pModule, "subplan", strlen(pModule))) {
g_dumpModule = DUMP_MODULE_SUBPLAN;
} else if (0 == strncasecmp(pModule, "all", strlen(pModule))) {
g_dumpModule = DUMP_MODULE_PHYSICAL;
}
}
class PlannerTestBaseImpl { class PlannerTestBaseImpl {
public: public:
...@@ -66,11 +100,9 @@ class PlannerTestBaseImpl { ...@@ -66,11 +100,9 @@ class PlannerTestBaseImpl {
SQueryPlan* pPlan = nullptr; SQueryPlan* pPlan = nullptr;
doCreatePhysiPlan(&cxt, pLogicPlan, &pPlan); doCreatePhysiPlan(&cxt, pLogicPlan, &pPlan);
if (g_isDump) { dump(g_dumpModule);
dump();
}
} catch (...) { } catch (...) {
dump(); dump(DUMP_MODULE_ALL);
throw; throw;
} }
} }
...@@ -109,23 +141,48 @@ class PlannerTestBaseImpl { ...@@ -109,23 +141,48 @@ class PlannerTestBaseImpl {
res_.physiSubplans_.clear(); res_.physiSubplans_.clear();
} }
void dump() { void dump(DumpModule module) {
if (DUMP_MODULE_NOTHING == module) {
return;
}
cout << "==========================================sql : [" << stmtEnv_.sql_ << "]" << endl; cout << "==========================================sql : [" << stmtEnv_.sql_ << "]" << endl;
cout << "syntax tree : " << endl;
cout << res_.ast_ << endl; if (DUMP_MODULE_ALL == module || DUMP_MODULE_PARSER == module) {
cout << "raw logic plan : " << endl; cout << "syntax tree : " << endl;
cout << res_.rawLogicPlan_ << endl; cout << res_.ast_ << endl;
cout << "optimized logic plan : " << endl; }
cout << res_.optimizedLogicPlan_ << endl;
cout << "split logic plan : " << endl; if (DUMP_MODULE_ALL == module || DUMP_MODULE_LOGIC == module) {
cout << res_.splitLogicPlan_ << endl; cout << "raw logic plan : " << endl;
cout << "scaled logic plan : " << endl; cout << res_.rawLogicPlan_ << endl;
cout << res_.scaledLogicPlan_ << endl; }
cout << "physical plan : " << endl;
cout << res_.physiPlan_ << endl; if (DUMP_MODULE_ALL == module || DUMP_MODULE_OPTIMIZED == module) {
cout << "physical subplan : " << endl; cout << "optimized logic plan : " << endl;
for (const auto& subplan : res_.physiSubplans_) { cout << res_.optimizedLogicPlan_ << endl;
cout << subplan << endl; }
if (DUMP_MODULE_ALL == module || DUMP_MODULE_SPLIT == module) {
cout << "split logic plan : " << endl;
cout << res_.splitLogicPlan_ << endl;
}
if (DUMP_MODULE_ALL == module || DUMP_MODULE_SCALED == module) {
cout << "scaled logic plan : " << endl;
cout << res_.scaledLogicPlan_ << endl;
}
if (DUMP_MODULE_ALL == module || DUMP_MODULE_PHYSICAL == module) {
cout << "physical plan : " << endl;
cout << res_.physiPlan_ << endl;
}
if (DUMP_MODULE_ALL == module || DUMP_MODULE_SUBPLAN == module) {
cout << "physical subplan : " << endl;
for (const auto& subplan : res_.physiSubplans_) {
cout << subplan << endl;
}
} }
} }
......
...@@ -32,6 +32,6 @@ class PlannerTestBase : public testing::Test { ...@@ -32,6 +32,6 @@ class PlannerTestBase : public testing::Test {
std::unique_ptr<PlannerTestBaseImpl> impl_; std::unique_ptr<PlannerTestBaseImpl> impl_;
}; };
extern bool g_isDump; extern void setDumpModule(const char* pModule);
#endif // PLAN_TEST_UTIL_H #endif // PLAN_TEST_UTIL_H
...@@ -297,6 +297,22 @@ static FORCE_INLINE void varToNchar(char* buf, SScalarParam* pOut, int32_t rowIn ...@@ -297,6 +297,22 @@ static FORCE_INLINE void varToNchar(char* buf, SScalarParam* pOut, int32_t rowIn
taosMemoryFree(t); taosMemoryFree(t);
} }
static FORCE_INLINE void ncharToVar(char* buf, SScalarParam* pOut, int32_t rowIndex) {
int32_t inputLen = varDataLen(buf);
char* t = taosMemoryCalloc(1, inputLen + VARSTR_HEADER_SIZE);
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(buf), varDataLen(buf), varDataVal(t));
if (len < 0) {
taosMemoryFree(t);
return;
}
varDataSetLen(t, len);
colDataAppend(pOut->columnData, rowIndex, t, false);
taosMemoryFree(t);
}
//TODO opt performance, tmp is not needed. //TODO opt performance, tmp is not needed.
int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, int32_t inType, int32_t outType) { int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, int32_t inType, int32_t outType) {
int32_t bufSize = pIn->columnData->info.bytes; int32_t bufSize = pIn->columnData->info.bytes;
...@@ -313,6 +329,10 @@ int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, in ...@@ -313,6 +329,10 @@ int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, in
func = varToUnsigned; func = varToUnsigned;
} else if (IS_FLOAT_TYPE(outType)) { } else if (IS_FLOAT_TYPE(outType)) {
func = varToFloat; func = varToFloat;
} else if (outType == TSDB_DATA_TYPE_BINARY) { // nchar -> binary
ASSERT(inType == TSDB_DATA_TYPE_NCHAR);
func = ncharToVar;
vton = true;
} else if (outType == TSDB_DATA_TYPE_NCHAR) { // binary -> nchar } else if (outType == TSDB_DATA_TYPE_NCHAR) { // binary -> nchar
ASSERT(inType == TSDB_DATA_TYPE_VARCHAR); ASSERT(inType == TSDB_DATA_TYPE_VARCHAR);
func = varToNchar; func = varToNchar;
...@@ -608,7 +628,7 @@ int8_t gConvertTypes[TSDB_DATA_TYPE_BLOB+1][TSDB_DATA_TYPE_BLOB+1] = { ...@@ -608,7 +628,7 @@ int8_t gConvertTypes[TSDB_DATA_TYPE_BLOB+1][TSDB_DATA_TYPE_BLOB+1] = {
/*BIGI*/ 0, 0, 0, 0, 0, 0, 6, 7, 7, 0, 7, 5, 5, 5, 7, 0, 7, 0, 0, /*BIGI*/ 0, 0, 0, 0, 0, 0, 6, 7, 7, 0, 7, 5, 5, 5, 7, 0, 7, 0, 0,
/*FLOA*/ 0, 0, 0, 0, 0, 0, 0, 7, 7, 6, 7, 6, 6, 6, 6, 0, 7, 0, 0, /*FLOA*/ 0, 0, 0, 0, 0, 0, 0, 7, 7, 6, 7, 6, 6, 6, 6, 0, 7, 0, 0,
/*DOUB*/ 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 7, 7, 7, 0, 7, 0, 0, /*DOUB*/ 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 7, 7, 7, 0, 7, 0, 0,
/*VARC*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 0, 7, 7, 7, 7, 0, 0, 0, 0, /*VARC*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 8, 7, 7, 7, 7, 0, 0, 0, 0,
/*TIME*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 9, 9, 9, 7, 0, 7, 0, 0, /*TIME*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 9, 9, 9, 7, 0, 7, 0, 0,
/*NCHA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 0, 0, 0, 0, /*NCHA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 0, 0, 0, 0,
/*UTIN*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 13, 14, 0, 7, 0, 0, /*UTIN*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 13, 14, 0, 7, 0, 0,
......
...@@ -40,7 +40,9 @@ int tdbCommit(TENV *pEnv, TXN *pTxn); ...@@ -40,7 +40,9 @@ int tdbCommit(TENV *pEnv, TXN *pTxn);
int tdbDbOpen(const char *fname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TENV *pEnv, TDB **ppDb); int tdbDbOpen(const char *fname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TENV *pEnv, TDB **ppDb);
int tdbDbClose(TDB *pDb); int tdbDbClose(TDB *pDb);
int tdbDbDrop(TDB *pDb); int tdbDbDrop(TDB *pDb);
int tdbDbPut(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn); int tdbDbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn);
int tdbDbDelete(TDB *pDb, const void *pKey, int kLen, TXN *pTxn);
int tdbDbUpsert(TDB *pDb, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn);
int tdbDbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen); int tdbDbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen);
int tdbDbPGet(TDB *pDb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen); int tdbDbPGet(TDB *pDb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen);
...@@ -53,11 +55,9 @@ int tdbDbcMoveToLast(TDBC *pDbc); ...@@ -53,11 +55,9 @@ int tdbDbcMoveToLast(TDBC *pDbc);
int tdbDbcMoveToNext(TDBC *pDbc); int tdbDbcMoveToNext(TDBC *pDbc);
int tdbDbcMoveToPrev(TDBC *pDbc); int tdbDbcMoveToPrev(TDBC *pDbc);
int tdbDbcGet(TDBC *pDbc, const void **ppKey, int *pkLen, const void **ppVal, int *pvLen); int tdbDbcGet(TDBC *pDbc, const void **ppKey, int *pkLen, const void **ppVal, int *pvLen);
int tdbDbcDelete(TDBC *pDbc);
int tdbDbcPut(TDBC *pDbc, const void *pKey, int keyLen, const void *pVal, int valLen);
int tdbDbcUpdate(TDBC *pDbc, const void *pKey, int kLen, const void *pVal, int vLen);
int tdbDbcDrop(TDBC *pDbc);
int tdbDbcNext(TDBC *pDbc, void **ppKey, int *kLen, void **ppVal, int *vLen); int tdbDbcNext(TDBC *pDbc, void **ppKey, int *kLen, void **ppVal, int *vLen);
int tdbDbcUpsert(TDBC *pDbc, const void *pKey, int nKey, const void *pData, int nData, int insert);
// TXN // TXN
#define TDB_TXN_WRITE 0x1 #define TDB_TXN_WRITE 0x1
......
...@@ -138,67 +138,90 @@ int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, in ...@@ -138,67 +138,90 @@ int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, in
} }
if (btc.idx == -1) { if (btc.idx == -1) {
idx = 0; btc.idx = 0;
} else { } else {
if (c > 0) { if (c > 0) {
idx = btc.idx + 1; btc.idx++;
} else if (c < 0) { } else if (c == 0) {
idx = btc.idx; // dup key not allowed
} else {
// TDB does NOT allow same key
tdbBtcClose(&btc);
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
} }
// make sure enough space to hold the cell ret = tdbBtcUpsert(&btc, pKey, kLen, pVal, vLen, 1);
szBuf = kLen + vLen + 14; if (ret < 0) {
pBuf = tdbRealloc(pBt->pBuf, pBt->pageSize > szBuf ? szBuf : pBt->pageSize);
if (pBuf == NULL) {
tdbBtcClose(&btc);
ASSERT(0); ASSERT(0);
tdbBtcClose(&btc);
return -1; return -1;
} }
pBt->pBuf = pBuf;
pCell = (SCell *)pBt->pBuf;
// encode cell tdbBtcClose(&btc);
ret = tdbBtreeEncodeCell(btc.pPage, pKey, kLen, pVal, vLen, pCell, &szCell); return 0;
}
int tdbBtreeDelete(SBTree *pBt, const void *pKey, int kLen, TXN *pTxn) {
SBTC btc;
int c;
int ret;
tdbBtcOpen(&btc, pBt, pTxn);
// move the cursor
ret = tdbBtcMoveTo(&btc, pKey, kLen, &c);
if (ret < 0) { if (ret < 0) {
tdbBtcClose(&btc); tdbBtcClose(&btc);
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
// mark the page dirty if (btc.idx < 0 || c != 0) {
ret = tdbPagerWrite(pBt->pPager, btc.pPage);
if (ret < 0) {
tdbBtcClose(&btc); tdbBtcClose(&btc);
ASSERT(0);
return -1; return -1;
} }
// insert the cell // delete the key
ret = tdbPageInsertCell(btc.pPage, idx, pCell, szCell, 0); if (tdbBtcDelete(&btc) < 0) {
if (ret < 0) {
tdbBtcClose(&btc); tdbBtcClose(&btc);
return -1;
}
tdbBtcClose(&btc);
return 0;
}
int tdbBtreeUpsert(SBTree *pBt, const void *pKey, int nKey, const void *pData, int nData, TXN *pTxn) {
SBTC btc;
int c;
int ret;
tdbBtcOpen(&btc, pBt, pTxn);
// move the cursor
ret = tdbBtcMoveTo(&btc, pKey, nKey, &c);
if (ret < 0) {
ASSERT(0); ASSERT(0);
tdbBtcClose(&btc);
return -1; return -1;
} }
// check if need balance if (btc.idx == -1) {
if (btc.pPage->nOverflow > 0) { btc.idx = 0;
ret = tdbBtreeBalance(&btc); c = 1;
if (ret < 0) { } else {
tdbBtcClose(&btc); if (c > 0) {
ASSERT(0); btc.idx = btc.idx + 1;
return -1;
} }
} }
tdbBtcClose(&btc); ret = tdbBtcUpsert(&btc, pKey, nKey, pData, nData, c);
if (ret < 0) {
ASSERT(0);
tdbBtcClose(&btc);
return -1;
}
tdbBtcClose(&btc);
return 0; return 0;
} }
...@@ -552,14 +575,14 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx ...@@ -552,14 +575,14 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
SCell *pCell; SCell *pCell;
int szLCell, szRCell; int szLCell, szRCell;
// balance page (iNew) and (iNew-1)
for (;;) { for (;;) {
pCell = tdbPageGetCell(pOlds[infoNews[iNew - 1].iPage], infoNews[iNew - 1].oIdx); pCell = tdbPageGetCell(pOlds[infoNews[iNew - 1].iPage], infoNews[iNew - 1].oIdx);
if (childNotLeaf) { szLCell = tdbBtreeCellSize(pOlds[infoNews[iNew - 1].iPage], pCell);
szLCell = szRCell = tdbBtreeCellSize(pOlds[infoNews[iNew - 1].iPage], pCell); if (!childNotLeaf) {
szRCell = szLCell;
} else { } else {
szLCell = tdbBtreeCellSize(pOlds[infoNews[iNew - 1].iPage], pCell);
int iPage = infoNews[iNew - 1].iPage; int iPage = infoNews[iNew - 1].iPage;
int oIdx = infoNews[iNew - 1].oIdx + 1; int oIdx = infoNews[iNew - 1].oIdx + 1;
SPage *pPage; SPage *pPage;
...@@ -736,6 +759,13 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx ...@@ -736,6 +759,13 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
} }
} }
if (TDB_BTREE_PAGE_IS_ROOT(pParent) && TDB_PAGE_TOTAL_CELLS(pParent) == 0) {
i8 flags = TDB_BTREE_ROOT | TDB_BTREE_PAGE_IS_LEAF(pNews[0]);
// copy content to the parent page
tdbBtreeInitPage(pParent, &(SBtreeInitPageArg){.flags = flags, .pBt = pBt}, 0);
tdbPageCopy(pNews[0], pParent);
}
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
if (pDivCell[i]) { if (pDivCell[i]) {
tdbOsFree(pDivCell[i]); tdbOsFree(pDivCell[i]);
...@@ -1357,7 +1387,143 @@ int tdbBtcGet(SBTC *pBtc, const void **ppKey, int *kLen, const void **ppVal, int ...@@ -1357,7 +1387,143 @@ int tdbBtcGet(SBTC *pBtc, const void **ppKey, int *kLen, const void **ppVal, int
if (ppVal) { if (ppVal) {
*ppVal = (void *)pBtc->coder.pVal; *ppVal = (void *)pBtc->coder.pVal;
*kLen = pBtc->coder.vLen; *vLen = pBtc->coder.vLen;
}
return 0;
}
int tdbBtcDelete(SBTC *pBtc) {
int idx = pBtc->idx;
int nCells = TDB_PAGE_TOTAL_CELLS(pBtc->pPage);
SPager *pPager = pBtc->pBt->pPager;
const void *pKey;
i8 iPage;
SPage *pPage;
SPgno pgno;
SCell *pCell;
int szCell;
int nKey;
int ret;
ASSERT(idx >= 0 && idx < nCells);
// drop the cell on the leaf
ret = tdbPagerWrite(pPager, pBtc->pPage);
if (ret < 0) {
ASSERT(0);
return -1;
}
tdbPageDropCell(pBtc->pPage, idx);
// update interior page or do balance
if (idx == nCells - 1) {
if (idx) {
pBtc->idx--;
tdbBtcGet(pBtc, &pKey, &nKey, NULL, NULL);
// loop to update the interial page
pgno = TDB_PAGE_PGNO(pBtc->pPage);
for (iPage = pBtc->iPage - 1; iPage >= 0; iPage--) {
pPage = pBtc->pgStack[iPage];
idx = pBtc->idxStack[iPage];
nCells = TDB_PAGE_TOTAL_CELLS(pPage);
if (idx < nCells) {
ret = tdbPagerWrite(pPager, pPage);
if (ret < 0) {
ASSERT(0);
return -1;
}
// update the cell with new key
pCell = tdbOsMalloc(nKey + 9);
tdbBtreeEncodeCell(pPage, pKey, nKey, &pgno, sizeof(pgno), pCell, &szCell);
ret = tdbPageUpdateCell(pPage, idx, pCell, szCell);
if (ret < 0) {
tdbOsFree(pCell);
ASSERT(0);
return -1;
}
tdbOsFree(pCell);
break;
} else {
pgno = TDB_PAGE_PGNO(pPage);
}
}
} else {
// delete the leaf page and do balance
ASSERT(TDB_PAGE_TOTAL_CELLS(pBtc->pPage) == 0);
ret = tdbBtreeBalance(pBtc);
if (ret < 0) {
ASSERT(0);
return -1;
}
}
}
return 0;
}
int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int nData, int insert) {
SCell *pCell;
int szCell;
int nCells = TDB_PAGE_TOTAL_CELLS(pBtc->pPage);
int szBuf;
void *pBuf;
int ret;
ASSERT(pBtc->idx >= 0);
// alloc space
szBuf = kLen + nData + 14;
pBuf = tdbRealloc(pBtc->pBt->pBuf, pBtc->pBt->pageSize > szBuf ? szBuf : pBtc->pBt->pageSize);
if (pBuf == NULL) {
ASSERT(0);
return -1;
}
pBtc->pBt->pBuf = pBuf;
pCell = (SCell *)pBtc->pBt->pBuf;
// encode cell
ret = tdbBtreeEncodeCell(pBtc->pPage, pKey, kLen, pData, nData, pCell, &szCell);
if (ret < 0) {
ASSERT(0);
return -1;
}
// mark dirty
ret = tdbPagerWrite(pBtc->pBt->pPager, pBtc->pPage);
if (ret < 0) {
ASSERT(0);
return -1;
}
// insert or update
if (insert) {
ASSERT(pBtc->idx <= nCells);
ret = tdbPageInsertCell(pBtc->pPage, pBtc->idx, pCell, szCell, 0);
} else {
ASSERT(pBtc->idx < nCells);
ret = tdbPageUpdateCell(pBtc->pPage, pBtc->idx, pCell, szCell);
}
if (ret < 0) {
ASSERT(0);
return -1;
}
// check balance
if (pBtc->pPage->nOverflow > 0) {
ret = tdbBtreeBalance(pBtc);
if (ret < 0) {
ASSERT(0);
return -1;
}
} }
return 0; return 0;
......
...@@ -75,10 +75,16 @@ int tdbDbDrop(TDB *pDb) { ...@@ -75,10 +75,16 @@ int tdbDbDrop(TDB *pDb) {
return 0; return 0;
} }
int tdbDbPut(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) { int tdbDbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) {
return tdbBtreeInsert(pDb->pBt, pKey, keyLen, pVal, valLen, pTxn); return tdbBtreeInsert(pDb->pBt, pKey, keyLen, pVal, valLen, pTxn);
} }
int tdbDbDelete(TDB *pDb, const void *pKey, int kLen, TXN *pTxn) { return tdbBtreeDelete(pDb->pBt, pKey, kLen, pTxn); }
int tdbDbUpsert(TDB *pDb, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn) {
return tdbBtreeUpsert(pDb->pBt, pKey, kLen, pVal, vLen, pTxn);
}
int tdbDbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen) { int tdbDbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen) {
return tdbBtreeGet(pDb->pBt, pKey, kLen, ppVal, vLen); return tdbBtreeGet(pDb->pBt, pKey, kLen, ppVal, vLen);
} }
...@@ -117,28 +123,16 @@ int tdbDbcGet(TDBC *pDbc, const void **ppKey, int *pkLen, const void **ppVal, in ...@@ -117,28 +123,16 @@ int tdbDbcGet(TDBC *pDbc, const void **ppKey, int *pkLen, const void **ppVal, in
return tdbBtcGet(&pDbc->btc, ppKey, pkLen, ppVal, pvLen); return tdbBtcGet(&pDbc->btc, ppKey, pkLen, ppVal, pvLen);
} }
int tdbDbcPut(TDBC *pDbc, const void *pKey, int keyLen, const void *pVal, int valLen) { int tdbDbcDelete(TDBC *pDbc) { return tdbBtcDelete(&pDbc->btc); }
// TODO
ASSERT(0);
return 0;
}
int tdbDbcUpdate(TDBC *pDbc, const void *pKey, int kLen, const void *pVal, int vLen) {
// TODO
ASSERT(0);
return 0;
}
int tdbDbcDrop(TDBC *pDbc) {
// TODO
ASSERT(0);
return 0;
}
int tdbDbcNext(TDBC *pDbc, void **ppKey, int *kLen, void **ppVal, int *vLen) { int tdbDbcNext(TDBC *pDbc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
return tdbBtreeNext(&pDbc->btc, ppKey, kLen, ppVal, vLen); return tdbBtreeNext(&pDbc->btc, ppKey, kLen, ppVal, vLen);
} }
int tdbDbcUpsert(TDBC *pDbc, const void *pKey, int nKey, const void *pData, int nData, int insert) {
return tdbBtcUpsert(&pDbc->btc, pKey, nKey, pData, nData, insert);
}
int tdbDbcClose(TDBC *pDbc) { int tdbDbcClose(TDBC *pDbc) {
if (pDbc) { if (pDbc) {
tdbBtcClose(&pDbc->btc); tdbBtcClose(&pDbc->btc);
......
...@@ -171,6 +171,11 @@ int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell, u8 asOvfl ...@@ -171,6 +171,11 @@ int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell, u8 asOvfl
return 0; return 0;
} }
int tdbPageUpdateCell(SPage *pPage, int idx, SCell *pCell, int szCell) {
tdbPageDropCell(pPage, idx);
return tdbPageInsertCell(pPage, idx, pCell, szCell, 0);
}
int tdbPageDropCell(SPage *pPage, int idx) { int tdbPageDropCell(SPage *pPage, int idx) {
int lidx; int lidx;
SCell *pCell; SCell *pCell;
......
...@@ -128,6 +128,8 @@ struct SBTC { ...@@ -128,6 +128,8 @@ struct SBTC {
int tdbBtreeOpen(int keyLen, int valLen, SPager *pFile, tdb_cmpr_fn_t kcmpr, SBTree **ppBt); int tdbBtreeOpen(int keyLen, int valLen, SPager *pFile, tdb_cmpr_fn_t kcmpr, SBTree **ppBt);
int tdbBtreeClose(SBTree *pBt); int tdbBtreeClose(SBTree *pBt);
int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn); int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn);
int tdbBtreeDelete(SBTree *pBt, const void *pKey, int kLen, TXN *pTxn);
int tdbBtreeUpsert(SBTree *pBt, const void *pKey, int nKey, const void *pData, int nData, TXN *pTxn);
int tdbBtreeGet(SBTree *pBt, const void *pKey, int kLen, void **ppVal, int *vLen); int tdbBtreeGet(SBTree *pBt, const void *pKey, int kLen, void **ppVal, int *vLen);
int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen); int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen);
...@@ -141,6 +143,8 @@ int tdbBtcMoveToNext(SBTC *pBtc); ...@@ -141,6 +143,8 @@ int tdbBtcMoveToNext(SBTC *pBtc);
int tdbBtcMoveToPrev(SBTC *pBtc); int tdbBtcMoveToPrev(SBTC *pBtc);
int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen); int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen);
int tdbBtcGet(SBTC *pBtc, const void **ppKey, int *kLen, const void **ppVal, int *vLen); int tdbBtcGet(SBTC *pBtc, const void **ppKey, int *kLen, const void **ppVal, int *vLen);
int tdbBtcDelete(SBTC *pBtc);
int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int nData, int insert);
// tdbPager.c ==================================== // tdbPager.c ====================================
...@@ -278,6 +282,7 @@ void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell ...@@ -278,6 +282,7 @@ void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell
void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *)); void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *));
int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell, u8 asOvfl); int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell, u8 asOvfl);
int tdbPageDropCell(SPage *pPage, int idx); int tdbPageDropCell(SPage *pPage, int idx);
int tdbPageUpdateCell(SPage *pPage, int idx, SCell *pCell, int szCell);
void tdbPageCopy(SPage *pFromPage, SPage *pToPage); void tdbPageCopy(SPage *pFromPage, SPage *pToPage);
int tdbPageCapacity(int pageSize, int amHdrSize); int tdbPageCapacity(int pageSize, int amHdrSize);
......
...@@ -115,12 +115,12 @@ static int tDefaultKeyCmpr(const void *pKey1, int keyLen1, const void *pKey2, in ...@@ -115,12 +115,12 @@ static int tDefaultKeyCmpr(const void *pKey1, int keyLen1, const void *pKey2, in
return cret; return cret;
} }
TEST(tdb_test, simple_test) { TEST(tdb_test, simple_insert1) {
int ret; int ret;
TENV *pEnv; TENV *pEnv;
TDB *pDb; TDB *pDb;
tdb_cmpr_fn_t compFunc; tdb_cmpr_fn_t compFunc;
int nData = 10000000; int nData = 1000000;
TXN txn; TXN txn;
taosRemoveDir("tdb"); taosRemoveDir("tdb");
...@@ -152,7 +152,7 @@ TEST(tdb_test, simple_test) { ...@@ -152,7 +152,7 @@ TEST(tdb_test, simple_test) {
for (int iData = 1; iData <= nData; iData++) { for (int iData = 1; iData <= nData; iData++) {
sprintf(key, "key%d", iData); sprintf(key, "key%d", iData);
sprintf(val, "value%d", iData); sprintf(val, "value%d", iData);
ret = tdbDbPut(pDb, key, strlen(key), val, strlen(val), &txn); ret = tdbDbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
// if pool is full, commit the transaction and start a new one // if pool is full, commit the transaction and start a new one
...@@ -202,6 +202,8 @@ TEST(tdb_test, simple_test) { ...@@ -202,6 +202,8 @@ TEST(tdb_test, simple_test) {
ret = tdbDbcOpen(pDb, &pDBC, NULL); ret = tdbDbcOpen(pDb, &pDBC, NULL);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
tdbDbcMoveToFirst(pDBC);
for (;;) { for (;;) {
ret = tdbDbcNext(pDBC, &pKey, &kLen, &pVal, &vLen); ret = tdbDbcNext(pDBC, &pKey, &kLen, &pVal, &vLen);
if (ret < 0) break; if (ret < 0) break;
...@@ -233,7 +235,7 @@ TEST(tdb_test, simple_test) { ...@@ -233,7 +235,7 @@ TEST(tdb_test, simple_test) {
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
} }
TEST(tdb_test, simple_test2) { TEST(tdb_test, simple_insert2) {
int ret; int ret;
TENV *pEnv; TENV *pEnv;
TDB *pDb; TDB *pDb;
...@@ -269,7 +271,7 @@ TEST(tdb_test, simple_test2) { ...@@ -269,7 +271,7 @@ TEST(tdb_test, simple_test2) {
for (int iData = 1; iData <= nData; iData++) { for (int iData = 1; iData <= nData; iData++) {
sprintf(key, "key%d", iData); sprintf(key, "key%d", iData);
sprintf(val, "value%d", iData); sprintf(val, "value%d", iData);
ret = tdbDbPut(pDb, key, strlen(key), val, strlen(val), &txn); ret = tdbDbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
} }
...@@ -283,13 +285,15 @@ TEST(tdb_test, simple_test2) { ...@@ -283,13 +285,15 @@ TEST(tdb_test, simple_test2) {
ret = tdbDbcOpen(pDb, &pDBC, NULL); ret = tdbDbcOpen(pDb, &pDBC, NULL);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
tdbDbcMoveToFirst(pDBC);
for (;;) { for (;;) {
ret = tdbDbcNext(pDBC, &pKey, &kLen, &pVal, &vLen); ret = tdbDbcNext(pDBC, &pKey, &kLen, &pVal, &vLen);
if (ret < 0) break; if (ret < 0) break;
std::cout.write((char *)pKey, kLen) /* << " " << kLen */ << " "; // std::cout.write((char *)pKey, kLen) /* << " " << kLen */ << " ";
std::cout.write((char *)pVal, vLen) /* << " " << vLen */; // std::cout.write((char *)pVal, vLen) /* << " " << vLen */;
std::cout << std::endl; // std::cout << std::endl;
count++; count++;
} }
...@@ -316,4 +320,164 @@ TEST(tdb_test, simple_test2) { ...@@ -316,4 +320,164 @@ TEST(tdb_test, simple_test2) {
// Close Env // Close Env
ret = tdbEnvClose(pEnv); ret = tdbEnvClose(pEnv);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
}
TEST(tdb_test, simple_delete1) {
int ret;
TDB *pDb;
char key[128];
char data[128];
TXN txn;
TENV *pEnv;
SPoolMem *pPool;
void *pKey = NULL;
void *pData = NULL;
int nKey;
TDBC *pDbc;
int nData;
int nKV = 69;
taosRemoveDir("tdb");
pPool = openPool();
// open env
ret = tdbEnvOpen("tdb", 1024, 256, &pEnv);
GTEST_ASSERT_EQ(ret, 0);
// open database
ret = tdbDbOpen("db.db", -1, -1, tKeyCmpr, pEnv, &pDb);
GTEST_ASSERT_EQ(ret, 0);
tdbTxnOpen(&txn, 0, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
tdbBegin(pEnv, &txn);
// loop to insert batch data
for (int iData = 0; iData < nKV; iData++) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d", iData);
ret = tdbDbInsert(pDb, key, strlen(key), data, strlen(data), &txn);
GTEST_ASSERT_EQ(ret, 0);
}
// query the data
for (int iData = 0; iData < nKV; iData++) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d", iData);
ret = tdbDbGet(pDb, key, strlen(key), &pData, &nData);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(memcmp(data, pData, nData), 0);
}
// loop to delete some data
for (int iData = nKV - 1; iData > 30; iData--) {
sprintf(key, "key%d", iData);
ret = tdbDbDelete(pDb, key, strlen(key), &txn);
GTEST_ASSERT_EQ(ret, 0);
}
// query the data
for (int iData = 0; iData < nKV; iData++) {
sprintf(key, "key%d", iData);
ret = tdbDbGet(pDb, key, strlen(key), &pData, &nData);
if (iData <= 30) {
GTEST_ASSERT_EQ(ret, 0);
} else {
GTEST_ASSERT_EQ(ret, -1);
}
}
// loop to iterate the data
tdbDbcOpen(pDb, &pDbc, NULL);
ret = tdbDbcMoveToFirst(pDbc);
GTEST_ASSERT_EQ(ret, 0);
pKey = NULL;
pData = NULL;
for (;;) {
ret = tdbDbcNext(pDbc, &pKey, &nKey, &pData, &nData);
if (ret < 0) break;
std::cout.write((char *)pKey, nKey) /* << " " << kLen */ << " ";
std::cout.write((char *)pData, nData) /* << " " << vLen */;
std::cout << std::endl;
}
tdbDbcClose(pDbc);
tdbCommit(pEnv, &txn);
closePool(pPool);
tdbDbClose(pDb);
tdbEnvClose(pEnv);
}
TEST(tdb_test, simple_upsert1) {
int ret;
TENV *pEnv;
TDB *pDb;
int nData = 100000;
char key[64];
char data[64];
void *pData = NULL;
SPoolMem *pPool;
TXN txn;
taosRemoveDir("tdb");
// open env
ret = tdbEnvOpen("tdb", 4096, 64, &pEnv);
GTEST_ASSERT_EQ(ret, 0);
// open database
ret = tdbDbOpen("db.db", -1, -1, NULL, pEnv, &pDb);
GTEST_ASSERT_EQ(ret, 0);
pPool = openPool();
// insert some data
tdbTxnOpen(&txn, 0, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
tdbBegin(pEnv, &txn);
for (int iData = 0; iData < nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d", iData);
ret = tdbDbInsert(pDb, key, strlen(key), data, strlen(data), &txn);
GTEST_ASSERT_EQ(ret, 0);
}
// query the data
for (int iData = 0; iData < nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d", iData);
ret = tdbDbGet(pDb, key, strlen(key), &pData, &nData);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(memcmp(pData, data, nData), 0);
}
// upsert some data
for (int iData = 0; iData < nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d-u", iData);
ret = tdbDbUpsert(pDb, key, strlen(key), data, strlen(data), &txn);
GTEST_ASSERT_EQ(ret, 0);
}
tdbCommit(pEnv, &txn);
// query the data
for (int iData = 0; iData < nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d-u", iData);
ret = tdbDbGet(pDb, key, strlen(key), &pData, &nData);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(memcmp(pData, data, nData), 0);
}
tdbDbClose(pDb);
tdbEnvClose(pEnv);
} }
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tskiplist2.h"
struct SSLNode {
int8_t level;
SSLNode *forwards[];
};
struct SSkipList2 {
int8_t level;
uint32_t seed;
int32_t size;
const SSLCfg *pCfg;
SSLNode *pHead[];
};
static void *slMalloc(void *pPool, int32_t size);
static void slFree(void *pPool, void *p);
static int32_t slCmprFn(const void *pKey, int32_t nKey, const void *pData, int32_t nData);
const SSLCfg slDefaultCfg = {.maxLevel = SL_MAX_LEVEL,
.nKey = -1,
.nData = -1,
.cmprFn = slCmprFn,
.pPool = NULL,
.xMalloc = slMalloc,
.xFree = slFree};
int32_t slOpen(const SSLCfg *pCfg, SSkipList2 **ppSl) {
SSkipList2 *pSl = NULL;
int32_t size;
*ppSl = NULL;
if (pCfg == NULL) pCfg = &slDefaultCfg;
// check config (TODO)
// malloc handle
size = sizeof(*pSl) + sizeof(SSLNode *) * pCfg->maxLevel * 2;
pSl = pCfg->xMalloc(pCfg->pPool, size);
if (pSl == NULL) {
return -1;
}
pSl->level = 0;
pSl->seed = taosRand();
pSl->size = 0;
pSl->pCfg = pCfg;
// init an empty skiplist
for (int32_t i = 0; i < pCfg->maxLevel * 2; i++) {
pSl->pHead[i] = NULL;
}
*ppSl = pSl;
return 0;
}
int32_t slClose(SSkipList2 *pSl) {
if (pSl) {
slClear(pSl);
if (pSl->pCfg->xFree) {
pSl->pCfg->xFree(pSl->pCfg->pPool, pSl);
}
}
return 0;
}
int32_t slClear(SSkipList2 *pSl) {
// loop to clear sl
for (;;) {
// (TODO)
}
// init sl (TODO)
return 0;
}
int32_t slcOpen(SSkipList2 *pSl, SSLCursor *pSlc) {
pSlc->pSl = pSl;
for (int i = 0; i < SL_MAX_LEVEL; i++) {
if (i < pSl->pCfg->maxLevel) {
} else {
pSlc->forwards[i] = NULL;
}
}
// TODO
return 0;
}
int32_t slcClose(SSLCursor *pSlc) {
// TODO
return 0;
}
int32_t slcMoveTo(SSLCursor *pSlc, const void *pKey, int32_t nKey) {
// TODO
return 0;
}
int32_t slcMoveToNext(SSLCursor *pSlc) {
// TODO
return 0;
}
int32_t slcMoveToPrev(SSLCursor *pSlc) {
// TODO
return 0;
}
int32_t slcMoveToFirst(SSLCursor *pSlc) {
// TODO
return 0;
}
int32_t slcMoveToLast(SSLCursor *pSlc) {
// TODO
return 0;
}
int32_t slcPut(SSLCursor *pSlc, const void *pKey, int32_t nKey, const void *pData, int32_t nData) {
// TODO
return 0;
}
int32_t slcGet(SSLCursor *pSlc, const void **ppKey, int32_t *nKey, const void **ppData, int32_t *nData) {
// TODO
return 0;
}
int32_t slcDrop(SSLCursor *pSlc) {
// TODO
return 0;
}
static FORCE_INLINE void *slMalloc(void *pPool, int32_t size) { return taosMemoryMalloc(size); }
static FORCE_INLINE void slFree(void *pPool, void *p) { taosMemoryFree(p); }
static int32_t slCmprFn(const void *pKey1, int32_t nKey1, const void *pKey2, int32_t nKey2) {
ASSERT(nKey1 >= 0 && nKey2 >= 0);
int32_t nKey = nKey1 > nKey2 ? nKey2 : nKey1;
int32_t c;
c = memcmp(pKey1, pKey2, nKey);
if (c == 0) {
if (nKey1 > nKey2) {
c = 1;
} else if (nKey1 < nKey2) {
c = -1;
}
}
return c;
}
\ No newline at end of file
此差异已折叠。
Subproject commit bf6c766986c61ff4fc80421fdea682a8fd4b5b32 Subproject commit 2f3dfddd4d9a869e706ba3cf98fb6d769404cd7c
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册