提交 7ef0ded3 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into feature/dnode3

......@@ -106,6 +106,10 @@ pipeline {
abortPreviousBuilds()
}
pre_test()
sh'''
cd ${WKC}/tests
./test-all.sh b1fq
'''
}
}
// stage('Parallel test stage') {
......
# stub
ExternalProject_Add(stub
GIT_REPOSITORY https://github.com/coolxv/cpp-stub.git
GIT_SUBMODULES "src"
SOURCE_DIR "${CMAKE_CONTRIB_DIR}/cpp-stub"
BINARY_DIR "${CMAKE_CONTRIB_DIR}/cpp-stub/src"
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
)
......@@ -12,6 +12,7 @@ configure_file("${CMAKE_SUPPORT_DIR}/deps_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}
# googletest
if(${BUILD_TEST})
cat("${CMAKE_SUPPORT_DIR}/gtest_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
cat("${CMAKE_SUPPORT_DIR}/stub_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_TEST})
# lz4
......@@ -79,6 +80,11 @@ execute_process(COMMAND "${CMAKE_COMMAND}" --build .
# googletest
if(${BUILD_TEST})
add_subdirectory(googletest)
target_include_directories(
gtest
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/cpp-stub/src>
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/cpp-stub/src_linux>
)
endif(${BUILD_TEST})
# cJson
......
......@@ -86,26 +86,32 @@ typedef struct STscObj {
SAppInstInfo *pAppInfo;
} STscObj;
typedef struct SClientResultInfo {
const char *pMsg;
typedef struct SReqResultInfo {
const char *pRspMsg;
const char *pData;
TAOS_FIELD *fields;
int32_t numOfCols;
int32_t numOfRows;
int32_t current;
uint32_t numOfCols;
int32_t *length;
TAOS_ROW row;
char **pCol;
} SClientResultInfo;
typedef struct SReqBody {
tsem_t rspSem; // not used now
void* fp;
void* param;
int32_t paramLen;
int64_t execId; // showId/queryId
SClientResultInfo* pResInfo;
} SRequestBody;
uint32_t numOfRows;
uint32_t current;
} SReqResultInfo;
typedef struct SReqMsg {
void *pMsg;
uint32_t len;
} SReqMsgInfo;
typedef struct SRequestSendRecvBody {
tsem_t rspSem; // not used now
void* fp;
int64_t execId; // showId/queryId
SReqMsgInfo requestMsg;
SReqResultInfo resInfo;
} SRequestSendRecvBody;
#define ERROR_MSG_BUF_DEFAULT_SIZE 512
......@@ -115,7 +121,7 @@ typedef struct SRequestObj {
STscObj *pTscObj;
SQueryExecMetric metric;
char *sqlstr; // sql string
SRequestBody body;
SRequestSendRecvBody body;
int64_t self;
char *msgBuf;
int32_t code;
......@@ -123,11 +129,10 @@ typedef struct SRequestObj {
} SRequestObj;
typedef struct SRequestMsgBody {
int32_t msgType;
void *pData;
int32_t msgLen;
uint64_t requestId;
uint64_t requestObjRefId;
int32_t msgType;
SReqMsgInfo msgInfo;
uint64_t requestId;
uint64_t requestObjRefId;
} SRequestMsgBody;
extern SAppInfo appInfo;
......@@ -158,7 +163,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen);
void* doFetchRow(SRequestObj* pRequest);
void setResultDataPtr(SClientResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows);
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows);
#ifdef __cplusplus
}
......
......@@ -155,8 +155,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
code = qParseQuerySql(pRequest->sqlstr, sqlLen, pRequest->requestId, &type, &output, &outputLen, pRequest->msgBuf, ERROR_MSG_BUF_DEFAULT_SIZE);
if (type == TSDB_SQL_CREATE_USER || type == TSDB_SQL_SHOW || type == TSDB_SQL_DROP_USER || type == TSDB_SQL_DROP_ACCT || type == TSDB_SQL_CREATE_DB || type == TSDB_SQL_CREATE_ACCT) {
pRequest->type = type;
pRequest->body.param = output;
pRequest->body.paramLen = outputLen;
pRequest->body.requestMsg = (SReqMsgInfo){.pMsg = output, .len = outputLen};
SRequestMsgBody body = {0};
buildRequestMsgFp[type](pRequest, &body);
......@@ -165,6 +164,8 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId);
tsem_wait(&pRequest->body.rspSem);
destroyRequestMsgBody(&body);
} else {
assert(0);
......@@ -255,7 +256,7 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con
static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) {
pMsgBody->msgType = TSDB_MSG_TYPE_CONNECT;
pMsgBody->msgLen = sizeof(SConnectMsg);
pMsgBody->msgInfo.len = sizeof(SConnectMsg);
pMsgBody->requestObjRefId = pRequest->self;
SConnectMsg *pConnect = calloc(1, sizeof(SConnectMsg));
......@@ -279,28 +280,28 @@ static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody)
pConnect->startTime = htobe64(appInfo.startTime);
tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app));
pMsgBody->pData = pConnect;
pMsgBody->msgInfo.pMsg = pConnect;
return 0;
}
static void destroyRequestMsgBody(SRequestMsgBody* pMsgBody) {
assert(pMsgBody != NULL);
tfree(pMsgBody->pData);
tfree(pMsgBody->msgInfo.pMsg);
}
int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, const SRequestMsgBody *pBody, int64_t* pTransporterId) {
char *pMsg = rpcMallocCont(pBody->msgLen);
char *pMsg = rpcMallocCont(pBody->msgInfo.len);
if (NULL == pMsg) {
tscError("0x%"PRIx64" msg:%s malloc failed", pBody->requestId, taosMsg[pBody->msgType]);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return -1;
}
memcpy(pMsg, pBody->pData, pBody->msgLen);
memcpy(pMsg, pBody->msgInfo.pMsg, pBody->msgInfo.len);
SRpcMsg rpcMsg = {
.msgType = pBody->msgType,
.pCont = pMsg,
.contLen = pBody->msgLen,
.contLen = pBody->msgInfo.len,
.ahandle = (void*) pBody->requestObjRefId,
.handle = NULL,
.code = 0
......@@ -388,7 +389,7 @@ TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, c
void* doFetchRow(SRequestObj* pRequest) {
assert(pRequest != NULL);
SClientResultInfo* pResultInfo = pRequest->body.pResInfo;
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
pRequest->type = TSDB_SQL_RETRIEVE_MNODE;
......@@ -421,7 +422,7 @@ void* doFetchRow(SRequestObj* pRequest) {
return pResultInfo->row;
}
void setResultDataPtr(SClientResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) {
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) {
assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
if (numOfRows == 0) {
return;
......
......@@ -115,12 +115,7 @@ int taos_field_count(TAOS_RES *res) {
}
SRequestObj* pRequest = (SRequestObj*) res;
SClientResultInfo* pResInfo = pRequest->body.pResInfo;
if (pResInfo == NULL) {
return 0;
}
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
return pResInfo->numOfCols;
}
......@@ -133,7 +128,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
return NULL;
}
SClientResultInfo* pResInfo = ((SRequestObj*) res)->body.pResInfo;
SReqResultInfo* pResInfo = &(((SRequestObj*) res)->body.resInfo);
return pResInfo->fields;
}
......@@ -248,7 +243,7 @@ int* taos_fetch_lengths(TAOS_RES *res) {
return NULL;
}
return ((SRequestObj*) res)->body.pResInfo->length;
return ((SRequestObj*) res)->body.resInfo.length;
}
const char *taos_data_type(int type) {
......
......@@ -13,11 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <catalog.h>
#include <tname.h>
#include "os.h"
#include "catalog.h"
#include "tname.h"
#include "clientInt.h"
#include "clientLog.h"
#include "os.h"
#include "tmsgtype.h"
#include "trpc.h"
......@@ -29,16 +29,6 @@ void tscProcessActivityTimer(void *handle, void *tmrId);
static int32_t extractSTableQueryVgroupId(STableMetaInfo* pTableMetaInfo);
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
static int32_t getWaitingTimeInterval(int32_t count) {
int32_t initial = 100; // 100 ms by default
if (count <= 1) {
return 0;
}
return initial * ((2u)<<(count - 2));
}
static int32_t vgIdCompare(const void *lhs, const void *rhs) {
int32_t left = *(int32_t *)lhs;
int32_t right = *(int32_t *)rhs;
......@@ -298,36 +288,6 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
taosReleaseRef(tscRefId, rid);
}
int tscSendMsgToServer(SSqlObj *pSql) {
STscObj* pObj = pSql->pTscObj;
SSqlCmd* pCmd = &pSql->cmd;
char *pMsg = rpcMallocCont(pCmd->payloadLen);
if (NULL == pMsg) {
tscError("0x%"PRIx64" msg:%s malloc failed", pSql->self, taosMsg[pSql->cmd.msgType]);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
// set the mgmt ip list
if (pSql->cmd.command >= TSDB_SQL_MGMT) {
tscDumpMgmtEpSet(pSql);
}
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
SRpcMsg rpcMsg = {
.msgType = pSql->cmd.msgType,
.pCont = pMsg,
.contLen = pSql->cmd.payloadLen,
.ahandle = (void*)pSql->self,
.handle = NULL,
.code = 0
};
rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid);
return TSDB_CODE_SUCCESS;
}
// handle three situation
// 1. epset retry, only return last failure ep
// 2. no epset retry, like 'taos -h invalidFqdn', return invalidFqdn
......@@ -354,176 +314,6 @@ void tscSetFqdnErrorMsg(SSqlObj* pSql, SRpcEpSet* pEpSet) {
}
}
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
if (pSql == NULL) {
rpcFreeCont(rpcMsg->pCont);
return;
}
assert(pSql->self == handle);
STscObj *pObj = pSql->pTscObj;
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
pSql->rpcRid = -1;
if (pObj->signature != pObj) {
tscDebug("0x%"PRIx64" DB connection is closed, cmd:%d pObj:%p signature:%p", pSql->self, pCmd->command, pObj, pObj->signature);
taosRemoveRef(tscObjRef, handle);
taosReleaseRef(tscObjRef, handle);
rpcFreeCont(rpcMsg->pCont);
return;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
tscDebug("0x%"PRIx64" sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
pSql->self, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
taosRemoveRef(tscObjRef, handle);
taosReleaseRef(tscObjRef, handle);
rpcFreeCont(rpcMsg->pCont);
return;
}
if (pEpSet) {
if (!tscEpSetIsEqual(&pSql->epSet, pEpSet)) {
if (pCmd->command < TSDB_SQL_MGMT) {
tscUpdateVgroupInfo(pSql, pEpSet);
} else {
tscUpdateMgmtEpSet(pSql, pEpSet);
}
}
}
int32_t cmd = pCmd->command;
// set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
if (cmd == TSDB_SQL_INSERT && rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
pSql->cmd.insertParam.schemaAttached = 1;
}
// single table query error need to be handled here.
if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_UPDATE_TAG_VAL) &&
(((rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID)) ||
rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_APP_NOT_READY)) {
// 1. super table subquery
// 2. nest queries are all not updated the tablemeta and retry parse the sql after cleanup local tablemeta/vgroup id buffer
if ((TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY |
TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) &&
!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) ||
(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY)) || (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY) && pQueryInfo->distinct)) {
// do nothing in case of super table subquery
} else {
pSql->retry += 1;
tscWarn("0x%" PRIx64 " it shall renew table meta, code:%s, retry:%d", pSql->self, tstrerror(rpcMsg->code), pSql->retry);
pSql->res.code = rpcMsg->code; // keep the previous error code
if (pSql->retry > pSql->maxRetry) {
tscError("0x%" PRIx64 " max retry %d reached, give up", pSql->self, pSql->maxRetry);
} else {
// wait for a little bit moment and then retry
// todo do not sleep in rpc callback thread, add this process into queue to process
if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
int32_t duration = getWaitingTimeInterval(pSql->retry);
taosMsleep(duration);
}
pSql->retryReason = rpcMsg->code;
rpcMsg->code = tscRenewTableMeta(pSql, 0);
// if there is an error occurring, proceed to the following error handling procedure.
if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, handle);
rpcFreeCont(rpcMsg->pCont);
return;
}
}
}
}
pRes->rspLen = 0;
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
tscDebug("0x%"PRIx64" query is cancelled, code:%s", pSql->self, tstrerror(pRes->code));
} else {
pRes->code = rpcMsg->code;
}
if (pRes->code == TSDB_CODE_SUCCESS) {
tscDebug("0x%"PRIx64" reset retry counter to be 0 due to success rsp, old:%d", pSql->self, pSql->retry);
pSql->retry = 0;
}
if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
assert(rpcMsg->msgType == pCmd->msgType + 1);
pRes->code = rpcMsg->code;
pRes->rspType = rpcMsg->msgType;
pRes->rspLen = rpcMsg->contLen;
if (pRes->rspLen > 0 && rpcMsg->pCont) {
char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
if (tmp == NULL) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
} else {
pRes->pRsp = tmp;
memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
}
} else {
tfree(pRes->pRsp);
}
/*
* There is not response callback function for submit response.
* The actual inserted number of points is the first number.
*/
if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
pMsg->code = htonl(pMsg->code);
pMsg->numOfRows = htonl(pMsg->numOfRows);
pMsg->affectedRows = htonl(pMsg->affectedRows);
pMsg->failedRows = htonl(pMsg->failedRows);
pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);
pRes->numOfRows += pMsg->affectedRows;
tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql->self, sqlCmd[pCmd->command],
tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
} else {
tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s rspLen:%d", pSql->self, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen);
}
}
if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
}
bool shouldFree = tscShouldBeFreed(pSql);
if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
if (rpcMsg->code != TSDB_CODE_SUCCESS) {
pRes->code = rpcMsg->code;
}
rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code;
if (rpcMsg->code == TSDB_CODE_RPC_FQDN_ERROR) {
tscAllocPayload(pCmd, TSDB_FQDN_LEN + 64);
tscSetFqdnErrorMsg(pSql, pEpSet);
}
(*pSql->fp)(pSql->param, pSql, rpcMsg->code);
}
if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it
tscDebug("0x%"PRIx64" sqlObj is automatically freed", pSql->self);
taosRemoveRef(tscObjRef, handle);
}
taosReleaseRef(tscObjRef, handle);
rpcFreeCont(rpcMsg->pCont);
}
int doBuildAndSendMsg(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
......@@ -2987,51 +2777,6 @@ int32_t tscGetUdfFromNode(SSqlObj *pSql, SQueryInfo* pQueryInfo) {
return code;
}
static void freeElem(void* p) {
tfree(*(char**)p);
}
/**
* retrieve table meta from mnode, and then update the local table meta hashmap.
* @param pSql sql object
* @param tableIndex table index
* @return status code
*/
int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
SSqlCmd* pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
char name[TSDB_TABLE_FNAME_LEN] = {0};
int32_t code = tNameExtractFullName(&pTableMetaInfo->name, name);
if (code != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64" failed to generate the table full name", pSql->self);
return TSDB_CODE_TSC_INVALID_OPERATION;
}
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
if (pTableMeta) {
tscDebug("0x%"PRIx64" update table meta:%s, old meta numOfTags:%d, numOfCols:%d, uid:%" PRIu64, pSql->self, name,
tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid);
}
// remove stored tableMeta info in hash table
tscResetSqlCmd(pCmd, true, pSql->self);
SArray* pNameList = taosArrayInit(1, POINTER_BYTES);
SArray* vgroupList = taosArrayInit(1, POINTER_BYTES);
char* n = strdup(name);
taosArrayPush(pNameList, &n);
code = getMultiTableMetaFromMnode(pSql, pNameList, vgroupList, NULL, tscTableMetaCallBack, true);
taosArrayDestroyEx(pNameList, freeElem);
taosArrayDestroyEx(vgroupList, freeElem);
return code;
}
static bool allVgroupInfoRetrieved(SQueryInfo* pQueryInfo) {
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
......@@ -3044,58 +2789,11 @@ static bool allVgroupInfoRetrieved(SQueryInfo* pQueryInfo) {
return true;
}
int tscGetSTableVgroupInfo(SSqlObj *pSql, SQueryInfo* pQueryInfo) {
int32_t code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
if (allVgroupInfoRetrieved(pQueryInfo)) {
return TSDB_CODE_SUCCESS;
}
SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
pNew->pTscObj = pSql->pTscObj;
pNew->signature = pNew;
pNew->cmd.command = TSDB_SQL_STABLEVGROUP;
// TODO TEST IT
SQueryInfo *pNewQueryInfo = tscGetQueryInfoS(&pNew->cmd);
if (pNewQueryInfo == NULL) {
tscFreeSqlObj(pNew);
return code;
}
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
STableMeta* pTableMeta = tscTableMetaDup(pMInfo->pTableMeta);
tscAddTableMetaInfo(pNewQueryInfo, &pMInfo->name, pTableMeta, NULL, pMInfo->tagColList, pMInfo->pVgroupTables);
}
if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
tscFreeSqlObj(pNew);
return code;
}
pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
registerSqlObj(pNew);
tscDebug("0x%"PRIx64" svgroupRid from %" PRId64 " to %" PRId64 , pSql->self, pSql->svgroupRid, pNew->self);
pSql->svgroupRid = pNew->self;
tscDebug("0x%"PRIx64" new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql->self, pNew, pNewQueryInfo->numOfTables);
pNew->fp = tscTableMetaCallBack;
pNew->param = (void *)pSql->self;
code = tscBuildAndSendRequest(pNew, NULL);
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;
}
return code;
}
#endif
int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) {
pMsgBody->msgType = TSDB_MSG_TYPE_CONNECT;
pMsgBody->msgLen = sizeof(SConnectMsg);
pMsgBody->msgInfo.len = sizeof(SConnectMsg);
pMsgBody->requestObjRefId = pRequest->self;
SConnectMsg *pConnect = calloc(1, sizeof(SConnectMsg));
......@@ -3119,7 +2817,7 @@ int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) {
pConnect->startTime = htobe64(appInfo.startTime);
tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app));
pMsgBody->pData = pConnect;
pMsgBody->msgInfo.pMsg = pConnect;
return 0;
}
......@@ -3160,17 +2858,14 @@ int processConnectRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) {
pTscObj->pAppInfo->clusterId = pConnect->clusterId;
atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
pRequest->body.pResInfo = calloc(1, sizeof(SClientResultInfo));
pRequest->body.pResInfo->pMsg = pMsg;
pRequest->body.resInfo.pRspMsg = pMsg;
tscDebug("0x%" PRIx64 " clusterId:%d, totalConn:%"PRId64, pRequest->requestId, pConnect->clusterId, pTscObj->pAppInfo->numOfConns);
return 0;
}
int32_t doBuildMsgSupp(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) {
pMsgBody->requestObjRefId = pRequest->self;
pMsgBody->msgLen = pRequest->body.paramLen;
pMsgBody->pData = pRequest->body.param;
pMsgBody->msgInfo = pRequest->body.requestMsg;
switch(pRequest->type) {
case TSDB_SQL_CREATE_USER:
......@@ -3188,7 +2883,7 @@ int32_t doBuildMsgSupp(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) {
case TSDB_SQL_CREATE_DB: {
pMsgBody->msgType = TSDB_MSG_TYPE_CREATE_DB;
SCreateDbMsg* pCreateMsg = pRequest->body.param;
SCreateDbMsg* pCreateMsg = pRequest->body.requestMsg.pMsg;
SName name = {0};
int32_t ret = tNameSetDbName(&name, pRequest->pTscObj->acctId, pCreateMsg->db, strnlen(pCreateMsg->db, tListLen(pCreateMsg->db)));
if (ret != TSDB_CODE_SUCCESS) {
......@@ -3205,36 +2900,6 @@ int32_t doBuildMsgSupp(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) {
}
}
STableMeta* createTableMetaFromMsg(STableMetaMsg* pTableMetaMsg) {
assert(pTableMetaMsg != NULL && pTableMetaMsg->numOfColumns >= 2);
size_t schemaSize = (pTableMetaMsg->numOfColumns + pTableMetaMsg->numOfTags) * sizeof(SSchema);
STableMeta* pTableMeta = calloc(1, sizeof(STableMeta) + schemaSize);
pTableMeta->tableType = pTableMetaMsg->tableType;
pTableMeta->vgId = pTableMetaMsg->vgId;
pTableMeta->suid = pTableMetaMsg->suid;
pTableMeta->uid = pTableMetaMsg->tuid;
pTableMeta->tableInfo = (STableComInfo) {
.numOfTags = pTableMetaMsg->numOfTags,
.precision = pTableMetaMsg->precision,
.numOfColumns = pTableMetaMsg->numOfColumns,
};
pTableMeta->sversion = pTableMetaMsg->sversion;
pTableMeta->tversion = pTableMetaMsg->tversion;
memcpy(pTableMeta->schema, pTableMetaMsg->pSchema, schemaSize);
int32_t numOfTotalCols = pTableMeta->tableInfo.numOfColumns;
for(int32_t i = 0; i < numOfTotalCols; ++i) {
pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes;
}
return pTableMeta;
}
int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) {
SShowRsp* pShow = (SShowRsp *)pMsg;
pShow->showId = htonl(pShow->showId);
......@@ -3257,12 +2922,8 @@ int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen)
pFields[i].bytes = pSchema[i].bytes;
}
if (pRequest->body.pResInfo == NULL) {
pRequest->body.pResInfo = calloc(1, sizeof(SClientResultInfo));
}
pRequest->body.pResInfo->pMsg = pMsg;
SClientResultInfo* pResInfo = pRequest->body.pResInfo;
pRequest->body.resInfo.pRspMsg = pMsg;
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
pResInfo->fields = pFields;
pResInfo->numOfCols = pMetaMsg->numOfColumns;
......@@ -3276,27 +2937,27 @@ int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen)
int buildRetrieveMnodeMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) {
pMsgBody->msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
pMsgBody->msgLen = sizeof(SRetrieveTableMsg);
pMsgBody->msgInfo.len = sizeof(SRetrieveTableMsg);
pMsgBody->requestObjRefId = pRequest->self;
SRetrieveTableMsg *pRetrieveMsg = calloc(1, sizeof(SRetrieveTableMsg));
pRetrieveMsg->showId = htonl(pRequest->body.execId);
pMsgBody->pData = pRetrieveMsg;
pMsgBody->msgInfo.pMsg = pRetrieveMsg;
return TSDB_CODE_SUCCESS;
}
int32_t processRetrieveMnodeRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) {
assert(msgLen >= sizeof(SRetrieveTableRsp));
tfree(pRequest->body.pResInfo->pMsg);
pRequest->body.pResInfo->pMsg = pMsg;
tfree(pRequest->body.resInfo.pRspMsg);
pRequest->body.resInfo.pRspMsg = pMsg;
SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) pMsg;
pRetrieve->numOfRows = htonl(pRetrieve->numOfRows);
pRetrieve->precision = htons(pRetrieve->precision);
SClientResultInfo* pResInfo = pRequest->body.pResInfo;
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
pResInfo->numOfRows = pRetrieve->numOfRows;
pResInfo->pData = pRetrieve->data; // todo fix this in async model
......
......@@ -170,7 +170,7 @@ void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t ty
pRequest->type = type;
pRequest->pTscObj = pObj;
pRequest->body.fp = fp;
pRequest->body.param = param;
// pRequest->body.requestMsg. = param;
pRequest->msgBuf = calloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
tsem_init(&pRequest->body.rspSem, 0, 0);
......@@ -188,10 +188,7 @@ static void doDestroyRequest(void* p) {
tfree(pRequest->sqlstr);
tfree(pRequest->pInfo);
if (pRequest->body.pResInfo != NULL) {
tfree(pRequest->body.pResInfo->pMsg);
tfree(pRequest->body.pResInfo);
}
tfree(pRequest->body.resInfo.pRspMsg);
deregisterRequest(pRequest);
tfree(pRequest);
......
......@@ -19,5 +19,5 @@ target_link_libraries(
# test
if(${BUILD_TEST})
#add_subdirectory(test)
add_subdirectory(test)
endif(${BUILD_TEST})
\ No newline at end of file
......@@ -71,6 +71,7 @@ struct SVnode {
SWal* pWal;
SVnodeSync* pSync;
SVnodeFS* pFs;
tsem_t canCommit;
};
int vnodeScheduleTask(SVnodeTask* task);
......
......@@ -38,9 +38,10 @@ int vnodeCommit(void *arg) {
metaCommit(pVnode->pMeta);
tqCommit(pVnode->pTq);
tsdbCommit(pVnode->pTq);
tsdbCommit(pVnode->pTsdb);
vnodeBufPoolRecycle(pVnode);
tsem_post(&(pVnode->canCommit));
// TODO
return 0;
}
......
......@@ -74,11 +74,14 @@ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) {
pVnode->path = strdup(path);
vnodeOptionsCopy(&(pVnode->config), pVnodeCfg);
tsem_init(&(pVnode->canCommit), 0, 1);
return pVnode;
}
static void vnodeFree(SVnode *pVnode) {
if (pVnode) {
tsem_destroy(&(pVnode->canCommit));
tfree(pVnode->path);
free(pVnode);
}
......
......@@ -27,14 +27,14 @@ int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) {
}
int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
SRpcMsg *pMsg;
SRpcMsg * pMsg;
SVnodeReq *pVnodeReq;
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
// ser request version
void *pBuf = pMsg->pCont;
void * pBuf = pMsg->pCont;
int64_t ver = pVnode->state.processed++;
taosEncodeFixedU64(&pBuf, ver);
......@@ -45,68 +45,61 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
walFsync(pVnode->pWal, false);
// Apply each request now
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
SVnodeReq vReq;
// TODO: Integrate RAFT module here
// Apply the request
{
void *ptr = vnodeMalloc(pVnode, pMsg->contLen);
if (ptr == NULL) {
// TODO: handle error
}
return 0;
}
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
SVnodeReq vReq;
void * ptr = vnodeMalloc(pVnode, pMsg->contLen);
if (ptr == NULL) {
// TODO: handle error
}
// TODO: copy here need to be extended
memcpy(ptr, pMsg->pCont, pMsg->contLen);
// TODO: copy here need to be extended
memcpy(ptr, pMsg->pCont, pMsg->contLen);
// todo: change the interface here
uint64_t ver;
taosDecodeFixedU64(pMsg->pCont, &ver);
if (tqPushMsg(pVnode->pTq, ptr, ver) < 0) {
// TODO: handle error
}
// todo: change the interface here
uint64_t ver;
taosDecodeFixedU64(pMsg->pCont, &ver);
if (tqPushMsg(pVnode->pTq, ptr, ver) < 0) {
vnodeParseReq(pMsg->pCont, &vReq, pMsg->msgType);
switch (pMsg->msgType) {
case TSDB_MSG_TYPE_CREATE_TABLE:
if (metaCreateTable(pVnode->pMeta, &(vReq.ctReq)) < 0) {
// TODO: handle error
}
vnodeParseReq(pMsg->pCont, &vReq, pMsg->msgType);
switch (pMsg->msgType) {
case TSDB_MSG_TYPE_CREATE_TABLE:
if (metaCreateTable(pVnode->pMeta, &(vReq.ctReq)) < 0) {
// TODO: handle error
}
// TODO: maybe need to clear the requst struct
break;
case TSDB_MSG_TYPE_DROP_TABLE:
if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) {
// TODO: handle error
}
break;
case TSDB_MSG_TYPE_SUBMIT:
if (tsdbInsertData(pVnode->pTsdb, (SSubmitMsg *)ptr) < 0) {
// TODO: handle error
}
break;
default:
break;
// TODO: maybe need to clear the requst struct
break;
case TSDB_MSG_TYPE_DROP_TABLE:
if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) {
// TODO: handle error
}
pVnode->state.applied = ver;
}
// Check if it needs to commit
if (vnodeShouldCommit(pVnode)) {
if (vnodeAsyncCommit(pVnode) < 0) {
break;
case TSDB_MSG_TYPE_SUBMIT:
if (tsdbInsertData(pVnode->pTsdb, (SSubmitMsg *)ptr) < 0) {
// TODO: handle error
}
}
break;
default:
break;
}
return 0;
}
pVnode->state.applied = ver;
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// TODO
// Check if it needs to commit
if (vnodeShouldCommit(pVnode)) {
tsem_wait(&(pVnode->canCommit));
if (vnodeAsyncCommit(pVnode) < 0) {
// TODO: handle error
}
}
return 0;
}
......
......@@ -166,6 +166,21 @@ static void vtClearMsgBatch(SArray *pMsgArr) {
taosArrayClear(pMsgArr);
}
static void vtProcessAndApplyReqs(SVnode *pVnode, SArray *pMsgArr) {
int rcode;
SRpcMsg *pReq;
SRpcMsg *pRsp;
rcode = vnodeProcessWMsgs(pVnode, pMsgArr);
GTEST_ASSERT_EQ(rcode, 0);
for (size_t i = 0; i < taosArrayGetSize(pMsgArr); i++) {
pReq = *(SRpcMsg **)taosArrayGet(pMsgArr, i);
rcode = vnodeApplyWMsg(pVnode, pReq, NULL);
GTEST_ASSERT_EQ(rcode, 0);
}
}
TEST(vnodeApiTest, vnode_simple_create_table_test) {
tb_uid_t suid = 1638166374163;
SRpcMsg *pMsg;
......@@ -189,8 +204,7 @@ TEST(vnodeApiTest, vnode_simple_create_table_test) {
sprintf(tbname, "st");
vtBuildCreateStbReq(suid, tbname, &pMsg);
taosArrayPush(pMsgArr, &pMsg);
rcode = vnodeProcessWMsgs(pVnode, pMsgArr);
ASSERT_EQ(rcode, 0);
vtProcessAndApplyReqs(pVnode, pMsgArr);
vtClearMsgBatch(pMsgArr);
// CREATE A LOT OF CHILD TABLES
......@@ -203,8 +217,7 @@ TEST(vnodeApiTest, vnode_simple_create_table_test) {
}
// Process request batch
rcode = vnodeProcessWMsgs(pVnode, pMsgArr);
ASSERT_EQ(rcode, 0);
vtProcessAndApplyReqs(pVnode, pMsgArr);
// Clear request batch
vtClearMsgBatch(pMsgArr);
......@@ -242,16 +255,14 @@ TEST(vnodeApiTest, vnode_simple_insert_test) {
sprintf(tbname, "st");
vtBuildCreateStbReq(suid, tbname, &pMsg);
taosArrayPush(pMsgArr, &pMsg);
rcode = vnodeProcessWMsgs(pVnode, pMsgArr);
GTEST_ASSERT_EQ(rcode, 0);
vtProcessAndApplyReqs(pVnode, pMsgArr);
vtClearMsgBatch(pMsgArr);
// 2. CREATE A CHILD TABLE
sprintf(tbname, "t0");
vtBuildCreateCtbReq(suid, tbname, &pMsg);
taosArrayPush(pMsgArr, &pMsg);
rcode = vnodeProcessWMsgs(pVnode, pMsgArr);
GTEST_ASSERT_EQ(rcode, 0);
vtProcessAndApplyReqs(pVnode, pMsgArr);
vtClearMsgBatch(pMsgArr);
// 3. WRITE A LOT OF TIME-SERIES DATA
......@@ -260,8 +271,7 @@ TEST(vnodeApiTest, vnode_simple_insert_test) {
vtBuildSubmitReq(&pMsg);
taosArrayPush(pMsgArr, &pMsg);
}
rcode = vnodeProcessWMsgs(pVnode, pMsgArr);
GTEST_ASSERT_EQ(rcode, 0);
vtProcessAndApplyReqs(pVnode, pMsgArr);
vtClearMsgBatch(pMsgArr);
}
......
......@@ -44,11 +44,13 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemA
pTq->path = strdup(path);
pTq->tqConfig = tqConfig;
pTq->tqLogReader = tqLogReader;
#if 0
pTq->tqMemRef.pAlloctorFactory = allocFac;
pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
if (pTq->tqMemRef.pAllocator == NULL) {
// TODO: error code of buffer pool
}
#endif
pTq->tqMeta = tqStoreOpen(path, (FTqSerialize)tqSerializeGroup, (FTqDeserialize)tqDeserializeGroup, free, 0);
if (pTq->tqMeta == NULL) {
// TODO: free STQ
......
aux_source_directory(src TSDB_SRC)
add_library(tsdb ${TSDB_SRC})
if(0)
add_library(tsdb ${TSDB_SRC})
else(0)
add_library(tsdb "")
target_sources(tsdb
PRIVATE
"src/tsdbCommit.c"
"src/tsdbMain.c"
"src/tsdbMemTable.c"
"src/tsdbOptions.c"
"src/tsdbWrite.c"
)
endif(0)
target_include_directories(
tsdb
PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/vnode/tsdb"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
tsdb
PUBLIC os
......
......@@ -16,6 +16,7 @@
#ifndef _TD_TSDB_COMMIT_H_
#define _TD_TSDB_COMMIT_H_
#if 0
typedef struct {
int minFid;
int midFid;
......@@ -53,5 +54,6 @@ static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
return -1;
}
}
#endif
#endif /* _TD_TSDB_COMMIT_H_ */
\ No newline at end of file
......@@ -19,8 +19,12 @@
extern "C" {
#endif
#if 0
void *tsdbCompactImpl(STsdbRepo *pRepo);
#endif
#ifdef __cplusplus
}
#endif
......
......@@ -16,6 +16,8 @@
#ifndef _TD_TSDB_FS_H_
#define _TD_TSDB_FS_H_
#if 0
#define TSDB_FS_VERSION 0
// ================== TSDB global config
......@@ -113,4 +115,6 @@ static FORCE_INLINE int tsdbUnLockFS(STsdbFS* pFs) {
return 0;
}
#endif
#endif /* _TD_TSDB_FS_H_ */
......@@ -16,6 +16,8 @@
#ifndef _TS_TSDB_FILE_H_
#define _TS_TSDB_FILE_H_
#if 0
#define TSDB_FILE_HEAD_SIZE 512
#define TSDB_FILE_DELIMITER 0xF00AFA0F
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
......@@ -364,4 +366,5 @@ static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet* pSet) {
return true;
}
#endif
#endif /* _TS_TSDB_FILE_H_ */
\ No newline at end of file
......@@ -16,10 +16,14 @@
#ifndef _TD_TSDB_HEALTH_H_
#define _TD_TSDB_HEALTH_H_
#if 0
bool tsdbUrgeQueryFree(STsdbRepo* pRepo);
int32_t tsdbInsertNewBlock(STsdbRepo* pRepo);
bool tsdbIdleMemEnough();
bool tsdbAllowNewBlock(STsdbRepo* pRepo);
#endif
#endif /* _TD_TSDB_BUFFER_H_ */
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_IDX_H_
#define _TD_TSDB_IDX_H_
#ifdef __cplusplus
extern "C" {
#endif
#ifdef __cplusplus
}
#endif
#endif /*_TD_TSDB_IDX_H_*/
\ No newline at end of file
......@@ -15,6 +15,7 @@
#ifndef _TD_TSDB_READ_IMPL_H_
#define _TD_TSDB_READ_IMPL_H_
#if 0
#include "tfs.h"
#include "tsdb.h"
......@@ -150,4 +151,6 @@ static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) {
return 0;
}
#endif
#endif /*_TD_TSDB_READ_IMPL_H_*/
......@@ -16,6 +16,8 @@
#ifndef TSDB_ROW_MERGE_BUF_H
#define TSDB_ROW_MERGE_BUF_H
#if 0
#ifdef __cplusplus
extern "C" {
#endif
......@@ -42,4 +44,6 @@ static FORCE_INLINE void tsdbFreeMergeBuf(SMergeBuf buf) {
}
#endif
#endif
#endif /* ifndef TSDB_ROW_MERGE_BUF_H */
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_SMA_H_
#define _TD_TSDB_SMA_H_
#ifdef __cplusplus
extern "C" {
#endif
#ifdef __cplusplus
}
#endif
#endif /*_TD_TSDB_SMA_H_*/
\ No newline at end of file
......@@ -16,6 +16,7 @@
#ifndef _TD_TSDB_INT_H_
#define _TD_TSDB_INT_H_
#if 0
// // TODO: remove the include
// #include <errno.h>
// #include <fcntl.h>
......@@ -144,4 +145,5 @@ static FORCE_INLINE int tsdbGetNextMaxTables(int tid) {
}
#endif
#endif
#endif /* _TD_TSDB_INT_H_ */
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
......@@ -26,7 +26,19 @@
extern "C" {
#endif
// tfile header
// |<---suid--->|<---version--->|<--colLen-->|<-colName->|<---type-->|
// |<-uint64_t->|<---int32_t--->|<--int32_t->|<-colLen-->|<-uint8_t->|
typedef struct TFileReadHeader {
uint64_t suid;
int32_t version;
char colName[128]; //
uint8_t colType;
} TFileReadHeader;
#define TFILE_HEADER_SIZE (sizeof(TFILE_HEADER_SIZE) + sizeof(uint32_t));
#define TFILE_HADER_PRE_SIZE (sizeof(uint64_t) + sizeof(int32_t) + sizeof(int32_t))
typedef struct TFileCacheKey {
uint64_t suid;
......@@ -48,13 +60,13 @@ typedef struct TFileCache {
typedef struct TFileWriter {
FstBuilder *fb;
WriterCtx *wc;
WriterCtx *ctx;
} TFileWriter;
typedef struct TFileReader {
T_REF_DECLARE()
Fst *fst;
WriterCtx *ctx;
} TFileReader;
typedef struct IndexTFile {
......@@ -78,18 +90,22 @@ typedef struct TFileReaderOpt {
} TFileReaderOpt;
// tfile cache
// tfile cache, manage tindex reader
TFileCache *tfileCacheCreate(const char *path);
void tfileCacheDestroy(TFileCache *tcache);
TFileReader* tfileCacheGet(TFileCache *tcache, TFileCacheKey *key);
void tfileCachePut(TFileCache *tcache, TFileCacheKey *key, TFileReader *reader);
TFileReader* tfileReaderCreate();
void TFileReaderDestroy(TFileReader *reader);
TFileWriter *tfileWriterCreate(const char *suid, const char *colName);
void tfileWriterDestroy(TFileWriter *tw);
//
IndexTFile *indexTFileCreate(const char *path);
int indexTFilePut(void *tfile, SIndexTerm *term, uint64_t uid);
int indexTFileSearch(void *tfile, SIndexTermQuery *query, SArray *result);
......
......@@ -65,6 +65,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char *path, bool readOnly, int
ctx->file.fd = tfOpenReadWrite(tmpFile);
}
if (ctx->file.fd < 0) {
goto END;
indexError("open file error %d", errno);
}
} else if (ctx->type == TMemory) {
......@@ -79,6 +80,9 @@ WriterCtx* writerCtxCreate(WriterType type, const char *path, bool readOnly, int
ctx->limit = capacity;
return ctx;
END:
if (ctx->type == TMemory) { free(ctx->mem.buf); }
free(ctx);
}
void writerCtxDestroy(WriterCtx *ctx) {
if (ctx->type == TMemory) {
......
......@@ -13,14 +13,38 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <sys/types.h>
#include <dirent.h>
//#include <sys/types.h>
//#include <dirent.h>
#include "index_tfile.h"
#include "index_fst.h"
#include "index_util.h"
#include "taosdef.h"
#include "index.h"
#include "index_fst_counting_writer.h"
// tfile name suid-colId-version.tindex
static FORCE_INLINE int tfileLoadHeader(WriterCtx *ctx, TFileReadHeader *header) {
//TODO simple tfile header later
char buf[TFILE_HADER_PRE_SIZE];
char *p = buf;
int64_t nread = ctx->read(ctx, buf, TFILE_HADER_PRE_SIZE);
assert(nread == TFILE_HADER_PRE_SIZE);
memcpy(&header->suid, p, sizeof(header->suid));
p += sizeof(header->suid);
memcpy(&header->version, p, sizeof(header->version));
p += sizeof(header->version);
int32_t colLen = 0;
memcpy(&colLen, p, sizeof(colLen));
assert(colLen < sizeof(header->colName));
nread = ctx->read(ctx, header->colName, colLen);
assert(nread == colLen);
nread = ctx->read(ctx, &header->colType, sizeof(header->colType));
return 0;
};
static int tfileGetFileList(const char *path, SArray *result) {
DIR *dir = opendir(path);
if (NULL == dir) { return -1; }
......@@ -35,6 +59,10 @@ static int tfileGetFileList(const char *path, SArray *result) {
closedir(dir);
return 0;
}
static void tfileDestroyFileName(void *elem) {
char *p = *(char **)elem;
free(p);
}
static int tfileCompare(const void *a, const void *b) {
const char *aName = *(char **)a;
const char *bName = *(char **)b;
......@@ -42,6 +70,7 @@ static int tfileCompare(const void *a, const void *b) {
size_t bLen = strlen(bName);
return strncmp(aName, bName, aLen > bLen ? aLen : bLen);
}
// tfile name suid-colId-version.tindex
static int tfileParseFileName(const char *filename, uint64_t *suid, int *colId, int *version) {
if (3 == sscanf(filename, "%" PRIu64 "-%d-%d.tindex", suid, colId, version)) {
// read suid & colid & version success
......@@ -74,14 +103,28 @@ TFileCache *tfileCacheCreate(const char *path) {
uint64_t suid;
int colId, version;
if (0 != tfileParseFileName(file, &suid, &colId, &version)) {
// invalid file, just skip
goto End;
continue;
}
free((void *)file);
TFileReader *reader = calloc(1, sizeof(TFileReader));
reader->ctx = writerCtxCreate(TFile, file, true, 1024 * 64);
if (reader->ctx == NULL) {
TFileReaderDestroy(reader);
indexError("failed to open index: %s", file);
goto End;
}
TFileReadHeader header = {0};
if (0 != tfileLoadHeader(reader->ctx, &header)) {
TFileReaderDestroy(reader);
indexError("failed to load index header, index Id: %s", file);
}
}
taosArrayDestroy(files);
taosArrayDestroyEx(files, tfileDestroyFileName);
return tcache;
End:
taosArrayDestroyEx(files, tfileDestroyFileName);
return NULL;
}
void tfileCacheDestroy(TFileCache *tcache) {
......@@ -103,13 +146,25 @@ void tfileCachePut(TFileCache *tcache, TFileCacheKey *key, TFileReader *reader)
}
TFileReader* tfileReaderCreate() {
}
void TFileReaderDestroy(TFileReader *reader) {
if (reader == NULL) { return; }
writerCtxDestroy(reader->ctx);
free(reader);
}
TFileWriter *tfileWriterCreate(const char *suid, const char *colName);
void tfileWriterDestroy(TFileWriter *tw);
IndexTFile *indexTFileCreate(const char *path) {
IndexTFile *tfile = calloc(1, sizeof(IndexTFile));
tfile->cache = tfileCacheCreate(path);
return tfile;
}
void IndexTFileDestroy(IndexTFile *tfile) {
......
......@@ -5,6 +5,7 @@
#include "taosmsg.h"
SCreateUserMsg* buildUserManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
SCreateAcctMsg* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
SDropUserMsg* buildDropUserMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
SShowMsg* buildShowMsg(SShowInfo* pShowInfo, int64_t id, char* msgBuf, int32_t msgLen);
SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, char* msgBuf, int32_t msgLen);
......
......@@ -24,6 +24,50 @@ SCreateUserMsg* buildUserManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, in
return pMsg;
}
SCreateAcctMsg* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen) {
SCreateAcctMsg* pMsg = (SCreateAcctMsg*)calloc(1, sizeof(SCreateAcctMsg));
if (pMsg == NULL) {
// tscError("0x%" PRIx64 " failed to malloc for query msg", id);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL;
}
SCreateAcctMsg *pCreateMsg = (SCreateAcctMsg *) calloc(1, sizeof(SCreateAcctMsg));
SToken *pName = &pInfo->pMiscInfo->user.user;
SToken *pPwd = &pInfo->pMiscInfo->user.passwd;
strncpy(pCreateMsg->user, pName->z, pName->n);
strncpy(pCreateMsg->pass, pPwd->z, pPwd->n);
SCreateAcctInfo *pAcctOpt = &pInfo->pMiscInfo->acctOpt;
pCreateMsg->maxUsers = htonl(pAcctOpt->maxUsers);
pCreateMsg->maxDbs = htonl(pAcctOpt->maxDbs);
pCreateMsg->maxTimeSeries = htonl(pAcctOpt->maxTimeSeries);
pCreateMsg->maxStreams = htonl(pAcctOpt->maxStreams);
// pCreateMsg->maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
pCreateMsg->maxStorage = htobe64(pAcctOpt->maxStorage);
// pCreateMsg->maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
// pCreateMsg->maxConnections = htonl(pAcctOpt->maxConnections);
if (pAcctOpt->stat.n == 0) {
pCreateMsg->accessState = -1;
} else {
if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
pCreateMsg->accessState = TSDB_VN_READ_ACCCESS;
} else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
pCreateMsg->accessState = TSDB_VN_WRITE_ACCCESS;
} else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
pCreateMsg->accessState = TSDB_VN_ALL_ACCCESS;
} else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
pCreateMsg->accessState = 0;
}
}
*outputLen = sizeof(SCreateAcctMsg);
return pMsg;
}
SDropUserMsg* buildDropUserMsg(SSqlInfo* pInfo, int32_t *msgLen, int64_t id, char* msgBuf, int32_t msgBufLen) {
SToken* pName = taosArrayGet(pInfo->pMiscInfo->a, 0);
if (pName->n >= TSDB_USER_LEN) {
......
......@@ -4228,6 +4228,42 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, int64_t id, void** output, in
break;
}
case TSDB_SQL_CREATE_ACCT:
case TSDB_SQL_ALTER_ACCT: {
const char* msg1 = "invalid state option, available options[no, r, w, all]";
const char* msg2 = "invalid user/account name";
const char* msg3 = "name too long";
SToken* pName = &pInfo->pMiscInfo->user.user;
SToken* pPwd = &pInfo->pMiscInfo->user.passwd;
if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (pName->n >= TSDB_USER_LEN) {
return buildInvalidOperationMsg(pMsgBuf, msg3);
}
if (parserValidateNameToken(pName) != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg2);
}
SCreateAcctInfo* pAcctOpt = &pInfo->pMiscInfo->acctOpt;
if (pAcctOpt->stat.n > 0) {
if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
} else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
} else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
} else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
} else {
return buildInvalidOperationMsg(pMsgBuf, msg1);
}
}
*output = buildAcctManipulationMsg(pInfo, outputLen, id, msgBuf, msgBufLen);
break;
}
case TSDB_SQL_DROP_ACCT:
case TSDB_SQL_DROP_USER: {
*output = buildDropUserMsg(pInfo, outputLen, id, msgBuf, msgBufLen);
......
......@@ -17,6 +17,9 @@
#include <iostream>
#include "stub.h"
#include "addr_any.h"
namespace {
void generateTestT1(MockCatalogService* mcs) {
......@@ -38,16 +41,36 @@ void generateTestST1(MockCatalogService* mcs) {
}
int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) {
int32_t __catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) {
return mockCatalogService->catalogGetHandle(clusterId, catalogHandle);
}
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
int32_t __catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
return mockCatalogService->catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, pTableMeta);
}
void initMetaDataEnv() {
mockCatalogService.reset(new MockCatalogService());
static Stub stub;
stub.set(catalogGetHandle, __catalogGetHandle);
stub.set(catalogGetTableMeta, __catalogGetTableMeta);
{
AddrAny any("libcatalog.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^catalogGetHandle$", result);
for (const auto& f : result) {
stub.set(f.second, __catalogGetHandle);
}
}
{
AddrAny any("libcatalog.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^catalogGetTableMeta$", result);
for (const auto& f : result) {
stub.set(f.second, __catalogGetTableMeta);
}
}
}
void generateMetaData() {
......
......@@ -23,7 +23,7 @@ void generateMetaData();
void destroyMetaDataEnv();
// mock
int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle);
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta);
// int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle);
// int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta);
#endif // MOCK_CATALOG_H
......@@ -28,7 +28,7 @@
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
class ParserEnv : public testing::Environment {
class PlannerEnv : public testing::Environment {
public:
virtual void SetUp() {
initMetaDataEnv();
......@@ -39,12 +39,12 @@ public:
destroyMetaDataEnv();
}
ParserEnv() {}
virtual ~ParserEnv() {}
PlannerEnv() {}
virtual ~PlannerEnv() {}
};
int main(int argc, char* argv[]) {
testing::AddGlobalTestEnvironment(new ParserEnv());
testing::AddGlobalTestEnvironment(new PlannerEnv());
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
......
CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20)
PROJECT(TDengine)
INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(tsdb ${SRC})
TARGET_LINK_LIBRARIES(tsdb tfs common tutil)
IF (TD_TSDB_PLUGINS)
TARGET_LINK_LIBRARIES(tsdb tsdbPlugins)
ENDIF ()
IF (TD_LINUX)
# Someone has no gtest directory, so comment it
# ADD_SUBDIRECTORY(tests)
ENDIF ()
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_BUFFER_H_
#define _TD_TSDB_BUFFER_H_
typedef struct {
int64_t blockId;
int offset;
int remain;
char data[];
} STsdbBufBlock;
typedef struct {
pthread_cond_t poolNotEmpty;
int bufBlockSize;
int tBufBlocks;
int nBufBlocks;
int nRecycleBlocks;
int nElasticBlocks;
int64_t index;
SList* bufBlockList;
} STsdbBufPool;
#define TSDB_BUFFER_RESERVE 1024 // Reseve 1K as commit threshold
STsdbBufPool* tsdbNewBufPool();
void tsdbFreeBufPool(STsdbBufPool* pBufPool);
int tsdbOpenBufPool(STsdbRepo* pRepo);
void tsdbCloseBufPool(STsdbRepo* pRepo);
SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo);
int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks);
void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode, bool bELastic);
// health cite
STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize);
void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock);
#endif /* _TD_TSDB_BUFFER_H_ */
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_LOG_H_
#define _TD_TSDB_LOG_H_
extern int32_t tsdbDebugFlag;
#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", 255, __VA_ARGS__); }} while(0)
#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", 255, __VA_ARGS__); }} while(0)
#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TDB WARN ", 255, __VA_ARGS__); }} while(0)
#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TDB ", 255, __VA_ARGS__); }} while(0)
#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0)
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0)
#endif /* _TD_TSDB_LOG_H_ */
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_META_H_
#define _TD_TSDB_META_H_
#define TSDB_MAX_TABLE_SCHEMAS 16
typedef struct STable {
STableId tableId;
ETableType type;
tstr* name; // NOTE: there a flexible string here
uint64_t suid;
struct STable* pSuper; // super table pointer
SArray* schema;
STSchema* tagSchema;
SKVRow tagVal;
SSkipList* pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index
void* eventHandler; // TODO
void* streamHandler; // TODO
TSKEY lastKey;
SMemRow lastRow;
char* sql;
void* cqhandle;
SRWLatch latch; // TODO: implementa latch functions
SDataCol *lastCols;
int16_t maxColNum;
int16_t restoreColumnNum;
bool hasRestoreLastColumn;
int lastColSVersion;
T_REF_DECLARE()
} STable;
typedef struct {
pthread_rwlock_t rwLock;
int32_t nTables;
int32_t maxTables;
STable** tables;
SList* superList;
SHashObj* uidMap;
int maxRowBytes;
int maxCols;
} STsdbMeta;
#define TSDB_INIT_NTABLES 1024
#define TABLE_TYPE(t) (t)->type
#define TABLE_NAME(t) (t)->name
#define TABLE_CHAR_NAME(t) TABLE_NAME(t)->data
#define TABLE_UID(t) (t)->tableId.uid
#define TABLE_TID(t) (t)->tableId.tid
#define TABLE_SUID(t) (t)->suid
// #define TSDB_META_FILE_MAGIC(m) KVSTORE_MAGIC((m)->pStore)
#define TSDB_RLOCK_TABLE(t) taosRLockLatch(&((t)->latch))
#define TSDB_RUNLOCK_TABLE(t) taosRUnLockLatch(&((t)->latch))
#define TSDB_WLOCK_TABLE(t) taosWLockLatch(&((t)->latch))
#define TSDB_WUNLOCK_TABLE(t) taosWUnLockLatch(&((t)->latch))
STsdbMeta* tsdbNewMeta(STsdbCfg* pCfg);
void tsdbFreeMeta(STsdbMeta* pMeta);
int tsdbOpenMeta(STsdbRepo* pRepo);
int tsdbCloseMeta(STsdbRepo* pRepo);
STable* tsdbGetTableByUid(STsdbMeta* pMeta, uint64_t uid);
STSchema* tsdbGetTableSchemaByVersion(STable* pTable, int16_t _version);
int tsdbWLockRepoMeta(STsdbRepo* pRepo);
int tsdbRLockRepoMeta(STsdbRepo* pRepo);
int tsdbUnlockRepoMeta(STsdbRepo* pRepo);
void tsdbRefTable(STable* pTable);
void tsdbUnRefTable(STable* pTable);
void tsdbUpdateTableSchema(STsdbRepo* pRepo, STable* pTable, STSchema* pSchema, bool insertAct);
int tsdbRestoreTable(STsdbRepo* pRepo, void* cont, int contLen);
void tsdbOrgMeta(STsdbRepo* pRepo);
int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema);
int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId);
int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema);
STSchema* tsdbGetTableLatestSchema(STable *pTable);
void tsdbFreeLastColumns(STable* pTable);
static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) {
if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) {
return -1;
} else if (*(int16_t *)key1 > schemaVersion(*(STSchema **)key2)) {
return 1;
} else {
return 0;
}
}
static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, bool copy, int16_t _version) {
STable* pDTable = (pTable->pSuper != NULL) ? pTable->pSuper : pTable; // for performance purpose
STSchema* pSchema = NULL;
STSchema* pTSchema = NULL;
if (lock) TSDB_RLOCK_TABLE(pDTable);
if (_version < 0) { // get the latest version of schema
pTSchema = *(STSchema **)taosArrayGetLast(pDTable->schema);
} else { // get the schema with version
void* ptr = taosArraySearch(pDTable->schema, &_version, tsdbCompareSchemaVersion, TD_EQ);
if (ptr == NULL) {
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
goto _exit;
}
pTSchema = *(STSchema**)ptr;
}
ASSERT(pTSchema != NULL);
if (copy) {
if ((pSchema = tdDupSchema(pTSchema)) == NULL) terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
} else {
pSchema = pTSchema;
}
_exit:
if (lock) TSDB_RUNLOCK_TABLE(pDTable);
return pSchema;
}
static FORCE_INLINE STSchema* tsdbGetTableSchema(STable* pTable) {
return tsdbGetTableSchemaImpl(pTable, false, false, -1);
}
static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) {
if (pTable->type == TSDB_CHILD_TABLE) { // check child table first
STable *pSuper = pTable->pSuper;
if (pSuper == NULL) return NULL;
return pSuper->tagSchema;
} else if (pTable->type == TSDB_SUPER_TABLE) {
return pTable->tagSchema;
} else {
return NULL;
}
}
static FORCE_INLINE TSKEY tsdbGetTableLastKeyImpl(STable* pTable) {
ASSERT((pTable->lastRow == NULL) || (pTable->lastKey == memRowKey(pTable->lastRow)));
return pTable->lastKey;
}
#endif /* _TD_TSDB_META_H_ */
\ No newline at end of file
此差异已折叠。
此差异已折叠。
此差异已折叠。
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
add_executable(tsdbTests ${SOURCE_LIST})
target_link_libraries(tsdbTests gtest gtest_main pthread common tsdb tutil trpc)
add_test(NAME unit COMMAND ${CMAKE_CURRENT_BINARY_DIR}/tsdbTests)
\ No newline at end of file
#include <gtest/gtest.h>
#include <stdlib.h>
#include <sys/time.h>
#include "tsdb.h"
#include "tsdbMain.h"
static double getCurTime() {
struct timeval tv;
gettimeofday(&tv, NULL);
return tv.tv_sec + tv.tv_usec * 1E-6;
}
typedef struct {
STsdbRepo *pRepo;
bool isAscend;
int tid;
uint64_t uid;
int sversion;
TSKEY startTime;
TSKEY interval;
int totalRows;
int rowsPerSubmit;
STSchema * pSchema;
} SInsertInfo;
static int insertData(SInsertInfo *pInfo) {
SSubmitMsg *pMsg =
(SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + dataRowMaxBytesFromSchema(pInfo->pSchema) * pInfo->rowsPerSubmit);
if (pMsg == NULL) return -1;
TSKEY start_time = pInfo->startTime;
// Loop to write data
double stime = getCurTime();
for (int k = 0; k < pInfo->totalRows/pInfo->rowsPerSubmit; k++) {
memset((void *)pMsg, 0, sizeof(SSubmitMsg));
SSubmitBlk *pBlock = (SSubmitBlk *)pMsg->blocks;
pBlock->uid = pInfo->uid;
pBlock->tid = pInfo->tid;
pBlock->sversion = pInfo->sversion;
pBlock->dataLen = 0;
pBlock->schemaLen = 0;
pBlock->numOfRows = 0;
for (int i = 0; i < pInfo->rowsPerSubmit; i++) {
// start_time += 1000;
if (pInfo->isAscend) {
start_time += pInfo->interval;
} else {
start_time -= pInfo->interval;
}
SDataRow row = (SDataRow)(pBlock->data + pBlock->dataLen);
tdInitDataRow(row, pInfo->pSchema);
for (int j = 0; j < schemaNCols(pInfo->pSchema); j++) {
STColumn *pTCol = schemaColAt(pInfo->pSchema, j);
if (j == 0) { // Just for timestamp
tdAppendColVal(row, (void *)(&start_time), pTCol->type, pTCol->offset);
} else { // For int
int val = 10;
tdAppendColVal(row, (void *)(&val), pTCol->type, pTCol->offset);
}
}
pBlock->dataLen += dataRowLen(row);
pBlock->numOfRows++;
}
pMsg->length = sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + pBlock->dataLen;
pMsg->numOfBlocks = 1;
pBlock->dataLen = htonl(pBlock->dataLen);
pBlock->numOfRows = htonl(pBlock->numOfRows);
pBlock->schemaLen = htonl(pBlock->schemaLen);
pBlock->uid = htobe64(pBlock->uid);
pBlock->tid = htonl(pBlock->tid);
pBlock->sversion = htonl(pBlock->sversion);
pBlock->padding = htonl(pBlock->padding);
pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
if (tsdbInsertData(pInfo->pRepo, pMsg, NULL) < 0) {
tfree(pMsg);
return -1;
}
}
double etime = getCurTime();
printf("Spent %f seconds to write %d records\n", etime - stime, pInfo->totalRows);
tfree(pMsg);
return 0;
}
static void tsdbSetCfg(STsdbCfg *pCfg, int32_t tsdbId, int32_t cacheBlockSize, int32_t totalBlocks, int32_t maxTables,
int32_t daysPerFile, int32_t keep, int32_t minRows, int32_t maxRows, int8_t precision,
int8_t compression) {
pCfg->tsdbId = tsdbId;
pCfg->cacheBlockSize = cacheBlockSize;
pCfg->totalBlocks = totalBlocks;
// pCfg->maxTables = maxTables;
pCfg->daysPerFile = daysPerFile;
pCfg->keep = keep;
pCfg->minRowsPerFileBlock = minRows;
pCfg->maxRowsPerFileBlock = maxRows;
pCfg->precision = precision;
pCfg->compression = compression;
}
static void tsdbSetTableCfg(STableCfg *pCfg) {
STSchemaBuilder schemaBuilder = {0};
pCfg->type = TSDB_NORMAL_TABLE;
pCfg->superUid = TSDB_INVALID_SUPER_TABLE_ID;
pCfg->tableId.tid = 1;
pCfg->tableId.uid = 5849583783847394;
tdInitTSchemaBuilder(&schemaBuilder, 0);
int colId = 0;
for (int i = 0; i < 5; i++) {
tdAddColToSchema(&schemaBuilder, (colId == 0) ? TSDB_DATA_TYPE_TIMESTAMP : TSDB_DATA_TYPE_INT, colId, 0);
colId++;
}
pCfg->schema = tdGetSchemaFromBuilder(&schemaBuilder);
pCfg->name = strdup("t1");
tdDestroyTSchemaBuilder(&schemaBuilder);
}
TEST(TsdbTest, testInsertSpeed) {
int vnode = 1;
int ret = 0;
STsdbCfg tsdbCfg;
STableCfg tableCfg;
std::string testDir = "./test";
char * rootDir = strdup((testDir + "/vnode" + std::to_string(vnode)).c_str());
tsdbDebugFlag = 131; //NOTE: you must set the flag
taosRemoveDir(rootDir);
// Create and open repository
tsdbSetCfg(&tsdbCfg, 1, 16, 4, -1, -1, -1, -1, -1, -1, -1);
tsdbCreateRepo(rootDir, &tsdbCfg);
STsdbRepo *repo = tsdbOpenRepo(rootDir, NULL);
ASSERT_NE(repo, nullptr);
// Create table
tsdbSetTableCfg(&tableCfg);
tsdbCreateTable(repo, &tableCfg);
// Insert data
SInsertInfo iInfo = {repo, true, 1, 5849583783847394, 0, 1590000000000, 10, 10000000, 100, tableCfg.schema};
insertData(&iInfo);
tsdbCloseRepo(repo, 1);
}
static char *getTKey(const void *data) {
return (char *)data;
}
\ No newline at end of file
此差异已折叠。
......@@ -41,7 +41,7 @@ function dohavecore(){
else
cd ../../
if [[ $1 == 1 ]];then
tar -zcPf $corepath'taos_'`date "+%Y_%m_%d_%H_%M_%S"`.tar.gz debug/build/bin/taosd debug/build/bin/tsim debug/build/lib/libtaos*so*
#tar -zcPf $corepath'taos_'`date "+%Y_%m_%d_%H_%M_%S"`.tar.gz debug
cp -r sim ~/sim_`date "+%Y_%m_%d_%H:%M:%S" `
fi
fi
......@@ -67,8 +67,8 @@ function runSimCaseOneByOne {
else
echo -n $case
./test.sh -f $case > /dev/null 2>&1 && \
( grep -q 'script.*'$case'.*failed.*, err.*lineNum' ../../sim/tsim/log/taoslog0.0 && echo -e "${RED} failed${NC}" | tee -a out.log || echo -e "${GREEN} success${NC}" | tee -a out.log )|| \
( grep -q 'script.*success.*m$' ../../sim/tsim/log/taoslog0.0 && echo -e "${GREEN} success${NC}" | tee -a out.log ) || \
( grep -q 'script.*'$case'.*failed.*, err.*lineNum' ../sim/tsim/log/taoslog0.0 && echo -e "${RED} failed${NC}" | tee -a out.log || echo -e "${GREEN} success${NC}" | tee -a out.log )|| \
( grep -q 'script.*success.*m$' ../sim/tsim/log/taoslog0.0 && echo -e "${GREEN} success${NC}" | tee -a out.log ) || \
echo -e "${RED} failed${NC}" | tee -a out.log
fi
out_log=`tail -1 out.log `
......@@ -99,11 +99,12 @@ function runSimCaseOneByOnefq {
( grep -q 'script.*success.*m$' ../../../sim/tsim/log/taoslog0.0 && echo -e "${GREEN} success${NC}" | tee -a out.log ) || \
( echo -e "${RED} failed${NC}" | tee -a out.log && echo '=====================log=====================' && cat case.log )
else
pwd
echo -n $case
./test.sh -f $case > ../../sim/case.log 2>&1 && \
( grep -q 'script.*'$case'.*failed.*, err.*lineNum' ../../sim/tsim/log/taoslog0.0 && echo -e "${RED} failed${NC}" | tee -a out.log || echo -e "${GREEN} success${NC}" | tee -a out.log )|| \
( grep -q 'script.*success.*m$' ../../sim/tsim/log/taoslog0.0 && echo -e "${GREEN} success${NC}" | tee -a out.log ) || \
( echo -e "${RED} failed${NC}" | tee -a out.log && echo '=====================log=====================' && cat case.log )
( echo -e "${RED} failed${NC}" | tee -a out.log && echo '=====================log=====================' && pwd && cat ../../sim/case.log )
fi
out_log=`tail -1 out.log `
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册