未验证 提交 0137a7dc 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #9667 from taosdata/feature/dnode3

test for create table
......@@ -996,7 +996,7 @@ typedef struct {
char encrypt;
char secret[TSDB_PASSWORD_LEN];
char ckey[TSDB_PASSWORD_LEN];
} SAuthMsg, SAuthRsp;
} SAuthReq, SAuthRsp;
typedef struct {
int8_t finished;
......
......@@ -305,18 +305,18 @@ static void dndBuildMnodeOpenOption(SDnode *pDnode, SMnodeOpt *pOption) {
memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
}
static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeMsg *pMsg) {
static int32_t dndBuildMnodeOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeMsg *pReq) {
dndInitMnodeOption(pDnode, pOption);
pOption->dnodeId = dndGetDnodeId(pDnode);
pOption->clusterId = dndGetClusterId(pDnode);
pOption->replica = pMsg->replica;
pOption->replica = pReq->replica;
pOption->selfIndex = -1;
for (int32_t i = 0; i < pMsg->replica; ++i) {
for (int32_t i = 0; i < pReq->replica; ++i) {
SReplica *pReplica = &pOption->replicas[i];
pReplica->id = pMsg->replicas[i].id;
pReplica->port = pMsg->replicas[i].port;
memcpy(pReplica->fqdn, pMsg->replicas[i].fqdn, TSDB_FQDN_LEN);
pReplica->id = pReq->replicas[i].id;
pReplica->port = pReq->replicas[i].port;
memcpy(pReplica->fqdn, pReq->replicas[i].fqdn, TSDB_FQDN_LEN);
if (pReplica->id == pOption->dnodeId) {
pOption->selfIndex = i;
}
......@@ -423,26 +423,26 @@ static int32_t dndDropMnode(SDnode *pDnode) {
return 0;
}
static SDCreateMnodeMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) {
SDCreateMnodeMsg *pMsg = pRpcMsg->pCont;
pMsg->dnodeId = htonl(pMsg->dnodeId);
for (int32_t i = 0; i < pMsg->replica; ++i) {
pMsg->replicas[i].id = htonl(pMsg->replicas[i].id);
pMsg->replicas[i].port = htons(pMsg->replicas[i].port);
static SDCreateMnodeMsg *dndParseCreateMnodeReq(SRpcMsg *pReq) {
SDCreateMnodeMsg *pCreate = pReq->pCont;
pCreate->dnodeId = htonl(pCreate->dnodeId);
for (int32_t i = 0; i < pCreate->replica; ++i) {
pCreate->replicas[i].id = htonl(pCreate->replicas[i].id);
pCreate->replicas[i].port = htons(pCreate->replicas[i].port);
}
return pMsg;
return pCreate;
}
int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
SDCreateMnodeMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg);
int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SDCreateMnodeMsg *pCreate = dndParseCreateMnodeReq(pReq);
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
if (pCreate->dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_MNODE_ID_INVALID;
return -1;
} else {
SMnodeOpt option = {0};
if (dndBuildMnodeOptionFromMsg(pDnode, &option, pMsg) != 0) {
if (dndBuildMnodeOptionFromReq(pDnode, &option, pCreate) != 0) {
return -1;
}
......@@ -450,16 +450,16 @@ int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
}
}
int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
SDAlterMnodeMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg);
int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SDAlterMnodeMsg *pAlter = dndParseCreateMnodeReq(pReq);
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
if (pAlter->dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_MNODE_ID_INVALID;
return -1;
}
SMnodeOpt option = {0};
if (dndBuildMnodeOptionFromMsg(pDnode, &option, pMsg) != 0) {
if (dndBuildMnodeOptionFromReq(pDnode, &option, pAlter) != 0) {
return -1;
}
......@@ -470,11 +470,11 @@ int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
return dndWriteMnodeFile(pDnode);
}
int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
SDDropMnodeMsg *pMsg = pRpcMsg->pCont;
pMsg->dnodeId = htonl(pMsg->dnodeId);
int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SDDropMnodeMsg *pDrop = pReq->pCont;
pDrop->dnodeId = htonl(pDrop->dnodeId);
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
if (pDrop->dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_MNODE_ID_INVALID;
return -1;
} else {
......
......@@ -143,26 +143,26 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = dndProcessVnodeFetchMsg;
}
static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
SDnode *pDnode = parent;
STransMgmt *pMgmt = &pDnode->tmgmt;
tmsg_t msgType = pMsg->msgType;
tmsg_t msgType = pRsp->msgType;
if (dndGetStat(pDnode) == DND_STAT_STOPPED) {
if (pMsg == NULL || pMsg->pCont == NULL) return;
dTrace("RPC %p, rsp:%s is ignored since dnode is stopping", pMsg->handle, TMSG_INFO(msgType));
rpcFreeCont(pMsg->pCont);
if (pRsp == NULL || pRsp->pCont == NULL) return;
dTrace("RPC %p, rsp:%s is ignored since dnode is stopping", pRsp->handle, TMSG_INFO(msgType));
rpcFreeCont(pRsp->pCont);
return;
}
DndMsgFp fp = pMgmt->msgFp[TMSG_INDEX(msgType)];
if (fp != NULL) {
dTrace("RPC %p, rsp:%s will be processed, code:0x%x", pMsg->handle, TMSG_INFO(msgType), pMsg->code & 0XFFFF);
(*fp)(pDnode, pMsg, pEpSet);
dTrace("RPC %p, rsp:%s will be processed, code:0x%x", pRsp->handle, TMSG_INFO(msgType), pRsp->code & 0XFFFF);
(*fp)(pDnode, pRsp, pEpSet);
} else {
dError("RPC %p, rsp:%s not processed", pMsg->handle, TMSG_INFO(msgType));
rpcFreeCont(pMsg->pCont);
dError("RPC %p, rsp:%s not processed", pRsp->handle, TMSG_INFO(msgType));
rpcFreeCont(pRsp->pCont);
}
}
......@@ -201,48 +201,48 @@ static void dndCleanupClient(SDnode *pDnode) {
}
}
static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) {
static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
SDnode *pDnode = param;
STransMgmt *pMgmt = &pDnode->tmgmt;
tmsg_t msgType = pMsg->msgType;
tmsg_t msgType = pReq->msgType;
if (msgType == TDMT_DND_NETWORK_TEST) {
dTrace("RPC %p, network test req, app:%p will be processed, code:0x%x", pMsg->handle, pMsg->ahandle, pMsg->code);
dndProcessStartupReq(pDnode, pMsg);
dTrace("RPC %p, network test req, app:%p will be processed, code:0x%x", pReq->handle, pReq->ahandle, pReq->code);
dndProcessStartupReq(pDnode, pReq);
return;
}
if (dndGetStat(pDnode) == DND_STAT_STOPPED) {
dError("RPC %p, req:%s app:%p is ignored since dnode exiting", pMsg->handle, TMSG_INFO(msgType), pMsg->ahandle);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_OFFLINE};
dError("RPC %p, req:%s app:%p is ignored since dnode exiting", pReq->handle, TMSG_INFO(msgType), pReq->ahandle);
SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_DND_OFFLINE};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
rpcFreeCont(pReq->pCont);
return;
} else if (dndGetStat(pDnode) != DND_STAT_RUNNING) {
dError("RPC %p, req:%s app:%p is ignored since dnode not running", pMsg->handle, TMSG_INFO(msgType), pMsg->ahandle);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_APP_NOT_READY};
dError("RPC %p, req:%s app:%p is ignored since dnode not running", pReq->handle, TMSG_INFO(msgType), pReq->ahandle);
SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_APP_NOT_READY};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
rpcFreeCont(pReq->pCont);
return;
}
if (pMsg->pCont == NULL) {
dTrace("RPC %p, req:%s app:%p not processed since content is null", pMsg->handle, TMSG_INFO(msgType),
pMsg->ahandle);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_INVALID_MSG_LEN};
if (pReq->pCont == NULL) {
dTrace("RPC %p, req:%s app:%p not processed since content is null", pReq->handle, TMSG_INFO(msgType),
pReq->ahandle);
SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_DND_INVALID_MSG_LEN};
rpcSendResponse(&rspMsg);
return;
}
DndMsgFp fp = pMgmt->msgFp[TMSG_INDEX(msgType)];
if (fp != NULL) {
dTrace("RPC %p, req:%s app:%p will be processed", pMsg->handle, TMSG_INFO(msgType), pMsg->ahandle);
(*fp)(pDnode, pMsg, pEpSet);
dTrace("RPC %p, req:%s app:%p will be processed", pReq->handle, TMSG_INFO(msgType), pReq->ahandle);
(*fp)(pDnode, pReq, pEpSet);
} else {
dError("RPC %p, req:%s app:%p is not processed since no handle", pMsg->handle, TMSG_INFO(msgType), pMsg->ahandle);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED};
dError("RPC %p, req:%s app:%p is not processed since no handle", pReq->handle, TMSG_INFO(msgType), pReq->ahandle);
SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
rpcFreeCont(pReq->pCont);
}
}
......@@ -254,7 +254,7 @@ static void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pRpcMsg, SRpcMsg *pRp
rpcSendRecv(pMgmt->clientRpc, &epSet, pRpcMsg, pRpcRsp);
}
static int32_t dndAuthInternalMsg(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
static int32_t dndAuthInternalReq(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
if (strcmp(user, INTERNAL_USER) == 0) {
// A simple temporary implementation
char pass[TSDB_PASSWORD_LEN] = {0};
......@@ -281,7 +281,7 @@ static int32_t dndAuthInternalMsg(SDnode *pDnode, char *user, char *spi, char *e
static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
SDnode *pDnode = parent;
if (dndAuthInternalMsg(parent, user, spi, encrypt, secret, ckey) == 0) {
if (dndAuthInternalReq(parent, user, spi, encrypt, secret, ckey) == 0) {
// dTrace("get internal auth success");
return 0;
}
......@@ -298,10 +298,10 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
// dDebug("user:%s, send auth msg to other mnodes", user);
SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg));
tstrncpy(pMsg->user, user, TSDB_USER_LEN);
SAuthReq *pReq = rpcMallocCont(sizeof(SAuthReq));
tstrncpy(pReq->user, user, TSDB_USER_LEN);
SRpcMsg rpcMsg = {.pCont = pMsg, .contLen = sizeof(SAuthMsg), .msgType = TDMT_MND_AUTH};
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = sizeof(SAuthReq), .msgType = TDMT_MND_AUTH};
SRpcMsg rpcRsp = {0};
dndSendMsgToMnodeRecv(pDnode, &rpcMsg, &rpcRsp);
......@@ -381,19 +381,19 @@ void dndCleanupTrans(SDnode *pDnode) {
dInfo("dnode-transport is cleaned up");
}
int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pMsg) {
int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq) {
STransMgmt *pMgmt = &pDnode->tmgmt;
if (pMgmt->clientRpc == NULL) {
terrno = TSDB_CODE_DND_OFFLINE;
return -1;
}
rpcSendRequest(pMgmt->clientRpc, pEpSet, pMsg, NULL);
rpcSendRequest(pMgmt->clientRpc, pEpSet, pReq, NULL);
return 0;
}
int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pMsg) {
int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pReq) {
SEpSet epSet = {0};
dndGetMnodeEpSet(pDnode, &epSet);
return dndSendReqToDnode(pDnode, &epSet, pMsg);
return dndSendReqToDnode(pDnode, &epSet, pReq);
}
......@@ -24,3 +24,160 @@ class DndTestMnode : public ::testing::Test {
};
Testbase DndTestMnode::test;
#if 0
TEST_F(DndTestMnode, 01_Create_Mnode) {
{
int32_t contLen = sizeof(SDCreateMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ID_INVALID);
}
{
int32_t contLen = sizeof(SDCreateMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
{
int32_t contLen = sizeof(SDCreateMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED);
}
test.Restart();
{
int32_t contLen = sizeof(SDCreateMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED);
}
}
TEST_F(DndTestMnode, 02_Alter_Mnode) {
{
int32_t contLen = sizeof(SDCreateMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ID_INVALID);
}
{
int32_t contLen = sizeof(SDCreateMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
{
int32_t contLen = sizeof(SDCreateMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED);
}
test.Restart();
{
int32_t contLen = sizeof(SDCreateMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED);
}
}
TEST_F(DndTestMnode, 03_Drop_Mnode) {
{
int32_t contLen = sizeof(SDDropMnodeReq);
SDDropMnodeReq* pReq = (SDDropMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ID_INVALID);
}
{
int32_t contLen = sizeof(SDDropMnodeReq);
SDDropMnodeReq* pReq = (SDDropMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
{
int32_t contLen = sizeof(SDDropMnodeReq);
SDDropMnodeReq* pReq = (SDDropMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_NOT_DEPLOYED);
}
test.Restart();
{
int32_t contLen = sizeof(SDDropMnodeReq);
SDDropMnodeReq* pReq = (SDDropMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_NOT_DEPLOYED);
}
{
int32_t contLen = sizeof(SDCreateMnodeReq);
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
}
}
#endif
\ No newline at end of file
......@@ -25,20 +25,21 @@
char dbName[32] = "db";
char stbName[64] = "st";
int32_t numOfThreads = 1;
int32_t numOfTables = 10000;
int64_t numOfTables = 200000;
int32_t createTable = 1;
int32_t insertData = 0;
int32_t batchNum = 10;
int32_t batchNum = 100;
int32_t numOfVgroups = 2;
typedef struct {
int32_t tableBeginIndex;
int32_t tableEndIndex;
int64_t tableBeginIndex;
int64_t tableEndIndex;
int32_t threadIndex;
char dbName[32];
char stbName[64];
float createTableSpeed;
float insertDataSpeed;
int64_t startMs;
pthread_t thread;
} SThreadInfo;
......@@ -57,7 +58,7 @@ int32_t main(int32_t argc, char *argv[]) {
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
SThreadInfo *pInfo = (SThreadInfo *)calloc(numOfThreads, sizeof(SThreadInfo));
int32_t numOfTablesPerThread = numOfTables / numOfThreads;
int64_t numOfTablesPerThread = numOfTables / numOfThreads;
numOfTables = numOfTablesPerThread * numOfThreads;
for (int32_t i = 0; i < numOfThreads; ++i) {
pInfo[i].tableBeginIndex = i * numOfTablesPerThread;
......@@ -83,8 +84,10 @@ int32_t main(int32_t argc, char *argv[]) {
insertDataSpeed += pInfo[i].insertDataSpeed;
}
pPrint("%s total %.1f tables/second, threads:%d %s", GREEN, createTableSpeed, numOfThreads, NC);
pPrint("%s total %.1f rows/second, threads:%d %s", GREEN, insertDataSpeed, numOfThreads, NC);
pPrint("%s total %" PRId64 " tables, %.1f tables/second, threads:%d %s", GREEN, numOfTables, createTableSpeed,
numOfThreads, NC);
pPrint("%s total %" PRId64 " tables, %.1f rows/second, threads:%d %s", GREEN, numOfTables, insertDataSpeed,
numOfThreads, NC);
pthread_attr_destroy(&thattr);
free(pInfo);
......@@ -130,6 +133,26 @@ void createDbAndStb() {
taos_close(con);
}
void printCreateProgress(SThreadInfo *pInfo, int64_t t) {
int64_t endMs = taosGetTimestampMs();
int64_t totalTables = t - pInfo->tableBeginIndex;
float seconds = (endMs - pInfo->startMs) / 1000.0;
float speed = totalTables / seconds;
pInfo->createTableSpeed = speed;
pPrint("thread:%d, %" PRId64 " tables created, time:%.2f sec, speed:%.1f tables/second, ", pInfo->threadIndex,
totalTables, seconds, speed);
}
void printInsertProgress(SThreadInfo *pInfo, int64_t t) {
int64_t endMs = taosGetTimestampMs();
int64_t totalTables = t - pInfo->tableBeginIndex;
float seconds = (endMs - pInfo->startMs) / 1000.0;
float speed = totalTables / seconds;
pInfo->insertDataSpeed = speed;
pPrint("thread:%d, %" PRId64 " rows inserted, time:%.2f sec, speed:%.1f rows/second, ", pInfo->threadIndex,
totalTables, seconds, speed);
}
void *threadFunc(void *param) {
SThreadInfo *pInfo = (SThreadInfo *)param;
char *qstr = malloc(2000 * 1000);
......@@ -146,47 +169,55 @@ void *threadFunc(void *param) {
taos_free_result(pSql);
if (createTable) {
int64_t startMs = taosGetTimestampMs();
for (int32_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) {
int32_t batch = (pInfo->tableEndIndex - t);
pInfo->startMs = taosGetTimestampMs();
for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) {
int64_t batch = (pInfo->tableEndIndex - t);
batch = MIN(batch, batchNum);
int32_t len = sprintf(qstr, "create table");
for (int32_t i = 0; i < batch; ++i) {
len += sprintf(qstr + len, " t%d using %s tags(%d)", t + i, stbName, t + i);
len += sprintf(qstr + len, " t%" PRId64 " using %s tags(%" PRId64 ")", t + i, stbName, t + i);
}
TAOS_RES *pSql = taos_query(con, qstr);
code = taos_errno(pSql);
if (code != 0) {
pError("failed to create table t%d, reason:%s", t, tstrerror(code));
pError("failed to create table t%" PRId64 ", reason:%s", t, tstrerror(code));
}
taos_free_result(pSql);
if (t % 100000 == 0) {
printCreateProgress(pInfo, t);
}
t += (batch - 1);
}
int64_t endMs = taosGetTimestampMs();
int32_t totalTables = pInfo->tableEndIndex - pInfo->tableBeginIndex;
float seconds = (endMs - startMs) / 1000.0;
float speed = totalTables / seconds;
pInfo->createTableSpeed = speed;
pPrint("thread:%d, time:%.2f sec, speed:%.1f tables/second, ", pInfo->threadIndex, seconds, speed);
printCreateProgress(pInfo, pInfo->tableEndIndex);
}
if (insertData) {
int64_t startMs = taosGetTimestampMs();
for (int32_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) {
sprintf(qstr, "insert into %s%d values(now, 1)", stbName, t);
pInfo->startMs = taosGetTimestampMs();
for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) {
int64_t batch = (pInfo->tableEndIndex - t);
batch = MIN(batch, batchNum);
int32_t len = sprintf(qstr, "insert into");
for (int32_t i = 0; i < batch; ++i) {
len += sprintf(qstr + len, " t%" PRId64 " values(now, %" PRId64 ")", t + i, t + i);
}
TAOS_RES *pSql = taos_query(con, qstr);
code = taos_errno(pSql);
if (code != 0) {
pError("failed to create table %s%d, reason:%s", stbName, t, tstrerror(code));
pError("failed to insert table t%" PRId64 ", reason:%s", t, tstrerror(code));
}
taos_free_result(pSql);
if (t % 100000 == 0) {
printInsertProgress(pInfo, t);
}
t += (batch - 1);
}
int64_t endMs = taosGetTimestampMs();
int32_t totalTables = pInfo->tableEndIndex - pInfo->tableBeginIndex;
float seconds = (endMs - startMs) / 1000.0;
float speed = totalTables / seconds;
pInfo->insertDataSpeed = speed;
pPrint("thread:%d, time:%.2f sec, speed:%.1f rows/second, ", pInfo->threadIndex, seconds, speed);
printInsertProgress(pInfo, pInfo->tableEndIndex);
}
taos_close(con);
......@@ -207,7 +238,7 @@ void printHelp() {
printf("%s%s\n", indent, "-t");
printf("%s%s%s%d\n", indent, indent, "numOfThreads, default is ", numOfThreads);
printf("%s%s\n", indent, "-n");
printf("%s%s%s%d\n", indent, indent, "numOfTables, default is ", numOfTables);
printf("%s%s%s%" PRId64 "\n", indent, indent, "numOfTables, default is ", numOfTables);
printf("%s%s\n", indent, "-v");
printf("%s%s%s%d\n", indent, indent, "numOfVgroups, default is ", numOfVgroups);
printf("%s%s\n", indent, "-a");
......@@ -234,7 +265,7 @@ void parseArgument(int32_t argc, char *argv[]) {
} else if (strcmp(argv[i], "-t") == 0) {
numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-n") == 0) {
numOfTables = atoi(argv[++i]);
numOfTables = atoll(argv[++i]);
} else if (strcmp(argv[i], "-n") == 0) {
numOfVgroups = atoi(argv[++i]);
} else if (strcmp(argv[i], "-a") == 0) {
......@@ -250,7 +281,7 @@ void parseArgument(int32_t argc, char *argv[]) {
pPrint("%s dbName:%s %s", GREEN, dbName, NC);
pPrint("%s stbName:%s %s", GREEN, stbName, NC);
pPrint("%s configDir:%s %s", GREEN, configDir, NC);
pPrint("%s numOfTables:%d %s", GREEN, numOfTables, NC);
pPrint("%s numOfTables:%" PRId64 " %s", GREEN, numOfTables, NC);
pPrint("%s numOfThreads:%d %s", GREEN, numOfThreads, NC);
pPrint("%s numOfVgroups:%d %s", GREEN, numOfVgroups, NC);
pPrint("%s createTable:%d %s", GREEN, createTable, NC);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册