提交 0a281a4f 编写于 作者: D dapan1121

Merge remote-tracking branch 'origin/3.0' into feature/qnode

...@@ -31,8 +31,12 @@ extern "C" { ...@@ -31,8 +31,12 @@ extern "C" {
/* ------------------------ TYPES EXPOSED ------------------------ */ /* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SVnode SVnode; typedef struct SVnode SVnode;
typedef struct SDnode SDnode;
typedef int32_t (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq);
typedef struct SVnodeCfg { typedef struct SVnodeCfg {
int32_t vgId; int32_t vgId;
SDnode *pDnode;
/** vnode buffer pool options */ /** vnode buffer pool options */
struct { struct {
...@@ -66,15 +70,23 @@ typedef struct SVnodeCfg { ...@@ -66,15 +70,23 @@ typedef struct SVnodeCfg {
SWalCfg walCfg; SWalCfg walCfg;
} SVnodeCfg; } SVnodeCfg;
typedef struct {
int32_t sver;
char *timezone;
char *locale;
char *charset;
uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO)
PutReqToVQueryQFp putReqToVQueryQFp;
} SVnodeOpt;
/* ------------------------ SVnode ------------------------ */ /* ------------------------ SVnode ------------------------ */
/** /**
* @brief Initialize the vnode module * @brief Initialize the vnode module
* *
* @param nthreads number of commit threads. 0 for no threads and * @param pOption Option of the vnode mnodule
* a schedule queue should be given (TODO)
* @return int 0 for success and -1 for failure * @return int 0 for success and -1 for failure
*/ */
int vnodeInit(uint16_t nthreads); int vnodeInit(const SVnodeOpt *pOption);
/** /**
* @brief clear a vnode * @brief clear a vnode
...@@ -89,7 +101,7 @@ void vnodeClear(); ...@@ -89,7 +101,7 @@ void vnodeClear();
* @param pVnodeCfg options of the vnode * @param pVnodeCfg options of the vnode
* @return SVnode* The vnode object * @return SVnode* The vnode object
*/ */
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid); SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg);
/** /**
* @brief Close a VNODE * @brief Close a VNODE
......
...@@ -94,6 +94,7 @@ typedef struct { ...@@ -94,6 +94,7 @@ typedef struct {
pthread_t *threadId; pthread_t *threadId;
SRWLatch latch; SRWLatch latch;
SDnodeWorker mgmtWorker; SDnodeWorker mgmtWorker;
SDnodeWorker statusWorker;
} SDnodeMgmt; } SDnodeMgmt;
typedef struct { typedef struct {
......
...@@ -36,6 +36,8 @@ int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq); ...@@ -36,6 +36,8 @@ int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq); int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq); int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t dndPutReqToVQueryQ(SDnode *pDnode, SRpcMsg *pReq);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -536,6 +536,11 @@ int32_t dndInitMgmt(SDnode *pDnode) { ...@@ -536,6 +536,11 @@ int32_t dndInitMgmt(SDnode *pDnode) {
return -1; return -1;
} }
if (dndInitWorker(pDnode, &pMgmt->statusWorker, DND_WORKER_SINGLE, "dnode-status", 1, 1, dndProcessMgmtQueue) != 0) {
dError("failed to start dnode mgmt worker since %s", terrstr());
return -1;
}
pMgmt->threadId = taosCreateThread(dnodeThreadRoutine, pDnode); pMgmt->threadId = taosCreateThread(dnodeThreadRoutine, pDnode);
if (pMgmt->threadId == NULL) { if (pMgmt->threadId == NULL) {
dError("failed to init dnode thread"); dError("failed to init dnode thread");
...@@ -550,6 +555,7 @@ int32_t dndInitMgmt(SDnode *pDnode) { ...@@ -550,6 +555,7 @@ int32_t dndInitMgmt(SDnode *pDnode) {
void dndStopMgmt(SDnode *pDnode) { void dndStopMgmt(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
dndCleanupWorker(&pMgmt->mgmtWorker); dndCleanupWorker(&pMgmt->mgmtWorker);
dndCleanupWorker(&pMgmt->statusWorker);
if (pMgmt->threadId != NULL) { if (pMgmt->threadId != NULL) {
taosDestoryThread(pMgmt->threadId); taosDestoryThread(pMgmt->threadId);
...@@ -587,7 +593,12 @@ void dndProcessMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -587,7 +593,12 @@ void dndProcessMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
dndUpdateMnodeEpSet(pDnode, pEpSet); dndUpdateMnodeEpSet(pDnode, pEpSet);
} }
if (dndWriteMsgToWorker(&pMgmt->mgmtWorker, pMsg, sizeof(SRpcMsg)) != 0) { SDnodeWorker *pWorker = &pMgmt->mgmtWorker;
if (pMsg->msgType == TDMT_MND_STATUS_RSP) {
pWorker = &pMgmt->statusWorker;
}
if (dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg)) != 0) {
if (pMsg->msgType & 1u) { if (pMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY}; SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY};
rpcSendResponse(&rsp); rpcSendResponse(&rsp);
......
...@@ -381,7 +381,8 @@ static void *dnodeOpenVnodeFunc(void *param) { ...@@ -381,7 +381,8 @@ static void *dnodeOpenVnodeFunc(void *param) {
pMgmt->openVnodes, pMgmt->totalVnodes); pMgmt->openVnodes, pMgmt->totalVnodes);
dndReportStartup(pDnode, "open-vnodes", stepDesc); dndReportStartup(pDnode, "open-vnodes", stepDesc);
SVnode *pImpl = vnodeOpen(pCfg->path, NULL, pCfg->vgId); SVnodeCfg cfg = {.pDnode = pDnode, .vgId = pCfg->vgId};
SVnode *pImpl = vnodeOpen(pCfg->path, &cfg);
if (pImpl == NULL) { if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
pThread->failed++; pThread->failed++;
...@@ -581,7 +582,8 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { ...@@ -581,7 +582,8 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
return -1; return -1;
} }
SVnode *pImpl = vnodeOpen(wrapperCfg.path, NULL /*pCfg*/, pCreate->vgId); vnodeCfg.pDnode = pDnode;
SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg);
if (pImpl == NULL) { if (pImpl == NULL) {
dError("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr()); dError("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr());
return -1; return -1;
...@@ -800,7 +802,7 @@ static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t ...@@ -800,7 +802,7 @@ static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t
} }
} }
static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg) { static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg, bool sendRsp) {
int32_t code = 0; int32_t code = 0;
if (pQueue == NULL) { if (pQueue == NULL) {
...@@ -817,13 +819,15 @@ static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg) ...@@ -817,13 +819,15 @@ static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg)
} }
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS && sendRsp) {
if (pRpcMsg->msgType & 1u) { if (pRpcMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code}; SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code};
rpcSendResponse(&rsp); rpcSendResponse(&rsp);
} }
rpcFreeCont(pRpcMsg->pCont); rpcFreeCont(pRpcMsg->pCont);
} }
return code;
} }
static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) { static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) {
...@@ -846,7 +850,7 @@ static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) { ...@@ -846,7 +850,7 @@ static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) {
void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
if (pVnode != NULL) { if (pVnode != NULL) {
dndWriteRpcMsgToVnodeQueue(pVnode->pWriteQ, pMsg); (void)dndWriteRpcMsgToVnodeQueue(pVnode->pWriteQ, pMsg, true);
dndReleaseVnode(pDnode, pVnode); dndReleaseVnode(pDnode, pVnode);
} }
} }
...@@ -854,7 +858,7 @@ void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -854,7 +858,7 @@ void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
if (pVnode != NULL) { if (pVnode != NULL) {
dndWriteRpcMsgToVnodeQueue(pVnode->pSyncQ, pMsg); (void)dndWriteRpcMsgToVnodeQueue(pVnode->pSyncQ, pMsg, true);
dndReleaseVnode(pDnode, pVnode); dndReleaseVnode(pDnode, pVnode);
} }
} }
...@@ -862,7 +866,7 @@ void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -862,7 +866,7 @@ void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
if (pVnode != NULL) { if (pVnode != NULL) {
dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg); (void)dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg, true);
dndReleaseVnode(pDnode, pVnode); dndReleaseVnode(pDnode, pVnode);
} }
} }
...@@ -870,11 +874,23 @@ void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -870,11 +874,23 @@ void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg); SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
if (pVnode != NULL) { if (pVnode != NULL) {
dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg); (void)dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg, true);
dndReleaseVnode(pDnode, pVnode); dndReleaseVnode(pDnode, pVnode);
} }
} }
int32_t dndPutReqToVQueryQ(SDnode *pDnode, SRpcMsg *pMsg) {
SMsgHead *pHead = pMsg->pCont;
// pHead->vgId = htonl(pHead->vgId);
SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId);
if (pVnode == NULL) return -1;
int32_t code = dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg, false);
dndReleaseVnode(pDnode, pVnode);
return code;
}
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg) { static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg) {
SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
if (pVnode == NULL) return -1; if (pVnode == NULL) return -1;
......
...@@ -22,8 +22,8 @@ ...@@ -22,8 +22,8 @@
#include "dndTransport.h" #include "dndTransport.h"
#include "dndVnodes.h" #include "dndVnodes.h"
#include "sync.h" #include "sync.h"
#include "wal.h"
#include "tfs.h" #include "tfs.h"
#include "wal.h"
EStat dndGetStat(SDnode *pDnode) { return pDnode->stat; } EStat dndGetStat(SDnode *pDnode) { return pDnode->stat; }
...@@ -196,7 +196,15 @@ SDnode *dndInit(SDnodeOpt *pOption) { ...@@ -196,7 +196,15 @@ SDnode *dndInit(SDnodeOpt *pOption) {
return NULL; return NULL;
} }
if (vnodeInit(pDnode->opt.numOfCommitThreads) != 0) { SVnodeOpt vnodeOpt = {
.sver = pDnode->opt.sver,
.timezone = pDnode->opt.timezone,
.locale = pDnode->opt.locale,
.charset = pDnode->opt.charset,
.nthreads = pDnode->opt.numOfCommitThreads,
.putReqToVQueryQFp = dndPutReqToVQueryQ,
};
if (vnodeInit(&vnodeOpt) != 0) {
dError("failed to init vnode env"); dError("failed to init vnode env");
dndCleanup(pDnode); dndCleanup(pDnode);
return NULL; return NULL;
......
...@@ -740,7 +740,7 @@ static int32_t mndBuildDropVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj * ...@@ -740,7 +740,7 @@ static int32_t mndBuildDropVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *
if (pReq == NULL) return -1; if (pReq == NULL) return -1;
action.pCont = pReq; action.pCont = pReq;
action.contLen = sizeof(SCreateVnodeReq); action.contLen = sizeof(SDropVnodeReq);
action.msgType = TDMT_DND_DROP_VNODE; action.msgType = TDMT_DND_DROP_VNODE;
action.acceptableCode = TSDB_CODE_DND_VNODE_NOT_DEPLOYED; action.acceptableCode = TSDB_CODE_DND_VNODE_NOT_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
......
...@@ -319,6 +319,14 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup, SArray *pAr ...@@ -319,6 +319,14 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup, SArray *pAr
taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes); taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
int32_t size = taosArrayGetSize(pArray);
if (size < pVgroup->replica) {
mError("db:%s, vgId:%d, no enough online dnodes:%d to alloc %d replica", pVgroup->dbName, pVgroup->vgId, size,
pVgroup->replica);
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
return -1;
}
for (int32_t v = 0; v < pVgroup->replica; ++v) { for (int32_t v = 0; v < pVgroup->replica; ++v) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[v]; SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
SDnodeObj *pDnode = taosArrayGet(pArray, v); SDnodeObj *pDnode = taosArrayGet(pArray, v);
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
class MndTestProfile : public ::testing::Test { class MndTestProfile : public ::testing::Test {
protected: protected:
static void SetUpTestSuite() { test.Init("/tmp/mnode_test_profile", 9022); } static void SetUpTestSuite() { test.Init("/tmp/mnode_test_profile", 9031); }
static void TearDownTestSuite() { test.Cleanup(); } static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test; static Testbase test;
...@@ -53,7 +53,7 @@ TEST_F(MndTestProfile, 01_ConnectMsg) { ...@@ -53,7 +53,7 @@ TEST_F(MndTestProfile, 01_ConnectMsg) {
EXPECT_EQ(pRsp->epSet.inUse, 0); EXPECT_EQ(pRsp->epSet.inUse, 0);
EXPECT_EQ(pRsp->epSet.numOfEps, 1); EXPECT_EQ(pRsp->epSet.numOfEps, 1);
EXPECT_EQ(pRsp->epSet.port[0], 9022); EXPECT_EQ(pRsp->epSet.port[0], 9031);
EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost"); EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost");
connId = pRsp->connId; connId = pRsp->connId;
...@@ -127,7 +127,7 @@ TEST_F(MndTestProfile, 04_HeartBeatMsg) { ...@@ -127,7 +127,7 @@ TEST_F(MndTestProfile, 04_HeartBeatMsg) {
EXPECT_EQ(pRsp->epSet.inUse, 0); EXPECT_EQ(pRsp->epSet.inUse, 0);
EXPECT_EQ(pRsp->epSet.numOfEps, 1); EXPECT_EQ(pRsp->epSet.numOfEps, 1);
EXPECT_EQ(pRsp->epSet.port[0], 9022); EXPECT_EQ(pRsp->epSet.port[0], 9031);
EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost"); EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost");
} }
...@@ -185,7 +185,7 @@ TEST_F(MndTestProfile, 05_KillConnMsg) { ...@@ -185,7 +185,7 @@ TEST_F(MndTestProfile, 05_KillConnMsg) {
EXPECT_EQ(pRsp->epSet.inUse, 0); EXPECT_EQ(pRsp->epSet.inUse, 0);
EXPECT_EQ(pRsp->epSet.numOfEps, 1); EXPECT_EQ(pRsp->epSet.numOfEps, 1);
EXPECT_EQ(pRsp->epSet.port[0], 9022); EXPECT_EQ(pRsp->epSet.port[0], 9031);
EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost"); EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost");
connId = pRsp->connId; connId = pRsp->connId;
...@@ -249,7 +249,7 @@ TEST_F(MndTestProfile, 07_KillQueryMsg) { ...@@ -249,7 +249,7 @@ TEST_F(MndTestProfile, 07_KillQueryMsg) {
EXPECT_EQ(pRsp->epSet.inUse, 0); EXPECT_EQ(pRsp->epSet.inUse, 0);
EXPECT_EQ(pRsp->epSet.numOfEps, 1); EXPECT_EQ(pRsp->epSet.numOfEps, 1);
EXPECT_EQ(pRsp->epSet.port[0], 9022); EXPECT_EQ(pRsp->epSet.port[0], 9031);
EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost"); EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost");
} }
} }
......
...@@ -57,6 +57,8 @@ typedef struct SVnodeMgr { ...@@ -57,6 +57,8 @@ typedef struct SVnodeMgr {
pthread_cond_t hasTask; pthread_cond_t hasTask;
TD_DLIST(SVnodeTask) queue; TD_DLIST(SVnodeTask) queue;
// For vnode Mgmt // For vnode Mgmt
SDnode* pDnode;
PutReqToVQueryQFp putReqToVQueryQFp;
} SVnodeMgr; } SVnodeMgr;
extern SVnodeMgr vnodeMgr; extern SVnodeMgr vnodeMgr;
...@@ -75,10 +77,13 @@ struct SVnode { ...@@ -75,10 +77,13 @@ struct SVnode {
SVnodeFS* pFs; SVnodeFS* pFs;
tsem_t canCommit; tsem_t canCommit;
SQHandle* pQuery; SQHandle* pQuery;
SDnode* pDnode;
}; };
int vnodeScheduleTask(SVnodeTask* task); int vnodeScheduleTask(SVnodeTask* task);
int32_t vnodePutReqToVQueryQ(SVnode *pVnode, struct SRpcMsg *pReq);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -15,27 +15,29 @@ ...@@ -15,27 +15,29 @@
#include "vnodeDef.h" #include "vnodeDef.h"
static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid); static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg);
static void vnodeFree(SVnode *pVnode); static void vnodeFree(SVnode *pVnode);
static int vnodeOpenImpl(SVnode *pVnode); static int vnodeOpenImpl(SVnode *pVnode);
static void vnodeCloseImpl(SVnode *pVnode); static void vnodeCloseImpl(SVnode *pVnode);
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid) { SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) {
SVnode *pVnode = NULL; SVnode *pVnode = NULL;
// Set default options // Set default options
//if (pVnodeCfg == NULL) { SVnodeCfg cfg = defaultVnodeOptions;
pVnodeCfg = &defaultVnodeOptions; if (pVnodeCfg != NULL) {
//} cfg.vgId = pVnodeCfg->vgId;
cfg.pDnode = pVnodeCfg->pDnode;
}
// Validate options // Validate options
if (vnodeValidateOptions(pVnodeCfg) < 0) { if (vnodeValidateOptions(&cfg) < 0) {
// TODO // TODO
return NULL; return NULL;
} }
// Create the handle // Create the handle
pVnode = vnodeNew(path, pVnodeCfg, vid); pVnode = vnodeNew(path, &cfg);
if (pVnode == NULL) { if (pVnode == NULL) {
// TODO: handle error // TODO: handle error
return NULL; return NULL;
...@@ -62,7 +64,7 @@ void vnodeClose(SVnode *pVnode) { ...@@ -62,7 +64,7 @@ void vnodeClose(SVnode *pVnode) {
void vnodeDestroy(const char *path) { taosRemoveDir(path); } void vnodeDestroy(const char *path) { taosRemoveDir(path); }
/* ------------------------ STATIC METHODS ------------------------ */ /* ------------------------ STATIC METHODS ------------------------ */
static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid) { static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) {
SVnode *pVnode = NULL; SVnode *pVnode = NULL;
pVnode = (SVnode *)calloc(1, sizeof(*pVnode)); pVnode = (SVnode *)calloc(1, sizeof(*pVnode));
...@@ -71,7 +73,8 @@ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vi ...@@ -71,7 +73,8 @@ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vi
return NULL; return NULL;
} }
pVnode->vgId = vid; pVnode->vgId = pVnodeCfg->vgId;
pVnode->pDnode = pVnodeCfg->pDnode;
pVnode->path = strdup(path); pVnode->path = strdup(path);
vnodeOptionsCopy(&(pVnode->config), pVnodeCfg); vnodeOptionsCopy(&(pVnode->config), pVnodeCfg);
......
...@@ -19,17 +19,18 @@ SVnodeMgr vnodeMgr = {.vnodeInitFlag = TD_MOD_UNINITIALIZED}; ...@@ -19,17 +19,18 @@ SVnodeMgr vnodeMgr = {.vnodeInitFlag = TD_MOD_UNINITIALIZED};
static void* loop(void* arg); static void* loop(void* arg);
int vnodeInit(uint16_t nthreads) { int vnodeInit(const SVnodeOpt *pOption) {
if (TD_CHECK_AND_SET_MODE_INIT(&(vnodeMgr.vnodeInitFlag)) == TD_MOD_INITIALIZED) { if (TD_CHECK_AND_SET_MODE_INIT(&(vnodeMgr.vnodeInitFlag)) == TD_MOD_INITIALIZED) {
return 0; return 0;
} }
vnodeMgr.stop = false; vnodeMgr.stop = false;
vnodeMgr.putReqToVQueryQFp = pOption->putReqToVQueryQFp;
// Start commit handers // Start commit handers
if (nthreads > 0) { if (pOption->nthreads > 0) {
vnodeMgr.nthreads = nthreads; vnodeMgr.nthreads = pOption->nthreads;
vnodeMgr.threads = (pthread_t*)calloc(nthreads, sizeof(pthread_t)); vnodeMgr.threads = (pthread_t*)calloc(pOption->nthreads, sizeof(pthread_t));
if (vnodeMgr.threads == NULL) { if (vnodeMgr.threads == NULL) {
return -1; return -1;
} }
...@@ -38,7 +39,7 @@ int vnodeInit(uint16_t nthreads) { ...@@ -38,7 +39,7 @@ int vnodeInit(uint16_t nthreads) {
pthread_cond_init(&(vnodeMgr.hasTask), NULL); pthread_cond_init(&(vnodeMgr.hasTask), NULL);
TD_DLIST_INIT(&(vnodeMgr.queue)); TD_DLIST_INIT(&(vnodeMgr.queue));
for (uint16_t i = 0; i < nthreads; i++) { for (uint16_t i = 0; i < pOption->nthreads; i++) {
pthread_create(&(vnodeMgr.threads[i]), NULL, loop, NULL); pthread_create(&(vnodeMgr.threads[i]), NULL, loop, NULL);
pthread_setname_np(vnodeMgr.threads[i], "VND Commit Thread"); pthread_setname_np(vnodeMgr.threads[i], "VND Commit Thread");
} }
...@@ -89,6 +90,14 @@ int vnodeScheduleTask(SVnodeTask* pTask) { ...@@ -89,6 +90,14 @@ int vnodeScheduleTask(SVnodeTask* pTask) {
return 0; return 0;
} }
int32_t vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) {
if (pVnode == NULL || pVnode->pDnode == NULL || vnodeMgr.putReqToVQueryQFp == NULL) {
terrno = TSDB_CODE_VND_APP_ERROR;
return -1;
}
return (*vnodeMgr.putReqToVQueryQFp)(pVnode->pDnode, pReq);
}
/* ------------------------ STATIC METHODS ------------------------ */ /* ------------------------ STATIC METHODS ------------------------ */
static void* loop(void* arg) { static void* loop(void* arg) {
SVnodeTask* pTask; SVnodeTask* pTask;
......
...@@ -5,8 +5,9 @@ ...@@ -5,8 +5,9 @@
./test.sh -f sim/user/basic1.sim ./test.sh -f sim/user/basic1.sim
# ---- db # ---- db
./test.sh -f sim/db/basic1.sim ./test.sh -f sim/db/basic1.sim
./test.sh -f sim/db/error1.sim ./test.sh -f sim/db/basic6.sim
./test.sh -f sim/db/error1.sim
# ---- table # ---- table
./test.sh -f sim/table/basic1.sim ./test.sh -f sim/table/basic1.sim
......
...@@ -30,7 +30,7 @@ do ...@@ -30,7 +30,7 @@ do
CLEAR_OPTION="clear" CLEAR_OPTION="clear"
;; ;;
v) v)
SHELL_OPTION="true" VALGRIND_OPTION="true"
;; ;;
u) u)
USERS=$OPTARG USERS=$OPTARG
...@@ -99,7 +99,7 @@ fi ...@@ -99,7 +99,7 @@ fi
if [ "$EXEC_OPTON" = "start" ]; then if [ "$EXEC_OPTON" = "start" ]; then
echo "ExcuteCmd:" $EXE_DIR/taosd -c $CFG_DIR echo "ExcuteCmd:" $EXE_DIR/taosd -c $CFG_DIR
if [ "$SHELL_OPTION" = "true" ]; then if [ "$VALGRIND_OPTION" = "true" ]; then
TT=`date +%s` TT=`date +%s`
mkdir ${LOG_DIR}/${TT} mkdir ${LOG_DIR}/${TT}
nohup valgrind --log-file=${LOG_DIR}/${TT}/valgrind.log --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes $EXE_DIR/taosd -c $CFG_DIR > /dev/null 2>&1 & nohup valgrind --log-file=${LOG_DIR}/${TT}/valgrind.log --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes $EXE_DIR/taosd -c $CFG_DIR > /dev/null 2>&1 &
......
system sh/stop_dnodes.sh system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1 system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c wallevel -v 0
system sh/cfg.sh -n dnode1 -c maxVgroupsPerDb -v 4
system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 1000
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
sleep 2000
sql connect sql connect
print ============================ dnode1 start print ============================ dnode1 start
$i = 0 $i = 0
$dbPrefix = ob_db_db $dbPrefix = db
$tbPrefix = ob_db_tb $tbPrefix = tb
$db = $dbPrefix . $i $db = $dbPrefix . $i
$tb = $tbPrefix . $i $tb = $tbPrefix . $i
print =============== step1 print =============== step1
sql create database $db replica 1 days 20 keep 2000 cache 16 sql create database $db replica 1 days 20 keep 2000 cache 16 vgroups 4
sql show databases sql show databases
print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07
if $data00 != $db then if $data00 != $db then
return -1 return -1
endi endi
if $data02 != 0 then if $data02 != 4 then
return -1 return -1
endi endi
if $data03 != 0 then if $data03 != 0 then
...@@ -63,9 +58,6 @@ print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 ...@@ -63,9 +58,6 @@ print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07
if $data00 != $db then if $data00 != $db then
return -1 return -1
endi endi
if $data02 != 0 then
return -1
endi
if $data03 != 0 then if $data03 != 0 then
return -1 return -1
endi endi
...@@ -77,30 +69,14 @@ if $data06 != 15 then ...@@ -77,30 +69,14 @@ if $data06 != 15 then
endi endi
print =============== step6 print =============== step6
sql use $db $i = $i + 1
sql create table $tb (ts timestamp, speed int) while $i < 5
$i = 1
while $i < 4
$db = $dbPrefix . $i $db = $dbPrefix . $i
$tb = $tbPrefix . $i
sql create database $db sql create database $db
sql use $db sql use $db
sql create table $tb (ts timestamp, speed int)
$i = $i + 1 $i = $i + 1
endw endw
sql show databases
if $rows != 4 then
return -1
endi
$i = 4
$db = $dbPrefix . $i
$tb = $tbPrefix . $i
sql create database $db
sql use $db
sql create table $tb (ts timestamp, speed int)
print =============== step7 print =============== step7
$i = 0 $i = 0
while $i < 5 while $i < 5
...@@ -115,7 +91,12 @@ $db = $dbPrefix . $i ...@@ -115,7 +91,12 @@ $db = $dbPrefix . $i
$tb = $tbPrefix . $i $tb = $tbPrefix . $i
sql create database $db sql create database $db
sql use $db sql use $db
sql create table $tb (ts timestamp, speed int) sql create table st (ts timestamp, i int) tags (j int)
sql create table $tb using st tags(1)
return
system sh/exec.sh -n dnode1 -s stop -x SIGINT
sql show tables sql show tables
if $rows != 1 then if $rows != 1 then
return -1 return -1
...@@ -133,7 +114,8 @@ if $rows != 0 then ...@@ -133,7 +114,8 @@ if $rows != 0 then
endi endi
print =============== step11 print =============== step11
sql create table $tb (ts timestamp, speed int) sql create table st (ts timestamp, i int) tags (j int)
sql create table $tb using st tags(1)
sql show tables sql show tables
if $rows != 1 then if $rows != 1 then
return -1 return -1
...@@ -149,16 +131,23 @@ sql show tables ...@@ -149,16 +131,23 @@ sql show tables
if $rows != 0 then if $rows != 0 then
return -1 return -1
endi endi
sql create table $tb (ts timestamp, speed int)
sql create table st (ts timestamp, i int) tags (j int)
sql create table $tb using st tags(1)
sql show tables sql show tables
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
sql insert into $tb values (now+1a, 0) sql insert into $tb values (now+1a, 0)
sql insert into $tb values (now+2a, 1) sql insert into $tb values (now+2a, 1)
sql insert into $tb values (now+3a, 2) sql insert into $tb values (now+3a, 2)
sql insert into $tb values (now+4a, 3) sql insert into $tb values (now+4a, 3)
sql insert into $tb values (now+5a, 4) sql insert into $tb values (now+5a, 4)
return
sql select * from $tb sql select * from $tb
if $rows != 5 then if $rows != 5 then
return -1 return -1
...@@ -176,7 +165,8 @@ if $rows != 0 then ...@@ -176,7 +165,8 @@ if $rows != 0 then
endi endi
print =============== step16 print =============== step16
sql create table $tb (ts timestamp, speed int) sql create table st (ts timestamp, i int) tags (j int)
sql create table $tb using st tags(1)
sql show tables sql show tables
if $rows != 1 then if $rows != 1 then
return -1 return -1
......
...@@ -40,6 +40,8 @@ typedef struct { ...@@ -40,6 +40,8 @@ typedef struct {
float createTableSpeed; float createTableSpeed;
float insertDataSpeed; float insertDataSpeed;
int64_t startMs; int64_t startMs;
int64_t maxDelay;
int64_t minDelay;
pthread_t thread; pthread_t thread;
} SThreadInfo; } SThreadInfo;
...@@ -58,12 +60,30 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -58,12 +60,30 @@ int32_t main(int32_t argc, char *argv[]) {
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
SThreadInfo *pInfo = (SThreadInfo *)calloc(numOfThreads, sizeof(SThreadInfo)); SThreadInfo *pInfo = (SThreadInfo *)calloc(numOfThreads, sizeof(SThreadInfo));
int64_t numOfTablesPerThread = numOfTables / numOfThreads; //int64_t numOfTablesPerThread = numOfTables / numOfThreads;
numOfTables = numOfTablesPerThread * numOfThreads; //numOfTables = numOfTablesPerThread * numOfThreads;
if (numOfThreads < 1) {
numOfThreads = 1;
}
int64_t a = numOfTables / numOfThreads;
if (a < 1) {
numOfThreads = numOfTables;
a = 1;
}
int64_t b = 0;
b = numOfTables % numOfThreads;
int64_t tableFrom = 0;
for (int32_t i = 0; i < numOfThreads; ++i) { for (int32_t i = 0; i < numOfThreads; ++i) {
pInfo[i].tableBeginIndex = i * numOfTablesPerThread; pInfo[i].tableBeginIndex = tableFrom;
pInfo[i].tableEndIndex = (i + 1) * numOfTablesPerThread; pInfo[i].tableEndIndex = i < b ? tableFrom + a : tableFrom + a - 1;
tableFrom = pInfo[i].tableEndIndex + 1;
pInfo[i].threadIndex = i; pInfo[i].threadIndex = i;
pInfo[i].minDelay = INT64_MAX;
strcpy(pInfo[i].dbName, dbName); strcpy(pInfo[i].dbName, dbName);
strcpy(pInfo[i].stbName, stbName); strcpy(pInfo[i].stbName, stbName);
pthread_create(&(pInfo[i].thread), &thattr, threadFunc, (void *)(pInfo + i)); pthread_create(&(pInfo[i].thread), &thattr, threadFunc, (void *)(pInfo + i));
...@@ -74,9 +94,15 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -74,9 +94,15 @@ int32_t main(int32_t argc, char *argv[]) {
pthread_join(pInfo[i].thread, NULL); pthread_join(pInfo[i].thread, NULL);
} }
int64_t maxDelay = 0;
int64_t minDelay = INT64_MAX;
float createTableSpeed = 0; float createTableSpeed = 0;
for (int32_t i = 0; i < numOfThreads; ++i) { for (int32_t i = 0; i < numOfThreads; ++i) {
createTableSpeed += pInfo[i].createTableSpeed; createTableSpeed += pInfo[i].createTableSpeed;
if (pInfo[i].maxDelay > maxDelay) maxDelay = pInfo[i].maxDelay;
if (pInfo[i].minDelay < minDelay) minDelay = pInfo[i].minDelay;
} }
float insertDataSpeed = 0; float insertDataSpeed = 0;
...@@ -84,10 +110,19 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -84,10 +110,19 @@ int32_t main(int32_t argc, char *argv[]) {
insertDataSpeed += pInfo[i].insertDataSpeed; insertDataSpeed += pInfo[i].insertDataSpeed;
} }
pPrint("%s total %" PRId64 " tables, %.1f tables/second, threads:%d %s", GREEN, numOfTables, createTableSpeed, pPrint("%s total %" PRId64 " tables, %.1f tables/second, threads:%d, maxDelay: %" PRId64 "us, minDelay: %" PRId64 "us %s",
numOfThreads, NC); GREEN,
pPrint("%s total %" PRId64 " tables, %.1f rows/second, threads:%d %s", GREEN, numOfTables, insertDataSpeed, numOfTables,
createTableSpeed,
numOfThreads,
maxDelay,
minDelay,
NC);
if (insertData) {
pPrint("%s total %" PRId64 " tables, %.1f rows/second, threads:%d %s", GREEN, numOfTables, insertDataSpeed,
numOfThreads, NC); numOfThreads, NC);
}
pthread_attr_destroy(&thattr); pthread_attr_destroy(&thattr);
free(pInfo); free(pInfo);
...@@ -99,36 +134,36 @@ void createDbAndStb() { ...@@ -99,36 +134,36 @@ void createDbAndStb() {
TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0); TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0);
if (con == NULL) { if (con == NULL) {
pError("failed to connect to DB, reason:%s", taos_errstr(con)); pError("failed to connect to DB, reason:%s", taos_errstr(NULL));
exit(1); exit(1);
} }
sprintf(qstr, "create database if not exists %s vgroups %d", dbName, numOfVgroups); sprintf(qstr, "create database if not exists %s vgroups %d", dbName, numOfVgroups);
TAOS_RES *pSql = taos_query(con, qstr); TAOS_RES *pRes = taos_query(con, qstr);
int32_t code = taos_errno(pSql); int32_t code = taos_errno(pRes);
if (code != 0) { if (code != 0) {
pError("failed to create database:%s, sql:%s, code:%d reason:%s", dbName, qstr, taos_errno(con), taos_errstr(con)); pError("failed to create database:%s, sql:%s, code:%d reason:%s", dbName, qstr, taos_errno(pRes), taos_errstr(pRes));
exit(0); exit(0);
} }
taos_free_result(pSql); taos_free_result(pRes);
sprintf(qstr, "use %s", dbName); sprintf(qstr, "use %s", dbName);
pSql = taos_query(con, qstr); pRes = taos_query(con, qstr);
code = taos_errno(pSql); code = taos_errno(pRes);
if (code != 0) { if (code != 0) {
pError("failed to use db, code:%d reason:%s", taos_errno(con), taos_errstr(con)); pError("failed to use db, code:%d reason:%s", taos_errno(pRes), taos_errstr(pRes));
exit(0); exit(0);
} }
taos_free_result(pSql); taos_free_result(pRes);
sprintf(qstr, "create table %s (ts timestamp, i int) tags (j int)", stbName); sprintf(qstr, "create table %s (ts timestamp, i int) tags (j int)", stbName);
pSql = taos_query(con, qstr); pRes = taos_query(con, qstr);
code = taos_errno(pSql); code = taos_errno(pRes);
if (code != 0) { if (code != 0) {
pError("failed to use db, code:%d reason:%s", taos_errno(con), taos_errstr(con)); pError("failed to use db, code:%d reason:%s", taos_errno(pRes), taos_errstr(pRes));
exit(0); exit(0);
} }
taos_free_result(pSql); taos_free_result(pRes);
taos_close(con); taos_close(con);
} }
...@@ -160,16 +195,20 @@ void *threadFunc(void *param) { ...@@ -160,16 +195,20 @@ void *threadFunc(void *param) {
TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0); TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0);
if (con == NULL) { if (con == NULL) {
pError("index:%d, failed to connect to DB, reason:%s", pInfo->threadIndex, taos_errstr(con)); pError("index:%d, failed to connect to DB, reason:%s", pInfo->threadIndex, taos_errstr(NULL));
exit(1); exit(1);
} }
//printf("thread:%d, table range: %"PRId64 " - %"PRId64 "\n", pInfo->threadIndex, pInfo->tableBeginIndex, pInfo->tableEndIndex);
sprintf(qstr, "use %s", pInfo->dbName); sprintf(qstr, "use %s", pInfo->dbName);
TAOS_RES *pSql = taos_query(con, qstr); TAOS_RES *pRes = taos_query(con, qstr);
taos_free_result(pSql); taos_free_result(pRes);
if (createTable) { if (createTable) {
pInfo->startMs = taosGetTimestampMs(); int64_t curMs = 0;
int64_t beginMs = taosGetTimestampMs();
pInfo->startMs = beginMs;
for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) { for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) {
int64_t batch = (pInfo->tableEndIndex - t); int64_t batch = (pInfo->tableEndIndex - t);
batch = MIN(batch, batchNum); batch = MIN(batch, batchNum);
...@@ -179,14 +218,22 @@ void *threadFunc(void *param) { ...@@ -179,14 +218,22 @@ void *threadFunc(void *param) {
len += sprintf(qstr + len, " t%" PRId64 " using %s tags(%" PRId64 ")", t + i, stbName, t + i); len += sprintf(qstr + len, " t%" PRId64 " using %s tags(%" PRId64 ")", t + i, stbName, t + i);
} }
TAOS_RES *pSql = taos_query(con, qstr); int64_t startTs = taosGetTimestampUs();
code = taos_errno(pSql); TAOS_RES *pRes = taos_query(con, qstr);
code = taos_errno(pRes);
if (code != 0) { if (code != 0) {
pError("failed to create table t%" PRId64 ", reason:%s", t, tstrerror(code)); pError("failed to create table t%" PRId64 ", reason:%s", t, tstrerror(code));
} }
taos_free_result(pSql); taos_free_result(pRes);
int64_t endTs = taosGetTimestampUs();
if (t % 100000 == 0) { int64_t delay = endTs - startTs;
//printf("==== %"PRId64" - %"PRId64", %"PRId64"\n", startTs, endTs, delay);
if (delay > pInfo->maxDelay) pInfo->maxDelay = delay;
if (delay < pInfo->minDelay) pInfo->minDelay = delay;
curMs = taosGetTimestampMs();
if (curMs - beginMs > 10000) {
beginMs = curMs;
printCreateProgress(pInfo, t); printCreateProgress(pInfo, t);
} }
t += (batch - 1); t += (batch - 1);
...@@ -195,6 +242,9 @@ void *threadFunc(void *param) { ...@@ -195,6 +242,9 @@ void *threadFunc(void *param) {
} }
if (insertData) { if (insertData) {
int64_t curMs = 0;
int64_t beginMs = taosGetTimestampMs();;
pInfo->startMs = taosGetTimestampMs(); pInfo->startMs = taosGetTimestampMs();
for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) { for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) {
int64_t batch = (pInfo->tableEndIndex - t); int64_t batch = (pInfo->tableEndIndex - t);
...@@ -205,14 +255,15 @@ void *threadFunc(void *param) { ...@@ -205,14 +255,15 @@ void *threadFunc(void *param) {
len += sprintf(qstr + len, " t%" PRId64 " values(now, %" PRId64 ")", t + i, t + i); len += sprintf(qstr + len, " t%" PRId64 " values(now, %" PRId64 ")", t + i, t + i);
} }
TAOS_RES *pSql = taos_query(con, qstr); TAOS_RES *pRes = taos_query(con, qstr);
code = taos_errno(pSql); code = taos_errno(pRes);
if (code != 0) { if (code != 0) {
pError("failed to insert table t%" PRId64 ", reason:%s", t, tstrerror(code)); pError("failed to insert table t%" PRId64 ", reason:%s", t, tstrerror(code));
} }
taos_free_result(pSql); taos_free_result(pRes);
if (t % 100000 == 0) { curMs = taosGetTimestampMs();
if (curMs - beginMs > 10000) {
printInsertProgress(pInfo, t); printInsertProgress(pInfo, t);
} }
t += (batch - 1); t += (batch - 1);
...@@ -266,7 +317,7 @@ void parseArgument(int32_t argc, char *argv[]) { ...@@ -266,7 +317,7 @@ void parseArgument(int32_t argc, char *argv[]) {
numOfThreads = atoi(argv[++i]); numOfThreads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-n") == 0) { } else if (strcmp(argv[i], "-n") == 0) {
numOfTables = atoll(argv[++i]); numOfTables = atoll(argv[++i]);
} else if (strcmp(argv[i], "-n") == 0) { } else if (strcmp(argv[i], "-v") == 0) {
numOfVgroups = atoi(argv[++i]); numOfVgroups = atoi(argv[++i]);
} else if (strcmp(argv[i], "-a") == 0) { } else if (strcmp(argv[i], "-a") == 0) {
createTable = atoi(argv[++i]); createTable = atoi(argv[++i]);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册