提交 0f7d9f3a 编写于 作者: W wenzhouwww@live.cn

Merge branch '3.0' into test/3.0_math_functions

...@@ -110,7 +110,7 @@ execute_process(COMMAND "${CMAKE_COMMAND}" --build . ...@@ -110,7 +110,7 @@ execute_process(COMMAND "${CMAKE_COMMAND}" --build .
# ================================================================================================ # ================================================================================================
# googletest # googletest
if(${BUILD_TEST}) if(${BUILD_TEST})
add_subdirectory(googletest) add_subdirectory(googletest EXCLUDE_FROM_ALL)
target_include_directories( target_include_directories(
gtest gtest
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/cpp-stub/src> PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/cpp-stub/src>
...@@ -143,7 +143,7 @@ set(CMAKE_PROJECT_INCLUDE_BEFORE "${TD_SUPPORT_DIR}/EnableCMP0048.txt.in") ...@@ -143,7 +143,7 @@ set(CMAKE_PROJECT_INCLUDE_BEFORE "${TD_SUPPORT_DIR}/EnableCMP0048.txt.in")
option(ENABLE_CJSON_TEST "Enable building cJSON test" OFF) option(ENABLE_CJSON_TEST "Enable building cJSON test" OFF)
option(CJSON_OVERRIDE_BUILD_SHARED_LIBS "Override BUILD_SHARED_LIBS with CJSON_BUILD_SHARED_LIBS" ON) option(CJSON_OVERRIDE_BUILD_SHARED_LIBS "Override BUILD_SHARED_LIBS with CJSON_BUILD_SHARED_LIBS" ON)
option(CJSON_BUILD_SHARED_LIBS "Overrides BUILD_SHARED_LIBS if CJSON_OVERRIDE_BUILD_SHARED_LIBS is enabled" OFF) option(CJSON_BUILD_SHARED_LIBS "Overrides BUILD_SHARED_LIBS if CJSON_OVERRIDE_BUILD_SHARED_LIBS is enabled" OFF)
add_subdirectory(cJson) add_subdirectory(cJson EXCLUDE_FROM_ALL)
target_include_directories( target_include_directories(
cjson cjson
# see https://stackoverflow.com/questions/25676277/cmake-target-include-directories-prints-an-error-when-i-try-to-add-the-source # see https://stackoverflow.com/questions/25676277/cmake-target-include-directories-prints-an-error-when-i-try-to-add-the-source
...@@ -152,7 +152,7 @@ target_include_directories( ...@@ -152,7 +152,7 @@ target_include_directories(
unset(CMAKE_PROJECT_INCLUDE_BEFORE) unset(CMAKE_PROJECT_INCLUDE_BEFORE)
# lz4 # lz4
add_subdirectory(lz4/build/cmake) add_subdirectory(lz4/build/cmake EXCLUDE_FROM_ALL)
target_include_directories( target_include_directories(
lz4_static lz4_static
PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/lz4/lib PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/lz4/lib
...@@ -160,7 +160,7 @@ target_include_directories( ...@@ -160,7 +160,7 @@ target_include_directories(
# zlib # zlib
set(CMAKE_PROJECT_INCLUDE_BEFORE "${TD_SUPPORT_DIR}/EnableCMP0048.txt.in") set(CMAKE_PROJECT_INCLUDE_BEFORE "${TD_SUPPORT_DIR}/EnableCMP0048.txt.in")
add_subdirectory(zlib) add_subdirectory(zlib EXCLUDE_FROM_ALL)
target_include_directories( target_include_directories(
zlibstatic zlibstatic
PUBLIC ${CMAKE_CURRENT_BINARY_DIR}/zlib PUBLIC ${CMAKE_CURRENT_BINARY_DIR}/zlib
...@@ -176,7 +176,7 @@ unset(CMAKE_PROJECT_INCLUDE_BEFORE) ...@@ -176,7 +176,7 @@ unset(CMAKE_PROJECT_INCLUDE_BEFORE)
# leveldb # leveldb
if(${BUILD_WITH_LEVELDB}) if(${BUILD_WITH_LEVELDB})
option(LEVELDB_BUILD_TESTS "" OFF) option(LEVELDB_BUILD_TESTS "" OFF)
add_subdirectory(leveldb) add_subdirectory(leveldb EXCLUDE_FROM_ALL)
target_include_directories( target_include_directories(
leveldb leveldb
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/leveldb/include> PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/leveldb/include>
...@@ -192,7 +192,7 @@ if(${BUILD_WITH_ROCKSDB}) ...@@ -192,7 +192,7 @@ if(${BUILD_WITH_ROCKSDB})
option(WITH_TOOLS "" OFF) option(WITH_TOOLS "" OFF)
option(WITH_LIBURING "" OFF) option(WITH_LIBURING "" OFF)
option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF) option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF)
add_subdirectory(rocksdb) add_subdirectory(rocksdb EXCLUDE_FROM_ALL)
target_include_directories( target_include_directories(
rocksdb rocksdb
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/rocksdb/include> PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/rocksdb/include>
...@@ -203,7 +203,7 @@ endif(${BUILD_WITH_ROCKSDB}) ...@@ -203,7 +203,7 @@ endif(${BUILD_WITH_ROCKSDB})
# To support build on ubuntu: sudo apt-get install libboost-all-dev # To support build on ubuntu: sudo apt-get install libboost-all-dev
if(${BUILD_WITH_LUCENE}) if(${BUILD_WITH_LUCENE})
option(ENABLE_TEST "Enable the tests" OFF) option(ENABLE_TEST "Enable the tests" OFF)
add_subdirectory(lucene) add_subdirectory(lucene EXCLUDE_FROM_ALL)
target_include_directories( target_include_directories(
lucene++ lucene++
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/lucene/include> PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/lucene/include>
...@@ -213,14 +213,14 @@ endif(${BUILD_WITH_LUCENE}) ...@@ -213,14 +213,14 @@ endif(${BUILD_WITH_LUCENE})
# NuRaft # NuRaft
if(${BUILD_WITH_NURAFT}) if(${BUILD_WITH_NURAFT})
add_subdirectory(nuraft) add_subdirectory(nuraft EXCLUDE_FROM_ALL)
endif(${BUILD_WITH_NURAFT}) endif(${BUILD_WITH_NURAFT})
# pthread # pthread
if(${BUILD_PTHREAD}) if(${BUILD_PTHREAD})
set(CMAKE_BUILD_TYPE release) set(CMAKE_BUILD_TYPE release)
add_definitions(-DPTW32_STATIC_LIB) add_definitions(-DPTW32_STATIC_LIB)
add_subdirectory(pthread) add_subdirectory(pthread EXCLUDE_FROM_ALL)
set_target_properties(libpthreadVC3 PROPERTIES OUTPUT_NAME pthread) set_target_properties(libpthreadVC3 PROPERTIES OUTPUT_NAME pthread)
add_library(pthread STATIC IMPORTED GLOBAL) add_library(pthread STATIC IMPORTED GLOBAL)
SET_PROPERTY(TARGET pthread PROPERTY IMPORTED_LOCATION ${LIBRARY_OUTPUT_PATH}/pthread.lib) SET_PROPERTY(TARGET pthread PROPERTY IMPORTED_LOCATION ${LIBRARY_OUTPUT_PATH}/pthread.lib)
...@@ -228,12 +228,12 @@ endif() ...@@ -228,12 +228,12 @@ endif()
# iconv # iconv
if(${BUILD_WITH_ICONV}) if(${BUILD_WITH_ICONV})
add_subdirectory(iconv) add_subdirectory(iconv EXCLUDE_FROM_ALL)
endif(${BUILD_WITH_ICONV}) endif(${BUILD_WITH_ICONV})
# wingetopt # wingetopt
if(${BUILD_WINGETOPT}) if(${BUILD_WINGETOPT})
add_subdirectory(wingetopt) add_subdirectory(wingetopt EXCLUDE_FROM_ALL)
endif(${BUILD_WINGETOPT}) endif(${BUILD_WINGETOPT})
# msvcregex # msvcregex
...@@ -293,7 +293,7 @@ if(${BUILD_WITH_UV}) ...@@ -293,7 +293,7 @@ if(${BUILD_WITH_UV})
MESSAGE("Windows need set no-sign-compare") MESSAGE("Windows need set no-sign-compare")
add_compile_options(-Wno-sign-compare) add_compile_options(-Wno-sign-compare)
endif () endif ()
add_subdirectory(libuv) add_subdirectory(libuv EXCLUDE_FROM_ALL)
endif(${BUILD_WITH_UV}) endif(${BUILD_WITH_UV})
# BDB # BDB
...@@ -334,5 +334,5 @@ endif(${BUILD_WITH_SQLITE}) ...@@ -334,5 +334,5 @@ endif(${BUILD_WITH_SQLITE})
# Build test # Build test
# ================================================================================================ # ================================================================================================
if(${BUILD_DEPENDENCY_TESTS}) if(${BUILD_DEPENDENCY_TESTS})
add_subdirectory(test) add_subdirectory(test EXCLUDE_FROM_ALL)
endif(${BUILD_DEPENDENCY_TESTS}) endif(${BUILD_DEPENDENCY_TESTS})
...@@ -18,6 +18,11 @@ ...@@ -18,6 +18,11 @@
#include "os.h" #include "os.h"
#include "thash.h"
#include "tlockfree.h"
#include "tlog.h"
#include "tmsg.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
...@@ -135,7 +140,7 @@ typedef enum { ...@@ -135,7 +140,7 @@ typedef enum {
typedef struct SSdb SSdb; typedef struct SSdb SSdb;
typedef int32_t (*SdbInsertFp)(SSdb *pSdb, void *pObj); typedef int32_t (*SdbInsertFp)(SSdb *pSdb, void *pObj);
typedef int32_t (*SdbUpdateFp)(SSdb *pSdb, void *pSrcObj, void *pDstObj); typedef int32_t (*SdbUpdateFp)(SSdb *pSdb, void *pSrcObj, void *pDstObj);
typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj); typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj, bool callFunc);
typedef int32_t (*SdbDeployFp)(SMnode *pMnode); typedef int32_t (*SdbDeployFp)(SMnode *pMnode);
typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw); typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw);
typedef SSdbRaw *(*SdbEncodeFp)(void *pObj); typedef SSdbRaw *(*SdbEncodeFp)(void *pObj);
...@@ -326,9 +331,29 @@ int32_t sdbGetRawSoftVer(SSdbRaw *pRaw, int8_t *sver); ...@@ -326,9 +331,29 @@ int32_t sdbGetRawSoftVer(SSdbRaw *pRaw, int8_t *sver);
int32_t sdbGetRawTotalSize(SSdbRaw *pRaw); int32_t sdbGetRawTotalSize(SSdbRaw *pRaw);
SSdbRow *sdbAllocRow(int32_t objSize); SSdbRow *sdbAllocRow(int32_t objSize);
void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow); void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow, bool callFunc);
void *sdbGetRowObj(SSdbRow *pRow); void *sdbGetRowObj(SSdbRow *pRow);
typedef struct SSdb {
SMnode *pMnode;
char *currDir;
char *syncDir;
char *tmpDir;
int64_t lastCommitVer;
int64_t curVer;
int64_t tableVer[SDB_MAX];
int64_t maxId[SDB_MAX];
EKeyType keyTypes[SDB_MAX];
SHashObj *hashObjs[SDB_MAX];
SRWLatch locks[SDB_MAX];
SdbInsertFp insertFps[SDB_MAX];
SdbUpdateFp updateFps[SDB_MAX];
SdbDeleteFp deleteFps[SDB_MAX];
SdbDeployFp deployFps[SDB_MAX];
SdbEncodeFp encodeFps[SDB_MAX];
SdbDecodeFp decodeFps[SDB_MAX];
} SSdb;
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -117,22 +117,23 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con ...@@ -117,22 +117,23 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
int32_t type = pColumnInfoData->info.type; int32_t type = pColumnInfoData->info.type;
if (IS_VAR_DATA_TYPE(type)) { if (IS_VAR_DATA_TYPE(type)) {
int32_t dataLen = varDataTLen(pData); int32_t dataLen = varDataTLen(pData);
if(type == TSDB_DATA_TYPE_JSON) { if (type == TSDB_DATA_TYPE_JSON) {
if(*pData == TSDB_DATA_TYPE_NULL) { if (*pData == TSDB_DATA_TYPE_NULL) {
dataLen = 0; dataLen = 0;
}else if(*pData == TSDB_DATA_TYPE_NCHAR) { } else if (*pData == TSDB_DATA_TYPE_NCHAR) {
dataLen = varDataTLen(pData+CHAR_BYTES); dataLen = varDataTLen(pData + CHAR_BYTES);
}else if(*pData == TSDB_DATA_TYPE_BIGINT || *pData == TSDB_DATA_TYPE_DOUBLE) { } else if (*pData == TSDB_DATA_TYPE_BIGINT || *pData == TSDB_DATA_TYPE_DOUBLE) {
dataLen = LONG_BYTES; dataLen = LONG_BYTES;
}else if(*pData == TSDB_DATA_TYPE_BOOL) { } else if (*pData == TSDB_DATA_TYPE_BOOL) {
dataLen = CHAR_BYTES; dataLen = CHAR_BYTES;
} }
dataLen += CHAR_BYTES; dataLen += CHAR_BYTES;
} }
SVarColAttr* pAttr = &pColumnInfoData->varmeta; SVarColAttr* pAttr = &pColumnInfoData->varmeta;
if (pAttr->allocLen < pAttr->length + dataLen) { if (pAttr->allocLen < pAttr->length + dataLen) {
uint32_t newSize = pAttr->allocLen; uint32_t newSize = pAttr->allocLen;
if (newSize == 0) { if (newSize <= 1) {
newSize = 8; newSize = 8;
} }
......
...@@ -69,8 +69,8 @@ static void dmSetSignalHandle() { ...@@ -69,8 +69,8 @@ static void dmSetSignalHandle() {
static int32_t dmParseArgs(int32_t argc, char const *argv[]) { static int32_t dmParseArgs(int32_t argc, char const *argv[]) {
int32_t cmdEnvIndex = 0; int32_t cmdEnvIndex = 0;
if (argc < 2) return 0; if (argc < 2) return 0;
global.envCmd = taosMemoryMalloc(argc-1); global.envCmd = taosMemoryMalloc((argc-1)*sizeof(char*));
memset(global.envCmd, 0, argc-1); memset(global.envCmd, 0, (argc-1)*sizeof(char*));
for (int32_t i = 1; i < argc; ++i) { for (int32_t i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-c") == 0) { if (strcmp(argv[i], "-c") == 0) {
if (i < argc - 1) { if (i < argc - 1) {
......
...@@ -126,8 +126,6 @@ typedef enum { ...@@ -126,8 +126,6 @@ typedef enum {
DND_REASON_OTHERS DND_REASON_OTHERS
} EDndReason; } EDndReason;
typedef void (*TransCbFp)(SMnode* pMnode, void* param);
typedef struct { typedef struct {
int32_t id; int32_t id;
ETrnStage stage; ETrnStage stage;
...@@ -150,8 +148,10 @@ typedef struct { ...@@ -150,8 +148,10 @@ typedef struct {
int64_t dbUid; int64_t dbUid;
char dbname[TSDB_DB_FNAME_LEN]; char dbname[TSDB_DB_FNAME_LEN];
char lastError[TSDB_TRANS_ERROR_LEN]; char lastError[TSDB_TRANS_ERROR_LEN];
TransCbFp transCbFp; int32_t startFunc;
void* transCbParam; int32_t stopFunc;
int32_t paramLen;
void* param;
} STrans; } STrans;
typedef struct { typedef struct {
......
...@@ -33,6 +33,15 @@ typedef struct { ...@@ -33,6 +33,15 @@ typedef struct {
void *pCont; void *pCont;
} STransAction; } STransAction;
typedef enum {
TEST_TRANS_START_FUNC = 1,
TEST_TRANS_STOP_FUNC = 2,
CONSUME_TRANS_START_FUNC = 3,
CONSUME_TRANS_STOP_FUNC = 4,
} ETrnFuncType;
typedef void (*TransCbFp)(SMnode *pMnode, void *param, int32_t paramLen);
int32_t mndInitTrans(SMnode *pMnode); int32_t mndInitTrans(SMnode *pMnode);
void mndCleanupTrans(SMnode *pMnode); void mndCleanupTrans(SMnode *pMnode);
...@@ -44,7 +53,7 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw); ...@@ -44,7 +53,7 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction);
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
void mndTransSetCb(STrans *pTrans, TransCbFp fp, void *param); void mndTransSetCb(STrans *pTrans, ETrnFuncType startFunc, ETrnFuncType stopFunc, void *param, int32_t paramLen);
void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb); void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb);
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
......
...@@ -29,7 +29,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans); ...@@ -29,7 +29,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans);
static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw); static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw);
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans); static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans);
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOld); static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOld);
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans); static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc);
static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw); static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw);
static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction); static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction);
...@@ -174,6 +174,13 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { ...@@ -174,6 +174,13 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen, TRANS_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen, TRANS_ENCODE_OVER)
} }
SDB_SET_INT32(pRaw, dataPos, pTrans->startFunc, TRANS_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pTrans->stopFunc, TRANS_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pTrans->paramLen, TRANS_ENCODE_OVER)
if (pTrans->param != NULL) {
SDB_SET_BINARY(pRaw, dataPos, pTrans->param, pTrans->paramLen, TRANS_ENCODE_OVER)
}
SDB_SET_RESERVE(pRaw, dataPos, MND_TRANS_RESERVE_SIZE, TRANS_ENCODE_OVER) SDB_SET_RESERVE(pRaw, dataPos, MND_TRANS_RESERVE_SIZE, TRANS_ENCODE_OVER)
SDB_SET_DATALEN(pRaw, dataPos, TRANS_ENCODE_OVER) SDB_SET_DATALEN(pRaw, dataPos, TRANS_ENCODE_OVER)
...@@ -305,6 +312,14 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { ...@@ -305,6 +312,14 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
action.pCont = NULL; action.pCont = NULL;
} }
SDB_GET_INT32(pRaw, dataPos, &pTrans->startFunc, TRANS_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pTrans->stopFunc, TRANS_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pTrans->paramLen, TRANS_DECODE_OVER)
if (pTrans->paramLen != 0) {
pTrans->param = taosMemoryMalloc(pTrans->paramLen);
SDB_GET_BINARY(pRaw, dataPos, pTrans->param, pTrans->paramLen, TRANS_DECODE_OVER);
}
SDB_GET_RESERVE(pRaw, dataPos, MND_TRANS_RESERVE_SIZE, TRANS_DECODE_OVER) SDB_GET_RESERVE(pRaw, dataPos, MND_TRANS_RESERVE_SIZE, TRANS_DECODE_OVER)
terrno = 0; terrno = 0;
...@@ -413,9 +428,36 @@ static const char *mndTransType(ETrnType type) { ...@@ -413,9 +428,36 @@ static const char *mndTransType(ETrnType type) {
} }
} }
static void mndTransTestStartFunc(SMnode *pMnode, void *param, int32_t paramLen) {
mInfo("test trans start, param:%s, len:%d", (char *)param, paramLen);
}
static void mndTransTestStopFunc(SMnode *pMnode, void *param, int32_t paramLen) {
mInfo("test trans stop, param:%s, len:%d", (char *)param, paramLen);
}
static TransCbFp mndTransGetCbFp(ETrnFuncType ftype) {
switch (ftype) {
case TEST_TRANS_START_FUNC:
return mndTransTestStartFunc;
case TEST_TRANS_STOP_FUNC:
return mndTransTestStopFunc;
default:
return NULL;
}
}
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) { static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
// pTrans->stage = TRN_STAGE_PREPARE; // pTrans->stage = TRN_STAGE_PREPARE;
mTrace("trans:%d, perform insert action, row:%p stage:%s", pTrans->id, pTrans, mndTransStr(pTrans->stage)); mTrace("trans:%d, perform insert action, row:%p stage:%s", pTrans->id, pTrans, mndTransStr(pTrans->stage));
if (pTrans->startFunc > 0) {
TransCbFp fp = mndTransGetCbFp(pTrans->startFunc);
if (fp) {
(*fp)(pSdb->pMnode, pTrans->param, pTrans->paramLen);
}
}
return 0; return 0;
} }
...@@ -430,10 +472,23 @@ static void mndTransDropData(STrans *pTrans) { ...@@ -430,10 +472,23 @@ static void mndTransDropData(STrans *pTrans) {
pTrans->rpcRsp = NULL; pTrans->rpcRsp = NULL;
pTrans->rpcRspLen = 0; pTrans->rpcRspLen = 0;
} }
if (pTrans->param != NULL) {
taosMemoryFree(pTrans->param);
pTrans->param = NULL;
pTrans->paramLen = 0;
}
} }
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) { static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) {
mTrace("trans:%d, perform delete action, row:%p stage:%s", pTrans->id, pTrans, mndTransStr(pTrans->stage)); mDebug("trans:%d, perform delete action, row:%p stage:%s callfunc:%d", pTrans->id, pTrans, mndTransStr(pTrans->stage),
callFunc);
if (pTrans->stopFunc > 0 && callFunc) {
TransCbFp fp = mndTransGetCbFp(pTrans->stopFunc);
if (fp) {
(*fp)(pSdb->pMnode, pTrans->param, pTrans->paramLen);
}
}
mndTransDropData(pTrans); mndTransDropData(pTrans);
return 0; return 0;
} }
...@@ -498,7 +553,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S ...@@ -498,7 +553,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S
return NULL; return NULL;
} }
mDebug("trans:%d, is created, data:%p", pTrans->id, pTrans); mDebug("trans:%d, local var is created, data:%p", pTrans->id, pTrans);
return pTrans; return pTrans;
} }
...@@ -525,7 +580,7 @@ static void mndTransDropActions(SArray *pArray) { ...@@ -525,7 +580,7 @@ static void mndTransDropActions(SArray *pArray) {
void mndTransDrop(STrans *pTrans) { void mndTransDrop(STrans *pTrans) {
if (pTrans != NULL) { if (pTrans != NULL) {
mndTransDropData(pTrans); mndTransDropData(pTrans);
mDebug("trans:%d, is dropped, data:%p", pTrans->id, pTrans); mDebug("trans:%d, local var is freed, data:%p", pTrans->id, pTrans);
taosMemoryFreeClear(pTrans); taosMemoryFreeClear(pTrans);
} }
} }
...@@ -574,9 +629,11 @@ void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen) { ...@@ -574,9 +629,11 @@ void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen) {
pTrans->rpcRspLen = contLen; pTrans->rpcRspLen = contLen;
} }
void mndTransSetCb(STrans *pTrans, TransCbFp fp, void *param) { void mndTransSetCb(STrans *pTrans, ETrnFuncType startFunc, ETrnFuncType stopFunc, void *param, int32_t paramLen) {
pTrans->transCbFp = fp; pTrans->startFunc = startFunc;
pTrans->transCbParam = param; pTrans->stopFunc = stopFunc;
pTrans->param = param;
pTrans->paramLen = paramLen;
} }
void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb) { void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb) {
...@@ -712,8 +769,6 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { ...@@ -712,8 +769,6 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
pNew->rpcRefId = pTrans->rpcRefId; pNew->rpcRefId = pTrans->rpcRefId;
pNew->rpcRsp = pTrans->rpcRsp; pNew->rpcRsp = pTrans->rpcRsp;
pNew->rpcRspLen = pTrans->rpcRspLen; pNew->rpcRspLen = pTrans->rpcRspLen;
pNew->transCbFp = pTrans->transCbFp;
pNew->transCbParam = pTrans->transCbParam;
pTrans->rpcRsp = NULL; pTrans->rpcRsp = NULL;
pTrans->rpcRspLen = 0; pTrans->rpcRspLen = 0;
...@@ -1125,10 +1180,6 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) { ...@@ -1125,10 +1180,6 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) {
mDebug("trans:%d, finished, code:0x%04x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes); mDebug("trans:%d, finished, code:0x%04x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes);
if (pTrans->transCbFp != NULL) {
(*pTrans->transCbFp)(pMnode, pTrans->transCbParam);
}
return continueExec; return continueExec;
} }
......
...@@ -96,36 +96,36 @@ static SSdbRaw *mndUserActionEncode(SUserObj *pUser) { ...@@ -96,36 +96,36 @@ static SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
int32_t size = sizeof(SUserObj) + USER_RESERVE_SIZE + (numOfReadDbs + numOfWriteDbs) * TSDB_DB_FNAME_LEN; int32_t size = sizeof(SUserObj) + USER_RESERVE_SIZE + (numOfReadDbs + numOfWriteDbs) * TSDB_DB_FNAME_LEN;
SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, USER_VER_NUMBER, size); SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, USER_VER_NUMBER, size);
if (pRaw == NULL) goto USER_ENCODE_OVER; if (pRaw == NULL) goto _OVER;
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_SET_BINARY(pRaw, dataPos, pUser->user, TSDB_USER_LEN, USER_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, pUser->user, TSDB_USER_LEN, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pUser->pass, TSDB_PASSWORD_LEN, USER_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, pUser->pass, TSDB_PASSWORD_LEN, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pUser->acct, TSDB_USER_LEN, USER_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, pUser->acct, TSDB_USER_LEN, _OVER)
SDB_SET_INT64(pRaw, dataPos, pUser->createdTime, USER_ENCODE_OVER) SDB_SET_INT64(pRaw, dataPos, pUser->createdTime, _OVER)
SDB_SET_INT64(pRaw, dataPos, pUser->updateTime, USER_ENCODE_OVER) SDB_SET_INT64(pRaw, dataPos, pUser->updateTime, _OVER)
SDB_SET_INT8(pRaw, dataPos, pUser->superUser, USER_ENCODE_OVER) SDB_SET_INT8(pRaw, dataPos, pUser->superUser, _OVER)
SDB_SET_INT32(pRaw, dataPos, numOfReadDbs, USER_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, numOfReadDbs, _OVER)
SDB_SET_INT32(pRaw, dataPos, numOfWriteDbs, USER_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, numOfWriteDbs, _OVER)
char *db = taosHashIterate(pUser->readDbs, NULL); char *db = taosHashIterate(pUser->readDbs, NULL);
while (db != NULL) { while (db != NULL) {
SDB_SET_BINARY(pRaw, dataPos, db, TSDB_DB_FNAME_LEN, USER_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, db, TSDB_DB_FNAME_LEN, _OVER);
db = taosHashIterate(pUser->readDbs, db); db = taosHashIterate(pUser->readDbs, db);
} }
db = taosHashIterate(pUser->writeDbs, NULL); db = taosHashIterate(pUser->writeDbs, NULL);
while (db != NULL) { while (db != NULL) {
SDB_SET_BINARY(pRaw, dataPos, db, TSDB_DB_FNAME_LEN, USER_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, db, TSDB_DB_FNAME_LEN, _OVER);
db = taosHashIterate(pUser->writeDbs, db); db = taosHashIterate(pUser->writeDbs, db);
} }
SDB_SET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, USER_ENCODE_OVER) SDB_SET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, USER_ENCODE_OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER)
terrno = 0; terrno = 0;
USER_ENCODE_OVER: _OVER:
if (terrno != 0) { if (terrno != 0) {
mError("user:%s, failed to encode to raw:%p since %s", pUser->user, pRaw, terrstr()); mError("user:%s, failed to encode to raw:%p since %s", pUser->user, pRaw, terrstr());
sdbFreeRaw(pRaw); sdbFreeRaw(pRaw);
...@@ -140,55 +140,54 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) { ...@@ -140,55 +140,54 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
int8_t sver = 0; int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto USER_DECODE_OVER; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
if (sver != USER_VER_NUMBER) { if (sver != USER_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER; terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto USER_DECODE_OVER; goto _OVER;
} }
SSdbRow *pRow = sdbAllocRow(sizeof(SUserObj)); SSdbRow *pRow = sdbAllocRow(sizeof(SUserObj));
if (pRow == NULL) goto USER_DECODE_OVER; if (pRow == NULL) goto _OVER;
SUserObj *pUser = sdbGetRowObj(pRow); SUserObj *pUser = sdbGetRowObj(pRow);
if (pUser == NULL) goto USER_DECODE_OVER; if (pUser == NULL) goto _OVER;
pUser->readDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, true);
pUser->writeDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, true);
if (pUser->readDbs == NULL || pUser->writeDbs == NULL) goto USER_DECODE_OVER;
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_GET_BINARY(pRaw, dataPos, pUser->user, TSDB_USER_LEN, USER_DECODE_OVER) SDB_GET_BINARY(pRaw, dataPos, pUser->user, TSDB_USER_LEN, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pUser->pass, TSDB_PASSWORD_LEN, USER_DECODE_OVER) SDB_GET_BINARY(pRaw, dataPos, pUser->pass, TSDB_PASSWORD_LEN, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pUser->acct, TSDB_USER_LEN, USER_DECODE_OVER) SDB_GET_BINARY(pRaw, dataPos, pUser->acct, TSDB_USER_LEN, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pUser->createdTime, USER_DECODE_OVER) SDB_GET_INT64(pRaw, dataPos, &pUser->createdTime, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pUser->updateTime, USER_DECODE_OVER) SDB_GET_INT64(pRaw, dataPos, &pUser->updateTime, _OVER)
SDB_GET_INT8(pRaw, dataPos, &pUser->superUser, USER_DECODE_OVER) SDB_GET_INT8(pRaw, dataPos, &pUser->superUser, _OVER)
int32_t numOfReadDbs = 0; int32_t numOfReadDbs = 0;
int32_t numOfWriteDbs = 0; int32_t numOfWriteDbs = 0;
SDB_GET_INT32(pRaw, dataPos, &numOfReadDbs, USER_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &numOfReadDbs, _OVER)
SDB_GET_INT32(pRaw, dataPos, &numOfWriteDbs, USER_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &numOfWriteDbs, _OVER)
pUser->readDbs = taosHashInit(numOfReadDbs, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, true);
pUser->writeDbs = taosHashInit(numOfWriteDbs, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, true);
if (pUser->readDbs == NULL || pUser->writeDbs == NULL) goto _OVER;
for (int32_t i = 0; i < numOfReadDbs; ++i) { for (int32_t i = 0; i < numOfReadDbs; ++i) {
char db[TSDB_DB_FNAME_LEN] = {0}; char db[TSDB_DB_FNAME_LEN] = {0};
SDB_GET_BINARY(pRaw, dataPos, db, TSDB_DB_FNAME_LEN, USER_DECODE_OVER) SDB_GET_BINARY(pRaw, dataPos, db, TSDB_DB_FNAME_LEN, _OVER)
int32_t len = strlen(db) + 1; int32_t len = strlen(db) + 1;
taosHashPut(pUser->readDbs, db, len, db, TSDB_DB_FNAME_LEN); taosHashPut(pUser->readDbs, db, len, db, TSDB_DB_FNAME_LEN);
} }
for (int32_t i = 0; i < numOfWriteDbs; ++i) { for (int32_t i = 0; i < numOfWriteDbs; ++i) {
char db[TSDB_DB_FNAME_LEN] = {0}; char db[TSDB_DB_FNAME_LEN] = {0};
SDB_GET_BINARY(pRaw, dataPos, db, TSDB_DB_FNAME_LEN, USER_DECODE_OVER) SDB_GET_BINARY(pRaw, dataPos, db, TSDB_DB_FNAME_LEN, _OVER)
int32_t len = strlen(db) + 1; int32_t len = strlen(db) + 1;
taosHashPut(pUser->writeDbs, db, len, db, TSDB_DB_FNAME_LEN); taosHashPut(pUser->writeDbs, db, len, db, TSDB_DB_FNAME_LEN);
} }
SDB_GET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, USER_DECODE_OVER) SDB_GET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, _OVER)
terrno = 0; terrno = 0;
USER_DECODE_OVER: _OVER:
if (terrno != 0) { if (terrno != 0) {
mError("user:%s, failed to decode from raw:%p since %s", pUser->user, pRaw, terrstr()); mError("user:%s, failed to decode from raw:%p since %s", pUser->user, pRaw, terrstr());
taosHashCleanup(pUser->readDbs); taosHashCleanup(pUser->readDbs);
...@@ -220,6 +219,8 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) { ...@@ -220,6 +219,8 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) {
mTrace("user:%s, perform delete action, row:%p", pUser->user, pUser); mTrace("user:%s, perform delete action, row:%p", pUser->user, pUser);
taosHashCleanup(pUser->readDbs); taosHashCleanup(pUser->readDbs);
taosHashCleanup(pUser->writeDbs); taosHashCleanup(pUser->writeDbs);
pUser->readDbs = NULL;
pUser->writeDbs = NULL;
return 0; return 0;
} }
...@@ -228,13 +229,8 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOld, SUserObj *pNew) { ...@@ -228,13 +229,8 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOld, SUserObj *pNew) {
memcpy(pOld->pass, pNew->pass, TSDB_PASSWORD_LEN); memcpy(pOld->pass, pNew->pass, TSDB_PASSWORD_LEN);
pOld->updateTime = pNew->updateTime; pOld->updateTime = pNew->updateTime;
void *tmp1 = pOld->readDbs; TSWAP(pOld->readDbs, pNew->readDbs, (void *));
pOld->readDbs = pNew->readDbs; TSWAP(pOld->writeDbs, pNew->writeDbs, (void *));
pNew->readDbs = tmp1;
void *tmp2 = pOld->writeDbs;
pOld->writeDbs = pNew->writeDbs;
pNew->writeDbs = tmp2;
return 0; return 0;
} }
...@@ -277,6 +273,9 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate ...@@ -277,6 +273,9 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate
} }
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY); sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
char *param = strdup("====> test code to be deleted later <=====");
mndTransSetCb(pTrans, TEST_TRANS_START_FUNC, TEST_TRANS_STOP_FUNC, param, strlen(param) + 1);
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
...@@ -296,41 +295,41 @@ static int32_t mndProcessCreateUserReq(SNodeMsg *pReq) { ...@@ -296,41 +295,41 @@ static int32_t mndProcessCreateUserReq(SNodeMsg *pReq) {
if (tDeserializeSCreateUserReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createReq) != 0) { if (tDeserializeSCreateUserReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
goto CREATE_USER_OVER; goto _OVER;
} }
mDebug("user:%s, start to create", createReq.user); mDebug("user:%s, start to create", createReq.user);
if (createReq.user[0] == 0) { if (createReq.user[0] == 0) {
terrno = TSDB_CODE_MND_INVALID_USER_FORMAT; terrno = TSDB_CODE_MND_INVALID_USER_FORMAT;
goto CREATE_USER_OVER; goto _OVER;
} }
if (createReq.pass[0] == 0) { if (createReq.pass[0] == 0) {
terrno = TSDB_CODE_MND_INVALID_PASS_FORMAT; terrno = TSDB_CODE_MND_INVALID_PASS_FORMAT;
goto CREATE_USER_OVER; goto _OVER;
} }
pUser = mndAcquireUser(pMnode, createReq.user); pUser = mndAcquireUser(pMnode, createReq.user);
if (pUser != NULL) { if (pUser != NULL) {
terrno = TSDB_CODE_MND_USER_ALREADY_EXIST; terrno = TSDB_CODE_MND_USER_ALREADY_EXIST;
goto CREATE_USER_OVER; goto _OVER;
} }
pOperUser = mndAcquireUser(pMnode, pReq->user); pOperUser = mndAcquireUser(pMnode, pReq->user);
if (pOperUser == NULL) { if (pOperUser == NULL) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
goto CREATE_USER_OVER; goto _OVER;
} }
if (mndCheckCreateUserAuth(pOperUser) != 0) { if (mndCheckCreateUserAuth(pOperUser) != 0) {
goto CREATE_USER_OVER; goto _OVER;
} }
code = mndCreateUser(pMnode, pOperUser->acct, &createReq, pReq); code = mndCreateUser(pMnode, pOperUser->acct, &createReq, pReq);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
CREATE_USER_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("user:%s, failed to create since %s", createReq.user, terrstr()); mError("user:%s, failed to create since %s", createReq.user, terrstr());
} }
...@@ -399,38 +398,38 @@ static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) { ...@@ -399,38 +398,38 @@ static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) {
if (tDeserializeSAlterUserReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &alterReq) != 0) { if (tDeserializeSAlterUserReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &alterReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
goto ALTER_USER_OVER; goto _OVER;
} }
mDebug("user:%s, start to alter", alterReq.user); mDebug("user:%s, start to alter", alterReq.user);
if (alterReq.user[0] == 0) { if (alterReq.user[0] == 0) {
terrno = TSDB_CODE_MND_INVALID_USER_FORMAT; terrno = TSDB_CODE_MND_INVALID_USER_FORMAT;
goto ALTER_USER_OVER; goto _OVER;
} }
if (alterReq.pass[0] == 0) { if (alterReq.pass[0] == 0) {
terrno = TSDB_CODE_MND_INVALID_PASS_FORMAT; terrno = TSDB_CODE_MND_INVALID_PASS_FORMAT;
goto ALTER_USER_OVER; goto _OVER;
} }
pUser = mndAcquireUser(pMnode, alterReq.user); pUser = mndAcquireUser(pMnode, alterReq.user);
if (pUser == NULL) { if (pUser == NULL) {
terrno = TSDB_CODE_MND_USER_NOT_EXIST; terrno = TSDB_CODE_MND_USER_NOT_EXIST;
goto ALTER_USER_OVER; goto _OVER;
} }
pOperUser = mndAcquireUser(pMnode, pReq->user); pOperUser = mndAcquireUser(pMnode, pReq->user);
if (pOperUser == NULL) { if (pOperUser == NULL) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
goto ALTER_USER_OVER; goto _OVER;
} }
memcpy(&newUser, pUser, sizeof(SUserObj)); memcpy(&newUser, pUser, sizeof(SUserObj));
newUser.readDbs = mndDupDbHash(pUser->readDbs); newUser.readDbs = mndDupDbHash(pUser->readDbs);
newUser.writeDbs = mndDupDbHash(pUser->writeDbs); newUser.writeDbs = mndDupDbHash(pUser->writeDbs);
if (newUser.readDbs == NULL || newUser.writeDbs == NULL) { if (newUser.readDbs == NULL || newUser.writeDbs == NULL) {
goto ALTER_USER_OVER; goto _OVER;
} }
int32_t len = strlen(alterReq.dbname) + 1; int32_t len = strlen(alterReq.dbname) + 1;
...@@ -446,50 +445,50 @@ static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) { ...@@ -446,50 +445,50 @@ static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) {
} else if (alterReq.alterType == TSDB_ALTER_USER_ADD_READ_DB) { } else if (alterReq.alterType == TSDB_ALTER_USER_ADD_READ_DB) {
if (pDb == NULL) { if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_EXIST; terrno = TSDB_CODE_MND_DB_NOT_EXIST;
goto ALTER_USER_OVER; goto _OVER;
} }
if (taosHashPut(newUser.readDbs, alterReq.dbname, len, alterReq.dbname, TSDB_DB_FNAME_LEN) != 0) { if (taosHashPut(newUser.readDbs, alterReq.dbname, len, alterReq.dbname, TSDB_DB_FNAME_LEN) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto ALTER_USER_OVER; goto _OVER;
} }
} else if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_READ_DB) { } else if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_READ_DB) {
if (taosHashRemove(newUser.readDbs, alterReq.dbname, len) != 0) { if (taosHashRemove(newUser.readDbs, alterReq.dbname, len) != 0) {
terrno = TSDB_CODE_MND_DB_NOT_EXIST; terrno = TSDB_CODE_MND_DB_NOT_EXIST;
goto ALTER_USER_OVER; goto _OVER;
} }
} else if (alterReq.alterType == TSDB_ALTER_USER_CLEAR_READ_DB) { } else if (alterReq.alterType == TSDB_ALTER_USER_CLEAR_READ_DB) {
taosHashClear(newUser.readDbs); taosHashClear(newUser.readDbs);
} else if (alterReq.alterType == TSDB_ALTER_USER_ADD_WRITE_DB) { } else if (alterReq.alterType == TSDB_ALTER_USER_ADD_WRITE_DB) {
if (pDb == NULL) { if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_EXIST; terrno = TSDB_CODE_MND_DB_NOT_EXIST;
goto ALTER_USER_OVER; goto _OVER;
} }
if (taosHashPut(newUser.writeDbs, alterReq.dbname, len, alterReq.dbname, TSDB_DB_FNAME_LEN) != 0) { if (taosHashPut(newUser.writeDbs, alterReq.dbname, len, alterReq.dbname, TSDB_DB_FNAME_LEN) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto ALTER_USER_OVER; goto _OVER;
} }
} else if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_WRITE_DB) { } else if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_WRITE_DB) {
if (taosHashRemove(newUser.writeDbs, alterReq.dbname, len) != 0) { if (taosHashRemove(newUser.writeDbs, alterReq.dbname, len) != 0) {
terrno = TSDB_CODE_MND_DB_NOT_EXIST; terrno = TSDB_CODE_MND_DB_NOT_EXIST;
goto ALTER_USER_OVER; goto _OVER;
} }
} else if (alterReq.alterType == TSDB_ALTER_USER_CLEAR_WRITE_DB) { } else if (alterReq.alterType == TSDB_ALTER_USER_CLEAR_WRITE_DB) {
taosHashClear(newUser.writeDbs); taosHashClear(newUser.writeDbs);
} else { } else {
terrno = TSDB_CODE_MND_INVALID_ALTER_OPER; terrno = TSDB_CODE_MND_INVALID_ALTER_OPER;
goto ALTER_USER_OVER; goto _OVER;
} }
newUser.updateTime = taosGetTimestampMs(); newUser.updateTime = taosGetTimestampMs();
if (mndCheckAlterUserAuth(pOperUser, pUser, pDb, &alterReq) != 0) { if (mndCheckAlterUserAuth(pOperUser, pUser, pDb, &alterReq) != 0) {
goto ALTER_USER_OVER; goto _OVER;
} }
code = mndUpdateUser(pMnode, pUser, &newUser, pReq); code = mndUpdateUser(pMnode, pUser, &newUser, pReq);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
ALTER_USER_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("user:%s, failed to alter since %s", alterReq.user, terrstr()); mError("user:%s, failed to alter since %s", alterReq.user, terrstr());
} }
...@@ -537,36 +536,36 @@ static int32_t mndProcessDropUserReq(SNodeMsg *pReq) { ...@@ -537,36 +536,36 @@ static int32_t mndProcessDropUserReq(SNodeMsg *pReq) {
if (tDeserializeSDropUserReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) { if (tDeserializeSDropUserReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
goto DROP_USER_OVER; goto _OVER;
} }
mDebug("user:%s, start to drop", dropReq.user); mDebug("user:%s, start to drop", dropReq.user);
if (dropReq.user[0] == 0) { if (dropReq.user[0] == 0) {
terrno = TSDB_CODE_MND_INVALID_USER_FORMAT; terrno = TSDB_CODE_MND_INVALID_USER_FORMAT;
goto DROP_USER_OVER; goto _OVER;
} }
pUser = mndAcquireUser(pMnode, dropReq.user); pUser = mndAcquireUser(pMnode, dropReq.user);
if (pUser == NULL) { if (pUser == NULL) {
terrno = TSDB_CODE_MND_USER_NOT_EXIST; terrno = TSDB_CODE_MND_USER_NOT_EXIST;
goto DROP_USER_OVER; goto _OVER;
} }
pOperUser = mndAcquireUser(pMnode, pReq->user); pOperUser = mndAcquireUser(pMnode, pReq->user);
if (pOperUser == NULL) { if (pOperUser == NULL) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
goto DROP_USER_OVER; goto _OVER;
} }
if (mndCheckDropUserAuth(pOperUser) != 0) { if (mndCheckDropUserAuth(pOperUser) != 0) {
goto DROP_USER_OVER; goto _OVER;
} }
code = mndDropUser(pMnode, pReq, pUser); code = mndDropUser(pMnode, pReq, pUser);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
DROP_USER_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("user:%s, failed to drop since %s", dropReq.user, terrstr()); mError("user:%s, failed to drop since %s", dropReq.user, terrstr());
} }
...@@ -586,7 +585,7 @@ static int32_t mndProcessGetUserAuthReq(SNodeMsg *pReq) { ...@@ -586,7 +585,7 @@ static int32_t mndProcessGetUserAuthReq(SNodeMsg *pReq) {
if (tDeserializeSGetUserAuthReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &authReq) != 0) { if (tDeserializeSGetUserAuthReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &authReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
goto GET_AUTH_OVER; goto _OVER;
} }
mTrace("user:%s, start to get auth", authReq.user); mTrace("user:%s, start to get auth", authReq.user);
...@@ -594,7 +593,7 @@ static int32_t mndProcessGetUserAuthReq(SNodeMsg *pReq) { ...@@ -594,7 +593,7 @@ static int32_t mndProcessGetUserAuthReq(SNodeMsg *pReq) {
pUser = mndAcquireUser(pMnode, authReq.user); pUser = mndAcquireUser(pMnode, authReq.user);
if (pUser == NULL) { if (pUser == NULL) {
terrno = TSDB_CODE_MND_USER_NOT_EXIST; terrno = TSDB_CODE_MND_USER_NOT_EXIST;
goto GET_AUTH_OVER; goto _OVER;
} }
memcpy(authRsp.user, pUser->user, TSDB_USER_LEN); memcpy(authRsp.user, pUser->user, TSDB_USER_LEN);
...@@ -622,7 +621,7 @@ static int32_t mndProcessGetUserAuthReq(SNodeMsg *pReq) { ...@@ -622,7 +621,7 @@ static int32_t mndProcessGetUserAuthReq(SNodeMsg *pReq) {
void *pRsp = rpcMallocCont(contLen); void *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) { if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto GET_AUTH_OVER; goto _OVER;
} }
tSerializeSGetUserAuthRsp(pRsp, contLen, &authRsp); tSerializeSGetUserAuthRsp(pRsp, contLen, &authRsp);
...@@ -631,7 +630,7 @@ static int32_t mndProcessGetUserAuthReq(SNodeMsg *pReq) { ...@@ -631,7 +630,7 @@ static int32_t mndProcessGetUserAuthReq(SNodeMsg *pReq) {
pReq->rspLen = contLen; pReq->rspLen = contLen;
code = 0; code = 0;
GET_AUTH_OVER: _OVER:
mndReleaseUser(pMnode, pUser); mndReleaseUser(pMnode, pUser);
taosHashCleanup(authRsp.readDbs); taosHashCleanup(authRsp.readDbs);
taosHashCleanup(authRsp.writeDbs); taosHashCleanup(authRsp.writeDbs);
......
...@@ -45,8 +45,8 @@ int32_t mndInitVgroup(SMnode *pMnode) { ...@@ -45,8 +45,8 @@ int32_t mndInitVgroup(SMnode *pMnode) {
.encodeFp = (SdbEncodeFp)mndVgroupActionEncode, .encodeFp = (SdbEncodeFp)mndVgroupActionEncode,
.decodeFp = (SdbDecodeFp)mndVgroupActionDecode, .decodeFp = (SdbDecodeFp)mndVgroupActionDecode,
.insertFp = (SdbInsertFp)mndVgroupActionInsert, .insertFp = (SdbInsertFp)mndVgroupActionInsert,
.updateFp = (SdbUpdateFp)mndVgroupActionDelete, .updateFp = (SdbUpdateFp)mndVgroupActionUpdate,
.deleteFp = (SdbDeleteFp)mndVgroupActionUpdate}; .deleteFp = (SdbDeleteFp)mndVgroupActionDelete};
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndProcessCreateVnodeRsp); mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndProcessCreateVnodeRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_VNODE_RSP, mndProcessAlterVnodeRsp); mndSetMsgHandle(pMnode, TDMT_VND_ALTER_VNODE_RSP, mndProcessAlterVnodeRsp);
......
...@@ -19,10 +19,6 @@ ...@@ -19,10 +19,6 @@
#include "os.h" #include "os.h"
#include "sdb.h" #include "sdb.h"
#include "thash.h"
#include "tlockfree.h"
#include "tlog.h"
#include "tmsg.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -53,26 +49,6 @@ typedef struct SSdbRow { ...@@ -53,26 +49,6 @@ typedef struct SSdbRow {
char pObj[]; char pObj[];
} SSdbRow; } SSdbRow;
typedef struct SSdb {
SMnode *pMnode;
char *currDir;
char *syncDir;
char *tmpDir;
int64_t lastCommitVer;
int64_t curVer;
int64_t tableVer[SDB_MAX];
int64_t maxId[SDB_MAX];
EKeyType keyTypes[SDB_MAX];
SHashObj *hashObjs[SDB_MAX];
SRWLatch locks[SDB_MAX];
SdbInsertFp insertFps[SDB_MAX];
SdbUpdateFp updateFps[SDB_MAX];
SdbDeleteFp deleteFps[SDB_MAX];
SdbDeployFp deployFps[SDB_MAX];
SdbEncodeFp encodeFps[SDB_MAX];
SdbDecodeFp decodeFps[SDB_MAX];
} SSdb;
const char *sdbTableName(ESdbType type); const char *sdbTableName(ESdbType type);
void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper); void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper);
......
...@@ -87,7 +87,7 @@ void sdbCleanup(SSdb *pSdb) { ...@@ -87,7 +87,7 @@ void sdbCleanup(SSdb *pSdb) {
SSdbRow *pRow = *ppRow; SSdbRow *pRow = *ppRow;
if (pRow == NULL) continue; if (pRow == NULL) continue;
sdbFreeRow(pSdb, pRow); sdbFreeRow(pSdb, pRow, true);
ppRow = taosHashIterate(hash, ppRow); ppRow = taosHashIterate(hash, ppRow);
} }
} }
......
...@@ -137,7 +137,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -137,7 +137,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
SSdbRow *pOldRow = taosHashGet(hash, pRow->pObj, keySize); SSdbRow *pOldRow = taosHashGet(hash, pRow->pObj, keySize);
if (pOldRow != NULL) { if (pOldRow != NULL) {
taosWUnLockLatch(pLock); taosWUnLockLatch(pLock);
sdbFreeRow(pSdb, pRow); sdbFreeRow(pSdb, pRow, false);
terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE; terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE;
return terrno; return terrno;
} }
...@@ -148,7 +148,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -148,7 +148,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) { if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) {
taosWUnLockLatch(pLock); taosWUnLockLatch(pLock);
sdbFreeRow(pSdb, pRow); sdbFreeRow(pSdb, pRow, false);
terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE; terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE;
return terrno; return terrno;
} }
...@@ -164,7 +164,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -164,7 +164,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
taosWLockLatch(pLock); taosWLockLatch(pLock);
taosHashRemove(hash, pRow->pObj, keySize); taosHashRemove(hash, pRow->pObj, keySize);
taosWUnLockLatch(pLock); taosWUnLockLatch(pLock);
sdbFreeRow(pSdb, pRow); sdbFreeRow(pSdb, pRow, false);
terrno = code; terrno = code;
return terrno; return terrno;
} }
...@@ -202,7 +202,7 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -202,7 +202,7 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
code = (*updateFp)(pSdb, pOldRow->pObj, pNewRow->pObj); code = (*updateFp)(pSdb, pOldRow->pObj, pNewRow->pObj);
} }
sdbFreeRow(pSdb, pNewRow); sdbFreeRow(pSdb, pNewRow, false);
pSdb->tableVer[pOldRow->type]++; pSdb->tableVer[pOldRow->type]++;
return code; return code;
...@@ -215,7 +215,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -215,7 +215,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
SSdbRow **ppOldRow = taosHashGet(hash, pRow->pObj, keySize); SSdbRow **ppOldRow = taosHashGet(hash, pRow->pObj, keySize);
if (ppOldRow == NULL || *ppOldRow == NULL) { if (ppOldRow == NULL || *ppOldRow == NULL) {
taosWUnLockLatch(pLock); taosWUnLockLatch(pLock);
sdbFreeRow(pSdb, pRow); sdbFreeRow(pSdb, pRow, false);
terrno = TSDB_CODE_SDB_OBJ_NOT_THERE; terrno = TSDB_CODE_SDB_OBJ_NOT_THERE;
return terrno; return terrno;
} }
...@@ -228,7 +228,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -228,7 +228,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
taosWUnLockLatch(pLock); taosWUnLockLatch(pLock);
pSdb->tableVer[pOldRow->type]++; pSdb->tableVer[pOldRow->type]++;
sdbFreeRow(pSdb, pRow); sdbFreeRow(pSdb, pRow, false);
sdbCheck(pSdb, pOldRow); sdbCheck(pSdb, pOldRow);
// sdbRelease(pSdb, pOldRow->pObj); // sdbRelease(pSdb, pOldRow->pObj);
...@@ -322,7 +322,7 @@ static void sdbCheck(SSdb *pSdb, SSdbRow *pRow) { ...@@ -322,7 +322,7 @@ static void sdbCheck(SSdb *pSdb, SSdbRow *pRow) {
int32_t ref = atomic_load_32(&pRow->refCount); int32_t ref = atomic_load_32(&pRow->refCount);
sdbPrintOper(pSdb, pRow, "check"); sdbPrintOper(pSdb, pRow, "check");
if (ref <= 0 && pRow->status == SDB_STATUS_DROPPED) { if (ref <= 0 && pRow->status == SDB_STATUS_DROPPED) {
sdbFreeRow(pSdb, pRow); sdbFreeRow(pSdb, pRow, true);
} }
taosRUnLockLatch(pLock); taosRUnLockLatch(pLock);
...@@ -340,7 +340,7 @@ void sdbRelease(SSdb *pSdb, void *pObj) { ...@@ -340,7 +340,7 @@ void sdbRelease(SSdb *pSdb, void *pObj) {
int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1); int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1);
sdbPrintOper(pSdb, pRow, "release"); sdbPrintOper(pSdb, pRow, "release");
if (ref <= 0 && pRow->status == SDB_STATUS_DROPPED) { if (ref <= 0 && pRow->status == SDB_STATUS_DROPPED) {
sdbFreeRow(pSdb, pRow); sdbFreeRow(pSdb, pRow, true);
} }
taosRUnLockLatch(pLock); taosRUnLockLatch(pLock);
......
...@@ -107,7 +107,9 @@ int32_t sdbSetRawBinary(SSdbRaw *pRaw, int32_t dataPos, const char *pVal, int32_ ...@@ -107,7 +107,9 @@ int32_t sdbSetRawBinary(SSdbRaw *pRaw, int32_t dataPos, const char *pVal, int32_
return -1; return -1;
} }
memcpy(pRaw->pData + dataPos, pVal, valLen); if (pVal != NULL) {
memcpy(pRaw->pData + dataPos, pVal, valLen);
}
return 0; return 0;
} }
......
...@@ -36,11 +36,11 @@ void *sdbGetRowObj(SSdbRow *pRow) { ...@@ -36,11 +36,11 @@ void *sdbGetRowObj(SSdbRow *pRow) {
return pRow->pObj; return pRow->pObj;
} }
void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow) { void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow, bool callFunc) {
// remove attached object such as trans // remove attached object such as trans
SdbDeleteFp deleteFp = pSdb->deleteFps[pRow->type]; SdbDeleteFp deleteFp = pSdb->deleteFps[pRow->type];
if (deleteFp != NULL) { if (deleteFp != NULL) {
(*deleteFp)(pSdb, pRow->pObj); (*deleteFp)(pSdb, pRow->pObj, callFunc);
} }
sdbPrintOper(pSdb, pRow, "free"); sdbPrintOper(pSdb, pRow, "free");
......
...@@ -1034,7 +1034,7 @@ void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlo ...@@ -1034,7 +1034,7 @@ void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlo
} }
} }
static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type, static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam,
int32_t paramIndex, int32_t numOfRows) { int32_t paramIndex, int32_t numOfRows) {
SColumnInfoData* pColInfo = NULL; SColumnInfoData* pColInfo = NULL;
if (pInput->pData[paramIndex] == NULL) { if (pInput->pData[paramIndex] == NULL) {
...@@ -1044,17 +1044,17 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc ...@@ -1044,17 +1044,17 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc
} }
// Set the correct column info (data type and bytes) // Set the correct column info (data type and bytes)
pColInfo->info.type = type; pColInfo->info.type = pFuncParam->param.nType;
pColInfo->info.bytes = tDataTypes[type].bytes; pColInfo->info.bytes = pFuncParam->param.nLen;
pInput->pData[paramIndex] = pColInfo; pInput->pData[paramIndex] = pColInfo;
} else { } else {
pColInfo = pInput->pData[paramIndex]; pColInfo = pInput->pData[paramIndex];
} }
ASSERT(!IS_VAR_DATA_TYPE(type));
colInfoDataEnsureCapacity(pColInfo, 0, numOfRows); colInfoDataEnsureCapacity(pColInfo, 0, numOfRows);
int8_t type = pFuncParam->param.nType;
if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) { if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
int64_t v = pFuncParam->param.i; int64_t v = pFuncParam->param.i;
for (int32_t i = 0; i < numOfRows; ++i) { for (int32_t i = 0; i < numOfRows; ++i) {
...@@ -1065,6 +1065,12 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc ...@@ -1065,6 +1065,12 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc
for (int32_t i = 0; i < numOfRows; ++i) { for (int32_t i = 0; i < numOfRows; ++i) {
colDataAppendDouble(pColInfo, i, &v); colDataAppendDouble(pColInfo, i, &v);
} }
} else if (type == TSDB_DATA_TYPE_VARCHAR) {
char *tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE);
STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen);
for(int32_t i = 0; i < numOfRows; ++i) {
colDataAppend(pColInfo, i, tmp, false);
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1104,7 +1110,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt ...@@ -1104,7 +1110,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
pInput->numOfRows = pBlock->info.rows; pInput->numOfRows = pBlock->info.rows;
pInput->startRowIndex = 0; pInput->startRowIndex = 0;
code = doCreateConstantValColumnInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows); code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -6684,18 +6690,32 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { ...@@ -6684,18 +6690,32 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i); STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
// todo extract method
SColumn c = {0};
c.slotId = pColNode->slotId;
c.colId = pColNode->colId;
c.type = pColNode->node.resType.type;
c.bytes = pColNode->node.resType.bytes;
c.precision = pColNode->node.resType.precision;
c.scale = pColNode->node.resType.scale;
taosArrayPush(pList, &c); if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) {
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
// todo extract method
SColumn c = {0};
c.slotId = pColNode->slotId;
c.colId = pColNode->colId;
c.type = pColNode->node.resType.type;
c.bytes = pColNode->node.resType.bytes;
c.scale = pColNode->node.resType.scale;
c.precision = pColNode->node.resType.precision;
taosArrayPush(pList, &c);
} else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) {
SValueNode* pValNode = (SValueNode*) pNode->pExpr;
SColumn c = {0};
c.slotId = pNode->slotId;
c.colId = pNode->slotId;
c.type = pValNode->node.type;
c.bytes = pValNode->node.resType.bytes;
c.scale = pValNode->node.resType.scale;
c.precision = pValNode->node.resType.precision;
taosArrayPush(pList, &c);
}
} }
return pList; return pList;
......
...@@ -213,7 +213,7 @@ enum { ...@@ -213,7 +213,7 @@ enum {
int32_t getUdfdPipeName(char* pipeName, int32_t size) { int32_t getUdfdPipeName(char* pipeName, int32_t size) {
char dnodeId[8] = {0}; char dnodeId[8] = {0};
size_t dnodeIdSize; size_t dnodeIdSize = sizeof(dnodeId);
int32_t err = uv_os_getenv(UDF_DNODE_ID_ENV_NAME, dnodeId, &dnodeIdSize); int32_t err = uv_os_getenv(UDF_DNODE_ID_ENV_NAME, dnodeId, &dnodeIdSize);
if (err != 0) { if (err != 0) {
dnodeId[0] = '1'; dnodeId[0] = '1';
......
...@@ -823,7 +823,7 @@ int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url) { ...@@ -823,7 +823,7 @@ int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url) {
} }
p++; p++;
if (memcmp(url, "jsonFile", 8) == 0) { if (strncmp(url, "jsonFile", 8) == 0) {
char *filepath = p; char *filepath = p;
if (!taosCheckExistFile(filepath)) { if (!taosCheckExistFile(filepath)) {
uError("fial to load json file: %s", filepath); uError("fial to load json file: %s", filepath);
...@@ -893,8 +893,8 @@ int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url) { ...@@ -893,8 +893,8 @@ int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url) {
} }
tjsonDelete(pJson); tjsonDelete(pJson);
// } else if (memcmp(url, "jsonUrl", 7) == 0) { // } else if (strncmp(url, "jsonUrl", 7) == 0) {
// } else if (memcmp(url, "etcdUrl", 7) == 0) { // } else if (strncmp(url, "etcdUrl", 7) == 0) {
} else { } else {
uError("Unsupported url: %s", url); uError("Unsupported url: %s", url);
return -1; return -1;
...@@ -908,7 +908,7 @@ int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char* apolloUrl ...@@ -908,7 +908,7 @@ int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char* apolloUrl
int32_t index = 0; int32_t index = 0;
if (envCmd == NULL) return 0; if (envCmd == NULL) return 0;
while (envCmd[index]!=NULL) { while (envCmd[index]!=NULL) {
if (memcmp(envCmd[index], "TAOS_APOLLO_URL", 14) == 0) { if (strncmp(envCmd[index], "TAOS_APOLLO_URL", 14) == 0) {
char *p = strchr(envCmd[index], '='); char *p = strchr(envCmd[index], '=');
if (p != NULL) { if (p != NULL) {
p++; p++;
...@@ -934,7 +934,7 @@ int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char* apolloUrl ...@@ -934,7 +934,7 @@ int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char* apolloUrl
break; break;
} }
if(line[_bytes - 1] == '\n') line[_bytes - 1] = 0; if(line[_bytes - 1] == '\n') line[_bytes - 1] = 0;
if (memcmp(line, "TAOS_APOLLO_URL", 14) == 0) { if (strncmp(line, "TAOS_APOLLO_URL", 14) == 0) {
char *p = strchr(line, '='); char *p = strchr(line, '=');
if (p != NULL) { if (p != NULL) {
p++; p++;
...@@ -975,7 +975,7 @@ int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char* apolloUrl ...@@ -975,7 +975,7 @@ int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char* apolloUrl
break; break;
} }
if(line[_bytes - 1] == '\n') line[_bytes - 1] = 0; if(line[_bytes - 1] == '\n') line[_bytes - 1] = 0;
if (memcmp(line, "TAOS_APOLLO_URL", 14) == 0) { if (strncmp(line, "TAOS_APOLLO_URL", 14) == 0) {
char *p = strchr(line, '='); char *p = strchr(line, '=');
if (p != NULL) { if (p != NULL) {
p++; p++;
......
...@@ -63,7 +63,7 @@ class TDSimClient: ...@@ -63,7 +63,7 @@ class TDSimClient:
if os.system(cmd) != 0: if os.system(cmd) != 0:
tdLog.exit(cmd) tdLog.exit(cmd)
def deploy(self): def deploy(self, *updatecfgDict):
self.logDir = "%s/sim/psim/log" % (self.path) self.logDir = "%s/sim/psim/log" % (self.path)
self.cfgDir = "%s/sim/psim/cfg" % (self.path) self.cfgDir = "%s/sim/psim/cfg" % (self.path)
self.cfgPath = "%s/sim/psim/cfg/taos.cfg" % (self.path) self.cfgPath = "%s/sim/psim/cfg/taos.cfg" % (self.path)
...@@ -95,6 +95,14 @@ class TDSimClient: ...@@ -95,6 +95,14 @@ class TDSimClient:
for key, value in self.cfgDict.items(): for key, value in self.cfgDict.items():
self.cfg(key, value) self.cfg(key, value)
try:
if bool(updatecfgDict) and updatecfgDict[0] and updatecfgDict[0][0]:
clientCfg = dict (updatecfgDict[0][0].get('clientCfg'))
for key, value in clientCfg.items():
self.cfg(key, value)
except Exception:
pass
tdLog.debug("psim is deployed and configured by %s" % (self.cfgPath)) tdLog.debug("psim is deployed and configured by %s" % (self.cfgPath))
...@@ -214,9 +222,11 @@ class TDDnode: ...@@ -214,9 +222,11 @@ class TDDnode:
# self.cfg("logDir",self.logDir) # self.cfg("logDir",self.logDir)
# print(updatecfgDict) # print(updatecfgDict)
isFirstDir = 1 isFirstDir = 1
if updatecfgDict[0] and updatecfgDict[0][0]: if bool(updatecfgDict) and updatecfgDict[0] and updatecfgDict[0][0]:
print(updatecfgDict[0][0]) print(updatecfgDict[0][0])
for key, value in updatecfgDict[0][0].items(): for key, value in updatecfgDict[0][0].items():
if key == "clientCfg":
continue
if value == 'dataDir': if value == 'dataDir':
if isFirstDir: if isFirstDir:
self.cfgDict.pop('dataDir') self.cfgDict.pop('dataDir')
...@@ -491,7 +501,7 @@ class TDDnodes: ...@@ -491,7 +501,7 @@ class TDDnodes:
self.sim.setTestCluster(self.testCluster) self.sim.setTestCluster(self.testCluster)
if (self.simDeployed == False): if (self.simDeployed == False):
self.sim.deploy() self.sim.deploy(updatecfgDict)
self.simDeployed = True self.simDeployed = True
self.check(index) self.check(index)
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
# ---- db # ---- db
./test.sh -f tsim/db/create_all_options.sim ./test.sh -f tsim/db/create_all_options.sim
#./test.sh -f tsim/db/alter_option.sim ./test.sh -f tsim/db/alter_option.sim
./test.sh -f tsim/db/basic1.sim ./test.sh -f tsim/db/basic1.sim
./test.sh -f tsim/db/basic2.sim ./test.sh -f tsim/db/basic2.sim
./test.sh -f tsim/db/basic3.sim ./test.sh -f tsim/db/basic3.sim
...@@ -87,6 +87,6 @@ ...@@ -87,6 +87,6 @@
# ./test.sh -f tsim/sma/tsmaCreateInsertData.sim # ./test.sh -f tsim/sma/tsmaCreateInsertData.sim
# --- valgrind # --- valgrind
#./test.sh -f tsim/valgrind/checkError.sim -v ./test.sh -f tsim/valgrind/checkError.sim -v
#======================b1-end=============== #======================b1-end===============
...@@ -66,7 +66,7 @@ print ============= create database ...@@ -66,7 +66,7 @@ print ============= create database
# | REPLICA value [1 | 3] # | REPLICA value [1 | 3]
# | WAL value [1 | 2] # | WAL value [1 | 2]
sql create database db BLOCKS 7 CACHE 3 CACHELAST 3 COMP 0 DAYS 345600 FSYNC 1000 MAXROWS 8000 MINROWS 10 KEEP 1440000 PRECISION 'ns' REPLICA 3 TTL 7 WAL 2 VGROUPS 6 SINGLE_STABLE 1 STREAM_MODE 1 sql create database db BLOCKS 7 CACHE 3 CACHELAST 3 COMP 0 DAYS 345600 FSYNC 1000 MAXROWS 8000 MINROWS 10 KEEP 1440000 PRECISION 'ns' REPLICA 1 TTL 7 WAL 2 VGROUPS 6 SINGLE_STABLE 1 STREAM_MODE 1
sql show databases sql show databases
print rows: $rows print rows: $rows
print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09 print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
...@@ -86,7 +86,7 @@ endi ...@@ -86,7 +86,7 @@ endi
if $data3_db != 0 then # ntables if $data3_db != 0 then # ntables
return -1 return -1
endi endi
if $data4_db != 3 then # replica if $data4_db != 1 then # replica
return -1 return -1
endi endi
if $data5_db != nostrict then # strict if $data5_db != nostrict then # strict
...@@ -127,47 +127,47 @@ if $data16_db != ns then # precision ...@@ -127,47 +127,47 @@ if $data16_db != ns then # precision
endi endi
sleep 3000 sleep 3000
sql show db.vgroups #sql show db.vgroups
if $data[0][4] == LEADER then #if $data[0][4] == LEADER then
if $data[0][6] != FOLLOWER then # if $data[0][6] != FOLLOWER then
return -1 # return -1
endi # endi
if $data[0][8] != FOLLOWER then # if $data[0][8] != FOLLOWER then
return -1 # return -1
endi # endi
endi #endi
if $data[0][6] == LEADER then #if $data[0][6] == LEADER then
if $data[0][4] != FOLLOWER then # if $data[0][4] != FOLLOWER then
return -1 # return -1
endi # endi
if $data[0][8] != FOLLOWER then # if $data[0][8] != FOLLOWER then
return -1 # return -1
endi # endi
endi #endi
if $data[0][8] == LEADER then #if $data[0][8] == LEADER then
if $data[0][4] != FOLLOWER then # if $data[0][4] != FOLLOWER then
return -1 # return -1
endi # endi
if $data[0][6] != FOLLOWER then # if $data[0][6] != FOLLOWER then
return -1 # return -1
endi # endi
endi #endi
#
if $data[0][4] != LEADER then #if $data[0][4] != LEADER then
if $data[0][4] != FOLLOWER then # if $data[0][4] != FOLLOWER then
return -1 # return -1
endi # endi
endi #endi
if $data[0][6] != LEADER then #if $data[0][6] != LEADER then
if $data[0][6] != FOLLOWER then # if $data[0][6] != FOLLOWER then
return -1 # return -1
endi # endi
endi #endi
if $data[0][8] != LEADER then #if $data[0][8] != LEADER then
if $data[0][8] != FOLLOWER then # if $data[0][8] != FOLLOWER then
return -1 # return -1
endi # endi
endi #endi
print ============== not support modify options: name, create_time, vgroups, ntables print ============== not support modify options: name, create_time, vgroups, ntables
sql_error alter database db name dba sql_error alter database db name dba
......
...@@ -42,6 +42,8 @@ int main(int argc, char *argv[]) { ...@@ -42,6 +42,8 @@ int main(int argc, char *argv[]) {
return 0; return 0;
} }
taos_init();
if (shell.args.is_dump_config) { if (shell.args.is_dump_config) {
shellDumpConfig(); shellDumpConfig();
taos_cleanup(); taos_cleanup();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册