提交 d1b9a866 编写于 作者: H hzcheng

Merge branch 'develop' into feature/2.0tsdb

......@@ -41,12 +41,29 @@ addons:
branch_pattern: coverity_scan
before_script:
- mkdir build
- cd build
- mkdir debug
- cd debug
script:
- cmake ..
- cmake --build .
- cmake --build . || exit $?
- |-
case $TRAVIS_OS_NAME in
linux)
cd ../tests/script
sudo ./test.sh 2>&1 | tee out.txt
cat out.txt
grep success out.txt
total_success=`grep success out.txt | wc -l`
echo "Total $total_success success"
grep failed out.txt
total_failed=`grep failed out.txt | wc -l`
echo "Total $total_failed failed"
if [ "$total_failed" -ne "0" ]; then
exit $total_failed
fi
;;
esac
#
# Build Matrix
......@@ -58,6 +75,7 @@ matrix:
packages:
- build-essential
- cmake
- net-tools
# - os: osx
# addons:
......
......@@ -3,6 +3,8 @@ PROJECT(TDengine)
SET(TD_CLUSTER FALSE)
SET(TD_ACCOUNT FALSE)
SET(TD_VPEER FALSE)
SET(TD_MPEER FALSE)
SET(TD_GRANT FALSE)
SET(TD_COVER FALSE)
SET(TD_PAGMODE_LITE FALSE)
......
......@@ -107,12 +107,12 @@ IF (TD_LINUX_64)
SET(RELEASE_FLAGS "-O0")
IF (NOT TD_ARM)
IF (${CMAKE_CXX_COMPILER_ID} MATCHES "Clang")
SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -malign-double -g3 -gdwarf-2 -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -Wno-missing-braces -fPIC -g3 -gdwarf-2 -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ELSE ()
SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -malign-double -g3 -gdwarf-2 -malign-stringops -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -malign-double -g3 -gdwarf-2 -malign-stringops -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ENDIF ()
ELSE ()
SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -g -fsigned-char -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -g -fsigned-char -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ENDIF ()
ADD_DEFINITIONS(-DLINUX)
ADD_DEFINITIONS(-D_REENTRANT -D__USE_POSIX -D_LIBC_REENTRANT)
......@@ -128,7 +128,7 @@ IF (TD_LINUX_64)
ENDIF ()
SET(DEBUG_FLAGS "-O0 -DDEBUG")
SET(RELEASE_FLAGS "-O0")
SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -g -fsigned-char -munaligned-access -fpack-struct=8 -latomic -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -g -fsigned-char -munaligned-access -fpack-struct=8 -latomic -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ADD_DEFINITIONS(-DLINUX)
ADD_DEFINITIONS(-D_REENTRANT -D__USE_POSIX -D_LIBC_REENTRANT)
ADD_DEFINITIONS(-DUSE_LIBICONV)
......@@ -141,7 +141,7 @@ IF (TD_LINUX_64)
ELSEIF (TD_WINDOWS_64)
SET(CMAKE_GENERATOR "NMake Makefiles" CACHE INTERNAL "" FORCE)
IF (NOT TD_GODLL)
SET(COMMON_FLAGS "/nologo /WX- /Oi /Oy- /Gm- /EHsc /MT /GS /Gy /fp:precise /Zc:wchar_t /Zc:forScope /Gd /errorReport:prompt /analyze-")
SET(COMMON_FLAGS "/nologo /WX /Oi /Oy- /Gm- /EHsc /MT /GS /Gy /fp:precise /Zc:wchar_t /Zc:forScope /Gd /errorReport:prompt /analyze-")
SET(DEBUG_FLAGS "/Zi /W3 /GL")
SET(RELEASE_FLAGS "/W0 /GL")
ENDIF ()
......@@ -151,7 +151,7 @@ IF (TD_LINUX_64)
ADD_DEFINITIONS(-DPTW32_BUILD)
ADD_DEFINITIONS(-D_MBCS -D_CRT_SECURE_NO_DEPRECATE -D_CRT_NONSTDC_NO_DEPRECATE)
ELSEIF (TD_DARWIN_64)
SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -malign-double -g -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -Wno-missing-braces -fPIC -g -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
SET(DEBUG_FLAGS "-O0 -DDEBUG")
SET(RELEASE_FLAGS "-O0")
ADD_DEFINITIONS(-DDARWIN)
......@@ -159,4 +159,4 @@ IF (TD_LINUX_64)
ELSE ()
MESSAGE(FATAL_ERROR "The current platform is not support yet, stop compile")
EXIT ()
ENDIF ()
\ No newline at end of file
ENDIF ()
......@@ -213,8 +213,6 @@ typedef struct SDataBlockList {
int32_t idx;
uint32_t nSize;
uint32_t nAlloc;
char * userParam; /* user assigned parameters for async query */
void * udfp; /* user defined function pointer, used in async model */
STableDataBlocks **pData;
} SDataBlockList;
......@@ -451,7 +449,6 @@ void tscCloseTscObj(STscObj *pObj);
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen);
void tscProcessMultiVnodesInsert(SSqlObj *pSql);
void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql);
void tscKillMetricQuery(SSqlObj *pSql);
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);
......
......@@ -342,8 +342,8 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) {
(*pSql->fp)(pSql->param, taosres, code);
if (shouldFree) {
tscFreeSqlObj(pSql);
tscTrace("%p Async sql is automatically freed in async res", pSql);
tscFreeSqlObj(pSql);
}
}
......
......@@ -941,6 +941,10 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
sql = sToken.z;
}
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (pSql->asyncTblPos == NULL) {
assert(code == TSDB_CODE_ACTION_IN_PROGRESS);
}
}
int32_t len = cend - cstart + 1;
......@@ -1064,8 +1068,8 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) {
/*
* For async insert, after get the metermeta from server, the sql string will not be
* parsed using the new metermeta to avoid the overhead cause by get metermeta data information.
* For async insert, after get the table meta from server, the sql string will not be
* parsed using the new table meta to avoid the overhead cause by get table meta data information.
* And during the getMeterMetaCallback function, the sql string will be parsed from the
* interrupted position.
*/
......
......@@ -292,7 +292,6 @@ void tscKillConnection(STscObj *pObj) {
pthread_mutex_unlock(&pObj->mutex);
taos_close(pObj);
tscTrace("connection:%p is killed", pObj);
taos_close(pObj);
}
......@@ -210,7 +210,7 @@ char* tsGetTagsValue(STableMeta* pTableMeta) {
}
// todo refactor
static FORCE_INLINE char* skipSegments(char* input, char delim, int32_t num) {
__attribute__ ((unused))static FORCE_INLINE char* skipSegments(char* input, char delim, int32_t num) {
for (int32_t i = 0; i < num; ++i) {
while (*input != 0 && *input++ != delim) {
};
......@@ -218,7 +218,7 @@ static FORCE_INLINE char* skipSegments(char* input, char delim, int32_t num) {
return input;
}
static FORCE_INLINE size_t copy(char* dst, const char* src, char delimiter) {
__attribute__ ((unused)) static FORCE_INLINE size_t copy(char* dst, const char* src, char delimiter) {
size_t len = 0;
while (*src != delimiter && *src != 0) {
*dst++ = *src++;
......
......@@ -343,8 +343,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
(*pSql->fp)(pSql->param, taosres, rpcMsg->code);
if (shouldFree) {
tscFreeSqlObj(pSql);
tscTrace("%p Async sql is automatically freed", pSql);
tscFreeSqlObj(pSql);
}
}
......@@ -1787,6 +1787,7 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
size = tscEstimateHeartBeatMsgLength(pSql);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
pthread_mutex_unlock(&pObj->mutex);
tscError("%p failed to malloc for heartbeat msg", pSql);
return -1;
}
......@@ -1835,7 +1836,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
}
for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
pMetaMsg->vpeerDesc[i].vnode = htonl(pMetaMsg->vpeerDesc[i].vnode);
pMetaMsg->vpeerDesc[i].vgId = htonl(pMetaMsg->vpeerDesc[i].vgId);
}
SSchema* pSchema = pMetaMsg->schema;
......@@ -2399,8 +2400,8 @@ int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name);
if (pTableMetaInfo->pTableMeta != NULL) {
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
tscTrace("%p retrieve tableMeta from cache, the number of columns:%d, numOfTags:%d", pSql, tinfo.numOfColumns,
tinfo.numOfTags);
tscTrace("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns,
tinfo.numOfTags, pTableMetaInfo->pTableMeta);
return TSDB_CODE_SUCCESS;
}
......
......@@ -757,8 +757,8 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
tscTrace("%p qhandle is null, abort free, fp:%p", pSql, pSql->fp);
if (tscShouldFreeAsyncSqlObj(pSql)) {
tscFreeSqlObj(pSql);
tscTrace("%p Async SqlObj is freed by app", pSql);
tscFreeSqlObj(pSql);
} else {
if (keepCmd) {
tscFreeSqlResult(pSql);
......
......@@ -582,10 +582,12 @@ void taos_close_stream(TAOS_STREAM *handle) {
tscRemoveFromStreamList(pStream, pSql);
taosTmrStopA(&(pStream->pTimer));
tscTrace("%p stream:%p is closed", pSql, pStream);
tscFreeSqlObj(pSql);
pStream->pSql = NULL;
tscTrace("%p stream:%p is closed", pSql, pStream);
tfree(pStream);
}
}
......@@ -108,7 +108,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
if (pSql == NULL) {
terrno = TSDB_CODE_CLI_OUT_OF_MEMORY;
tscError("failed to allocate SSqlObj for subscription");
goto failed;
goto _pSql_failed;
}
pSql->signature = pSql;
......@@ -137,13 +137,11 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
return pSub;
failed:
if (sqlstr != NULL) {
free(sqlstr);
}
if (pSql != NULL) {
free(pSql);
}
free(pSub);
tfree(sqlstr);
_pSql_failed:
tfree(pSql);
tfree(pSub);
return NULL;
}
......
......@@ -1381,6 +1381,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
} else { // all data has been retrieved to client
tscAllDataRetrievedFromDnode(trsupport, pSql);
}
pthread_mutex_unlock(&trsupport->queryMutex);
}
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) {
......@@ -1454,7 +1455,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
SSqlObj *pNew = tscCreateSqlObjForSubquery(pParentSql, trsupport, pSql);
if (pNew == NULL) {
tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vid:%d, orderOfSub:%d",
trsupport->pParentSqlObj, pSql, pSvd != NULL ? pSvd->vnode : -1, trsupport->subqueryIndex);
trsupport->pParentSqlObj, pSql, pSvd != NULL ? pSvd->vgId : -1, trsupport->subqueryIndex);
pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
......@@ -1470,7 +1471,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
if (pState->code != TSDB_CODE_SUCCESS) { // failed, abort
if (vnodeInfo != NULL) {
tscTrace("%p sub:%p query failed,ip:%u,vid:%d,orderOfSub:%d,global code:%d", pParentSql, pSql,
vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode,
vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vgId,
trsupport->subqueryIndex, pState->code);
} else {
tscTrace("%p sub:%p query failed,orderOfSub:%d,global code:%d", pParentSql, pSql,
......@@ -1481,7 +1482,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
} else { // success, proceed to retrieve data from dnode
if (vnodeInfo != NULL) {
tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode,
vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vgId,
trsupport->subqueryIndex);
} else {
tscTrace("%p sub:%p query complete, orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
......
......@@ -6,7 +6,7 @@
#include "ttokendef.h"
// todo refactor
static FORCE_INLINE const char* skipSegments(const char* input, char delim, int32_t num) {
__attribute__((unused)) static FORCE_INLINE const char* skipSegments(const char* input, char delim, int32_t num) {
for (int32_t i = 0; i < num; ++i) {
while (*input != 0 && *input++ != delim) {
};
......@@ -14,7 +14,7 @@ static FORCE_INLINE const char* skipSegments(const char* input, char delim, int3
return input;
}
static FORCE_INLINE size_t copy(char* dst, const char* src, char delimiter) {
__attribute__((unused)) static FORCE_INLINE size_t copy(char* dst, const char* src, char delimiter) {
size_t len = 0;
while (*src != delimiter && *src != 0) {
*dst++ = *src++;
......
......@@ -6,6 +6,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/mnode/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/vnode/tsdb/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc)
INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc)
......@@ -25,6 +26,10 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
IF (TD_CLUSTER)
TARGET_LINK_LIBRARIES(taosd cluster)
ENDIF ()
IF (TD_VPEER)
TARGET_LINK_LIBRARIES(taosd balance)
ENDIF ()
SET(PREPARE_ENV_CMD "prepare_env_cmd")
SET(PREPARE_ENV_TARGET "prepare_env_target")
......
......@@ -28,9 +28,11 @@
#include "dnodeRead.h"
#include "dnodeShell.h"
#include "dnodeWrite.h"
#include "mgmtGrant.h"
static int32_t dnodeInitSystem();
static int32_t dnodeInitStorage();
extern void grantParseParameter();
static void dnodeCleanupStorage();
static void dnodeCleanUpSystem();
static void dnodeSetRunStatus(SDnodeRunStatus status);
......@@ -77,7 +79,7 @@ int32_t main(int32_t argc, char *argv[]) {
}
/* Set termination handler. */
struct sigaction act;
struct sigaction act = {0};
act.sa_flags = SA_SIGINFO;
act.sa_sigaction = signal_handler;
sigaction(SIGTERM, &act, NULL);
......
......@@ -64,7 +64,7 @@ int32_t dnodeInitMgmt() {
dError("failed to init dnode timer");
return -1;
}
int32_t code = dnodeOpenVnodes();
if (code != TSDB_CODE_SUCCESS) {
return -1;
......@@ -104,7 +104,7 @@ void dnodeMgmt(SRpcMsg *pMsg) {
rpcFreeCont(pMsg->pCont);
}
static int dnodeGetVnodeList(int32_t vnodeList[]) {
static int32_t dnodeGetVnodeList(int32_t vnodeList[]) {
DIR *dir = opendir(tsVnodeDir);
if (dir == NULL) {
return TSDB_CODE_NO_WRITE_ACCESS;
......@@ -129,47 +129,59 @@ static int dnodeGetVnodeList(int32_t vnodeList[]) {
}
static int32_t dnodeOpenVnodes() {
char vnodeDir[TSDB_FILENAME_LEN * 3];
int failed = 0;
char vnodeDir[TSDB_FILENAME_LEN * 3];
int32_t failed = 0;
int32_t *vnodeList = (int32_t *) malloc(sizeof(int32_t) * 10000);
int numOfVnodes = dnodeGetVnodeList(vnodeList);
for (int i=0; i<numOfVnodes; ++i) {
snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/vnode%d", tsVnodeDir, vnodeList[i]);
if (vnodeOpen(vnodeList[i], vnodeDir) <0) failed++;
}
int32_t *vnodeList = (int32_t *)malloc(sizeof(int32_t) * 10000);
int32_t numOfVnodes = dnodeGetVnodeList(vnodeList);
for (int32_t i = 0; i < numOfVnodes; ++i) {
snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/vnode%d", tsVnodeDir, vnodeList[i]);
if (vnodeOpen(vnodeList[i], vnodeDir) < 0) failed++;
}
free(vnodeList);
free(vnodeList);
dPrint("there are total vnodes:%d, failed to open:%d", numOfVnodes, failed);
return TSDB_CODE_SUCCESS;
dPrint("there are total vnodes:%d, failed to open:%d", numOfVnodes, failed);
return TSDB_CODE_SUCCESS;
}
static void dnodeCloseVnodes() {
int32_t *vnodeList = (int32_t *) malloc(sizeof(int32_t) * 10000);
int numOfVnodes = dnodeGetVnodeList(vnodeList);
for (int i=0; i<numOfVnodes; ++i)
vnodeClose(vnodeList[i]);
int32_t *vnodeList = (int32_t *)malloc(sizeof(int32_t) * 10000);
int32_t numOfVnodes = dnodeGetVnodeList(vnodeList);
free(vnodeList);
dPrint("total vnodes:%d are all closed", numOfVnodes);
for (int32_t i = 0; i < numOfVnodes; ++i) {
vnodeClose(vnodeList[i]);
}
free(vnodeList);
dPrint("total vnodes:%d are all closed", numOfVnodes);
}
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
SMDCreateVnodeMsg *pCreate = rpcMsg->pCont;
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
pCreate->cfg.commitLog = pCreate->cfg.commitLog;
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
pCreate->cfg.cacheBlockSize = htonl(pCreate->cfg.cacheBlockSize);
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
pCreate->cfg.daysToKeep1 = htonl(pCreate->cfg.daysToKeep1);
pCreate->cfg.daysToKeep2 = htonl(pCreate->cfg.daysToKeep2);
pCreate->cfg.daysToKeep = htonl(pCreate->cfg.daysToKeep);
pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime);
pCreate->cfg.rowsInFileBlock = htonl(pCreate->cfg.rowsInFileBlock);
pCreate->cfg.blocksPerTable = htons(pCreate->cfg.blocksPerTable);
pCreate->cfg.cacheNumOfBlocks.totalBlocks = htonl(pCreate->cfg.cacheNumOfBlocks.totalBlocks);
for (int32_t j = 0; j < pCreate->cfg.replications; ++j) {
pCreate->vpeerDesc[j].vgId = htonl(pCreate->vpeerDesc[j].vgId);
pCreate->vpeerDesc[j].dnodeId = htonl(pCreate->vpeerDesc[j].dnodeId);
pCreate->vpeerDesc[j].ip = htonl(pCreate->vpeerDesc[j].ip);
}
return vnodeCreate(pCreate);
}
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
SMDDropVnodeMsg *pDrop = rpcMsg->pCont;
pDrop->vgId = htonl(pDrop->vgId);
......@@ -177,7 +189,6 @@ static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
}
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
SMDCreateVnodeMsg *pCreate = rpcMsg->pCont;
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
......@@ -206,7 +217,7 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
static void dnodeSendStatusMsg(void *handle, void *tmrId) {
if (tsDnodeTmr == NULL) {
dError("dnode timer is already released");
dError("dnode timer is already released");
return;
}
......
......@@ -15,38 +15,25 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tlog.h"
#include "tqueue.h"
#include "trpc.h"
#include "twal.h"
#include "dnodeMgmt.h"
#include "dnodeRead.h"
#include "queryExecutor.h"
#include "vnode.h"
typedef struct {
int32_t code;
int32_t count;
int32_t numOfVnodes;
} SRpcContext;
typedef struct {
void *pCont;
int32_t contLen;
SRpcMsg rpcMsg;
SRpcContext *pRpcContext; // RPC message context
SRspRet rspRet;
void *pCont;
int32_t contLen;
SRpcMsg rpcMsg;
} SReadMsg;
static void *dnodeProcessReadQueue(void *param);
static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead);
static void dnodeHandleIdleReadWorker();
static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg);
static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg);
static void(*dnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(void *pVnode, SReadMsg *pNode);
// module global variable
static taos_qset readQset;
......@@ -55,14 +42,11 @@ static int32_t maxThreads;
static int32_t minThreads;
int32_t dnodeInitRead() {
dnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeProcessQueryMsg;
dnodeProcessReadMsgFp[TSDB_MSG_TYPE_RETRIEVE] = dnodeProcessRetrieveMsg;
readQset = taosOpenQset();
minThreads = 3;
maxThreads = tsNumOfCores*tsNumOfThreadsPerCore;
if (maxThreads <= minThreads*2) maxThreads = 2*minThreads;
maxThreads = tsNumOfCores * tsNumOfThreadsPerCore;
if (maxThreads <= minThreads * 2) maxThreads = 2 * minThreads;
dPrint("dnode read is opened");
return 0;
......@@ -77,21 +61,21 @@ void dnodeRead(SRpcMsg *pMsg) {
int32_t queuedMsgNum = 0;
int32_t leftLen = pMsg->contLen;
char *pCont = (char *) pMsg->pCont;
SRpcContext *pRpcContext = NULL;
void *pVnode;
dTrace("dnode %s msg incoming, thandle:%p", taosMsg[pMsg->msgType], pMsg->handle);
if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) {
queuedMsgNum = 0;
}
while (leftLen > 0) {
SMsgHead *pHead = (SMsgHead *) pCont;
pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen);
pVnode = vnodeGetVnode(pHead->vgId);
if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) {
pVnode = vnodeGetVnode(pHead->vgId);
} else {
pVnode = vnodeAccquireVnode(pHead->vgId);
}
if (pVnode == NULL) {
leftLen -= pHead->contLen;
pCont -= pHead->contLen;
......@@ -104,7 +88,6 @@ void dnodeRead(SRpcMsg *pMsg) {
pRead->rpcMsg = *pMsg;
pRead->pCont = pCont;
pRead->contLen = pHead->contLen;
pRead->pRpcContext = pRpcContext;
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
......@@ -155,6 +138,34 @@ void dnodeFreeRqueue(void *rqueue) {
// dynamically adjust the number of threads
}
static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) {
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg = pMsg->rpcMsg;
pRead->pCont = qhandle;
pRead->contLen = 0;
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
taos_queue queue = vnodeGetRqueue(pVnode);
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
}
void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
if (code == TSDB_CODE_ACTION_NEED_REPROCESSED) {
dnodeContinueExecuteQuery(pVnode, pRead->rspRet.qhandle, pRead);
}
SRpcMsg rpcRsp = {
.handle = pRead->rpcMsg.handle,
.pCont = pRead->rspRet.rsp,
.contLen = pRead->rspRet.len,
.code = pRead->rspRet.code,
};
rpcSendResponse(&rpcRsp);
rpcFreeCont(pRead->rpcMsg.pCont);
}
static void *dnodeProcessReadQueue(void *param) {
taos_qset qset = (taos_qset)param;
SReadMsg *pReadMsg;
......@@ -167,13 +178,8 @@ static void *dnodeProcessReadQueue(void *param) {
continue;
}
terrno = 0;
if (dnodeProcessReadMsgFp[pReadMsg->rpcMsg.msgType]) {
(*dnodeProcessReadMsgFp[pReadMsg->rpcMsg.msgType]) (pVnode, pReadMsg);
} else {
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
}
int32_t code = vnodeProcessRead(pVnode, pReadMsg->rpcMsg.msgType, pReadMsg->pCont, pReadMsg->contLen, &pReadMsg->rspRet);
dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
taosFreeQitem(pReadMsg);
}
......@@ -192,118 +198,3 @@ static void dnodeHandleIdleReadWorker() {
}
}
static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead) {
SRpcContext *pRpcContext = pRead->pRpcContext;
int32_t code = 0;
if (pRpcContext) {
if (terrno) {
if (pRpcContext->code == 0) pRpcContext->code = terrno;
}
int32_t count = atomic_add_fetch_32(&pRpcContext->count, 1);
if (count < pRpcContext->numOfVnodes) {
// not over yet, multiple vnodes
return;
}
// over, result can be merged now
code = pRpcContext->code;
} else {
code = terrno;
}
//TODO: query handle is returned by dnodeProcessQueryMsg
if (0) {
SRpcMsg rsp;
rsp.handle = pRead->rpcMsg.handle;
rsp.code = code;
rsp.pCont = NULL;
rpcSendResponse(&rsp);
}
rpcFreeCont(pRead->rpcMsg.pCont); // free the received message
}
static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) {
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg = pMsg->rpcMsg;
pRead->pCont = qhandle;
pRead->contLen = 0;
pRead->pRpcContext = pMsg->pRpcContext;
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
taos_queue queue = vnodeGetRqueue(pVnode);
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
}
static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg) {
SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pMsg->pCont;
SQInfo* pQInfo = NULL;
if (pMsg->contLen != 0) {
void* tsdb = vnodeGetTsdb(pVnode);
int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = code;
pRsp->qhandle = htobe64((uint64_t) (pQInfo));
SRpcMsg rpcRsp = {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
.contLen = sizeof(SQueryTableRsp),
.code = code,
.msgType = 0
};
rpcSendResponse(&rpcRsp);
dTrace("dnode query msg disposed, thandle:%p", pMsg->rpcMsg.handle);
vnodeRelease(pVnode);
} else {
pQInfo = pMsg->pCont;
}
qTableQuery(pQInfo); // do execute query
}
static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) {
SRetrieveTableMsg *pRetrieve = pMsg->pCont;
void *pQInfo = (void*) htobe64(pRetrieve->qhandle);
dTrace("QInfo:%p vgId:%d, retrieve msg is received", pQInfo, pRetrieve->header.vgId);
int32_t contLen = 0;
SRetrieveTableRsp *pRsp = NULL;
int32_t code = qRetrieveQueryResultInfo(pQInfo);
if (code != TSDB_CODE_SUCCESS) {
contLen = sizeof(SRetrieveTableRsp);
pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen);
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
} else {
// todo check code and handle error in build result set
code = qDumpRetrieveResult(pQInfo, &pRsp, &contLen);
if (qHasMoreResultsToRetrieve(pQInfo)) {
dnodeContinueExecuteQuery(pVnode, pQInfo, pMsg);
} else { // no further execution invoked, release the ref to vnode
dnodeProcessReadResult(pVnode, pMsg);
//vnodeRelease(pVnode);
}
}
SRpcMsg rpcRsp = (SRpcMsg) {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
.contLen = contLen,
.code = code,
.msgType = 0
};
rpcSendResponse(&rpcRsp);
dTrace("dnode retrieve msg disposed, thandle:%p", pMsg->rpcMsg.handle);
vnodeRelease(pVnode);
}
......@@ -37,7 +37,7 @@ int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeRead;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_RETRIEVE] = dnodeRead;
int numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore;
int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore;
numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0);
if (numOfThreads < 1) {
numOfThreads = 1;
......
......@@ -52,7 +52,6 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker);
SWriteWorkerPool wWorkerPool;
int32_t dnodeInitWrite() {
wWorkerPool.max = tsNumOfCores;
wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max);
if (wWorkerPool.writeWorker == NULL) return -1;
......@@ -71,7 +70,7 @@ void dnodeCleanupWrite() {
}
void dnodeWrite(SRpcMsg *pMsg) {
char *pCont = (char *) pMsg->pCont;
char *pCont = (char *)pMsg->pCont;
if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT || pMsg->msgType == TSDB_MSG_TYPE_MD_DROP_STABLE) {
SMsgDesc *pDesc = (SMsgDesc *)pCont;
......@@ -80,16 +79,16 @@ void dnodeWrite(SRpcMsg *pMsg) {
}
SMsgHead *pHead = (SMsgHead *) pCont;
pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen);
taos_queue queue = vnodeGetWqueue(pHead->vgId);
if (queue) {
// put message into queue
SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg));
pWrite->rpcMsg = *pMsg;
pWrite->pCont = pCont;
pWrite->contLen = pHead->contLen;
pWrite->rpcMsg = *pMsg;
pWrite->pCont = pCont;
pWrite->contLen = pHead->contLen;
taosWriteQitem(queue, TAOS_QTYPE_RPC, pWrite);
} else {
......@@ -227,4 +226,3 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
pthread_exit(NULL);
}
}
......@@ -98,7 +98,6 @@ typedef struct {
typedef struct {
int32_t dnodeId;
int32_t vnode;
uint32_t privateIp;
uint32_t publicIp;
} SVnodeGid;
......@@ -106,10 +105,10 @@ typedef struct {
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
int8_t type;
} STableInfo;
} STableObj;
typedef struct SSuperTableObj {
STableInfo info;
STableObj info;
uint64_t uid;
int64_t createdTime;
int32_t sversion;
......@@ -124,7 +123,7 @@ typedef struct SSuperTableObj {
} SSuperTableObj;
typedef struct {
STableInfo info;
STableObj info;
uint64_t uid;
int64_t createdTime;
int32_t sversion; //used by normal table
......@@ -255,7 +254,7 @@ typedef struct {
SUserObj *pUser;
SDbObj *pDb;
SVgObj *pVgroup;
STableInfo *pTable;
STableObj *pTable;
} SQueuedMsg;
int32_t mgmtInitSystem();
......
......@@ -47,6 +47,7 @@ static STaosError errors[] = {
// rpc
TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_IN_PROGRESS, 0, 1, "action in progress")
TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_NEED_REPROCESSED, 0, 3, "action need to be reprocessed")
TAOS_DEFINE_ERROR(TSDB_CODE_MSG_NOT_PROCESSED, 0, 4, "message not processed")
TAOS_DEFINE_ERROR(TSDB_CODE_ALREADY_PROCESSED, 0, 5, "message already processed")
TAOS_DEFINE_ERROR(TSDB_CODE_REDIRECT, 0, 6, "redirect")
......@@ -156,28 +157,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QUERY_CANCELLED, 0, 110, "query cancelled
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_IE, 0, 111, "invalid ie")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VALUE, 0, 112, "invalid value")
// TAOS_DEFINE_ERROR(TSDB_CODE_SYNC_REQUIRED, 0, 99, "sync required")
// TAOS_DEFINE_ERROR(TSDB_CODE_UNSYNCED, 0, 100, "unsyned")
// TAOS_DEFINE_ERROR(TSDB_CODE_DATA_ALREADY_IMPORTED, 0, 75, "data already imported")
// TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_COMMIT_LOG, 0, 109, "invalid commit log") // commit log init failed
// TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VNODE_STATUS, 0, 116, "invalid vnode status")
// TAOS_DEFINE_ERROR(TSDB_CODE_TIMESTAMP_OUT_OF_RANGE, 0, 105, "timestamp out of range")
// TAOS_DEFINE_ERROR(TSDB_CODE_DUPLICATE_TAGS, 0, 112, "duplicate tags") // tags value for join not unique
// TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_SUBMIT_MSG, 0, 113, "invalid submit message")
// TAOS_DEFINE_ERROR(TSDB_CODE_FAILED_TO_LOCK_RESOURCES, 0, 117, "failed to lock resources")
// TAOS_DEFINE_ERROR(TSDB_CODE_FILE_BLOCK_TS_DISORDERED, 0, 108, "file block ts disordered") // time stamp in file block is disordered
// TAOS_DEFINE_ERROR(TSDB_CODE_BATCH_SIZE_TOO_BIG, 0, 104, "batch size too big")
// TAOS_DEFINE_ERROR(TSDB_CODE_WRONG_SCHEMA, 0, 53, "wrong schema")
// TAOS_DEFINE_ERROR(TSDB_CODE_NO_QSUMMARY, 0, 68, "no qsummery")
// TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_METER_ID, 0, 27, "invalid meter id")
// TAOS_DEFINE_ERROR(TSDB_CODE_METRICMETA_EXPIRED, 0, 63, "metricmeta expired") // local cached metric-meta expired causes error in metric query
// TAOS_DEFINE_ERROR(TSDB_CODE_SESSION_ALREADY_EXIST, 0, 67, "session already exist")
// TAOS_DEFINE_ERROR(TSDB_CODE_SESSION_NOT_READY, 0, 103, "session not ready") // table NOT in ready state
// TAOS_DEFINE_ERROR(TSDB_CODE_DATA_OVERFLOW, 0, 82, "data overflow")
// TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_TRANS_NOT_FINISHED, 0, 17, "action transaction not finished")
// TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_NOT_ONLINE, 0, 18, "action not online")
// TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_SEND_FAILD, 0, 19, "action send failed")
// TAOS_DEFINE_ERROR(TSDB_CODE_NOT_ACTIVE_SESSION, 0, 20, "not active session")
// others
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_FILE_FORMAT, 0, 120, "invalid file format")
#ifdef TAOS_ERROR_C
};
......
......@@ -241,7 +241,8 @@ typedef struct SSchema {
} SSchema;
typedef struct {
int32_t vnode; // the index of vnode
int32_t vgId;
int32_t dnodeId;
uint32_t ip;
} SVnodeDesc;
......@@ -752,12 +753,12 @@ typedef struct {
typedef struct {
int32_t numOfQueries;
SQueryDesc qdesc[];
SQueryDesc *qdesc;
} SQqueryList;
typedef struct {
int32_t numOfStreams;
SStreamDesc sdesc[];
SStreamDesc *sdesc;
} SStreamList;
typedef struct {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_SYNC_H
#define TDENGINE_SYNC_H
#ifdef __cplusplus
extern "C" {
#endif
#define TAOS_SYNC_MAX_REPLICA 5
typedef enum _TAOS_SYNC_ROLE {
TAOS_SYNC_ROLE_OFFLINE,
TAOS_SYNC_ROLE_UNSYNCED,
TAOS_SYNC_ROLE_SLAVE,
TAOS_SYNC_ROLE_MASTER,
} ESyncRole;
typedef enum _TAOS_SYNC_STATUS {
TAOS_SYNC_STATUS_INIT,
TAOS_SYNC_STATUS_START,
TAOS_SYNC_STATUS_FILE,
TAOS_SYNC_STATUS_CACHE,
} ESyncStatus;
typedef struct {
uint32_t nodeId; // node ID assigned by TDengine
uint32_t nodeIp; // node IP address
char name[TSDB_FILENAME_LEN]; // external node name
} SNodeInfo;
typedef struct {
uint32_t arbitratorIp; // arbitrator IP address
int8_t quorum; // number of confirms required, >=1
int8_t replica; // number of replications, >=1
SNodeInfo nodeInfo[TAOS_SYNC_MAX_REPLICA];
} SSyncCfg;
typedef struct {
int selfIndex;
uint32_t nodeId[TAOS_SYNC_MAX_REPLICA];
int role[TAOS_SYNC_MAX_REPLICA];
} SNodesRole;
typedef struct {
char label[20]; // for debug purpose
char path[128]; // path to the file
int8_t replica; // number of replications, >=1
int8_t quorum; // number of confirms required, >=1
int32_t vgId; // vgroup ID
void *ahandle; // handle provided by APP
uint64_t version; // initial version
uint32_t arbitratorIp;
SNodeInfo nodeInfo[TAOS_SYNC_MAX_REPLICA];
// if name is null, get the file from index or after, used by master
// if name is provided, get the named file at the specified index, used by unsynced node
// it returns the file magic number and size, if file not there, magic shall be 0.
uint32_t (*getFileInfo)(char *name, int *index, int *size);
// get the wal file from index or after
// return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file
int (*getWalInfo)(char *name, int *index);
// when a forward pkt is received, call this to handle data
int (*writeToCache)(void *ahandle, SWalHead *, int type);
// when forward is confirmed by peer, master call this API to notify app
void (*confirmForward)(void *ahandle, void *mhandle, int32_t code);
// when role is changed, call this to notify app
void (*notifyRole)(void *ahandle, int8_t role);
} SSyncInfo;
typedef void* tsync_h;
tsync_h syncStart(SSyncInfo *);
void syncStop(tsync_h shandle);
int syncReconfig(tsync_h shandle, SSyncInfo *);
int syncForwardToPeer(tsync_h shandle, SWalHead *pHead, void *mhandle);
void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code);
void syncRecover(tsync_h shandle); // recover from other nodes:
int syncGetNodesRole(tsync_h shandle, SNodesRole *);
extern char *syncRole[];
extern int tsMaxSyncNum;
extern int tsSyncTcpThreads;
extern int tsMaxWatchFiles;
extern short tsSyncPort;
extern int tsMaxFwdInfo;
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_SYNC_H
......@@ -33,6 +33,11 @@ typedef struct {
char cont[];
} SWalHead;
typedef struct {
int8_t commitLog; // commitLog
int8_t wals; // number of WAL files;
} SWalCfg;
typedef void* twal_h; // WAL HANDLE
twal_h walOpen(char *path, int max, int level);
......
......@@ -22,7 +22,9 @@ extern "C" {
typedef struct {
int len;
int code;
void *rsp;
void *qhandle; //used by query and retrieve msg
} SRspRet;
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg);
......@@ -31,7 +33,8 @@ int32_t vnodeOpen(int32_t vgId, char *rootDir);
int32_t vnodeClose(int32_t vgId);
void vnodeRelease(void *pVnode);
void* vnodeGetVnode(int32_t vgId);
void* vnodeAccquireVnode(int32_t vgId); // add refcount
void* vnodeGetVnode(int32_t vgId); // keep refcount unchanged
void* vnodeGetRqueue(void *);
void* vnodeGetWqueue(int32_t vgId);
......@@ -41,6 +44,8 @@ void* vnodeGetTsdb(void *pVnode);
int32_t vnodeProcessWrite(void *pVnode, int qtype, SWalHead *pHead, void *item);
void vnodeBuildStatusMsg(void * param);
int32_t vnodeProcessRead(void *pVnode, int msgType, void *pCont, int32_t contLen, SRspRet *ret);
#ifdef __cplusplus
}
#endif
......
......@@ -23,7 +23,7 @@ extern "C" {
int32_t mgmtInitBalance();
void mgmtCleanupBalance();
void mgmtStartBalanceTimer(int32_t afterMs) ;
void mgmtBalanceNotify() ;
int32_t mgmtAllocVnodes(SVgObj *pVgroup);
#ifdef __cplusplus
......
......@@ -14,7 +14,7 @@
*/
#ifndef TDENGINE_MGMT_GRANT_H
#define TDENGINE_MGMT_GTANT_H
#define TDENGINE_MGMT_GRANT_H
#ifdef __cplusplus
"C" {
......
......@@ -27,7 +27,7 @@ extern "C" {
int32_t mgmtInitTables();
void mgmtCleanUpTables();
STableInfo* mgmtGetTable(char* tableId);
STableObj* mgmtGetTable(char* tableId);
void mgmtIncTableRef(void *pTable);
void mgmtDecTableRef(void *pTable);
void mgmtDropAllChildTables(SDbObj *pDropDb);
......
......@@ -18,11 +18,35 @@
#include "mgmtBalance.h"
#include "mgmtDnode.h"
int32_t mgmtInitBalance() { return 0; }
void mgmtCleanupBalance() {}
void mgmtStartBalanceTimer(int32_t afterMs) {}
extern int32_t balanceInit();
extern void balanceCleanUp();
extern void balanceNotify();
extern int32_t balanceAllocVnodes(SVgObj *pVgroup);
int32_t mgmtInitBalance() {
#ifdef _VPEER
return balanceInit();
#else
return 0;
#endif
}
void mgmtCleanupBalance() {
#ifdef _VPEER
balanceCleanUp();
#endif
}
void mgmtBalanceNotify() {
#ifdef _VPEER
balanceNotify();
#endif
}
int32_t mgmtAllocVnodes(SVgObj *pVgroup) {
#ifdef _VPEER
return balanceAllocVnodes(pVgroup);
#else
void * pNode = NULL;
SDnodeObj *pDnode = NULL;
SDnodeObj *pSelDnode = NULL;
......@@ -53,4 +77,5 @@ int32_t mgmtAllocVnodes(SVgObj *pVgroup) {
mTrace("dnode:%d, alloc one vnode to vgroup, openVnodes:%d", pSelDnode->dnodeId, pSelDnode->openVnodes);
return TSDB_CODE_SUCCESS;
#endif
}
......@@ -41,7 +41,7 @@ static int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, voi
extern int32_t clusterInit();
extern void clusterCleanUp();
extern int32_t clusterGetDnodesNum();
extern void * clusterGetNextDnode(void *pNode, void **pDnode);
extern void * clusterGetNextDnode(void *pNode, SDnodeObj **pDnode);
extern void clusterIncDnodeRef(SDnodeObj *pDnode);
extern void clusterDecDnodeRef(SDnodeObj *pDnode);
extern SDnodeObj* clusterGetDnode(int32_t dnodeId);
......@@ -251,7 +251,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
if (pDnode->status != TSDB_DN_STATUS_READY) {
mTrace("dnode:%d, from offline to online", pDnode->dnodeId);
pDnode->status = TSDB_DN_STATUS_READY;
mgmtStartBalanceTimer(200);
mgmtBalanceNotify();
}
mgmtDecDnodeRef(pDnode);
......
......@@ -695,6 +695,7 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) {
int32_t total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM);
SRowHead *rowHead = (SRowHead *)calloc(1, total_size);
if (rowHead == NULL) {
pthread_mutex_unlock(&pTable->mutex);
sdbError("table:%s, failed to allocate row head memory", pTable->tableName);
return -1;
}
......
......@@ -486,8 +486,8 @@ static void *mgmtGetSuperTable(char *tableId) {
return sdbGetRow(tsSuperTableSdb, tableId);
}
STableInfo *mgmtGetTable(char *tableId) {
STableInfo *tableInfo = sdbGetRow(tsSuperTableSdb, tableId);
STableObj *mgmtGetTable(char *tableId) {
STableObj *tableInfo = sdbGetRow(tsSuperTableSdb, tableId);
if (tableInfo != NULL) {
return tableInfo;
}
......@@ -501,7 +501,7 @@ STableInfo *mgmtGetTable(char *tableId) {
}
void mgmtIncTableRef(void *p1) {
STableInfo *pTable = (STableInfo *)p1;
STableObj *pTable = (STableObj *)p1;
if (pTable->type == TSDB_SUPER_TABLE) {
sdbIncRef(tsSuperTableSdb, pTable);
} else {
......@@ -512,7 +512,7 @@ void mgmtIncTableRef(void *p1) {
void mgmtDecTableRef(void *p1) {
if (p1 == NULL) return;
STableInfo *pTable = (STableInfo *)p1;
STableObj *pTable = (STableObj *)p1;
if (pTable->type == TSDB_SUPER_TABLE) {
sdbDecRef(tsSuperTableSdb, pTable);
} else {
......@@ -1302,7 +1302,7 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) {
return;
}
pMsg->pTable = (STableInfo *)mgmtDoCreateChildTable(pCreate, pVgroup, sid);
pMsg->pTable = (STableObj *)mgmtDoCreateChildTable(pCreate, pVgroup, sid);
if (pMsg->pTable == NULL) {
mgmtSendSimpleResp(pMsg->thandle, terrno);
return;
......@@ -1512,7 +1512,8 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) {
} else {
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].privateIp;
}
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId);
pMeta->vpeerDesc[i].dnodeId = htonl(pVgroup->vnodeGid[i].dnodeId);
}
pMeta->numOfVpeers = pVgroup->numOfVnodes;
......@@ -1640,7 +1641,7 @@ static SChildTableObj* mgmtGetTableByPos(uint32_t dnodeId, int32_t vnode, int32_
}
SChildTableObj *pTable = pVgroup->tableList[sid];
mgmtIncTableRef((STableInfo *)pTable);
mgmtIncTableRef((STableObj *)pTable);
mgmtDecVgroupRef(pVgroup);
return pTable;
}
......
......@@ -32,7 +32,7 @@
#include "mgmtVgroup.h"
void *tsVgroupSdb = NULL;
static int32_t tsVgUpdateSize = 0;
int32_t tsVgUpdateSize = 0;
static int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn);
......@@ -93,11 +93,12 @@ static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) {
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SDnodeObj *pDnode = mgmtGetDnode(pVgroup->vnodeGid[i].dnodeId);
pVgroup->vnodeGid[i].privateIp = pDnode->privateIp;
pVgroup->vnodeGid[i].publicIp = pDnode->publicIp;
pVgroup->vnodeGid[i].vnode = pVgroup->vgId;
atomic_add_fetch_32(&pDnode->openVnodes, 1);
mgmtDecDnodeRef(pDnode);
if (pDnode != NULL) {
pVgroup->vnodeGid[i].privateIp = pDnode->privateIp;
pVgroup->vnodeGid[i].publicIp = pDnode->publicIp;
atomic_add_fetch_32(&pDnode->openVnodes, 1);
mgmtDecDnodeRef(pDnode);
}
}
mgmtAddVgroupIntoDb(pVgroup);
......@@ -236,7 +237,7 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb) {
mPrint("vgroup:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
mPrint("vgroup:%d, index:%d, dnode:%d vnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId, pVgroup->vnodeGid[i].vnode);
mPrint("vgroup:%d, index:%d, dnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId);
}
pMsg->ahandle = pVgroup;
......@@ -292,7 +293,7 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
int32_t maxReplica = 0;
SVgObj *pVgroup = NULL;
STableInfo *pTable = NULL;
STableObj *pTable = NULL;
if (pShow->payloadLen > 0 ) {
pTable = mgmtGetTable(pShow->payload);
if (NULL == pTable || pTable->type == TSDB_SUPER_TABLE) {
......@@ -312,27 +313,21 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
}
for (int32_t i = 0; i < maxReplica; ++i) {
pShow->bytes[cols] = 16;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "ip");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "vnode");
strcpy(pSchema[cols].name, "dnode");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 9;
pShow->bytes[cols] = 16;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "vnode status");
strcpy(pSchema[cols].name, "ip");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 16;
pShow->bytes[cols] = 9;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "public ip");
strcpy(pSchema[cols].name, "vstatus");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
}
......@@ -416,13 +411,13 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo
cols++;
for (int32_t i = 0; i < maxReplica; ++i) {
tinet_ntoa(ipstr, pVgroup->vnodeGid[i].privateIp);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, ipstr);
*(int16_t *) pWrite = pVgroup->vnodeGid[i].dnodeId;
cols++;
tinet_ntoa(ipstr, pVgroup->vnodeGid[i].privateIp);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *) pWrite = pVgroup->vnodeGid[i].vnode;
strcpy(pWrite, ipstr);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
......@@ -433,11 +428,6 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo
strcpy(pWrite, "null");
}
cols++;
tinet_ntoa(ipstr, pVgroup->vnodeGid[i].publicIp);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, ipstr);
cols++;
}
numOfRows++;
......@@ -490,15 +480,15 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) {
pCfg->daysToKeep2 = htonl(pCfg->daysToKeep2);
pCfg->daysToKeep = htonl(pCfg->daysToKeep);
pCfg->commitTime = htonl(pCfg->commitTime);
pCfg->commitLog = pCfg->commitLog;
pCfg->blocksPerTable = htons(pCfg->blocksPerTable);
pCfg->replications = (char) pVgroup->numOfVnodes;
pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock);
pCfg->blocksPerTable = htons(pCfg->blocksPerTable);
pCfg->replications = (int8_t) pVgroup->numOfVnodes;
SVnodeDesc *vpeerDesc = pVnode->vpeerDesc;
for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) {
vpeerDesc[j].vnode = htonl(pVgroup->vnodeGid[j].vnode);
vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].privateIp);
vpeerDesc[j].vgId = htonl(pVgroup->vgId);
vpeerDesc[j].dnodeId = htonl(pVgroup->vnodeGid[j].dnodeId);
vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].privateIp);
}
return pVnode;
......
......@@ -14,4 +14,6 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
TARGET_LINK_LIBRARIES(query tsdb tutil m rt)
ENDIF ()
ADD_SUBDIRECTORY(tests)
\ No newline at end of file
ADD_SUBDIRECTORY(tests)
SET_SOURCE_FILES_PROPERTIES(src/sql.c PROPERTIES COMPILE_FLAGS -w)
......@@ -198,6 +198,12 @@ typedef struct SQInfo {
*/
int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo);
/**
* destroy the query info struct
* @param pQInfo
*/
void qDestroyQueryInfo(SQInfo* pQInfo);
/**
* query on single table
* @param pReadMsg
......
......@@ -38,7 +38,7 @@ typedef struct SLoserTreeInfo {
SLoserTreeNode *pNode;
} SLoserTreeInfo;
uint8_t tLoserTreeCreate(SLoserTreeInfo **pTree, int32_t numOfEntries, void *param, __merge_compare_fn_t compareFn);
uint32_t tLoserTreeCreate(SLoserTreeInfo **pTree, int32_t numOfEntries, void *param, __merge_compare_fn_t compareFn);
void tLoserTreeInit(SLoserTreeInfo *pTree);
......
......@@ -1566,6 +1566,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
}
destroyResultBuf(pRuntimeEnv->pResultBuf);
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf);
}
......@@ -2238,27 +2239,6 @@ char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWi
pQuery->pSelectExpr[columnIndex].resBytes * realRowId;
}
void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) {
if (pQInfo == NULL) {
return;
}
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
teardownQueryRuntimeEnv(&pQInfo->runtimeEnv);
// tSidSetDestroy(&pQInfo->pSidSet);
if (pQInfo->pTableDataInfo != NULL) {
// size_t num = taosHashGetSize(pQInfo->pTableIdList);
for (int32_t j = 0; j < 0; ++j) {
destroyMeterQueryInfo(pQInfo->pTableDataInfo[j].pTableQInfo, pQuery->numOfOutputCols);
}
}
tfree(pQInfo->pTableDataInfo);
}
int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) ||
(!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) {
......@@ -2266,14 +2246,10 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
pQuery->window.ekey, pQuery->order.order);
sem_post(&pQInfo->dataReady);
// pQInfo->over = 1;
return TSDB_CODE_SUCCESS;
}
pQuery->status = 0;
pQuery->rec = (SResultRec){0};
pQuery->rec = (SResultRec){0};
changeExecuteScanOrder(pQuery, true);
......@@ -4202,8 +4178,9 @@ int32_t doInitializeQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTable
}
pRuntimeEnv->pQueryHandle = tsdbQueryByTableId(tsdb, &cond, pQInfo->pTableIdList, cols);
taosArrayDestroy(cols);
pRuntimeEnv->pQuery = pQuery;
pRuntimeEnv->pTSBuf = param;
pRuntimeEnv->cur.vnodeIndex = -1;
if (param != NULL) {
......@@ -5169,7 +5146,8 @@ static void singleTableQueryImpl(SQInfo* pQInfo) {
if (isQueryKilled(pQInfo)) {
dTrace("QInfo:%p query is killed", pQInfo);
} else {
dTrace("QInfo:%p query task completed, %d points are returned", pQInfo, pQuery->rec.size);
dTrace("QInfo:%p query task completed, %" PRId64 " rows will returned, total:%" PRId64 " rows", pQInfo, pQuery->rec.size,
pQuery->rec.total);
}
sem_post(&pQInfo->dataReady);
......@@ -5444,13 +5422,11 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
*tagCond = calloc(1, pQueryMsg->tagCondLen * TSDB_NCHAR_SIZE);
memcpy(*tagCond, pMsg, pQueryMsg->tagCondLen * TSDB_NCHAR_SIZE);
}
dTrace("qmsg:%p query on %d meter(s), qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, numOfTagCols:%d, "
"timestamp order:%d, tags order:%d, tags order col:%d, numOfOutputCols:%d, numOfCols:%d, interval:%" PRId64
", fillType:%d, comptslen:%d, limit:%" PRId64 ", offset:%" PRId64,
dTrace("qmsg:%p query on %d table(s), qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, ts order:%d, "
"outputCols:%d, numOfCols:%d, interval:%d" PRId64 ", fillType:%d, comptsLen:%d, limit:%" PRId64 ", offset:%" PRId64,
pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->window.skey, pQueryMsg->window.ekey,
pQueryMsg->numOfGroupCols, pQueryMsg->order, pQueryMsg->orderType,
pQueryMsg->orderByIdx, pQueryMsg->numOfOutputCols,
pQueryMsg->numOfGroupCols, pQueryMsg->order, pQueryMsg->numOfOutputCols,
pQueryMsg->numOfCols, pQueryMsg->intervalTime, pQueryMsg->interpoType, pQueryMsg->tsLen,
pQueryMsg->limit, pQueryMsg->offset);
......@@ -5731,7 +5707,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
SArray *pTableIdList) {
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
if (pQInfo == NULL) {
goto _clean_memory;
goto _clean_pQInfo_memory;
}
SQuery *pQuery = calloc(1, sizeof(SQuery));
......@@ -5841,7 +5817,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
vnodeParametersSafetyCheck(pQuery);
dTrace("qmsg:%p create QInfo:%p, QInfo created", pQueryMsg, pQInfo);
dTrace("qmsg:%p QInfo:%p created", pQueryMsg, pQInfo);
return pQInfo;
_clean_memory:
......@@ -5860,6 +5836,7 @@ _clean_memory:
tfree(pExprs);
tfree(pGroupbyExpr);
_clean_pQInfo_memory:
tfree(pQInfo);
return NULL;
......@@ -5927,7 +5904,7 @@ static void freeQInfo(SQInfo *pQInfo) {
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
setQueryKilled(pQInfo);
dTrace("QInfo:%p start to free SQInfo", pQInfo);
dTrace("QInfo:%p start to free QInfo", pQInfo);
for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) {
tfree(pQuery->sdata[col]);
}
......@@ -5941,7 +5918,16 @@ static void freeQInfo(SQInfo *pQInfo) {
// }
sem_destroy(&(pQInfo->dataReady));
vnodeQueryFreeQInfoEx(pQInfo);
teardownQueryRuntimeEnv(&pQInfo->runtimeEnv);
if (pQInfo->pTableDataInfo != NULL) {
// size_t num = taosHashGetSize(pQInfo->pTableIdList);
for (int32_t j = 0; j < 0; ++j) {
destroyMeterQueryInfo(pQInfo->pTableDataInfo[j].pTableQInfo, pQuery->numOfOutputCols);
}
}
tfree(pQInfo->pTableDataInfo);
for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) {
SSingleColumnFilterInfo *pColFilter = &pQuery->pFilterInfo[i];
......@@ -5974,6 +5960,8 @@ static void freeQInfo(SQInfo *pQInfo) {
tfree(pQuery->pGroupbyExpr);
tfree(pQuery);
taosArrayDestroy(pQInfo->pTableIdList);
dTrace("QInfo:%p QInfo is freed", pQInfo);
// destroy signature, in order to avoid the query process pass the object safety check
......@@ -6120,6 +6108,11 @@ _query_over:
return TSDB_CODE_SUCCESS;
}
void qDestroyQueryInfo(SQInfo* pQInfo) {
dTrace("QInfo:%p query completed", pQInfo);
freeQInfo(pQInfo);
}
void qTableQuery(SQInfo *pQInfo) {
if (pQInfo == NULL || pQInfo->signature != pQInfo) {
dTrace("%p freed abort query", pQInfo);
......@@ -6133,6 +6126,9 @@ void qTableQuery(SQInfo *pQInfo) {
dTrace("QInfo:%p query task is launched", pQInfo);
// sem_post(&pQInfo->dataReady);
// pQInfo->runtimeEnv.pQuery->status = QUERY_OVER;
int32_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList);
if (numOfTables == 1) {
singleTableQueryImpl(pQInfo);
......@@ -6151,18 +6147,14 @@ int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
if (isQueryKilled(pQInfo)) {
dTrace("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code);
if (pQInfo->code == TSDB_CODE_SUCCESS) {
return TSDB_CODE_QUERY_CANCELLED;
} else { // in case of not TSDB_CODE_SUCCESS, return the code to client
return (pQInfo->code >= 0)? pQInfo->code:(-pQInfo->code);
}
return pQInfo->code;
}
sem_wait(&pQInfo->dataReady);
dTrace("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%d", pQInfo, pQuery->rowSize, pQuery->rec.size,
pQInfo->code);
return (pQInfo->code >= 0)? pQInfo->code:(-pQInfo->code);
return pQInfo->code;
}
bool qHasMoreResultsToRetrieve(SQInfo* pQInfo) {
......@@ -6207,12 +6199,12 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c
if (pQuery->rec.size > 0 && code == TSDB_CODE_SUCCESS) {
code = doDumpQueryResult(pQInfo, (*pRsp)->data);
} else {
setQueryStatus(pQuery, QUERY_OVER);
code = pQInfo->code;
}
if (isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) {
(*pRsp)->completed = 1; // notify no more result to client
freeQInfo(pQInfo);
}
return code;
......@@ -6222,4 +6214,4 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c
// vnodeDecRefCount(pObj->qhandle);
// pObj->qhandle = NULL;
// }
}
\ No newline at end of file
}
......@@ -40,7 +40,7 @@ void tLoserTreeDisplay(SLoserTreeInfo* pTree) {
printf("\n");
}
uint8_t tLoserTreeCreate(SLoserTreeInfo** pTree, int32_t numOfEntries, void* param, __merge_compare_fn_t compareFn) {
uint32_t tLoserTreeCreate(SLoserTreeInfo** pTree, int32_t numOfEntries, void* param, __merge_compare_fn_t compareFn) {
int32_t totalEntries = numOfEntries << 1;
*pTree = (SLoserTreeInfo*)calloc(1, sizeof(SLoserTreeInfo) + sizeof(SLoserTreeNode) * totalEntries);
......
......@@ -331,6 +331,7 @@ void *rpcReallocCont(void *ptr, int contLen) {
char *start = ((char *)ptr) - sizeof(SRpcReqContext) - sizeof(SRpcHead);
if (contLen == 0 ) {
free(start);
return NULL;
}
int size = contLen + RPC_MSG_OVERHEAD;
......
......@@ -472,7 +472,7 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
}
int32_t ref = T_REF_INC(ptNode);
pTrace("%p add data ref in cache, refcnt:%d", ptNode, ref)
pTrace("%p acquired by data in cache, refcnt:%d", ptNode, ref)
// the data if referenced by at least one object, so the reference count must be greater than the value of 2.
assert(ref >= 2);
......@@ -516,7 +516,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
*data = NULL;
int16_t ref = T_REF_DEC(pNode);
pTrace("%p is released, refcnt:%d", pNode, ref);
pTrace("%p data released, refcnt:%d", pNode, ref);
if (_remove) {
__cache_wr_lock(pCacheObj);
......
......@@ -126,7 +126,7 @@ int taosUpdateIdPool(id_pool_t *handle, int maxId) {
return -1;
}
int *idList = calloc(maxId, sizeof(bool));
bool *idList = calloc(maxId, sizeof(bool));
if (idList == NULL) {
return -1;
}
......@@ -137,7 +137,7 @@ int taosUpdateIdPool(id_pool_t *handle, int maxId) {
pIdPool->numOfFree += (maxId - pIdPool->maxId);
pIdPool->maxId = maxId;
int *oldIdList = pIdPool->freeList;
bool *oldIdList = pIdPool->freeList;
pIdPool->freeList = idList;
free(oldIdList);
......
......@@ -347,7 +347,7 @@ void tprintf(const char *const flags, int dflag, const char *const format, ...)
va_start(argpointer, format);
int writeLen = vsnprintf(buffer + len, MAX_LOGLINE_CONTENT_SIZE, format, argpointer);
if (writeLen <= 0) {
char tmp[MAX_LOGLINE_DUMP_BUFFER_SIZE];
char tmp[MAX_LOGLINE_DUMP_BUFFER_SIZE] = {0};
writeLen = vsnprintf(tmp, MAX_LOGLINE_DUMP_CONTENT_SIZE, format, argpointer);
strncpy(buffer + len, tmp, MAX_LOGLINE_CONTENT_SIZE);
len += MAX_LOGLINE_CONTENT_SIZE;
......
......@@ -77,7 +77,7 @@ void taosUnLockNote(int fd, taosNoteInfo * pNote)
void *taosThreadToOpenNewNote(void *param)
{
char name[NOTE_FILE_NAME_LEN];
char name[NOTE_FILE_NAME_LEN * 2];
taosNoteInfo * pNote = (taosNoteInfo *)param;
pNote->taosNoteFlag ^= 1;
......@@ -170,7 +170,7 @@ void taosGetNoteName(char *fn, taosNoteInfo * pNote)
int taosOpenNoteWithMaxLines(char *fn, int maxLines, int maxNoteNum, taosNoteInfo * pNote)
{
char name[NOTE_FILE_NAME_LEN] = "\0";
char name[NOTE_FILE_NAME_LEN * 2] = "\0";
struct stat notestat0, notestat1;
int size;
......
......@@ -18,7 +18,7 @@
#include "tskiplist.h"
#include "tutil.h"
static FORCE_INLINE void recordNodeEachLevel(SSkipList *pSkipList, int32_t level) { // record link count in each level
__attribute__ ((unused)) static FORCE_INLINE void recordNodeEachLevel(SSkipList *pSkipList, int32_t level) { // record link count in each level
#if SKIP_LIST_RECORD_PERFORMANCE
for (int32_t i = 0; i < level; ++i) {
pSkipList->state.nLevelNodeCnt[i]++;
......@@ -26,7 +26,7 @@ static FORCE_INLINE void recordNodeEachLevel(SSkipList *pSkipList, int32_t level
#endif
}
static FORCE_INLINE void removeNodeEachLevel(SSkipList *pSkipList, int32_t level) {
__attribute__ ((unused)) static FORCE_INLINE void removeNodeEachLevel(SSkipList *pSkipList, int32_t level) {
#if SKIP_LIST_RECORD_PERFORMANCE
for (int32_t i = 0; i < level; ++i) {
pSkipList->state.nLevelNodeCnt[i]--;
......
......@@ -20,6 +20,9 @@
extern "C" {
#endif
#include "tsync.h"
#include "twal.h"
typedef enum _VN_STATUS {
VN_STATUS_INIT,
VN_STATUS_CREATING,
......@@ -41,10 +44,14 @@ typedef struct {
void *sync;
void *events;
void *cq; // continuous query
STsdbCfg tsdbCfg;
SSyncCfg syncCfg;
SWalCfg walCfg;
} SVnodeObj;
int vnodeWriteToQueue(void *param, SWalHead *pHead, int type);
void vnodeInitWriteFp(void);
void vnodeInitReadFp(void);
#ifdef __cplusplus
}
......
......@@ -29,19 +29,21 @@
#include "vnode.h"
#include "vnodeInt.h"
static void *tsDnodeVnodesHash;
static void vnodeCleanUp(SVnodeObj *pVnode);
static void vnodeBuildVloadMsg(char *pNode, void * param);
static int vnodeWALCallback(void *arg);
static int tsOpennedVnodes;
static void *tsDnodeVnodesHash;
static void vnodeCleanUp(SVnodeObj *pVnode);
static void vnodeBuildVloadMsg(char *pNode, void * param);
static int vnodeWALCallback(void *arg);
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg);
static int32_t vnodeReadCfg(SVnodeObj *pVnode);
static int32_t tsOpennedVnodes;
static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT;
static void vnodeInit() {
vnodeInitWriteFp();
vnodeInitReadFp();
tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj), taosHashInt);
tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj *), taosHashInt);
if (tsDnodeVnodesHash == NULL) {
dError("failed to init vnode list");
}
......@@ -51,22 +53,12 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
int32_t code;
pthread_once(&vnodeModuleInit, vnodeInit);
SVnodeObj *pTemp = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pVnodeCfg->cfg.vgId);
SVnodeObj *pTemp = (SVnodeObj *)taosGetIntHashData(tsDnodeVnodesHash, pVnodeCfg->cfg.vgId);
if (pTemp != NULL) {
dPrint("vgId:%d, vnode already exist, pVnode:%p", pVnodeCfg->cfg.vgId, pTemp);
return TSDB_CODE_SUCCESS;
}
STsdbCfg tsdbCfg = {0};
tsdbCfg.precision = pVnodeCfg->cfg.precision;
tsdbCfg.tsdbId = pVnodeCfg->cfg.vgId;
tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions;
tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile;
tsdbCfg.minRowsPerFileBlock = -1;
tsdbCfg.maxRowsPerFileBlock = -1;
tsdbCfg.keep = -1;
tsdbCfg.maxCacheSize = -1;
}
char rootDir[TSDB_FILENAME_LEN] = {0};
sprintf(rootDir, "%s/vnode%d", tsVnodeDir, pVnodeCfg->cfg.vgId);
......@@ -81,12 +73,28 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
}
}
code = vnodeSaveCfg(pVnodeCfg);
if (code != TSDB_CODE_SUCCESS) {
dError("vgId:%d, failed to save vnode cfg, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(code));
return code;
}
STsdbCfg tsdbCfg = {0};
tsdbCfg.precision = pVnodeCfg->cfg.precision;
tsdbCfg.tsdbId = pVnodeCfg->cfg.vgId;
tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions;
tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile;
tsdbCfg.minRowsPerFileBlock = -1;
tsdbCfg.maxRowsPerFileBlock = -1;
tsdbCfg.keep = -1;
tsdbCfg.maxCacheSize = -1;
char tsdbDir[TSDB_FILENAME_LEN] = {0};
sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
code = tsdbCreateRepo(tsdbDir, &tsdbCfg, NULL);
if (code <0) {
if (code != TSDB_CODE_SUCCESS) {
dError("vgId:%d, failed to create tsdb in vnode, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(terrno));
return code;
return terrno;
}
dPrint("vgId:%d, vnode is created, clog:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.commitLog);
......@@ -96,8 +104,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
}
int32_t vnodeDrop(int32_t vgId) {
SVnodeObj *pVnode = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, vgId);
SVnodeObj *pVnode = *(SVnodeObj **)taosGetIntHashData(tsDnodeVnodesHash, vgId);
if (pVnode == NULL) {
dTrace("vgId:%d, failed to drop, vgId not exist", vgId);
return TSDB_CODE_INVALID_VGROUP_ID;
......@@ -114,18 +121,25 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
char temp[TSDB_FILENAME_LEN];
pthread_once(&vnodeModuleInit, vnodeInit);
SVnodeObj vnodeObj = {0};
vnodeObj.vgId = vnode;
vnodeObj.status = VN_STATUS_INIT;
vnodeObj.refCount = 1;
vnodeObj.version = 0;
SVnodeObj *pVnode = (SVnodeObj *)taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *)(&vnodeObj));
SVnodeObj *pVnode = calloc(sizeof(SVnodeObj), 1);
pVnode->vgId = vnode;
pVnode->status = VN_STATUS_INIT;
pVnode->refCount = 1;
pVnode->version = 0;
taosAddIntHash(tsDnodeVnodesHash, pVnode->vgId, (char *)(&pVnode));
int32_t code = vnodeReadCfg(pVnode);
if (code != TSDB_CODE_SUCCESS) {
dError("pVnode:%p vgId:%d, failed to read cfg file", pVnode, pVnode->vgId);
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
return code;
}
pVnode->wqueue = dnodeAllocateWqueue(pVnode);
pVnode->rqueue = dnodeAllocateRqueue(pVnode);
sprintf(temp, "%s/wal", rootDir);
pVnode->wal = walOpen(temp, 3, tsCommitLog);
pVnode->wal = walOpen(temp, pVnode->walCfg.wals, pVnode->walCfg.commitLog);
pVnode->sync = NULL;
pVnode->events = NULL;
pVnode->cq = NULL;
......@@ -155,7 +169,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
int32_t vnodeClose(int32_t vgId) {
SVnodeObj *pVnode = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, vgId);
SVnodeObj *pVnode = *(SVnodeObj **)taosGetIntHashData(tsDnodeVnodesHash, vgId);
if (pVnode == NULL) return 0;
dTrace("pVnode:%p vgId:%d, vnode will be closed", pVnode, pVnode->vgId);
......@@ -188,6 +202,7 @@ void vnodeRelease(void *pVnodeRaw) {
}
dTrace("pVnode:%p vgId:%d, vnode is released", pVnode, pVnode->vgId);
free(pVnode);
tsOpennedVnodes--;
if (tsOpennedVnodes <= 0) {
......@@ -198,12 +213,19 @@ void vnodeRelease(void *pVnodeRaw) {
}
void *vnodeGetVnode(int32_t vgId) {
SVnodeObj *pVnode = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, vgId);
SVnodeObj *pVnode = *(SVnodeObj **) taosGetIntHashData(tsDnodeVnodesHash, vgId);
if (pVnode == NULL) {
terrno = TSDB_CODE_INVALID_VGROUP_ID;
return NULL;
}
return pVnode;
}
void *vnodeAccquireVnode(int32_t vgId) {
SVnodeObj *pVnode = vnodeGetVnode(vgId);
if (pVnode == NULL) return pVnode;
atomic_add_fetch_32(&pVnode->refCount, 1);
dTrace("pVnode:%p vgId:%d, get vnode, refCount:%d", pVnode, pVnode->vgId, pVnode->refCount);
......@@ -215,7 +237,7 @@ void *vnodeGetRqueue(void *pVnode) {
}
void *vnodeGetWqueue(int32_t vgId) {
SVnodeObj *pVnode = vnodeGetVnode(vgId);
SVnodeObj *pVnode = vnodeAccquireVnode(vgId);
if (pVnode == NULL) return NULL;
return pVnode->wqueue;
}
......@@ -234,7 +256,7 @@ void vnodeBuildStatusMsg(void *param) {
}
static void vnodeBuildVloadMsg(char *pNode, void * param) {
SVnodeObj *pVnode = (SVnodeObj *) pNode;
SVnodeObj *pVnode = *(SVnodeObj **) pNode;
if (pVnode->status == VN_STATUS_DELETING) return;
SDMStatusMsg *pStatus = param;
......@@ -247,8 +269,9 @@ static void vnodeBuildVloadMsg(char *pNode, void * param) {
}
static void vnodeCleanUp(SVnodeObj *pVnode) {
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
//syncStop(pVnode->sync);
tsdbCloseRepo(pVnode->tsdb);
walClose(pVnode->wal);
......@@ -256,7 +279,94 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
vnodeRelease(pVnode);
}
// TODO: this is a simple implement
static int vnodeWALCallback(void *arg) {
SVnodeObj *pVnode = arg;
return walRenew(pVnode->wal);
}
\ No newline at end of file
}
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
char cfgFile[TSDB_FILENAME_LEN * 2] = {0};
sprintf(cfgFile, "%s/vnode%d/config", tsVnodeDir, pVnodeCfg->cfg.vgId);
FILE *fp = fopen(cfgFile, "w");
if (!fp) return errno;
fprintf(fp, "commitLog %d\n", pVnodeCfg->cfg.commitLog);
fprintf(fp, "wals %d\n", 3);
fprintf(fp, "arbitratorIp %d\n", pVnodeCfg->vpeerDesc[0].ip);
fprintf(fp, "quorum %d\n", 1);
fprintf(fp, "replica %d\n", pVnodeCfg->cfg.replications);
for (int32_t i = 0; i < pVnodeCfg->cfg.replications; i++) {
fprintf(fp, "index%d nodeId %d nodeIp %u name n%d\n", i, pVnodeCfg->vpeerDesc[i].dnodeId, pVnodeCfg->vpeerDesc[i].ip, pVnodeCfg->vpeerDesc[i].dnodeId);
}
fclose(fp);
dTrace("vgId:%d, save vnode cfg successed", pVnodeCfg->cfg.vgId);
return TSDB_CODE_SUCCESS;
}
// TODO: this is a simple implement
static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
char option[5][16] = {0};
char cfgFile[TSDB_FILENAME_LEN * 2] = {0};
sprintf(cfgFile, "%s/vnode%d/config", tsVnodeDir, pVnode->vgId);
FILE *fp = fopen(cfgFile, "r");
if (!fp) return errno;
int32_t commitLog = -1;
int32_t num = fscanf(fp, "%s %d", option[0], &commitLog);
if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT;
if (strcmp(option[0], "commitLog") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
if (commitLog == -1) return TSDB_CODE_INVALID_FILE_FORMAT;
pVnode->walCfg.commitLog = (int8_t)commitLog;
int32_t wals = -1;
num = fscanf(fp, "%s %d", option[0], &wals);
if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT;
if (strcmp(option[0], "wals") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
if (wals == -1) return TSDB_CODE_INVALID_FILE_FORMAT;
pVnode->walCfg.wals = (int8_t)wals;
int32_t arbitratorIp = -1;
num = fscanf(fp, "%s %u", option[0], &arbitratorIp);
if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT;
if (strcmp(option[0], "arbitratorIp") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
if (arbitratorIp == -1) return TSDB_CODE_INVALID_FILE_FORMAT;
pVnode->syncCfg.arbitratorIp = arbitratorIp;
int32_t quorum = -1;
num = fscanf(fp, "%s %d", option[0], &quorum);
if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT;
if (strcmp(option[0], "quorum") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
if (quorum == -1) return TSDB_CODE_INVALID_FILE_FORMAT;
pVnode->syncCfg.quorum = (int8_t)quorum;
int32_t replica = -1;
num = fscanf(fp, "%s %d", option[0], &replica);
if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT;
if (strcmp(option[0], "replica") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
if (replica == -1) return TSDB_CODE_INVALID_FILE_FORMAT;
pVnode->syncCfg.replica = (int8_t)replica;
for (int32_t i = 0; i < replica; ++i) {
int32_t dnodeId = -1;
uint32_t dnodeIp = -1;
num = fscanf(fp, "%s %s %d %s %u %s %s", option[0], option[1], &dnodeId, option[2], &dnodeIp, option[3], pVnode->syncCfg.nodeInfo[i].name);
if (num != 7) return TSDB_CODE_INVALID_FILE_FORMAT;
if (strcmp(option[1], "nodeId") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
if (strcmp(option[2], "nodeIp") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
if (strcmp(option[3], "name") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
if (dnodeId == -1) return TSDB_CODE_INVALID_FILE_FORMAT;
if (dnodeIp == -1) return TSDB_CODE_INVALID_FILE_FORMAT;
pVnode->syncCfg.nodeInfo[i].nodeId = dnodeId;
pVnode->syncCfg.nodeInfo[i].nodeIp = dnodeIp;
}
fclose(fp);
dTrace("pVnode:%p vgId:%d, read vnode cfg successed", pVnode, pVnode->vgId);
return TSDB_CODE_SUCCESS;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "taoserror.h"
#include "tlog.h"
#include "tqueue.h"
#include "trpc.h"
#include "tsdb.h"
#include "twal.h"
#include "dataformat.h"
#include "vnode.h"
#include "vnodeInt.h"
#include "queryExecutor.h"
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, int32_t contLen, SRspRet *pRet);
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet);
static int32_t vnodeProcessRetrieveMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet);
void vnodeInitReadFp(void) {
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg;
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_RETRIEVE] = vnodeProcessRetrieveMsg;
}
int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, SRspRet *ret) {
SVnodeObj *pVnode = (SVnodeObj *)param;
if (vnodeProcessReadMsgFp[msgType] == NULL)
return TSDB_CODE_MSG_NOT_PROCESSED;
if (pVnode->status == VN_STATUS_DELETING || pVnode->status == VN_STATUS_CLOSING)
return TSDB_CODE_NOT_ACTIVE_VNODE;
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pCont, contLen, ret);
}
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet) {
SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pCont;
memset(pRet, 0, sizeof(SRspRet));
int32_t code = TSDB_CODE_SUCCESS;
SQInfo* pQInfo = NULL;
if (contLen != 0) {
void* tsdb = vnodeGetTsdb(pVnode);
pRet->code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->qhandle = htobe64((uint64_t) (pQInfo));
pRsp->code = pRet->code;
pRet->len = sizeof(SQueryTableRsp);
pRet->rsp = pRsp;
dTrace("pVnode:%p vgId:%d QInfo:%p, dnode query msg disposed", pVnode, pVnode->vgId, pQInfo);
} else {
pQInfo = pCont;
code = TSDB_CODE_ACTION_IN_PROGRESS;
}
qTableQuery(pQInfo); // do execute query
return code;
}
static int32_t vnodeProcessRetrieveMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet) {
SRetrieveTableMsg *pRetrieve = pCont;
void *pQInfo = (void*) htobe64(pRetrieve->qhandle);
memset(pRet, 0, sizeof(SRspRet));
int32_t code = TSDB_CODE_SUCCESS;
dTrace("pVnode:%p vgId:%d QInfo:%p, retrieve msg is received", pVnode, pVnode->vgId, pQInfo);
pRet->code = qRetrieveQueryResultInfo(pQInfo);
if (pRet->code != TSDB_CODE_SUCCESS) {
//TODO
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
} else {
// todo check code and handle error in build result set
pRet->code = qDumpRetrieveResult(pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len);
if (qHasMoreResultsToRetrieve(pQInfo)) {
pRet->qhandle = pQInfo;
code = TSDB_CODE_ACTION_NEED_REPROCESSED;
} else {
// no further execution invoked, release the ref to vnode
qDestroyQueryInfo(pQInfo);
vnodeRelease(pVnode);
}
}
dTrace("pVnode:%p vgId:%d QInfo:%p, retrieve msg is disposed", pVnode, pVnode->vgId, pQInfo);
return code;
}
......@@ -337,6 +337,12 @@ SArray *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle);
*/
SArray *tsdbQueryTableList(tsdb_repo_t* tsdb, int64_t uid, const wchar_t *pTagCond, size_t len);
/**
* clean up the query handle
* @param queryHandle
*/
void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle);
#ifdef __cplusplus
}
#endif
......
......@@ -367,14 +367,16 @@ int32_t tsdbInsertData(tsdb_repo_t *repo, SSubmitMsg *pMsg) {
SSubmitMsgIter msgIter;
tsdbInitSubmitMsgIter(pMsg, &msgIter);
SSubmitBlk *pBlock;
SSubmitBlk *pBlock = NULL;
int32_t code = TSDB_CODE_SUCCESS;
while ((pBlock = tsdbGetSubmitMsgNext(&msgIter)) != NULL) {
if (tsdbInsertDataToTable(repo, pBlock) < 0) {
return -1;
if ((code = tsdbInsertDataToTable(repo, pBlock)) != TSDB_CODE_SUCCESS) {
return code;
}
}
return 0;
return code;
}
/**
......@@ -654,7 +656,7 @@ static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo) {
}
static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo) {
char fname[128];
char fname[260];
if (pRepo == NULL) return 0;
char *dirName = calloc(1, strlen(pRepo->rootDir) + strlen("tsdb") + 2);
if (dirName == NULL) {
......@@ -735,7 +737,9 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) {
STableId tableId = {.uid = pBlock->uid, .tid = pBlock->tid};
STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, tableId);
if (pTable == NULL) return -1;
if (pTable == NULL) {
return TSDB_CODE_INVALID_TABLE_ID;
}
SSubmitBlkIter blkIter;
SDataRow row;
......@@ -747,7 +751,7 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) {
}
}
return 0;
return TSDB_CODE_SUCCESS;
}
static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) {
......
......@@ -49,9 +49,8 @@ typedef struct SQueryFilePos {
} SQueryFilePos;
typedef struct SDataBlockLoadInfo {
int32_t fileListIndex;
int32_t fileId;
int32_t slotIdx;
SFileGroup* fileGroup;
int32_t slot;
int32_t sid;
SArray *pLoadedCols;
} SDataBlockLoadInfo;
......@@ -190,10 +189,9 @@ static void initQueryFileInfoFD(SQueryFilesInfo *pVnodeFilesInfo) {
}
static void vnodeInitDataBlockLoadInfo(SDataBlockLoadInfo *pBlockLoadInfo) {
pBlockLoadInfo->slotIdx = -1;
pBlockLoadInfo->fileId = -1;
pBlockLoadInfo->slot = -1;
pBlockLoadInfo->sid = -1;
pBlockLoadInfo->fileListIndex = -1;
pBlockLoadInfo->fileGroup = NULL;
}
static void vnodeInitCompBlockLoadInfo(SLoadCompBlockInfo *pCompBlockLoadInfo) {
......@@ -202,76 +200,6 @@ static void vnodeInitCompBlockLoadInfo(SLoadCompBlockInfo *pCompBlockLoadInfo) {
pCompBlockLoadInfo->fileListIndex = -1;
}
static int fileOrderComparFn(const void *p1, const void *p2) {
SHeaderFileInfo *pInfo1 = (SHeaderFileInfo *)p1;
SHeaderFileInfo *pInfo2 = (SHeaderFileInfo *)p2;
if (pInfo1->fileId == pInfo2->fileId) {
return 0;
}
return (pInfo1->fileId > pInfo2->fileId) ? 1 : -1;
}
void vnodeRecordAllFiles(int32_t vnodeId, SQueryFilesInfo *pVnodeFilesInfo) {
char suffix[] = ".head";
pVnodeFilesInfo->pFileInfo = taosArrayInit(4, sizeof(int32_t));
struct dirent *pEntry = NULL;
pVnodeFilesInfo->vnodeId = vnodeId;
char* tsDirectory = "";
sprintf(pVnodeFilesInfo->dbFilePathPrefix, "%s/vnode%d/db/", tsDirectory, vnodeId);
DIR *pDir = opendir(pVnodeFilesInfo->dbFilePathPrefix);
if (pDir == NULL) {
// dError("QInfo:%p failed to open directory:%s, %s", pQInfo, pVnodeFilesInfo->dbFilePathPrefix,
// strerror(errno));
return;
}
while ((pEntry = readdir(pDir)) != NULL) {
if ((pEntry->d_name[0] == '.' && pEntry->d_name[1] == '\0') || (strcmp(pEntry->d_name, "..") == 0)) {
continue;
}
if (pEntry->d_type & DT_DIR) {
continue;
}
size_t len = strlen(pEntry->d_name);
if (strcasecmp(&pEntry->d_name[len - 5], suffix) != 0) {
continue;
}
int32_t vid = 0;
int32_t fid = 0;
sscanf(pEntry->d_name, "v%df%d", &vid, &fid);
if (vid != vnodeId) { /* ignore error files */
// dError("QInfo:%p error data file:%s in vid:%d, ignore", pQInfo, pEntry->d_name, vnodeId);
continue;
}
// int32_t firstFid = pVnode->fileId - pVnode->numOfFiles + 1;
// if (fid > pVnode->fileId || fid < firstFid) {
// dError("QInfo:%p error data file:%s in vid:%d, fid:%d, fid range:%d-%d", pQInfo, pEntry->d_name, vnodeId,
// fid, firstFid, pVnode->fileId);
// continue;
// }
assert(fid >= 0 && vid >= 0);
taosArrayPush(pVnodeFilesInfo->pFileInfo, &fid);
}
closedir(pDir);
// dTrace("QInfo:%p find %d data files in %s to be checked", pQInfo, pVnodeFilesInfo->numOfFiles,
// pVnodeFilesInfo->dbFilePathPrefix);
// order the files information according their names */
size_t numOfFiles = taosArrayGetSize(pVnodeFilesInfo->pFileInfo);
qsort(pVnodeFilesInfo->pFileInfo->pData, numOfFiles, sizeof(SHeaderFileInfo), fileOrderComparFn);
}
tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo) {
// todo 1. filter not exist table
......@@ -282,7 +210,6 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond
pQueryHandle->window = pCond->twindow;
pQueryHandle->pTsdb = tsdb;
pQueryHandle->pColumns = pColumnInfo;
pQueryHandle->loadDataAfterSeek = false;
pQueryHandle->isFirstSlot = true;
......@@ -331,9 +258,6 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond
vnodeInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo);
vnodeInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo);
int32_t vnodeId = 1;
vnodeRecordAllFiles(vnodeId, &pQueryHandle->vnodeFileInfo);
return (tsdb_query_handle_t)pQueryHandle;
}
......@@ -468,6 +392,7 @@ static bool loadQualifiedDataFromFileBlock(STsdbQueryHandle *pQueryHandle) {
}
}
taosArrayDestroy(sa);
return pQueryHandle->realNumOfRows > 0;
}
......@@ -513,10 +438,10 @@ bool moveToNextBlock(STsdbQueryHandle *pQueryHandle, int32_t step) {
return true;
}
} else { // check data in cache
pQueryHandle->cur.fid = -1;
return hasMoreDataInCacheForSingleModel(pQueryHandle);
}
} else {
// next block in the same file
} else { // next block in the same file
cur->slot += step;
SCompBlock* pBlock = &pCheckInfo->pCompInfo->blocks[cur->slot];
......@@ -601,9 +526,11 @@ static void filterDataInDataBlock(STsdbQueryHandle *pQueryHandle, SDataCols* pCo
if (QUERY_IS_ASC_QUERY(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) {
endPos = blockInfo.size - 1;
pQueryHandle->realNumOfRows = endPos - cur->pos + 1;
pCheckInfo->lastKey = blockInfo.window.ekey + 1;
} else if (!QUERY_IS_ASC_QUERY(pQueryHandle->order) && pQueryHandle->window.ekey < blockInfo.window.skey) {
endPos = 0;
pQueryHandle->realNumOfRows = cur->pos + 1;
pCheckInfo->lastKey = blockInfo.window.ekey - 1;
} else {
endPos = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, pQueryHandle->window.ekey, pQueryHandle->order);
......@@ -614,6 +541,8 @@ static void filterDataInDataBlock(STsdbQueryHandle *pQueryHandle, SDataCols* pCo
} else {
pQueryHandle->realNumOfRows = endPos - cur->pos;
}
pCheckInfo->lastKey = ((int64_t*)(pCols->cols[0].pData))[endPos] + 1;
} else {
if (endPos > cur->pos) {
pQueryHandle->realNumOfRows = 0;
......@@ -621,6 +550,8 @@ static void filterDataInDataBlock(STsdbQueryHandle *pQueryHandle, SDataCols* pCo
} else {
pQueryHandle->realNumOfRows = cur->pos - endPos;
}
assert(0);
}
}
......@@ -751,7 +682,7 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
int32_t tid = pCheckInfo->tableId.tid;
while (pCheckInfo->pFileGroup != NULL) {
if ((fid = getFileCompInfo(pCheckInfo, pCheckInfo->pFileGroup)) < 0) {
if (getFileCompInfo(pCheckInfo, pCheckInfo->pFileGroup) != TSDB_CODE_SUCCESS) {
break;
}
......@@ -761,7 +692,6 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
pCheckInfo->pFileGroup->fileId, tid);
pCheckInfo->pFileGroup = tsdbGetFileGroupNext(&pCheckInfo->fileIter);
continue;
}
......@@ -790,7 +720,7 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
assert(index >= 0 && index < pCheckInfo->compIndex[tid].numOfSuperBlocks);
// load first data block into memory failed, caused by disk block error
bool blockLoaded = false;
bool blockLoaded = false;
SArray *sa = getDefaultLoadColumns(pQueryHandle, true);
// todo no need to loaded at all
......@@ -810,27 +740,41 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
pFile->fd = open(pFile->fname, O_RDONLY);
}
// if (tsdbLoadDataBlock(pFile, &pCheckInfo->pCompInfo->blocks[cur->slot], 1,
// pCheckInfo->pDataCols, data) == 0) {
// blockLoaded = true;
// }
if (tsdbLoadDataBlock(pFile, pBlock, 1, pCheckInfo->pDataCols, data) == 0) {
SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo;
pBlockLoadInfo->fileGroup = pCheckInfo->pFileGroup;
pBlockLoadInfo->slot = pQueryHandle->cur.slot;
pBlockLoadInfo->sid = pCheckInfo->pTableObj->tableId.tid;
blockLoaded = true;
}
// dError("QInfo:%p fileId:%d total numOfBlks:%d blockId:%d load into memory failed due to error in disk files",
// GET_QINFO_ADDR(pQuery), pQuery->fileId, pQuery->numOfBlocks, blkIdx);
// failed to load data from disk, abort current query
if (blockLoaded == false) {
taosArrayDestroy(sa);
tfree(data);
return false;
}
// todo search qualified points in blk, according to primary key (timestamp) column
SDataCols* pDataCols = pCheckInfo->pDataCols;
TSKEY* d = (TSKEY*) pDataCols->cols[PRIMARYKEY_TIMESTAMP_COL_INDEX].pData;
assert(d[0] == pBlock->keyFirst && d[pBlock->numOfPoints - 1] == pBlock->keyLast);
cur->pos = binarySearchForKey(pDataCols->cols[0].pData, pBlock->numOfPoints, key, pQueryHandle->order);
cur->fid = pCheckInfo->pFileGroup->fileId;
assert(cur->pos >= 0 && cur->fid >= 0 && cur->slot >= 0);
filterDataInDataBlock(pQueryHandle, pCheckInfo->pDataCols, sa);
taosArrayDestroy(sa);
tfree(data);
return pQueryHandle->realNumOfRows > 0;
}
......@@ -838,8 +782,6 @@ static bool hasMoreDataInFileForSingleTableModel(STsdbQueryHandle* pHandle) {
assert(pHandle->activeIndex == 0 && taosArrayGetSize(pHandle->pTableCheckInfo) == 1);
STsdbFileH* pFileHandle = tsdbGetFile(pHandle->pTsdb);
// SQueryFilePos* cur = &pHandle->cur;
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
if (!pCheckInfo->checkFirstFileBlock) {
......@@ -952,7 +894,7 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) {
rows = pHandle->realNumOfRows;
skey = *(TSKEY*) pColInfoEx->pData;
ekey = *(TSKEY*) pColInfoEx->pData + TSDB_KEYSIZE * (rows - 1);
ekey = *(TSKEY*) ((char*)pColInfoEx->pData + TSDB_KEYSIZE * (rows - 1));
}
} else {
if (pTable->mem != NULL) {
......@@ -990,6 +932,10 @@ SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList
STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
if (pHandle->cur.fid < 0) {
return pHandle->pColumns;
} else {
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
......@@ -1001,10 +947,17 @@ SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList
return pHandle->pColumns;
} else {
SArray *sa = getDefaultLoadColumns(pHandle, true);
doLoadDataFromFileBlock(pHandle);
filterDataInDataBlock(pHandle, pCheckInfo->pDataCols, sa);
return pHandle->pColumns;
// data block has been loaded, todo extract method
SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo;
if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->sid == pCheckInfo->pTableObj->tableId.tid) {
return pHandle->pColumns;
} else {
doLoadDataFromFileBlock(pHandle);
filterDataInDataBlock(pHandle, pCheckInfo->pDataCols, sa);
return pHandle->pColumns;
}
}
}
}
......@@ -1351,3 +1304,36 @@ SArray *tsdbQueryTableList(tsdb_repo_t* tsdb, int64_t uid, const wchar_t *pTagCo
return result;
}
}
void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) queryHandle;
size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
for(int32_t i = 0; i < size; ++i) {
STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
tSkipListDestroyIter(pTableCheckInfo->iter);
if (pTableCheckInfo->pDataCols != NULL) {
tfree(pTableCheckInfo->pDataCols->buf);
}
tfree(pTableCheckInfo->pDataCols);
tfree(pTableCheckInfo->pCompInfo);
tfree(pTableCheckInfo->compIndex);
}
taosArrayDestroy(pQueryHandle->pTableCheckInfo);
size_t cols = taosArrayGetSize(pQueryHandle->pColumns);
for(int32_t i = 0; i < cols; ++i) {
SColumnInfoEx *pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
tfree(pColInfo->pData);
}
taosArrayDestroy(pQueryHandle->pColumns);
tfree(pQueryHandle->unzipBuffer);
tfree(pQueryHandle->secondaryUnzipBuffer);
tfree(pQueryHandle);
}
......@@ -80,7 +80,8 @@ void *walOpen(char *path, int max, int level) {
}
void walClose(void *handle) {
if (handle == NULL) return;
SWal *pWal = (SWal *)handle;
close(pWal->fd);
......@@ -125,7 +126,7 @@ int walRenew(twal_h handle) {
if (pWal->num > pWal->max) {
// remove the oldest wal file
char name[TSDB_FILENAME_LEN];
char name[TSDB_FILENAME_LEN * 3];
sprintf(name, "%s/%s%d", pWal->path, walPrefix, pWal->id - pWal->max);
if (remove(name) <0) {
wError("wal:%s, failed to remove(%s)", name, strerror(errno));
......@@ -150,7 +151,7 @@ int walWrite(void *handle, SWalHead *pHead) {
if (pWal->level == TAOS_WAL_NOLOG) return 0;
pHead->signature = walSignature;
taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWal));
taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead));
int contLen = pHead->len + sizeof(SWalHead);
if(write(pWal->fd, pHead, contLen) != contLen) {
......@@ -177,7 +178,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, SWalHead *, in
uint32_t maxId = 0, minId = -1, index =0;
int plen = strlen(walPrefix);
char opath[TSDB_FILENAME_LEN];
char opath[TSDB_FILENAME_LEN+5];
sprintf(opath, "%s/old", pWal->path);
// is there old directory?
......@@ -272,7 +273,7 @@ static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, SW
break;
}
if (taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
wWarn("wal:%s, cksum is messed up, skip the rest of file", name);
break;
}
......@@ -294,8 +295,8 @@ static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, SW
int walHandleExistingFiles(char *path) {
int code = 0;
char oname[TSDB_FILENAME_LEN];
char nname[TSDB_FILENAME_LEN];
char oname[TSDB_FILENAME_LEN * 3];
char nname[TSDB_FILENAME_LEN * 3];
char opath[TSDB_FILENAME_LEN];
sprintf(opath, "%s/old", path);
......@@ -336,7 +337,7 @@ int walHandleExistingFiles(char *path) {
static int walRemoveWalFiles(char *path) {
int plen = strlen(walPrefix);
char name[TSDB_FILENAME_LEN];
char name[TSDB_FILENAME_LEN * 3];
int code = 0;
if (access(path, F_OK) != 0) return 0;
......
......@@ -24,17 +24,45 @@
void taosMsleep(int mseconds);
static int32_t doQuery(TAOS* taos, const char* sql) {
int32_t code = taos_query(taos, sql);
if (code != 0) {
printf("failed to execute query, reason:%s\n", taos_errstr(taos));
return -1;
}
TAOS_RES* res = taos_use_result(taos);
TAOS_ROW row = NULL;
char buf[512] = {0};
int32_t numOfFields = taos_num_fields(res);
TAOS_FIELD* pFields = taos_fetch_fields(res);
while((row = taos_fetch_row(res)) != NULL) {
taos_print_row(buf, row, pFields, numOfFields);
printf("%s\n", buf);
memset(buf, 0, 512);
}
taos_free_result(res);
return 0;
}
int main(int argc, char *argv[]) {
TAOS * taos;
char qstr[1024];
TAOS_RES *result;
// connect to server
if (argc < 2) {
printf("please input server-ip \n");
return 0;
}
taos_options(TSDB_OPTION_CONFIGDIR, "~/sec/cfg");
// init TAOS
taos_init();
......@@ -45,6 +73,22 @@ int main(int argc, char *argv[]) {
}
printf("success to connect to server\n");
doQuery(taos, "create database if not exists test");
doQuery(taos, "use test");
doQuery(taos, "create table if not exists tm0 (ts timestamp, k int);");
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:1', 1);");
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:2', 2);");
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:3', 3);");
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:4', 4);");
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:5', 5);");
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:6', 6);");
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:7', 7);");
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:8', 8);");
doQuery(taos, "insert into tm0 values('2020-1-1 1:1:9', 9);");
doQuery(taos, "select * from tm0;");
taos_close(taos);
return 0;
taos_query(taos, "drop database demo");
if (taos_query(taos, "create database demo") != 0) {
......@@ -53,8 +97,10 @@ int main(int argc, char *argv[]) {
}
printf("success to create database\n");
taos_query(taos, "use demo");
// create table
if (taos_query(taos, "create table m1 (ts timestamp, speed int)") != 0) {
printf("failed to create table, reason:%s\n", taos_errstr(taos));
......@@ -62,9 +108,11 @@ int main(int argc, char *argv[]) {
}
printf("success to create table\n");
// sleep for one second to make sure table is created on data node
// taosMsleep(1000);
// insert 10 records
int i = 0;
for (i = 0; i < 10; ++i) {
......@@ -76,6 +124,7 @@ int main(int argc, char *argv[]) {
}
printf("success to insert rows, total %d rows\n", i);
// query the records
sprintf(qstr, "SELECT * FROM m1");
if (taos_query(taos, qstr) != 0) {
......@@ -83,12 +132,16 @@ int main(int argc, char *argv[]) {
exit(1);
}
result = taos_use_result(taos);
if (result == NULL) {
printf("failed to get result, reason:%s\n", taos_errstr(taos));
exit(1);
}
// TAOS_ROW row;
TAOS_ROW row;
int rows = 0;
......@@ -96,6 +149,7 @@ int main(int argc, char *argv[]) {
TAOS_FIELD *fields = taos_fetch_fields(result);
char temp[256];
printf("select * from table, result:\n");
// fetch the records row by row
while ((row = taos_fetch_row(result))) {
......
#################################
run general/user/basic1.sim
run general/show/dnodes.sim
run general/db/basic1.sim
run general/db/basic2.sim
run general/db/basic3.sim
run general/db/basic4.sim
run general/db/basic5.sim
run general/table/basic1.sim
run general/table/basic2.sim
run general/table/basic3.sim
#run general/table/basic2.sim
#run general/table/basic3.sim
##################################
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -m 192.168.0.1 -i 192.168.0.1
system ifconfig
system sh/exec.sh -n dnode1 -s start
sql connect
......@@ -43,9 +44,9 @@ print $data10 $data11 $data22
print $data20 $data11 $data22
print =============== insert data
sql insert into d1.n1 values(now, 1)
sql insert into d1.n1 values(now, 2)
sql insert into d1.n1 values(now, 3)
sql insert into d1.n1 values(now+1s, 1)
sql insert into d1.n1 values(now+2s, 2)
sql insert into d1.n1 values(now+3s, 3)
print =============== query data
sql select * from d1.n1
......@@ -69,7 +70,3 @@ if $data21 != 3 then
return -1
endi
sql drop database d1
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -64,6 +64,3 @@ if $data21 != 3 then
return -1
endi
sql drop database d1
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
......@@ -43,9 +43,9 @@ print $data10 $data11 $data22
print $data20 $data11 $data22
print =============== insert data
sql insert into db.n1 values(now, 1)
sql insert into db.n1 values(now, 2)
sql insert into db.n1 values(now, 3)
sql insert into db.n1 values(now+1s, 1)
sql insert into db.n1 values(now+2s, 2)
sql insert into db.n1 values(now+3s, 3)
print =============== query data
sql select * from db.n1
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -m 192.168.0.1 -i 192.168.0.1
system sh/deploy.sh -n dnode2 -m 192.168.0.1 -i 192.168.0.2
system sh/exec_up.sh -n dnode1 -s start
system sh/exec_up.sh -n dnode2 -s start
sql connect
\ No newline at end of file
......@@ -290,7 +290,7 @@ loop_end:
input_buffer->offset += (size_t)(after_end - number_c_string);
strncpy(item->numberstring, number_c_string, 12);
strncpy(item->numberstring, (const char *)number_c_string, 12);
return true;
}
......
......@@ -576,6 +576,7 @@ bool simCreateRestFulConnect(SScript *script, char *user, char *pass) {
bool simCreateNativeConnect(SScript *script, char *user, char *pass) {
simCloseTaosdConnect(script);
void *taos = NULL;
taosMsleep(2000);
for (int attempt = 0; attempt < 10; ++attempt) {
taos = taos_connect(NULL, user, pass, NULL, tsMnodeShellPort);
if (taos == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册