提交 c8605a7f 编写于 作者: H Hongze Cheng

Merge branch '3.0' into feature/vnode

......@@ -889,6 +889,7 @@ typedef struct {
typedef struct {
char db[TSDB_DB_FNAME_LEN];
int64_t uid;
int32_t vgVersion;
int32_t vgNum;
int8_t hashMethod;
......@@ -934,13 +935,13 @@ typedef struct {
typedef struct {
int32_t dnodeId;
} SMCreateMnodeMsg, SMDropMnodeMsg, SDDropMnodeMsg;
} SMCreateMnodeReq, SMDropMnodeReq, SDDropMnodeReq;
typedef struct {
int32_t dnodeId;
int8_t replica;
SReplica replicas[TSDB_MAX_REPLICA];
} SDCreateMnodeMsg, SDAlterMnodeMsg;
} SDCreateMnodeReq, SDAlterMnodeReq;
typedef struct {
int32_t dnodeId;
......@@ -1014,7 +1015,7 @@ typedef struct {
char encrypt;
char secret[TSDB_PASSWORD_LEN];
char ckey[TSDB_PASSWORD_LEN];
} SAuthMsg, SAuthRsp;
} SAuthReq, SAuthRsp;
typedef struct {
int8_t finished;
......
......@@ -48,8 +48,22 @@ typedef struct SMetaData {
typedef struct SCatalogCfg {
uint32_t maxTblCacheNum;
uint32_t maxDBCacheNum;
uint32_t dbRentSec;
uint32_t stableRentSec;
} SCatalogCfg;
typedef struct SSTableMetaVersion {
uint64_t suid;
int16_t sversion;
int16_t tversion;
} SSTableMetaVersion;
typedef struct SDbVgVersion {
int64_t dbId;
int32_t vgVersion;
} SDbVgVersion;
int32_t catalogInit(SCatalogCfg *cfg);
/**
......@@ -60,19 +74,27 @@ int32_t catalogInit(SCatalogCfg *cfg);
*/
int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle);
/**
* Free a cluster's all catalog info, usually it's not necessary, until the application is closing.
* no current or future usage should be guaranteed by application
* @param pCatalog (input, NO more usage)
* @return error code
*/
void catalogFreeHandle(struct SCatalog* pCatalog);
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version);
/**
* Get a DB's all vgroup info.
* @param pCatalog (input, got with catalogGetHandle)
* @param pRpc (input, rpc object)
* @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pDBName (input, full db name)
* @param forceUpdate (input, force update db vgroup info from mnode)
* @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller)
* @return error code
*/
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, bool forceUpdate, SArray** pVgroupList);
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, bool forceUpdate, SArray** pVgroupList);
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo);
......@@ -87,15 +109,28 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
*/
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
/**
* Get a super table's meta data.
* @param pCatalog (input, got with catalogGetHandle)
* @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pTableName (input, table name, NOT including db name)
* @param pTableMeta(output, table meta data, NEED to free it by calller)
* @return error code
*/
int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
/**
* Force renew a table's local cached meta data.
* @param pCatalog (input, got with catalogGetHandle)
* @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pTableName (input, table name, NOT including db name)
* @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure)
* @return error code
*/
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName);
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable);
/**
* Force renew a table's local cached meta data and get the new one.
......@@ -104,21 +139,23 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, co
* @param pMgmtEps (input, mnode EPs)
* @param pTableName (input, table name, NOT including db name)
* @param pTableMeta(output, table meta data, NEED to free it by calller)
* @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure)
* @return error code
*/
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable);
/**
* Get a table's actual vgroup, for stable it's all possible vgroup list.
* @param pCatalog (input, got with catalogGetHandle)
* @param pRpc (input, rpc object)
* @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pTableName (input, table name, NOT including db name)
* @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller)
* @return error code
*/
int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList);
int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList);
/**
* Get a table's vgroup from its name's hash value.
......@@ -135,17 +172,20 @@ int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void * pTransporter
/**
* Get all meta data required in pReq.
* @param pCatalog (input, got with catalogGetHandle)
* @param pRpc (input, rpc object)
* @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pReq (input, reqest info)
* @param pRsp (output, response data)
* @return error code
*/
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp);
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp);
int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, SArray* pQnodeList);
int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList);
int32_t catalogGetExpiredSTables(struct SCatalog* pCatalog, SSTableMetaVersion **stables, uint32_t *num);
int32_t catalogGetExpiredDBs(struct SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num);
/**
......
......@@ -24,7 +24,6 @@ extern "C" {
typedef struct SParseContext {
SParseBasicCtx ctx;
int8_t schemaAttached; // denote if submit block is built with table schema or not
const char *pSql; // sql string
size_t sqlLen; // length of the sql string
char *pMsg; // extended error message if exists to help identifying the problem in sql statement.
......@@ -41,8 +40,17 @@ typedef struct SParseContext {
*/
int32_t qParseQuerySql(SParseContext* pContext, SQueryNode** pQuery);
bool qIsDdlQuery(const SQueryNode* pQuery);
/**
* Return true if it is a ddl/dcl sql statement
* @param pQuery
* @return
*/
bool qIsDdlQuery(const SQueryNode* pQueryNode);
/**
* Destroy logic query plan
* @param pQueryNode
*/
void qDestroyQuery(SQueryNode* pQueryNode);
/**
......@@ -62,8 +70,8 @@ void columnListDestroy(SArray* pColumnList);
void dropAllExprInfo(SArray** pExprInfo, int32_t numOfLevel);
typedef struct SSourceParam {
SArray *pExprNodeList; //Array<struct tExprNode*>
SArray *pColumnList; //Array<struct SColumn>
SArray *pExprNodeList; //Array<struct tExprNode*>
SArray *pColumnList; //Array<struct SColumn>
int32_t num;
} SSourceParam;
......
......@@ -50,8 +50,10 @@ struct SQueryStmtInfo;
typedef SSchema SSlotSchema;
typedef struct SDataBlockSchema {
SSlotSchema *pSchema;
int32_t numOfCols; // number of columns
SSlotSchema *pSchema;
int32_t numOfCols; // number of columns
int32_t resultRowSize;
int16_t precision;
} SDataBlockSchema;
typedef struct SQueryNodeBasicInfo {
......@@ -61,6 +63,7 @@ typedef struct SQueryNodeBasicInfo {
typedef struct SDataSink {
SQueryNodeBasicInfo info;
SDataBlockSchema schema;
} SDataSink;
typedef struct SDataDispatcher {
......@@ -139,9 +142,13 @@ typedef struct SQueryDag {
struct SQueryNode;
/**
* Create the physical plan for the query, according to the AST.
*/
/**
* Create the physical plan for the query, according to the AST.
* @param pQueryInfo
* @param pDag
* @param requestId
* @return
*/
int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag** pDag, uint64_t requestId);
// Set datasource of this subplan, multiple calls may be made to a subplan.
......
......@@ -76,6 +76,7 @@ typedef struct STableMeta {
typedef struct SDBVgroupInfo {
SRWLatch lock;
int64_t dbId;
int32_t vgVersion;
int8_t hashMethod;
SHashObj *vgInfo; //key:vgId, value:SVgroupInfo
......@@ -86,8 +87,15 @@ typedef struct SUseDbOutput {
SDBVgroupInfo dbVgroup;
} SUseDbOutput;
enum {
META_TYPE_NON_TABLE = 1,
META_TYPE_CTABLE,
META_TYPE_TABLE,
META_TYPE_BOTH_TABLE,
};
typedef struct STableMetaOutput {
int32_t metaNum;
int32_t metaType;
char ctbFname[TSDB_TABLE_FNAME_LEN];
char tbFname[TSDB_TABLE_FNAME_LEN];
SCTableMeta ctbMeta;
......@@ -149,6 +157,11 @@ void initQueryModuleMsgHandle();
extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen);
extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char *msg, int32_t msgSize);
#define SET_META_TYPE_NONE(t) (t) = META_TYPE_NON_TABLE
#define SET_META_TYPE_CTABLE(t) (t) = META_TYPE_CTABLE
#define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE
#define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE
#define qFatal(...) do { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qError(...) do { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qWarn(...) do { if (qDebugFlag & DEBUG_WARN) { taosPrintLog("QRY WARN ", qDebugFlag, __VA_ARGS__); }} while(0)
......
......@@ -75,6 +75,12 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void
*/
int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob);
/**
* Fetch query result from the remote query executor
* @param pJob
* @param data
* @return
*/
int32_t scheduleFetchRows(void *pJob, void **data);
......@@ -85,6 +91,10 @@ int32_t scheduleFetchRows(void *pJob, void **data);
*/
int32_t scheduleCancelJob(void *pJob);
/**
* Free the query job
* @param pJob
*/
void scheduleFreeJob(void *pJob);
void schedulerDestroy(void);
......
......@@ -259,28 +259,24 @@ int32_t* taosGetErrno();
#define TSDB_CODE_DND_DNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0411)
#define TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0420)
#define TSDB_CODE_DND_MNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0421)
#define TSDB_CODE_DND_MNODE_ID_INVALID TAOS_DEF_ERROR_CODE(0, 0x0422)
#define TSDB_CODE_DND_MNODE_ID_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0423)
#define TSDB_CODE_DND_MNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0424)
#define TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0425)
#define TSDB_CODE_DND_MNODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x0422)
#define TSDB_CODE_DND_MNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0423)
#define TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0424)
#define TSDB_CODE_DND_QNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0430)
#define TSDB_CODE_DND_QNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0431)
#define TSDB_CODE_DND_QNODE_ID_INVALID TAOS_DEF_ERROR_CODE(0, 0x0432)
#define TSDB_CODE_DND_QNODE_ID_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0433)
#define TSDB_CODE_DND_QNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0434)
#define TSDB_CODE_DND_QNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0435)
#define TSDB_CODE_DND_QNODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x0432)
#define TSDB_CODE_DND_QNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0433)
#define TSDB_CODE_DND_QNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0434)
#define TSDB_CODE_DND_SNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0440)
#define TSDB_CODE_DND_SNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0441)
#define TSDB_CODE_DND_SNODE_ID_INVALID TAOS_DEF_ERROR_CODE(0, 0x0442)
#define TSDB_CODE_DND_SNODE_ID_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0443)
#define TSDB_CODE_DND_SNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0444)
#define TSDB_CODE_DND_SNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0445)
#define TSDB_CODE_DND_SNODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x0442)
#define TSDB_CODE_DND_SNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0443)
#define TSDB_CODE_DND_SNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0444)
#define TSDB_CODE_DND_BNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0450)
#define TSDB_CODE_DND_BNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0451)
#define TSDB_CODE_DND_BNODE_ID_INVALID TAOS_DEF_ERROR_CODE(0, 0x0452)
#define TSDB_CODE_DND_BNODE_ID_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0453)
#define TSDB_CODE_DND_BNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0454)
#define TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0455)
#define TSDB_CODE_DND_BNODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x0452)
#define TSDB_CODE_DND_BNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0453)
#define TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0454)
#define TSDB_CODE_DND_VNODE_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0460)
#define TSDB_CODE_DND_VNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0461)
#define TSDB_CODE_DND_VNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0462)
......
......@@ -124,6 +124,9 @@ int32_t taosHashGetSize(const SHashObj *pHashObj);
*/
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size);
int32_t taosHashPutExt(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size, bool *newAdded);
/**
* return the payload data with the specified key
*
......
......@@ -44,7 +44,6 @@ extern int32_t tsdbDebugFlag;
extern int32_t tqDebugFlag;
extern int32_t cqDebugFlag;
extern int32_t debugFlag;
extern int32_t ctgDebugFlag;
#define DEBUG_FATAL 1U
#define DEBUG_ERROR DEBUG_FATAL
......
......@@ -62,6 +62,7 @@ typedef struct SAppInstInfo {
SList *pConnList; // STscObj linked list
int64_t clusterId;
void *pTransporter;
SHeartBeatInfo hb;
} SAppInstInfo;
typedef struct SAppInfo {
......@@ -70,7 +71,7 @@ typedef struct SAppInfo {
char *ep;
int32_t pid;
int32_t numOfThreads;
SHeartBeatInfo hb;
SHashObj *pInstMap;
} SAppInfo;
......
......@@ -181,7 +181,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
if (pDcl->msgType == TDMT_VND_SHOW_TABLES) {
SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
if (pShowReqInfo->pArray == NULL) {
pShowReqInfo->currentIndex = 0;
pShowReqInfo->currentIndex = 0; // set the first vnode/ then iterate the next vnode
pShowReqInfo->pArray = pDcl->pExtension;
}
}
......@@ -291,10 +291,10 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
nPrintTsc("%s", sql)
SRequestObj* pRequest = NULL;
SQueryNode* pQuery = NULL;
SQueryDag* pDag = NULL;
void* pJob = NULL;
SRequestObj *pRequest = NULL;
SQueryNode *pQuery = NULL;
SQueryDag *pDag = NULL;
void *pJob = NULL;
terrno = TSDB_CODE_SUCCESS;
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
......
#include "os.h"
#include "tref.h"
#include "trpc.h"
#include "clientInt.h"
#include "clientLog.h"
#include "query.h"
#include "tmsg.h"
#include "tglobal.h"
#include "tref.h"
#include "trpc.h"
#include "catalog.h"
#define TSC_VAR_NOT_RELEASE 1
#define TSC_VAR_RELEASED 0
......@@ -46,6 +47,7 @@ void taos_cleanup(void) {
taosCloseRef(id);
rpcCleanup();
catalogDestroy();
taosCloseLog();
tscInfo("all local resources released");
......
......@@ -48,7 +48,7 @@ int main(int argc, char** argv) {
}
TEST(testCase, driverInit_Test) { taos_init(); }
#if 0
TEST(testCase, connect_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
......@@ -485,6 +485,22 @@ TEST(testCase, drop_stable_Test) {
taos_close(pConn);
}
TEST(testCase, generated_request_id_test) {
SHashObj* phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
for (int32_t i = 0; i < 50000; ++i) {
uint64_t v = generateRequestId();
void* result = taosHashGet(phash, &v, sizeof(v));
if (result != nullptr) {
printf("0x%lx, index:%d\n", v, i);
}
assert(result == nullptr);
taosHashPut(phash, &v, sizeof(v), NULL, 0);
}
taosHashCleanup(phash);
}
// TEST(testCase, create_topic_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
......@@ -518,46 +534,55 @@ TEST(testCase, drop_stable_Test) {
// tmq_create_topic(pConn, "test_topic_1", sql, strlen(sql));
// taos_close(pConn);
//}
TEST(testCase, generated_request_id_test) {
SHashObj* phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
for (int32_t i = 0; i < 50000; ++i) {
uint64_t v = generateRequestId();
void* result = taosHashGet(phash, &v, sizeof(v));
if (result != nullptr) {
printf("0x%lx, index:%d\n", v, i);
}
assert(result == nullptr);
taosHashPut(phash, &v, sizeof(v), NULL, 0);
}
taosHashCleanup(phash);
}
// TEST(testCase, projection_query_tables) {
//TEST(testCase, insert_test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_EQ(pConn, nullptr);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "select * from t_2");
// pRes = taos_query(pConn, "insert into t_2 values(now, 1)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// ASSERT_TRUE(false);
// }
//
// TAOS_ROW pRow = NULL;
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0};
// while((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
#endif
TEST(testCase, projection_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "use test1");
if (taos_errno(pRes) != 0) {
printf("failed to use db, reason:%s", taos_errstr(pRes));
taos_free_result(pRes);
return;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "select * from tm0");
if (taos_errno(pRes) != 0) {
printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
ASSERT_TRUE(false);
}
TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0};
while ((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str);
}
taos_free_result(pRes);
taos_close(pConn);
}
......@@ -927,16 +927,6 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosAddConfigOption(cfg);
cfg.option = "ctgDebugFlag";
cfg.ptr = &ctgDebugFlag;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 255;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosAddConfigOption(cfg);
cfg.option = "enableRecordSql";
cfg.ptr = &tsTscEnableRecordSql;
cfg.valType = TAOS_CFG_VTYPE_INT8;
......
......@@ -268,7 +268,7 @@ int32_t dndProcessCreateBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
pMsg->dnodeId = htonl(pMsg->dnodeId);
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_BNODE_ID_INVALID;
terrno = TSDB_CODE_DND_BNODE_INVALID_OPTION;
dError("failed to create bnode since %s", terrstr());
return -1;
} else {
......@@ -281,7 +281,7 @@ int32_t dndProcessDropBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
pMsg->dnodeId = htonl(pMsg->dnodeId);
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_BNODE_ID_INVALID;
terrno = TSDB_CODE_DND_BNODE_INVALID_OPTION;
dError("failed to drop bnode since %s", terrstr());
return -1;
} else {
......
......@@ -305,25 +305,24 @@ static void dndBuildMnodeOpenOption(SDnode *pDnode, SMnodeOpt *pOption) {
memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
}
static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeMsg *pMsg) {
static int32_t dndBuildMnodeOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) {
dndInitMnodeOption(pDnode, pOption);
pOption->dnodeId = dndGetDnodeId(pDnode);
pOption->clusterId = dndGetClusterId(pDnode);
pOption->replica = pMsg->replica;
pOption->replica = pCreate->replica;
pOption->selfIndex = -1;
for (int32_t i = 0; i < pMsg->replica; ++i) {
for (int32_t i = 0; i < pCreate->replica; ++i) {
SReplica *pReplica = &pOption->replicas[i];
pReplica->id = pMsg->replicas[i].id;
pReplica->port = pMsg->replicas[i].port;
memcpy(pReplica->fqdn, pMsg->replicas[i].fqdn, TSDB_FQDN_LEN);
pReplica->id = pCreate->replicas[i].id;
pReplica->port = pCreate->replicas[i].port;
memcpy(pReplica->fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN);
if (pReplica->id == pOption->dnodeId) {
pOption->selfIndex = i;
}
}
if (pOption->selfIndex == -1) {
terrno = TSDB_CODE_DND_MNODE_ID_NOT_FOUND;
dError("failed to build mnode options since %s", terrstr());
return -1;
}
......@@ -423,63 +422,97 @@ static int32_t dndDropMnode(SDnode *pDnode) {
return 0;
}
static SDCreateMnodeMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) {
SDCreateMnodeMsg *pMsg = pRpcMsg->pCont;
pMsg->dnodeId = htonl(pMsg->dnodeId);
for (int32_t i = 0; i < pMsg->replica; ++i) {
pMsg->replicas[i].id = htonl(pMsg->replicas[i].id);
pMsg->replicas[i].port = htons(pMsg->replicas[i].port);
static SDCreateMnodeReq *dndParseCreateMnodeReq(SRpcMsg *pReq) {
SDCreateMnodeReq *pCreate = pReq->pCont;
pCreate->dnodeId = htonl(pCreate->dnodeId);
for (int32_t i = 0; i < pCreate->replica; ++i) {
pCreate->replicas[i].id = htonl(pCreate->replicas[i].id);
pCreate->replicas[i].port = htons(pCreate->replicas[i].port);
}
return pMsg;
return pCreate;
}
int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
SDCreateMnodeMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg);
int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SDCreateMnodeReq *pCreate = dndParseCreateMnodeReq(pReq);
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_MNODE_ID_INVALID;
if (pCreate->replica <= 1 || pCreate->dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
dError("failed to create mnode since %s", terrstr());
return -1;
} else {
SMnodeOpt option = {0};
if (dndBuildMnodeOptionFromMsg(pDnode, &option, pMsg) != 0) {
return -1;
}
}
return dndOpenMnode(pDnode, &option);
SMnodeOpt option = {0};
if (dndBuildMnodeOptionFromReq(pDnode, &option, pCreate) != 0) {
terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
dError("failed to create mnode since %s", terrstr());
return -1;
}
SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode != NULL) {
dndReleaseMnode(pDnode, pMnode);
terrno = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED;
dError("failed to create mnode since %s", terrstr());
return -1;
}
dDebug("start to create mnode");
return dndOpenMnode(pDnode, &option);
}
int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
SDAlterMnodeMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg);
int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SDAlterMnodeReq *pAlter = dndParseCreateMnodeReq(pReq);
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_MNODE_ID_INVALID;
if (pAlter->dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
dError("failed to alter mnode since %s", terrstr());
return -1;
}
SMnodeOpt option = {0};
if (dndBuildMnodeOptionFromMsg(pDnode, &option, pMsg) != 0) {
if (dndBuildMnodeOptionFromReq(pDnode, &option, pAlter) != 0) {
terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
dError("failed to alter mnode since %s", terrstr());
return -1;
}
if (dndAlterMnode(pDnode, &option) != 0) {
SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode == NULL) {
terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED;
dError("failed to alter mnode since %s", terrstr());
return -1;
}
return dndWriteMnodeFile(pDnode);
dDebug("start to alter mnode");
int32_t code = dndAlterMnode(pDnode, &option);
dndReleaseMnode(pDnode, pMnode);
return code;
}
int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
SDDropMnodeMsg *pMsg = pRpcMsg->pCont;
pMsg->dnodeId = htonl(pMsg->dnodeId);
int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SDDropMnodeReq *pDrop = pReq->pCont;
pDrop->dnodeId = htonl(pDrop->dnodeId);
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_MNODE_ID_INVALID;
if (pDrop->dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
dError("failed to drop mnode since %s", terrstr());
return -1;
}
SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode != NULL) {
terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED;
dError("failed to drop mnode since %s", terrstr());
return -1;
} else {
return dndDropMnode(pDnode);
}
dDebug("start to drop mnode");
int32_t code = dndDropMnode(pDnode);
dndReleaseMnode(pDnode, pMnode);
return code;
}
static void dndProcessMnodeQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
......@@ -506,6 +539,7 @@ static void dndWriteMnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpc
code = TSDB_CODE_OUT_OF_MEMORY;
} else {
code = dndWriteMsgToWorker(pWorker, pMsg, 0);
if (code != 0) code = terrno;
}
if (code != 0) {
......
......@@ -274,7 +274,7 @@ int32_t dndProcessCreateQnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
pMsg->dnodeId = htonl(pMsg->dnodeId);
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_QNODE_ID_INVALID;
terrno = TSDB_CODE_DND_QNODE_INVALID_OPTION;
dError("failed to create qnode since %s", terrstr());
return -1;
} else {
......@@ -287,7 +287,7 @@ int32_t dndProcessDropQnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
pMsg->dnodeId = htonl(pMsg->dnodeId);
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_QNODE_ID_INVALID;
terrno = TSDB_CODE_DND_QNODE_INVALID_OPTION;
dError("failed to drop qnode since %s", terrstr());
return -1;
} else {
......
......@@ -268,7 +268,7 @@ int32_t dndProcessCreateSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
pMsg->dnodeId = htonl(pMsg->dnodeId);
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_SNODE_ID_INVALID;
terrno = TSDB_CODE_DND_SNODE_INVALID_OPTION;
dError("failed to create snode since %s", terrstr());
return -1;
} else {
......@@ -281,7 +281,7 @@ int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
pMsg->dnodeId = htonl(pMsg->dnodeId);
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_SNODE_ID_INVALID;
terrno = TSDB_CODE_DND_SNODE_INVALID_OPTION;
dError("failed to drop snode since %s", terrstr());
return -1;
} else {
......
......@@ -143,26 +143,26 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = dndProcessVnodeFetchMsg;
}
static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
SDnode *pDnode = parent;
STransMgmt *pMgmt = &pDnode->tmgmt;
tmsg_t msgType = pMsg->msgType;
tmsg_t msgType = pRsp->msgType;
if (dndGetStat(pDnode) == DND_STAT_STOPPED) {
if (pMsg == NULL || pMsg->pCont == NULL) return;
dTrace("RPC %p, rsp:%s is ignored since dnode is stopping", pMsg->handle, TMSG_INFO(msgType));
rpcFreeCont(pMsg->pCont);
if (pRsp == NULL || pRsp->pCont == NULL) return;
dTrace("RPC %p, rsp:%s is ignored since dnode is stopping", pRsp->handle, TMSG_INFO(msgType));
rpcFreeCont(pRsp->pCont);
return;
}
DndMsgFp fp = pMgmt->msgFp[TMSG_INDEX(msgType)];
if (fp != NULL) {
dTrace("RPC %p, rsp:%s will be processed, code:0x%x", pMsg->handle, TMSG_INFO(msgType), pMsg->code & 0XFFFF);
(*fp)(pDnode, pMsg, pEpSet);
dTrace("RPC %p, rsp:%s will be processed, code:0x%x", pRsp->handle, TMSG_INFO(msgType), pRsp->code & 0XFFFF);
(*fp)(pDnode, pRsp, pEpSet);
} else {
dError("RPC %p, rsp:%s not processed", pMsg->handle, TMSG_INFO(msgType));
rpcFreeCont(pMsg->pCont);
dError("RPC %p, rsp:%s not processed", pRsp->handle, TMSG_INFO(msgType));
rpcFreeCont(pRsp->pCont);
}
}
......@@ -201,48 +201,48 @@ static void dndCleanupClient(SDnode *pDnode) {
}
}
static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) {
static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
SDnode *pDnode = param;
STransMgmt *pMgmt = &pDnode->tmgmt;
tmsg_t msgType = pMsg->msgType;
tmsg_t msgType = pReq->msgType;
if (msgType == TDMT_DND_NETWORK_TEST) {
dTrace("RPC %p, network test req, app:%p will be processed, code:0x%x", pMsg->handle, pMsg->ahandle, pMsg->code);
dndProcessStartupReq(pDnode, pMsg);
dTrace("RPC %p, network test req, app:%p will be processed, code:0x%x", pReq->handle, pReq->ahandle, pReq->code);
dndProcessStartupReq(pDnode, pReq);
return;
}
if (dndGetStat(pDnode) == DND_STAT_STOPPED) {
dError("RPC %p, req:%s app:%p is ignored since dnode exiting", pMsg->handle, TMSG_INFO(msgType), pMsg->ahandle);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_OFFLINE};
dError("RPC %p, req:%s app:%p is ignored since dnode exiting", pReq->handle, TMSG_INFO(msgType), pReq->ahandle);
SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_DND_OFFLINE};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
rpcFreeCont(pReq->pCont);
return;
} else if (dndGetStat(pDnode) != DND_STAT_RUNNING) {
dError("RPC %p, req:%s app:%p is ignored since dnode not running", pMsg->handle, TMSG_INFO(msgType), pMsg->ahandle);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_APP_NOT_READY};
dError("RPC %p, req:%s app:%p is ignored since dnode not running", pReq->handle, TMSG_INFO(msgType), pReq->ahandle);
SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_APP_NOT_READY};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
rpcFreeCont(pReq->pCont);
return;
}
if (pMsg->pCont == NULL) {
dTrace("RPC %p, req:%s app:%p not processed since content is null", pMsg->handle, TMSG_INFO(msgType),
pMsg->ahandle);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_INVALID_MSG_LEN};
if (pReq->pCont == NULL) {
dTrace("RPC %p, req:%s app:%p not processed since content is null", pReq->handle, TMSG_INFO(msgType),
pReq->ahandle);
SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_DND_INVALID_MSG_LEN};
rpcSendResponse(&rspMsg);
return;
}
DndMsgFp fp = pMgmt->msgFp[TMSG_INDEX(msgType)];
if (fp != NULL) {
dTrace("RPC %p, req:%s app:%p will be processed", pMsg->handle, TMSG_INFO(msgType), pMsg->ahandle);
(*fp)(pDnode, pMsg, pEpSet);
dTrace("RPC %p, req:%s app:%p will be processed", pReq->handle, TMSG_INFO(msgType), pReq->ahandle);
(*fp)(pDnode, pReq, pEpSet);
} else {
dError("RPC %p, req:%s app:%p is not processed since no handle", pMsg->handle, TMSG_INFO(msgType), pMsg->ahandle);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED};
dError("RPC %p, req:%s app:%p is not processed since no handle", pReq->handle, TMSG_INFO(msgType), pReq->ahandle);
SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
rpcFreeCont(pReq->pCont);
}
}
......@@ -254,7 +254,7 @@ static void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pRpcMsg, SRpcMsg *pRp
rpcSendRecv(pMgmt->clientRpc, &epSet, pRpcMsg, pRpcRsp);
}
static int32_t dndAuthInternalMsg(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
static int32_t dndAuthInternalReq(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
if (strcmp(user, INTERNAL_USER) == 0) {
// A simple temporary implementation
char pass[TSDB_PASSWORD_LEN] = {0};
......@@ -281,7 +281,7 @@ static int32_t dndAuthInternalMsg(SDnode *pDnode, char *user, char *spi, char *e
static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
SDnode *pDnode = parent;
if (dndAuthInternalMsg(parent, user, spi, encrypt, secret, ckey) == 0) {
if (dndAuthInternalReq(parent, user, spi, encrypt, secret, ckey) == 0) {
// dTrace("get internal auth success");
return 0;
}
......@@ -298,10 +298,10 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
// dDebug("user:%s, send auth msg to other mnodes", user);
SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg));
tstrncpy(pMsg->user, user, TSDB_USER_LEN);
SAuthReq *pReq = rpcMallocCont(sizeof(SAuthReq));
tstrncpy(pReq->user, user, TSDB_USER_LEN);
SRpcMsg rpcMsg = {.pCont = pMsg, .contLen = sizeof(SAuthMsg), .msgType = TDMT_MND_AUTH};
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = sizeof(SAuthReq), .msgType = TDMT_MND_AUTH};
SRpcMsg rpcRsp = {0};
dndSendMsgToMnodeRecv(pDnode, &rpcMsg, &rpcRsp);
......@@ -381,19 +381,19 @@ void dndCleanupTrans(SDnode *pDnode) {
dInfo("dnode-transport is cleaned up");
}
int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pMsg) {
int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq) {
STransMgmt *pMgmt = &pDnode->tmgmt;
if (pMgmt->clientRpc == NULL) {
terrno = TSDB_CODE_DND_OFFLINE;
return -1;
}
rpcSendRequest(pMgmt->clientRpc, pEpSet, pMsg, NULL);
rpcSendRequest(pMgmt->clientRpc, pEpSet, pReq, NULL);
return 0;
}
int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pMsg) {
int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pReq) {
SEpSet epSet = {0};
dndGetMnodeEpSet(pDnode, &epSet);
return dndSendReqToDnode(pDnode, &epSet, pMsg);
return dndSendReqToDnode(pDnode, &epSet, pReq);
}
......@@ -197,6 +197,10 @@ static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
dndFreeVnodeWriteQueue(pDnode, pVnode);
dndFreeVnodeApplyQueue(pDnode, pVnode);
dndFreeVnodeSyncQueue(pDnode, pVnode);
vnodeClose(pVnode->pImpl);
pVnode->pImpl = NULL;
free(pVnode->path);
free(pVnode->db);
free(pVnode);
......
......@@ -34,7 +34,7 @@ TEST_F(DndTestBnode, 01_Create_Bnode) {
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_BNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_BNODE_ID_INVALID);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_BNODE_INVALID_OPTION);
}
{
......@@ -82,7 +82,7 @@ TEST_F(DndTestBnode, 01_Drop_Bnode) {
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_BNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_BNODE_ID_INVALID);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_BNODE_INVALID_OPTION);
}
{
......
......@@ -13,14 +13,244 @@
class DndTestMnode : public ::testing::Test {
protected:
static void SetUpTestSuite() { test.Init("/tmp/dnode_test_mnode", 9113); }
static void TearDownTestSuite() { test.Cleanup(); }
static void SetUpTestSuite() {
test.Init("/tmp/dnode_test_mnode", 9113);
const char* fqdn = "localhost";
const char* firstEp = "localhost:9113";
static Testbase test;
server2.Start("/tmp/dnode_test_mnode2", fqdn, 9114, firstEp);
}
static void TearDownTestSuite() {
server2.Stop();
test.Cleanup();
}
static Testbase test;
static TestServer server2;
public:
void SetUp() override {}
void TearDown() override {}
};
Testbase DndTestMnode::test;
Testbase DndTestMnode::test;
TestServer DndTestMnode::server2;
TEST_F(DndTestMnode, 01_Create_Dnode) {
int32_t contLen = sizeof(SCreateDnodeReq);
SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen);
strcpy(pReq->fqdn, "localhost");
pReq->port = htonl(9114);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
taosMsleep(1300);
test.SendShowMetaReq(TSDB_MGMT_TABLE_DNODE, "");
test.SendShowRetrieveReq();
EXPECT_EQ(test.GetShowRows(), 2);
}
TEST_F(DndTestMnode, 01_Create_Mnode) {
{
int32_t contLen = sizeof(SDCreateMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
pReq->replica = 1;
pReq->replicas[0].id = htonl(1);
pReq->replicas[0].port = htonl(9113);
strcpy(pReq->replicas[0].fqdn, "localhost");
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_INVALID_OPTION);
}
{
int32_t contLen = sizeof(SDCreateMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
pReq->replica = 1;
pReq->replicas[0].id = htonl(2);
pReq->replicas[0].port = htonl(9113);
strcpy(pReq->replicas[0].fqdn, "localhost");
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_INVALID_OPTION);
}
{
int32_t contLen = sizeof(SDCreateMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
pReq->replica = 2;
pReq->replicas[0].id = htonl(1);
pReq->replicas[0].port = htonl(9113);
strcpy(pReq->replicas[0].fqdn, "localhost");
pReq->replicas[1].id = htonl(1);
pReq->replicas[1].port = htonl(9114);
strcpy(pReq->replicas[1].fqdn, "localhost");
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED);
}
// {
// int32_t contLen = sizeof(SDCreateMnodeReq);
// SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
// pReq->dnodeId = htonl(1);
// pReq->replica = 2;
// pReq->replicas[0].id = htonl(1);
// pReq->replicas[0].port = htonl(9113);
// pReq->replicas[0].id = htonl(1);
// pReq->replicas[0].port = htonl(9113);
// strcpy(pReq->replicas[0].fqdn, "localhost");
// SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
// ASSERT_NE(pRsp, nullptr);
// ASSERT_EQ(pRsp->code, 0);
// }
// {
// int32_t contLen = sizeof(SDCreateMnodeReq);
// SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
// pReq->dnodeId = htonl(1);
// SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
// ASSERT_NE(pRsp, nullptr);
// ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED);
// }
// test.Restart();
// {
// int32_t contLen = sizeof(SDCreateMnodeReq);
// SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
// pReq->dnodeId = htonl(1);
// SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
// ASSERT_NE(pRsp, nullptr);
// ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED);
// }
}
#if 0
TEST_F(DndTestMnode, 02_Alter_Mnode) {
{
int32_t contLen = sizeof(SDCreateMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ID_INVALID);
}
{
int32_t contLen = sizeof(SDCreateMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
{
int32_t contLen = sizeof(SDCreateMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED);
}
test.Restart();
{
int32_t contLen = sizeof(SDCreateMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED);
}
}
TEST_F(DndTestMnode, 03_Drop_Mnode) {
{
int32_t contLen = sizeof(SDDropMnodeReq);
SDDropMnodeReq* pReq = (SDDropMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ID_INVALID);
}
{
int32_t contLen = sizeof(SDDropMnodeReq);
SDDropMnodeReq* pReq = (SDDropMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
{
int32_t contLen = sizeof(SDDropMnodeReq);
SDDropMnodeReq* pReq = (SDDropMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_NOT_DEPLOYED);
}
test.Restart();
{
int32_t contLen = sizeof(SDDropMnodeReq);
SDDropMnodeReq* pReq = (SDDropMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_NOT_DEPLOYED);
}
{
int32_t contLen = sizeof(SDCreateMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
}
#endif
\ No newline at end of file
......@@ -34,7 +34,7 @@ TEST_F(DndTestQnode, 01_Create_Qnode) {
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_QNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_QNODE_ID_INVALID);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_QNODE_INVALID_OPTION);
}
{
......@@ -82,7 +82,7 @@ TEST_F(DndTestQnode, 02_Drop_Qnode) {
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_QNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_QNODE_ID_INVALID);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_QNODE_INVALID_OPTION);
}
{
......
......@@ -34,7 +34,7 @@ TEST_F(DndTestSnode, 01_Create_Snode) {
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_SNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_SNODE_ID_INVALID);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_SNODE_INVALID_OPTION);
}
{
......@@ -82,7 +82,7 @@ TEST_F(DndTestSnode, 01_Drop_Snode) {
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_SNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_SNODE_ID_INVALID);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_SNODE_INVALID_OPTION);
}
{
......
......@@ -137,9 +137,9 @@ TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
pRsp->numOfColumns = htonl(pRsp->numOfColumns);
pRsp->sversion = htonl(pRsp->sversion);
pRsp->tversion = htonl(pRsp->tversion);
pRsp->suid = htobe64(pRsp->suid);
pRsp->tuid = htobe64(pRsp->tuid);
pRsp->vgId = htobe64(pRsp->vgId);
pRsp->suid = be64toh(pRsp->suid);
pRsp->tuid = be64toh(pRsp->tuid);
pRsp->vgId = be64toh(pRsp->vgId);
for (int32_t i = 0; i < pRsp->numOfTags + pRsp->numOfColumns; ++i) {
SSchema* pSchema = &pRsp->pSchema[i];
pSchema->colId = htonl(pSchema->colId);
......@@ -156,7 +156,7 @@ TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
EXPECT_EQ(pRsp->sversion, 1);
EXPECT_EQ(pRsp->tversion, 0);
EXPECT_GT(pRsp->suid, 0);
EXPECT_EQ(pRsp->tuid, 0);
EXPECT_GT(pRsp->tuid, 0);
EXPECT_EQ(pRsp->vgId, 0);
{
......
......@@ -16,7 +16,7 @@
#include "sut.h"
void Testbase::InitLog(const char* path) {
dDebugFlag = 0;
dDebugFlag = 143;
vDebugFlag = 0;
mDebugFlag = 143;
cDebugFlag = 0;
......
......@@ -917,6 +917,7 @@ static int32_t mndProcessUseDbMsg(SMnodeMsg *pMsg) {
}
memcpy(pRsp->db, pDb->name, TSDB_DB_FNAME_LEN);
pRsp->uid = htobe64(pDb->uid);
pRsp->vgVersion = htonl(pDb->vgVersion);
pRsp->vgNum = htonl(vindex);
pRsp->hashMethod = pDb->hashMethod;
......
......@@ -277,7 +277,7 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
void *pIter = NULL;
int32_t numOfReplicas = 0;
SDCreateMnodeMsg createMsg = {0};
SDCreateMnodeReq createMsg = {0};
while (1) {
SMnodeObj *pMObj = NULL;
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
......@@ -307,18 +307,18 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
STransAction action = {0};
SDAlterMnodeMsg *pMsg = malloc(sizeof(SDAlterMnodeMsg));
SDAlterMnodeReq *pMsg = malloc(sizeof(SDAlterMnodeReq));
if (pMsg == NULL) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pMObj);
return -1;
}
memcpy(pMsg, &createMsg, sizeof(SDAlterMnodeMsg));
memcpy(pMsg, &createMsg, sizeof(SDAlterMnodeReq));
pMsg->dnodeId = htonl(pMObj->id);
action.epSet = mndGetDnodeEpset(pMObj->pDnode);
action.pCont = pMsg;
action.contLen = sizeof(SDAlterMnodeMsg);
action.contLen = sizeof(SDAlterMnodeReq);
action.msgType = TDMT_DND_ALTER_MNODE;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
......@@ -335,14 +335,14 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
STransAction action = {0};
action.epSet = mndGetDnodeEpset(pDnode);
SDCreateMnodeMsg *pMsg = malloc(sizeof(SDCreateMnodeMsg));
SDCreateMnodeReq *pMsg = malloc(sizeof(SDCreateMnodeReq));
if (pMsg == NULL) return -1;
memcpy(pMsg, &createMsg, sizeof(SDAlterMnodeMsg));
memcpy(pMsg, &createMsg, sizeof(SDAlterMnodeReq));
pMsg->dnodeId = htonl(pObj->id);
action.epSet = mndGetDnodeEpset(pDnode);
action.pCont = pMsg;
action.contLen = sizeof(SDCreateMnodeMsg);
action.contLen = sizeof(SDCreateMnodeReq);
action.msgType = TDMT_DND_CREATE_MNODE;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMsg);
......@@ -353,7 +353,7 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
return 0;
}
static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode, SMCreateMnodeMsg *pCreate) {
static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode, SMCreateMnodeReq *pCreate) {
SMnodeObj mnodeObj = {0};
mnodeObj.id = pDnode->id;
mnodeObj.createdTime = taosGetTimestampMs();
......@@ -396,7 +396,7 @@ CREATE_MNODE_OVER:
static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SMCreateMnodeMsg *pCreate = pMsg->rpcMsg.pCont;
SMCreateMnodeReq *pCreate = pMsg->rpcMsg.pCont;
pCreate->dnodeId = htonl(pCreate->dnodeId);
......@@ -449,7 +449,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
void *pIter = NULL;
int32_t numOfReplicas = 0;
SDAlterMnodeMsg alterMsg = {0};
SDAlterMnodeReq alterMsg = {0};
while (1) {
SMnodeObj *pMObj = NULL;
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
......@@ -475,18 +475,18 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
if (pMObj->id != pObj->id) {
STransAction action = {0};
SDAlterMnodeMsg *pMsg = malloc(sizeof(SDAlterMnodeMsg));
SDAlterMnodeReq *pMsg = malloc(sizeof(SDAlterMnodeReq));
if (pMsg == NULL) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pMObj);
return -1;
}
memcpy(pMsg, &alterMsg, sizeof(SDAlterMnodeMsg));
memcpy(pMsg, &alterMsg, sizeof(SDAlterMnodeReq));
pMsg->dnodeId = htonl(pMObj->id);
action.epSet = mndGetDnodeEpset(pMObj->pDnode);
action.pCont = pMsg;
action.contLen = sizeof(SDAlterMnodeMsg);
action.contLen = sizeof(SDAlterMnodeReq);
action.msgType = TDMT_DND_ALTER_MNODE;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
......@@ -504,7 +504,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
STransAction action = {0};
action.epSet = mndGetDnodeEpset(pDnode);
SDDropMnodeMsg *pMsg = malloc(sizeof(SDDropMnodeMsg));
SDDropMnodeReq *pMsg = malloc(sizeof(SDDropMnodeReq));
if (pMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
......@@ -513,7 +513,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
action.epSet = mndGetDnodeEpset(pDnode);
action.pCont = pMsg;
action.contLen = sizeof(SDDropMnodeMsg);
action.contLen = sizeof(SDDropMnodeReq);
action.msgType = TDMT_DND_DROP_MNODE;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMsg);
......@@ -563,7 +563,7 @@ DROP_MNODE_OVER:
static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SMDropMnodeMsg *pDrop = pMsg->rpcMsg.pCont;
SMDropMnodeReq *pDrop = pMsg->rpcMsg.pCont;
pDrop->dnodeId = htonl(pDrop->dnodeId);
mDebug("mnode:%d, start to drop", pDrop->dnodeId);
......
......@@ -769,7 +769,8 @@ static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) {
pMeta->tableType = TSDB_SUPER_TABLE;
pMeta->update = pDb->cfg.update;
pMeta->sversion = htonl(pStb->version);
pMeta->suid = htonl(pStb->uid);
pMeta->suid = htobe64(pStb->uid);
pMeta->tuid = htobe64(pStb->uid);
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pMeta->pSchema[i];
......
......@@ -86,10 +86,6 @@ static int32_t mndRestoreWal(SMnode *pMnode) {
mndTransPullup(pMnode);
if (walBeginSnapshot(pWal, sdbVer) < 0) {
goto WAL_RESTORE_OVER;
}
if (sdbVer != lastSdbVer) {
mInfo("sdb restored from %" PRId64 " to %" PRId64 ", write file", lastSdbVer, sdbVer);
if (sdbWriteFile(pSdb) != 0) {
......@@ -97,6 +93,10 @@ static int32_t mndRestoreWal(SMnode *pMnode) {
}
}
if (walBeginSnapshot(pWal, sdbVer) < 0) {
goto WAL_RESTORE_OVER;
}
if (walEndSnapshot(pWal) < 0) {
goto WAL_RESTORE_OVER;
}
......
......@@ -72,9 +72,9 @@ TEST_F(MndTestMnode, 01_ShowDnode) {
TEST_F(MndTestMnode, 02_Create_Mnode_Invalid_Id) {
{
int32_t contLen = sizeof(SMCreateMnodeMsg);
int32_t contLen = sizeof(SMCreateMnodeReq);
SMCreateMnodeMsg* pReq = (SMCreateMnodeMsg*)rpcMallocCont(contLen);
SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen);
......@@ -85,9 +85,9 @@ TEST_F(MndTestMnode, 02_Create_Mnode_Invalid_Id) {
TEST_F(MndTestMnode, 03_Create_Mnode_Invalid_Id) {
{
int32_t contLen = sizeof(SMCreateMnodeMsg);
int32_t contLen = sizeof(SMCreateMnodeReq);
SMCreateMnodeMsg* pReq = (SMCreateMnodeMsg*)rpcMallocCont(contLen);
SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen);
......@@ -117,9 +117,9 @@ TEST_F(MndTestMnode, 04_Create_Mnode) {
{
// create mnode
int32_t contLen = sizeof(SMCreateMnodeMsg);
int32_t contLen = sizeof(SMCreateMnodeReq);
SMCreateMnodeMsg* pReq = (SMCreateMnodeMsg*)rpcMallocCont(contLen);
SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen);
......@@ -144,9 +144,9 @@ TEST_F(MndTestMnode, 04_Create_Mnode) {
{
// drop mnode
int32_t contLen = sizeof(SMDropMnodeMsg);
int32_t contLen = sizeof(SMDropMnodeReq);
SMDropMnodeMsg* pReq = (SMDropMnodeMsg*)rpcMallocCont(contLen);
SMDropMnodeReq* pReq = (SMDropMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_MNODE, pReq, contLen);
......
......@@ -105,6 +105,9 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
if (pTbCfg->type == META_CHILD_TABLE) {
strcpy(pTbMetaMsg->stbFname, pStbCfg->name);
pTbMetaMsg->suid = htobe64(pTbCfg->ctbCfg.suid);
} else if (pTbCfg->type == META_SUPER_TABLE) {
strcpy(pTbMetaMsg->stbFname, pTbCfg->name);
pTbMetaMsg->suid = htobe64(uid);
}
pTbMetaMsg->numOfTags = htonl(nTagCols);
pTbMetaMsg->numOfColumns = htonl(nCols);
......
......@@ -22,20 +22,31 @@ extern "C" {
#include "catalog.h"
#include "common.h"
#include "tlog.h"
#include "query.h"
#define CTG_DEFAULT_CACHE_CLUSTER_NUMBER 6
#define CTG_DEFAULT_CACHE_VGROUP_NUMBER 100
#define CTG_DEFAULT_CACHE_DB_NUMBER 20
#define CTG_DEFAULT_CACHE_TABLEMETA_NUMBER 100000
#define CTG_DEFAULT_RENT_SECOND 10
#define CTG_DEFAULT_RENT_SLOT_SIZE 10
#define CTG_RENT_SLOT_SECOND 2
#define CTG_DEFAULT_INVALID_VERSION (-1)
#define CTG_ERR_CODE_TABLE_NOT_EXIST TSDB_CODE_TDB_INVALID_TABLE_ID
enum {
CTG_READ = 1,
CTG_WRITE,
};
enum {
CTG_RENT_DB = 1,
CTG_RENT_STABLE,
};
typedef struct SVgroupListCache {
int32_t vgroupVersion;
SHashObj *cache; // key:vgId, value:SVgroupInfo
......@@ -51,30 +62,76 @@ typedef struct STableMetaCache {
SHashObj *stableCache; //key:suid, value:STableMeta*
} STableMetaCache;
typedef struct SRentSlotInfo {
SRWLatch lock;
bool needSort;
SArray *meta; // element is SDbVgVersion or SSTableMetaVersion
} SRentSlotInfo;
typedef struct SMetaRentMgmt {
int8_t type;
uint16_t slotNum;
uint16_t slotRIdx;
int64_t lastReadMsec;
SRentSlotInfo *slots;
} SMetaRentMgmt;
typedef struct SCatalog {
uint64_t clusterId;
SDBVgroupCache dbCache;
STableMetaCache tableCache;
SMetaRentMgmt dbRent;
SMetaRentMgmt stableRent;
} SCatalog;
typedef struct SCtgApiStat {
} SCtgApiStat;
typedef struct SCtgResourceStat {
} SCtgResourceStat;
typedef struct SCtgCacheStat {
} SCtgCacheStat;
typedef struct SCatalogStat {
SCtgApiStat api;
SCtgResourceStat resource;
SCtgCacheStat cache;
} SCatalogStat;
typedef struct SCatalogMgmt {
void *pMsgSender; // used to send messsage to mnode to fetch necessary metadata
SHashObj *pCluster; // items cached for each cluster, the hash key is the cluster-id got from mgmt node
SCatalogCfg cfg;
SHashObj *pCluster; //key: clusterId, value: SCatalog*
SCatalogStat stat;
SCatalogCfg cfg;
} SCatalogMgmt;
typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define ctgFatal(...) do { if (ctgDebugFlag & DEBUG_FATAL) { taosPrintLog("CTG FATAL ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgError(...) do { if (ctgDebugFlag & DEBUG_ERROR) { taosPrintLog("CTG ERROR ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgWarn(...) do { if (ctgDebugFlag & DEBUG_WARN) { taosPrintLog("CTG WARN ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgInfo(...) do { if (ctgDebugFlag & DEBUG_INFO) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgDebug(...) do { if (ctgDebugFlag & DEBUG_DEBUG) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgTrace(...) do { if (ctgDebugFlag & DEBUG_TRACE) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgDebugL(...) do { if (ctgDebugFlag & DEBUG_DEBUG) { taosPrintLongString("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define CTG_IS_META_NONE(type) ((type) == META_TYPE_NON_TABLE)
#define CTG_IS_META_CTABLE(type) ((type) == META_TYPE_CTABLE)
#define CTG_IS_META_TABLE(type) ((type) == META_TYPE_TABLE)
#define CTG_IS_META_BOTH(type) ((type) == META_TYPE_BOTH_TABLE)
#define CTG_IS_STABLE(isSTable) (1 == (isSTable))
#define CTG_IS_NOT_STABLE(isSTable) (0 == (isSTable))
#define CTG_IS_UNKNOWN_STABLE(isSTable) ((isSTable) < 0)
#define CTG_SET_STABLE(isSTable, tbType) do { (isSTable) = ((tbType) == TSDB_SUPER_TABLE) ? 1 : ((tbType) > TSDB_SUPER_TABLE ? 0 : -1); } while (0)
#define CTG_TBTYPE_MATCH(isSTable, tbType) (CTG_IS_UNKNOWN_STABLE(isSTable) || (CTG_IS_STABLE(isSTable) && (tbType) == TSDB_SUPER_TABLE) || (CTG_IS_NOT_STABLE(isSTable) && (tbType) != TSDB_SUPER_TABLE))
#define CTG_TABLE_NOT_EXIST(code) (code == CTG_ERR_CODE_TABLE_NOT_EXIST)
#define ctgFatal(param, ...) qFatal("CTG:%p " param, pCatalog, __VA_ARGS__)
#define ctgError(param, ...) qError("CTG:%p " param, pCatalog, __VA_ARGS__)
#define ctgWarn(param, ...) qWarn("CTG:%p " param, pCatalog, __VA_ARGS__)
#define ctgInfo(param, ...) qInfo("CTG:%p " param, pCatalog, __VA_ARGS__)
#define ctgDebug(param, ...) qDebug("CTG:%p " param, pCatalog, __VA_ARGS__)
#define ctgTrace(param, ...) qTrace("CTG:%p " param, pCatalog, __VA_ARGS__)
#define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
......@@ -82,15 +139,15 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define CTG_LOCK(type, _lock) do { \
if (CTG_READ == (type)) { \
assert(atomic_load_32((_lock)) >= 0); \
ctgDebug("CTG RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
qDebug("CTG RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
ctgDebug("CTG RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
qDebug("CTG RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) > 0); \
} else { \
assert(atomic_load_32((_lock)) >= 0); \
ctgDebug("CTG WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
qDebug("CTG WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
ctgDebug("CTG WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
qDebug("CTG WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
} \
} while (0)
......@@ -98,15 +155,15 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define CTG_UNLOCK(type, _lock) do { \
if (CTG_READ == (type)) { \
assert(atomic_load_32((_lock)) > 0); \
ctgDebug("CTG RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
qDebug("CTG RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
ctgDebug("CTG RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
qDebug("CTG RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} else { \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
ctgDebug("CTG WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
qDebug("CTG WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
ctgDebug("CTG WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
qDebug("CTG WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} \
} while (0)
......
此差异已折叠。
......@@ -16,3 +16,8 @@ TARGET_INCLUDE_DIRECTORIES(
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/catalog/"
PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/catalog/inc"
)
add_test(
NAME catalogTest
COMMAND catalogTest
)
......@@ -42,10 +42,13 @@ extern "C" int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMeta
void ctgTestSetPrepareTableMeta();
void ctgTestSetPrepareCTableMeta();
void ctgTestSetPrepareSTableMeta();
void ctgTestSetPrepareMultiSTableMeta();
bool ctgTestStop = false;
bool ctgTestEnableSleep = false;
bool ctgTestDeadLoop = true;
bool ctgTestDeadLoop = false;
int32_t ctgTestPrintNum = 200000;
int32_t ctgTestMTRunSec = 30;
int32_t ctgTestCurrentVgVersion = 0;
int32_t ctgTestVgVersion = 1;
......@@ -54,6 +57,8 @@ int32_t ctgTestColNum = 2;
int32_t ctgTestTagNum = 1;
int32_t ctgTestSVersion = 1;
int32_t ctgTestTVersion = 1;
int32_t ctgTestSuid = 2;
int64_t ctgTestDbId = 33;
uint64_t ctgTestClusterId = 0x1;
char *ctgTestDbname = "1.db1";
......@@ -101,7 +106,6 @@ void ctgTestInitLogFile() {
const char *defaultLogFileNamePrefix = "taoslog";
const int32_t maxLogFileNum = 10;
ctgDebugFlag = 159;
tsAsyncLog = 0;
char temp[128] = {0};
......@@ -128,7 +132,7 @@ void ctgTestBuildCTableMetaOutput(STableMetaOutput *output) {
char tbFullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(&cn, tbFullName);
output->metaNum = 2;
SET_META_TYPE_BOTH_TABLE(output->metaType);
strcpy(output->ctbFname, tbFullName);
......@@ -183,6 +187,7 @@ void ctgTestBuildDBVgroup(SDBVgroupInfo *dbVgroup) {
ctgTestCurrentVgVersion = dbVgroup->vgVersion;
dbVgroup->hashMethod = 0;
dbVgroup->dbId = ctgTestDbId;
dbVgroup->vgInfo = taosHashInit(ctgTestVgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
vgNum = ctgTestGetVgNumFromVgVersion(dbVgroup->vgVersion);
......@@ -216,6 +221,7 @@ void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM
ctgTestCurrentVgVersion = ctgTestVgVersion;
rspMsg->vgNum = htonl(ctgTestVgNum);
rspMsg->hashMethod = 0;
rspMsg->uid = htobe64(ctgTestDbId);
SVgroupInfo *vg = NULL;
uint32_t hashUnit = UINT32_MAX / ctgTestVgNum;
......@@ -338,8 +344,52 @@ void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc
rspMsg->update = 1;
rspMsg->sversion = htonl(ctgTestSVersion);
rspMsg->tversion = htonl(ctgTestTVersion);
rspMsg->suid = htobe64(0x0000000000000002);
rspMsg->tuid = htobe64(0x0000000000000003);
rspMsg->suid = htobe64(ctgTestSuid);
rspMsg->tuid = htobe64(ctgTestSuid);
rspMsg->vgId = 0;
SSchema *s = NULL;
s = &rspMsg->pSchema[0];
s->type = TSDB_DATA_TYPE_TIMESTAMP;
s->colId = htonl(1);
s->bytes = htonl(8);
strcpy(s->name, "ts");
s = &rspMsg->pSchema[1];
s->type = TSDB_DATA_TYPE_INT;
s->colId = htonl(2);
s->bytes = htonl(4);
strcpy(s->name, "col1s");
s = &rspMsg->pSchema[2];
s->type = TSDB_DATA_TYPE_BINARY;
s->colId = htonl(3);
s->bytes = htonl(12);
strcpy(s->name, "tag1s");
return;
}
void ctgTestPrepareMultiSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
STableMetaMsg *rspMsg = NULL; //todo
static int32_t idx = 1;
pRsp->code =0;
pRsp->contLen = sizeof(STableMetaMsg) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->pCont = calloc(1, pRsp->contLen);
rspMsg = (STableMetaMsg *)pRsp->pCont;
sprintf(rspMsg->tbFname, "%s.%s_%d", ctgTestDbname, ctgTestSTablename, idx);
sprintf(rspMsg->stbFname, "%s.%s_%d", ctgTestDbname, ctgTestSTablename, idx);
rspMsg->numOfTags = htonl(ctgTestTagNum);
rspMsg->numOfColumns = htonl(ctgTestColNum);
rspMsg->precision = 1;
rspMsg->tableType = TSDB_SUPER_TABLE;
rspMsg->update = 1;
rspMsg->sversion = htonl(ctgTestSVersion);
rspMsg->tversion = htonl(ctgTestTVersion);
rspMsg->suid = htobe64(ctgTestSuid + idx);
rspMsg->tuid = htobe64(ctgTestSuid + idx);
rspMsg->vgId = 0;
SSchema *s = NULL;
......@@ -361,10 +411,13 @@ void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc
s->bytes = htonl(12);
strcpy(s->name, "tag1s");
++idx;
return;
}
void ctgTestPrepareDbVgroupsAndNormalMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
ctgTestPrepareDbVgroups(shandle, pEpSet, pMsg, pRsp);
......@@ -390,6 +443,14 @@ void ctgTestPrepareDbVgroupsAndSuperMeta(void *shandle, SEpSet *pEpSet, SRpcMsg
return;
}
void ctgTestPrepareDbVgroupsAndMultiSuperMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
ctgTestPrepareDbVgroups(shandle, pEpSet, pMsg, pRsp);
ctgTestSetPrepareMultiSTableMeta();
return;
}
void ctgTestSetPrepareDbVgroups() {
......@@ -444,6 +505,20 @@ void ctgTestSetPrepareSTableMeta() {
}
}
void ctgTestSetPrepareMultiSTableMeta() {
static Stub stub;
stub.set(rpcSendRecv, ctgTestPrepareMultiSTableMeta);
{
AddrAny any("libtransport.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^rpcSendRecv$", result);
for (const auto& f : result) {
stub.set(f.second, ctgTestPrepareMultiSTableMeta);
}
}
}
void ctgTestSetPrepareDbVgroupsAndNormalMeta() {
static Stub stub;
stub.set(rpcSendRecv, ctgTestPrepareDbVgroupsAndNormalMeta);
......@@ -484,6 +559,19 @@ void ctgTestSetPrepareDbVgroupsAndSuperMeta() {
}
}
void ctgTestSetPrepareDbVgroupsAndMultiSuperMeta() {
static Stub stub;
stub.set(rpcSendRecv, ctgTestPrepareDbVgroupsAndMultiSuperMeta);
{
AddrAny any("libtransport.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^rpcSendRecv$", result);
for (const auto& f : result) {
stub.set(f.second, ctgTestPrepareDbVgroupsAndMultiSuperMeta);
}
}
}
}
......@@ -507,7 +595,7 @@ void *ctgTestGetDbVgroupThread(void *param) {
if (ctgTestEnableSleep) {
usleep(rand()%5);
}
if (++n % 50000 == 0) {
if (++n % ctgTestPrintNum == 0) {
printf("Get:%d\n", n);
}
}
......@@ -531,7 +619,7 @@ void *ctgTestSetDbVgroupThread(void *param) {
if (ctgTestEnableSleep) {
usleep(rand()%5);
}
if (++n % 50000 == 0) {
if (++n % ctgTestPrintNum == 0) {
printf("Set:%d\n", n);
}
}
......@@ -563,7 +651,7 @@ void *ctgTestGetCtableMetaThread(void *param) {
usleep(rand()%5);
}
if (++n % 50000 == 0) {
if (++n % ctgTestPrintNum == 0) {
printf("Get:%d\n", n);
}
}
......@@ -589,7 +677,7 @@ void *ctgTestSetCtableMetaThread(void *param) {
if (ctgTestEnableSleep) {
usleep(rand()%5);
}
if (++n % 50000 == 0) {
if (++n % ctgTestPrintNum == 0) {
printf("Set:%d\n", n);
}
}
......@@ -600,7 +688,6 @@ void *ctgTestSetCtableMetaThread(void *param) {
}
#if 0
TEST(tableMeta, normalTable) {
struct SCatalog* pCtg = NULL;
......@@ -628,6 +715,7 @@ TEST(tableMeta, normalTable) {
ASSERT_EQ(vgInfo.vgId, 8);
ASSERT_EQ(vgInfo.numOfEps, 3);
ctgTestSetPrepareTableMeta();
STableMeta *tableMeta = NULL;
......@@ -654,6 +742,41 @@ TEST(tableMeta, normalTable) {
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
SDbVgVersion *dbs = NULL;
SSTableMetaVersion *stb = NULL;
uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0;
int32_t i = 0;
while (i < 5) {
++i;
code = catalogGetExpiredDBs(pCtg, &dbs, &dbNum);
ASSERT_EQ(code, 0);
code = catalogGetExpiredSTables(pCtg, &stb, &stbNum);
ASSERT_EQ(code, 0);
if (dbNum) {
printf("got expired db,dbId:%"PRId64"\n", dbs->dbId);
free(dbs);
dbs = NULL;
} else {
printf("no expired db\n");
}
if (stbNum) {
printf("got expired stb,suid:%"PRId64"\n", stb->suid);
free(stb);
stb = NULL;
} else {
printf("no expired stb\n");
}
allDbNum += dbNum;
allStbNum += stbNum;
sleep(2);
}
ASSERT_EQ(allDbNum, 1);
ASSERT_EQ(allStbNum, 0);
catalogDestroy();
}
......@@ -715,6 +838,42 @@ TEST(tableMeta, childTableCase) {
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
SDbVgVersion *dbs = NULL;
SSTableMetaVersion *stb = NULL;
uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0;
int32_t i = 0;
while (i < 5) {
++i;
code = catalogGetExpiredDBs(pCtg, &dbs, &dbNum);
ASSERT_EQ(code, 0);
code = catalogGetExpiredSTables(pCtg, &stb, &stbNum);
ASSERT_EQ(code, 0);
if (dbNum) {
printf("got expired db,dbId:%"PRId64"\n", dbs->dbId);
free(dbs);
dbs = NULL;
} else {
printf("no expired db\n");
}
if (stbNum) {
printf("got expired stb,suid:%"PRId64"\n", stb->suid);
free(stb);
stb = NULL;
} else {
printf("no expired stb\n");
}
allDbNum += dbNum;
allStbNum += stbNum;
sleep(2);
}
ASSERT_EQ(allDbNum, 1);
ASSERT_EQ(allStbNum, 1);
catalogDestroy();
}
......@@ -745,6 +904,8 @@ TEST(tableMeta, superTableCase) {
ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE);
ASSERT_EQ(tableMeta->sversion, ctgTestSVersion);
ASSERT_EQ(tableMeta->tversion, ctgTestTVersion);
ASSERT_EQ(tableMeta->uid, ctgTestSuid);
ASSERT_EQ(tableMeta->suid, ctgTestSuid);
ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum);
ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum);
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
......@@ -768,7 +929,7 @@ TEST(tableMeta, superTableCase) {
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
tableMeta = NULL;
code = catalogRenewAndGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta);
code = catalogRenewAndGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta, 0);
ASSERT_EQ(code, 0);
ASSERT_EQ(tableMeta->vgId, 9);
ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE);
......@@ -779,6 +940,40 @@ TEST(tableMeta, superTableCase) {
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
SDbVgVersion *dbs = NULL;
SSTableMetaVersion *stb = NULL;
uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0;
int32_t i = 0;
while (i < 5) {
++i;
code = catalogGetExpiredDBs(pCtg, &dbs, &dbNum);
ASSERT_EQ(code, 0);
code = catalogGetExpiredSTables(pCtg, &stb, &stbNum);
ASSERT_EQ(code, 0);
if (dbNum) {
printf("got expired db,dbId:%"PRId64"\n", dbs->dbId);
free(dbs);
dbs = NULL;
} else {
printf("no expired db\n");
}
if (stbNum) {
printf("got expired stb,suid:%"PRId64"\n", stb->suid);
free(stb);
stb = NULL;
} else {
printf("no expired stb\n");
}
allDbNum += dbNum;
allStbNum += stbNum;
sleep(2);
}
ASSERT_EQ(allDbNum, 1);
ASSERT_EQ(allStbNum, 1);
catalogDestroy();
......@@ -948,7 +1143,6 @@ TEST(dbVgroup, getSetDbVgroupCase) {
catalogDestroy();
}
TEST(multiThread, getSetDbVgroupCase) {
struct SCatalog* pCtg = NULL;
void *mockPointer = (void *)0x1;
......@@ -956,6 +1150,7 @@ TEST(multiThread, getSetDbVgroupCase) {
SVgroupInfo *pvgInfo = NULL;
SDBVgroupInfo dbVgroup = {0};
SArray *vgList = NULL;
ctgTestStop = false;
ctgTestInitLogFile();
......@@ -988,7 +1183,7 @@ TEST(multiThread, getSetDbVgroupCase) {
if (ctgTestDeadLoop) {
sleep(1);
} else {
sleep(600);
sleep(ctgTestMTRunSec);
break;
}
}
......@@ -999,9 +1194,6 @@ TEST(multiThread, getSetDbVgroupCase) {
catalogDestroy();
}
#endif
TEST(multiThread, ctableMeta) {
struct SCatalog* pCtg = NULL;
void *mockPointer = (void *)0x1;
......@@ -1009,6 +1201,7 @@ TEST(multiThread, ctableMeta) {
SVgroupInfo *pvgInfo = NULL;
SDBVgroupInfo dbVgroup = {0};
SArray *vgList = NULL;
ctgTestStop = false;
ctgTestSetPrepareDbVgroupsAndChildMeta();
......@@ -1038,7 +1231,7 @@ TEST(multiThread, ctableMeta) {
if (ctgTestDeadLoop) {
sleep(1);
} else {
sleep(600);
sleep(ctgTestMTRunSec);
break;
}
}
......@@ -1050,6 +1243,78 @@ TEST(multiThread, ctableMeta) {
}
TEST(rentTest, allRent) {
struct SCatalog* pCtg = NULL;
void *mockPointer = (void *)0x1;
SVgroupInfo vgInfo = {0};
SVgroupInfo *pvgInfo = NULL;
SDBVgroupInfo dbVgroup = {0};
SArray *vgList = NULL;
ctgTestStop = false;
SDbVgVersion *dbs = NULL;
SSTableMetaVersion *stable = NULL;
uint32_t num = 0;
ctgTestSetPrepareDbVgroupsAndMultiSuperMeta();
initQueryModuleMsgHandle();
int32_t code = catalogInit(NULL);
ASSERT_EQ(code, 0);
code = catalogGetHandle(ctgTestClusterId, &pCtg);
ASSERT_EQ(code, 0);
SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1};
strcpy(n.dbname, "db1");
for (int32_t i = 1; i <= 10; ++i) {
sprintf(n.tname, "%s_%d", ctgTestSTablename, i);
STableMeta *tableMeta = NULL;
code = catalogGetSTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta);
ASSERT_EQ(code, 0);
ASSERT_EQ(tableMeta->vgId, 0);
ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE);
ASSERT_EQ(tableMeta->sversion, ctgTestSVersion);
ASSERT_EQ(tableMeta->tversion, ctgTestTVersion);
ASSERT_EQ(tableMeta->uid, ctgTestSuid + i);
ASSERT_EQ(tableMeta->suid, ctgTestSuid + i);
ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum);
ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum);
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
code = catalogGetExpiredDBs(pCtg, &dbs, &num);
ASSERT_EQ(code, 0);
printf("%d - expired dbNum:%d\n", i, num);
if (dbs) {
printf("%d - expired dbId:%"PRId64", vgVersion:%d\n", i, dbs->dbId, dbs->vgVersion);
free(dbs);
dbs = NULL;
}
code = catalogGetExpiredSTables(pCtg, &stable, &num);
ASSERT_EQ(code, 0);
printf("%d - expired stableNum:%d\n", i, num);
if (stable) {
for (int32_t n = 0; n < num; ++n) {
printf("suid:%"PRId64", sversion:%d, tversion:%d\n", stable[n].suid, stable[n].sversion, stable[n].tversion);
}
free(stable);
stable = NULL;
}
printf("*************************************************\n");
sleep(2);
}
catalogDestroy();
}
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
......
......@@ -8,5 +8,5 @@ target_include_directories(
target_link_libraries(
executor
PRIVATE os util common function parser
PRIVATE os util common function parser planner qcom
)
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _DATA_SINK_INT_H
#define _DATA_SINK_INT_H
#ifdef __cplusplus
extern "C" {
#endif
#include "common.h"
#include "dataSinkMgt.h"
struct SDataSink;
struct SDataSinkHandle;
typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SDataResult* pRes);
typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, char* pData, int32_t* pLen);
typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle);
typedef struct SDataSinkHandle {
FPutDataBlock fPut;
FGetDataBlock fGet;
FDestroyDataSinker fDestroy;
} SDataSinkHandle;
int32_t createDataDispatcher(const struct SDataSink* pDataSink, DataSinkHandle* pHandle);
#ifdef __cplusplus
}
#endif
#endif /*_DATA_SINK_INT_H*/
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _DATA_SINK_MGT_H
#define _DATA_SINK_MGT_H
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
#include "executorimpl.h"
#define DS_CAPACITY_ENOUGH 1
#define DS_CAPACITY_FULL 2
#define DS_NEED_SCHEDULE 3
struct SDataSink;
struct SSDataBlock;
typedef struct SDataSinkMgtCfg {
uint32_t maxDataBlockNum;
uint32_t maxDataBlockNumPerQuery;
} SDataSinkMgtCfg;
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg);
typedef void* DataSinkHandle;
typedef struct SDataResult {
SQueryCostInfo profile;
const SSDataBlock* pData;
SHashObj* pTableRetrieveTsMap;
} SDataResult;
/**
* Create a subplan's datasinker handle for all later operations.
* @param pDataSink
* @param pHandle output
* @return error code
*/
int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pHandle);
/**
* Put the result set returned by the executor into datasinker.
* @param handle
* @param pRes
* @return error code
*/
int32_t dsPutDataBlock(DataSinkHandle handle, const SDataResult* pRes);
/**
* Get the length of the data returned by the next call to dsGetDataBlock.
* @param handle
* @return data length
*/
int32_t dsGetDataLength(DataSinkHandle handle);
/**
* Get data, the caller needs to allocate data memory.
* @param handle
* @param pData output
* @param pLen output
* @return error code
*/
int32_t dsGetDataBlock(DataSinkHandle handle, char* pData, int32_t* pLen);
/**
* Get the datasinker state, after each dsPutDataBlock and dsGetDataBlock call.
* @param handle
* @return datasinker status
*/
int32_t dsGetStatus(DataSinkHandle handle);
/**
* After dsGetStatus returns DS_NEED_SCHEDULE, the caller need to put this into the work queue.
* @param ahandle
* @param pItem
*/
void dsScheduleProcess(void* ahandle, void* pItem);
/**
* Destroy the datasinker handle.
* @param handle
*/
void dsDestroyDataSinker(DataSinkHandle handle);
#ifdef __cplusplus
}
#endif
#endif /*_DATA_SINK_MGT_H*/
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "dataSinkInt.h"
#include "dataSinkMgt.h"
#include "planner.h"
#include "tcompression.h"
#include "tglobal.h"
#include "tqueue.h"
#define GET_BUF_DATA(buf) (buf)->pData + (buf)->pos
#define GET_BUF_REMAIN(buf) (buf)->remain
typedef struct SBuf {
int32_t size;
int32_t pos;
int32_t remain;
char* pData;
} SBuf;
typedef struct SDataDispatchHandle {
SDataSinkHandle sink;
SDataBlockSchema schema;
STaosQueue* pDataBlocks;
SBuf buf;
} SDataDispatchHandle;
static bool needCompress(const SSDataBlock* pData, const SDataBlockSchema* pSchema) {
if (tsCompressColData < 0 || 0 == pData->info.rows) {
return false;
}
for (int32_t col = 0; col < pSchema->numOfCols; ++col) {
SColumnInfoData* pColRes = taosArrayGet(pData->pDataBlock, col);
int32_t colSize = pColRes->info.bytes * pData->info.rows;
if (NEEDTO_COMPRESS_QUERY(colSize)) {
return true;
}
}
return false;
}
static int32_t compressQueryColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) {
int32_t colSize = pColRes->info.bytes * numOfRows;
return (*(tDataTypes[pColRes->info.type].compFunc))(
pColRes->pData, colSize, numOfRows, data, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
}
static void doCopyQueryResultToMsg(const SDataResult* pRes, const SDataBlockSchema* pSchema, char* data, int8_t compressed, int32_t *compLen) {
int32_t *compSizes = (int32_t*)data;
if (compressed) {
data += pSchema->numOfCols * sizeof(int32_t);
}
for (int32_t col = 0; col < pSchema->numOfCols; ++col) {
SColumnInfoData* pColRes = taosArrayGet(pRes->pData->pDataBlock, col);
if (compressed) {
compSizes[col] = compressQueryColData(pColRes, pRes->pData->info.rows, data, compressed);
data += compSizes[col];
*compLen += compSizes[col];
compSizes[col] = htonl(compSizes[col]);
} else {
memmove(data, pColRes->pData, pColRes->info.bytes * pRes->pData->info.rows);
data += pColRes->info.bytes * pRes->pData->info.rows;
}
}
int32_t numOfTables = (int32_t) taosHashGetSize(pRes->pTableRetrieveTsMap);
*(int32_t*)data = htonl(numOfTables);
data += sizeof(int32_t);
STableIdInfo* item = taosHashIterate(pRes->pTableRetrieveTsMap, NULL);
while (item) {
STableIdInfo* pDst = (STableIdInfo*)data;
pDst->uid = htobe64(item->uid);
pDst->key = htobe64(item->key);
data += sizeof(STableIdInfo);
item = taosHashIterate(pRes->pTableRetrieveTsMap, item);
}
}
static void toRetrieveResult(SDataDispatchHandle* pHandle, const SDataResult* pRes, char* pData, int32_t* pContLen) {
SRetrieveTableRsp* pRsp = (SRetrieveTableRsp*)pData;
pRsp->useconds = htobe64(pRes->profile.elapsedTime);
pRsp->precision = htons(pHandle->schema.precision);
pRsp->compressed = (int8_t)needCompress(pRes->pData, &(pHandle->schema));
pRsp->numOfRows = htonl(pRes->pData->info.rows);
*pContLen = sizeof(int32_t) + sizeof(STableIdInfo) * taosHashGetSize(pRes->pTableRetrieveTsMap) + sizeof(SRetrieveTableRsp);
doCopyQueryResultToMsg(pRes, &pHandle->schema, pRsp->data, pRsp->compressed, &pRsp->compLen);
*pContLen += (pRsp->compressed ? pRsp->compLen : pHandle->schema.resultRowSize * pRes->pData->info.rows);
pRsp->compLen = htonl(pRsp->compLen);
// todo completed
}
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SDataResult* pRes) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
int32_t useSize = 0;
toRetrieveResult(pDispatcher, pRes, GET_BUF_DATA(&pDispatcher->buf), &useSize);
}
static int32_t getDataBlock(SDataSinkHandle* pHandle, char* pData, int32_t* pLen) {
}
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
}
int32_t createDataDispatcher(const SDataSink* pDataSink, DataSinkHandle* pHandle) {
SDataDispatchHandle* dispatcher = calloc(1, sizeof(SDataDispatchHandle));
if (NULL == dispatcher) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
dispatcher->sink.fPut = putDataBlock;
dispatcher->sink.fGet = getDataBlock;
dispatcher->sink.fDestroy = destroyDataSinker;
dispatcher->pDataBlocks = taosOpenQueue();
if (NULL == dispatcher->pDataBlocks) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
*pHandle = dispatcher;
return TSDB_CODE_SUCCESS;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "dataSinkMgt.h"
#include "dataSinkInt.h"
#include "planner.h"
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg) {
// todo
}
int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pHandle) {
if (DSINK_Dispatch == pDataSink->info.type) {
return createDataDispatcher(pDataSink, pHandle);
}
return TSDB_CODE_FAILED;
}
int32_t dsPutDataBlock(DataSinkHandle handle, const SDataResult* pRes) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
return pHandleImpl->fPut(pHandleImpl, pRes);
}
int32_t dsGetDataLength(DataSinkHandle handle) {
// todo
}
int32_t dsGetDataBlock(DataSinkHandle handle, char* pData, int32_t* pLen) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
return pHandleImpl->fGet(pHandleImpl, pData, pLen);
}
int32_t dsGetStatus(DataSinkHandle handle) {
// todo
}
void dsScheduleProcess(void* ahandle, void* pItem) {
// todo
}
void dsDestroyDataSinker(DataSinkHandle handle) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
pHandleImpl->fDestroy(pHandleImpl);
}
......@@ -123,7 +123,7 @@ typedef struct SCreatedTableInfo {
SToken name; // table name token
SToken stbName; // super table name token , for using clause
SArray *pTagNames; // create by using super table, tag name
SArray *pTagVals; // create by using super table, tag value
SArray *pTagVals; // create by using super table, tag value. SArray<SToken>
char *fullname; // table full name
int8_t igExist; // ignore if exists
} SCreatedTableInfo;
......
......@@ -44,14 +44,14 @@ void clearAllTableMetaInfo(SQueryStmtInfo* pQueryInfo, bool removeMeta, uint64_t
/**
* Validate the sql info, according to the corresponding metadata info from catalog.
* @param pCatalog
* @param pSqlInfo
* @param pQueryInfo a bounded AST with essential meta data from local buffer or mgmt node
* @param id
* @param msg
* @param pCtx
* @param pInfo
* @param pQueryInfo
* @param msgBuf
* @param msgBufLen
* @return
*/
int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pSqlInfo, SQueryStmtInfo* pQueryInfo, int64_t id, char* msg, int32_t msgLen);
int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmtInfo* pQueryInfo, char* msgBuf, int32_t msgBufLen);
/**
* validate the ddl ast, and convert the ast to the corresponding message format
......@@ -62,6 +62,14 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pSqlInfo, SQ
*/
SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen);
/**
*
* @param pInfo
* @param pCtx
* @param msgBuf
* @param msgBufLen
* @return
*/
SVnodeModifOpStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen);
/**
......
......@@ -686,7 +686,7 @@ void destroySqlNode(SSqlNode *pSqlNode) {
void freeCreateTableInfo(void* p) {
SCreatedTableInfo* pInfo = (SCreatedTableInfo*) p;
taosArrayDestroy(pInfo->pTagNames);
taosArrayDestroyEx(pInfo->pTagVals, freeItem);
taosArrayDestroy(pInfo->pTagVals);
tfree(pInfo->fullname);
}
......
......@@ -3630,12 +3630,12 @@ int32_t evaluateSqlNode(SSqlNode* pNode, int32_t tsPrecision, SMsgBuf* pMsgBuf)
return TSDB_CODE_SUCCESS;
}
int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pInfo, SQueryStmtInfo* pQueryInfo, int64_t id, char* msgBuf, int32_t msgBufLen) {
assert(pCatalog != NULL && pInfo != NULL);
int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmtInfo* pQueryInfo, char* msgBuf, int32_t msgBufLen) {
assert(pCtx != NULL && pInfo != NULL);
int32_t code = 0;
SMsgBuf m = {.buf = msgBuf, .len = msgBufLen};
SMsgBuf *pMsgBuf = &m;
SMsgBuf m = {.buf = msgBuf, .len = msgBufLen};
SMsgBuf* pMsgBuf = &m;
switch (pInfo->type) {
#if 0
......@@ -3682,22 +3682,6 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pInfo, SQuer
break;
}
case TSDB_SQL_USE_DB: {
const char* msg = "invalid db name";
SToken* pToken = taosArrayGet(pInfo->pMiscInfo->a, 0);
if (tscValidateName(pToken) != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg);
}
int32_t ret = tNameSetDbName(&pTableMetaInfo->name, getAccountId(pSql), pToken);
if (ret != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg);
}
break;
}
case TSDB_SQL_RESET_CACHE: {
return TSDB_CODE_SUCCESS;
}
......@@ -3712,55 +3696,6 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pInfo, SQuer
break;
}
case TSDB_SQL_CREATE_DNODE: {
const char* msg = "invalid host name (ip address)";
if (taosArrayGetSize(pInfo->pMiscInfo->a) > 1) {
return buildInvalidOperationMsg(pMsgBuf, msg);
}
SToken* id = taosArrayGet(pInfo->pMiscInfo->a, 0);
if (id->type == TK_STRING) {
id->n = strdequote(id->z);
}
break;
}
case TSDB_SQL_CREATE_ACCT:
case TSDB_SQL_ALTER_ACCT: {
const char* msg1 = "invalid state option, available options[no, r, w, all]";
const char* msg2 = "invalid user/account name";
const char* msg3 = "name too long";
SToken* pName = &pInfo->pMiscInfo->user.user;
SToken* pPwd = &pInfo->pMiscInfo->user.passwd;
if (handlePassword(pCmd, pPwd) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (pName->n >= TSDB_USER_LEN) {
return buildInvalidOperationMsg(pMsgBuf, msg3);
}
if (tscValidateName(pName) != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg2);
}
SCreateAcctInfo* pAcctOpt = &pInfo->pMiscInfo->acctOpt;
if (pAcctOpt->stat.n > 0) {
if (pAcctOpt->stat.z[0] == 'r' && pAcctOpt->stat.n == 1) {
} else if (pAcctOpt->stat.z[0] == 'w' && pAcctOpt->stat.n == 1) {
} else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
} else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
} else {
return buildInvalidOperationMsg(pMsgBuf, msg1);
}
}
break;
}
case TSDB_SQL_DESCRIBE_TABLE: {
const char* msg1 = "invalid table name";
......@@ -3865,29 +3800,6 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pInfo, SQuer
return TSDB_CODE_SUCCESS;
}
case TSDB_SQL_CREATE_TABLE: {
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
if (pCreateTable->type == TSQL_CREATE_TABLE || pCreateTable->type == TSQL_CREATE_STABLE) {
if ((code = doCheckForCreateTable(pSql, 0, pInfo)) != TSDB_CODE_SUCCESS) {
return code;
}
} else if (pCreateTable->type == TSQL_CREATE_TABLE_FROM_STABLE) {
assert(pCmd->numOfCols == 0);
if ((code = doCheckForCreateFromStable(pSql, pInfo)) != TSDB_CODE_SUCCESS) {
return code;
}
} else if (pCreateTable->type == TSQL_CREATE_STREAM) {
if ((code = doCheckForStream(pSql, pInfo)) != TSDB_CODE_SUCCESS) {
return code;
}
}
break;
}
case TSDB_SQL_SELECT: {
const char * msg1 = "no nested query supported in union clause";
code = loadAllTableMeta(pSql, pInfo);
......@@ -3981,13 +3893,14 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pInfo, SQuer
}
break;
}
#endif
default:
return buildInvalidOperationMsg(pMsgBuf, "not support sql expression");
}
#endif
}
SCatalogReq req = {0};
SMetaData data = {0};
SCatalogReq req = {0};
SMetaData data = {0};
// TODO: check if the qnode info has been cached already
req.qNodeRequired = true;
......@@ -3997,7 +3910,7 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pInfo, SQuer
}
// load the meta data from catalog
code = catalogGetAllMeta(pCatalog, NULL, NULL, &req, &data);
code = catalogGetAllMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &req, &data);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......
#include <astGenerator.h>
#include <tmsg.h>
#include "astToMsg.h"
#include "tmsg.h"
#include "tglobal.h"
#include "parserInt.h"
#include "ttime.h"
#include "astToMsg.h"
#include "astGenerator.h"
#include "parserUtil.h"
#include "queryInfoUtil.h"
#include "tglobal.h"
#include "tmsg.h"
#include "ttime.h"
/* is contained in pFieldList or not */
static bool has(SArray* pFieldList, int32_t startIndex, const char* name) {
......@@ -332,7 +331,6 @@ static int32_t doParseSerializeTagValue(SSchema* pTagSchema, int32_t numOfInputT
char* endPtr = NULL;
char tmpTokenBuf[TSDB_MAX_TAGS_LEN] = {0};
SKvParam param = {.builder = pKvRowBuilder, .schema = pSchema};
SToken* pItem = taosArrayGet(pTagValList, i);
......
......@@ -624,12 +624,11 @@ int32_t parseInsertSql(SParseContext* pContext, SVnodeModifOpStmtInfo** pInfo) {
if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pOutput) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
*pInfo = context.pOutput;
context.pOutput->nodeType = TSDB_SQL_INSERT;
context.pOutput->schemaAttache = pContext->schemaAttached;
context.pOutput->payloadType = PAYLOAD_TYPE_KV;
int32_t code = skipInsertInto(&context);
......@@ -638,5 +637,5 @@ int32_t parseInsertSql(SParseContext* pContext, SVnodeModifOpStmtInfo** pInfo) {
}
destroyInsertParseContext(&context);
terrno = code;
return (TSDB_CODE_SUCCESS == code ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED);
return code;
}
......@@ -31,8 +31,8 @@ bool isInsertSql(const char* pStr, size_t length) {
} while (1);
}
bool qIsDdlQuery(const SQueryNode* pQuery) {
return TSDB_SQL_INSERT != pQuery->type && TSDB_SQL_SELECT != pQuery->type && TSDB_SQL_CREATE_TABLE != pQuery->type;
bool qIsDdlQuery(const SQueryNode* pQueryNode) {
return TSDB_SQL_INSERT != pQueryNode->type && TSDB_SQL_SELECT != pQueryNode->type && TSDB_SQL_CREATE_TABLE != pQueryNode->type;
}
int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
......@@ -67,7 +67,7 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
return terrno;
}
int32_t code = qParserValidateSqlNode(pCxt->ctx.pCatalog, &info, pQueryInfo, pCxt->ctx.requestId, pCxt->pMsg, pCxt->msgLen);
int32_t code = qParserValidateSqlNode(&pCxt->ctx, &info, pQueryInfo, pCxt->pMsg, pCxt->msgLen);
if (code == TSDB_CODE_SUCCESS) {
*pQuery = (SQueryNode*)pQueryInfo;
}
......
......@@ -97,6 +97,7 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
pRsp->vgVersion = ntohl(pRsp->vgVersion);
pRsp->vgNum = ntohl(pRsp->vgNum);
pRsp->uid = be64toh(pRsp->uid);
if (pRsp->vgNum < 0) {
qError("invalid db[%s] vgroup number[%d]", pRsp->db, pRsp->vgNum);
......@@ -111,6 +112,7 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
pOut->dbVgroup.vgVersion = pRsp->vgVersion;
pOut->dbVgroup.hashMethod = pRsp->hashMethod;
pOut->dbVgroup.dbId = pRsp->uid;
pOut->dbVgroup.vgInfo = taosHashInit(pRsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (NULL == pOut->dbVgroup.vgInfo) {
qError("hash init[%d] failed", pRsp->vgNum);
......@@ -149,8 +151,8 @@ static int32_t queryConvertTableMetaMsg(STableMetaMsg* pMetaMsg) {
pMetaMsg->numOfColumns = ntohl(pMetaMsg->numOfColumns);
pMetaMsg->sversion = ntohl(pMetaMsg->sversion);
pMetaMsg->tversion = ntohl(pMetaMsg->tversion);
pMetaMsg->tuid = htobe64(pMetaMsg->tuid);
pMetaMsg->suid = htobe64(pMetaMsg->suid);
pMetaMsg->tuid = be64toh(pMetaMsg->tuid);
pMetaMsg->suid = be64toh(pMetaMsg->suid);
pMetaMsg->vgId = ntohl(pMetaMsg->vgId);
if (pMetaMsg->numOfTags < 0 || pMetaMsg->numOfTags > TSDB_MAX_TAGS) {
......@@ -208,7 +210,7 @@ int32_t queryCreateTableMetaFromMsg(STableMetaMsg* msg, bool isSuperTable, STabl
pTableMeta->vgId = isSuperTable ? 0 : msg->vgId;
pTableMeta->tableType = isSuperTable ? TSDB_SUPER_TABLE : msg->tableType;
pTableMeta->uid = msg->tuid;
pTableMeta->uid = isSuperTable ? msg->suid : msg->tuid;
pTableMeta->suid = msg->suid;
pTableMeta->sversion = msg->sversion;
pTableMeta->tversion = msg->tversion;
......@@ -244,7 +246,7 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
}
if (pMetaMsg->tableType == TSDB_CHILD_TABLE) {
pOut->metaNum = 2;
SET_META_TYPE_BOTH_TABLE(pOut->metaType);
if (pMetaMsg->dbFname[0]) {
snprintf(pOut->ctbFname, sizeof(pOut->ctbFname), "%s.%s", pMetaMsg->dbFname, pMetaMsg->tbFname);
......@@ -261,7 +263,7 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
code = queryCreateTableMetaFromMsg(pMetaMsg, true, &pOut->tbMeta);
} else {
pOut->metaNum = 1;
SET_META_TYPE_TABLE(pOut->metaType);
if (pMetaMsg->dbFname[0]) {
snprintf(pOut->tbFname, sizeof(pOut->tbFname), "%s.%s", pMetaMsg->dbFname, pMetaMsg->tbFname);
......
......@@ -505,7 +505,7 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms
break;
}
case TDMT_VND_SUBMIT_RSP: {
if (rspCode != TSDB_CODE_SUCCESS) {
if (rspCode != TSDB_CODE_SUCCESS || NULL == msg) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode));
} else {
SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg;
......@@ -521,7 +521,7 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms
case TDMT_VND_QUERY_RSP: {
SQueryTableRsp *rsp = (SQueryTableRsp *)msg;
if (rsp->code != TSDB_CODE_SUCCESS) {
if (rsp->code != TSDB_CODE_SUCCESS || NULL == msg) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code));
} else {
code = schBuildAndSendMsg(job, task, TDMT_VND_RES_READY);
......@@ -534,7 +534,7 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms
case TDMT_VND_RES_READY_RSP: {
SResReadyRsp *rsp = (SResReadyRsp *)msg;
if (rsp->code != TSDB_CODE_SUCCESS) {
if (rsp->code != TSDB_CODE_SUCCESS || NULL == msg) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code));
} else {
code = schProcessOnTaskSuccess(job, task);
......@@ -549,7 +549,9 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
job->res = rsp;
job->resNumOfRows = rsp->numOfRows;
if (rsp) {
job->resNumOfRows = rsp->numOfRows;
}
SCH_ERR_JRET(schProcessOnDataFetched(job));
break;
......@@ -1100,6 +1102,7 @@ void scheduleFreeJob(void *pJob) {
taosHashCleanup(job->failTasks);
taosHashCleanup(job->succTasks);
taosArrayDestroy(job->levels);
tfree(job);
}
......
......@@ -259,26 +259,22 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DND_DNODE_READ_FILE_ERROR, "Read dnode.json error
TAOS_DEFINE_ERROR(TSDB_CODE_DND_DNODE_WRITE_FILE_ERROR, "Write dnode.json error")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED, "Mnode already deployed")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_MNODE_NOT_DEPLOYED, "Mnode not deployed")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_MNODE_ID_INVALID, "Mnode Id invalid")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_MNODE_ID_NOT_FOUND, "Mnode Id not found")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_MNODE_INVALID_OPTION, "Mnode option invalid")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_MNODE_READ_FILE_ERROR, "Read mnode.json error")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR, "Write mnode.json error")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_QNODE_ALREADY_DEPLOYED, "Qnode already deployed")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_QNODE_NOT_DEPLOYED, "Qnode not deployed")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_QNODE_ID_INVALID, "Qnode Id invalid")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_QNODE_ID_NOT_FOUND, "Qnode Id not found")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_QNODE_INVALID_OPTION, "Qnode option invalid")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_QNODE_READ_FILE_ERROR, "Read qnode.json error")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_QNODE_WRITE_FILE_ERROR, "Write qnode.json error")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_SNODE_ALREADY_DEPLOYED, "Snode already deployed")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_SNODE_NOT_DEPLOYED, "Snode not deployed")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_SNODE_ID_INVALID, "Snode Id invalid")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_SNODE_ID_NOT_FOUND, "Snode Id not found")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_SNODE_INVALID_OPTION, "Snode option invalid")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_SNODE_READ_FILE_ERROR, "Read snode.json error")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_SNODE_WRITE_FILE_ERROR, "Write snode.json error")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_BNODE_ALREADY_DEPLOYED, "Bnode already deployed")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_BNODE_NOT_DEPLOYED, "Bnode not deployed")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_BNODE_ID_INVALID, "Bnode Id invalid")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_BNODE_ID_NOT_FOUND, "Bnode Id not found")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_BNODE_INVALID_OPTION, "Bnode option invalid")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_BNODE_READ_FILE_ERROR, "Read bnode.json error")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR, "Write bnode.json error")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_TOO_MANY_VNODES, "Too many vnode directories")
......
......@@ -215,7 +215,7 @@ static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj) {
return taosHashGetSize(pHashObj) == 0;
}
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) {
int32_t taosHashPutImpl(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size, bool *newAdded) {
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal);
if (pNewNode == NULL) {
......@@ -274,6 +274,10 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
atomic_add_fetch_32(&pHashObj->size, 1);
if (newAdded) {
*newAdded = true;
}
return 0;
} else {
// not support the update operation, return error
......@@ -290,10 +294,23 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
// enable resize
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
if (newAdded) {
*newAdded = false;
}
return pHashObj->enableUpdate ? 0 : -2;
}
}
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) {
return taosHashPutImpl(pHashObj, key, keyLen, data, size, NULL);
}
int32_t taosHashPutExt(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size, bool *newAdded) {
return taosHashPutImpl(pHashObj, key, keyLen, data, size, newAdded);
}
void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) {
return taosHashGetClone(pHashObj, key, keyLen, NULL);
}
......
......@@ -95,7 +95,6 @@ int32_t tsdbDebugFlag = 131;
int32_t tqDebugFlag = 131;
int32_t cqDebugFlag = 131;
int32_t fsDebugFlag = 135;
int32_t ctgDebugFlag = 131;
int64_t dbgEmptyW = 0;
int64_t dbgWN = 0;
......
......@@ -62,13 +62,12 @@ print $data00 $data01 $data02
print $data10 $data11 $data22
print $data20 $data11 $data22
return
print =============== insert data
sql insert into c1 values(now+1s, 1)
sql insert into c1 values(now+2s, 2)
sql insert into c1 values(now+3s, 3)
return
print =============== query data
sql select * from c1
if $rows != 3 then
......
......@@ -25,20 +25,21 @@
char dbName[32] = "db";
char stbName[64] = "st";
int32_t numOfThreads = 1;
int32_t numOfTables = 10000;
int64_t numOfTables = 200000;
int32_t createTable = 1;
int32_t insertData = 0;
int32_t batchNum = 10;
int32_t batchNum = 100;
int32_t numOfVgroups = 2;
typedef struct {
int32_t tableBeginIndex;
int32_t tableEndIndex;
int64_t tableBeginIndex;
int64_t tableEndIndex;
int32_t threadIndex;
char dbName[32];
char stbName[64];
float createTableSpeed;
float insertDataSpeed;
int64_t startMs;
pthread_t thread;
} SThreadInfo;
......@@ -57,7 +58,7 @@ int32_t main(int32_t argc, char *argv[]) {
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
SThreadInfo *pInfo = (SThreadInfo *)calloc(numOfThreads, sizeof(SThreadInfo));
int32_t numOfTablesPerThread = numOfTables / numOfThreads;
int64_t numOfTablesPerThread = numOfTables / numOfThreads;
numOfTables = numOfTablesPerThread * numOfThreads;
for (int32_t i = 0; i < numOfThreads; ++i) {
pInfo[i].tableBeginIndex = i * numOfTablesPerThread;
......@@ -83,8 +84,10 @@ int32_t main(int32_t argc, char *argv[]) {
insertDataSpeed += pInfo[i].insertDataSpeed;
}
pPrint("%s total %.1f tables/second, threads:%d %s", GREEN, createTableSpeed, numOfThreads, NC);
pPrint("%s total %.1f rows/second, threads:%d %s", GREEN, insertDataSpeed, numOfThreads, NC);
pPrint("%s total %" PRId64 " tables, %.1f tables/second, threads:%d %s", GREEN, numOfTables, createTableSpeed,
numOfThreads, NC);
pPrint("%s total %" PRId64 " tables, %.1f rows/second, threads:%d %s", GREEN, numOfTables, insertDataSpeed,
numOfThreads, NC);
pthread_attr_destroy(&thattr);
free(pInfo);
......@@ -130,6 +133,26 @@ void createDbAndStb() {
taos_close(con);
}
void printCreateProgress(SThreadInfo *pInfo, int64_t t) {
int64_t endMs = taosGetTimestampMs();
int64_t totalTables = t - pInfo->tableBeginIndex;
float seconds = (endMs - pInfo->startMs) / 1000.0;
float speed = totalTables / seconds;
pInfo->createTableSpeed = speed;
pPrint("thread:%d, %" PRId64 " tables created, time:%.2f sec, speed:%.1f tables/second, ", pInfo->threadIndex,
totalTables, seconds, speed);
}
void printInsertProgress(SThreadInfo *pInfo, int64_t t) {
int64_t endMs = taosGetTimestampMs();
int64_t totalTables = t - pInfo->tableBeginIndex;
float seconds = (endMs - pInfo->startMs) / 1000.0;
float speed = totalTables / seconds;
pInfo->insertDataSpeed = speed;
pPrint("thread:%d, %" PRId64 " rows inserted, time:%.2f sec, speed:%.1f rows/second, ", pInfo->threadIndex,
totalTables, seconds, speed);
}
void *threadFunc(void *param) {
SThreadInfo *pInfo = (SThreadInfo *)param;
char *qstr = malloc(2000 * 1000);
......@@ -146,47 +169,55 @@ void *threadFunc(void *param) {
taos_free_result(pSql);
if (createTable) {
int64_t startMs = taosGetTimestampMs();
for (int32_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) {
int32_t batch = (pInfo->tableEndIndex - t);
pInfo->startMs = taosGetTimestampMs();
for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) {
int64_t batch = (pInfo->tableEndIndex - t);
batch = MIN(batch, batchNum);
int32_t len = sprintf(qstr, "create table");
for (int32_t i = 0; i < batch; ++i) {
len += sprintf(qstr + len, " t%d using %s tags(%d)", t + i, stbName, t + i);
len += sprintf(qstr + len, " t%" PRId64 " using %s tags(%" PRId64 ")", t + i, stbName, t + i);
}
TAOS_RES *pSql = taos_query(con, qstr);
code = taos_errno(pSql);
if (code != 0) {
pError("failed to create table t%d, reason:%s", t, tstrerror(code));
pError("failed to create table t%" PRId64 ", reason:%s", t, tstrerror(code));
}
taos_free_result(pSql);
if (t % 100000 == 0) {
printCreateProgress(pInfo, t);
}
t += (batch - 1);
}
int64_t endMs = taosGetTimestampMs();
int32_t totalTables = pInfo->tableEndIndex - pInfo->tableBeginIndex;
float seconds = (endMs - startMs) / 1000.0;
float speed = totalTables / seconds;
pInfo->createTableSpeed = speed;
pPrint("thread:%d, time:%.2f sec, speed:%.1f tables/second, ", pInfo->threadIndex, seconds, speed);
printCreateProgress(pInfo, pInfo->tableEndIndex);
}
if (insertData) {
int64_t startMs = taosGetTimestampMs();
for (int32_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) {
sprintf(qstr, "insert into %s%d values(now, 1)", stbName, t);
pInfo->startMs = taosGetTimestampMs();
for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) {
int64_t batch = (pInfo->tableEndIndex - t);
batch = MIN(batch, batchNum);
int32_t len = sprintf(qstr, "insert into");
for (int32_t i = 0; i < batch; ++i) {
len += sprintf(qstr + len, " t%" PRId64 " values(now, %" PRId64 ")", t + i, t + i);
}
TAOS_RES *pSql = taos_query(con, qstr);
code = taos_errno(pSql);
if (code != 0) {
pError("failed to create table %s%d, reason:%s", stbName, t, tstrerror(code));
pError("failed to insert table t%" PRId64 ", reason:%s", t, tstrerror(code));
}
taos_free_result(pSql);
if (t % 100000 == 0) {
printInsertProgress(pInfo, t);
}
t += (batch - 1);
}
int64_t endMs = taosGetTimestampMs();
int32_t totalTables = pInfo->tableEndIndex - pInfo->tableBeginIndex;
float seconds = (endMs - startMs) / 1000.0;
float speed = totalTables / seconds;
pInfo->insertDataSpeed = speed;
pPrint("thread:%d, time:%.2f sec, speed:%.1f rows/second, ", pInfo->threadIndex, seconds, speed);
printInsertProgress(pInfo, pInfo->tableEndIndex);
}
taos_close(con);
......@@ -207,7 +238,7 @@ void printHelp() {
printf("%s%s\n", indent, "-t");
printf("%s%s%s%d\n", indent, indent, "numOfThreads, default is ", numOfThreads);
printf("%s%s\n", indent, "-n");
printf("%s%s%s%d\n", indent, indent, "numOfTables, default is ", numOfTables);
printf("%s%s%s%" PRId64 "\n", indent, indent, "numOfTables, default is ", numOfTables);
printf("%s%s\n", indent, "-v");
printf("%s%s%s%d\n", indent, indent, "numOfVgroups, default is ", numOfVgroups);
printf("%s%s\n", indent, "-a");
......@@ -234,7 +265,7 @@ void parseArgument(int32_t argc, char *argv[]) {
} else if (strcmp(argv[i], "-t") == 0) {
numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-n") == 0) {
numOfTables = atoi(argv[++i]);
numOfTables = atoll(argv[++i]);
} else if (strcmp(argv[i], "-n") == 0) {
numOfVgroups = atoi(argv[++i]);
} else if (strcmp(argv[i], "-a") == 0) {
......@@ -250,7 +281,7 @@ void parseArgument(int32_t argc, char *argv[]) {
pPrint("%s dbName:%s %s", GREEN, dbName, NC);
pPrint("%s stbName:%s %s", GREEN, stbName, NC);
pPrint("%s configDir:%s %s", GREEN, configDir, NC);
pPrint("%s numOfTables:%d %s", GREEN, numOfTables, NC);
pPrint("%s numOfTables:%" PRId64 " %s", GREEN, numOfTables, NC);
pPrint("%s numOfThreads:%d %s", GREEN, numOfThreads, NC);
pPrint("%s numOfVgroups:%d %s", GREEN, numOfVgroups, NC);
pPrint("%s createTable:%d %s", GREEN, createTable, NC);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册