提交 9ef37af5 编写于 作者: D dapan1121

Merge remote-tracking branch 'origin/3.0' into feature/qnode

......@@ -1648,6 +1648,15 @@ typedef struct {
int32_t tSerializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
int32_t tDeserializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
typedef struct {
char topic[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CGROUP_LEN];
int8_t igNotExists;
} SMDropCgroupReq;
int32_t tSerializeSMDropCgroupReq(void* buf, int32_t bufLen, SMDropCgroupReq* pReq);
int32_t tDeserializeSMDropCgroupReq(void* buf, int32_t bufLen, SMDropCgroupReq* pReq);
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t alterType;
......
......@@ -333,23 +333,23 @@ SSdbRow *sdbAllocRow(int32_t objSize);
void *sdbGetRowObj(SSdbRow *pRow);
typedef struct SSdb {
SMnode *pMnode;
char *currDir;
char *syncDir;
char *tmpDir;
int64_t lastCommitVer;
int64_t curVer;
int64_t tableVer[SDB_MAX];
int64_t maxId[SDB_MAX];
EKeyType keyTypes[SDB_MAX];
SHashObj *hashObjs[SDB_MAX];
SRWLatch locks[SDB_MAX];
SdbInsertFp insertFps[SDB_MAX];
SdbUpdateFp updateFps[SDB_MAX];
SdbDeleteFp deleteFps[SDB_MAX];
SdbDeployFp deployFps[SDB_MAX];
SdbEncodeFp encodeFps[SDB_MAX];
SdbDecodeFp decodeFps[SDB_MAX];
SMnode *pMnode;
char *currDir;
char *syncDir;
char *tmpDir;
int64_t lastCommitVer;
int64_t curVer;
int64_t tableVer[SDB_MAX];
int64_t maxId[SDB_MAX];
EKeyType keyTypes[SDB_MAX];
SHashObj *hashObjs[SDB_MAX];
TdThreadRwlock locks[SDB_MAX];
SdbInsertFp insertFps[SDB_MAX];
SdbUpdateFp updateFps[SDB_MAX];
SdbDeleteFp deleteFps[SDB_MAX];
SdbDeployFp deployFps[SDB_MAX];
SdbEncodeFp encodeFps[SDB_MAX];
SdbDecodeFp decodeFps[SDB_MAX];
} SSdb;
#ifdef __cplusplus
......
......@@ -95,6 +95,15 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model);
/**
*
* @param tinfo
* @param sversion
* @param tversion
* @return
*/
int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion, int32_t* tversion);
/**
* The main task execution function, including query on both table and multiple tables,
* which are decided according to the tag or table name query conditions
......
......@@ -1435,7 +1435,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) {
while (1) {
tmqHandleAllDelayedTask(tmq);
tmqPollImpl(tmq, wait_time);
if (tmqPollImpl(tmq, wait_time) < 0) return NULL;
rspObj = tmqHandleAllRsp(tmq, wait_time, false);
if (rspObj) {
......
......@@ -2627,6 +2627,35 @@ int32_t tDeserializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pR
return 0;
}
int32_t tSerializeSMDropCgroupReq(void *buf, int32_t bufLen, SMDropCgroupReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->topic) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->cgroup) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSMDropCgroupReq(void *buf, int32_t bufLen, SMDropCgroupReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->topic) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->cgroup) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTopicReq *pReq) {
int32_t sqlLen = 0;
int32_t astLen = 0;
......
......@@ -36,9 +36,9 @@ typedef struct SBnodeMgmt {
// bmHandle.c
SArray *bmGetMsgHandles();
int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pReq);
int32_t bmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pReq);
int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq);
int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t bmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pMsg);
// bmWorker.c
int32_t bmStartWorker(SBnodeMgmt *pMgmt);
......
......@@ -18,7 +18,7 @@
void bmGetMonitorInfo(SBnodeMgmt *pMgmt, SMonBmInfo *bmInfo) {}
int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq) {
int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonBmInfo bmInfo = {0};
bmGetMonitorInfo(pMgmt, &bmInfo);
dmGetMonitorSystemInfo(&bmInfo.sys);
......@@ -37,17 +37,15 @@ int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq) {
}
tSerializeSMonBmInfo(pRsp, rspLen, &bmInfo);
pReq->info.rsp = pRsp;
pReq->info.rspLen = rspLen;
pMsg->info.rsp = pRsp;
pMsg->info.rspLen = rspLen;
tFreeSMonBmInfo(&bmInfo);
return 0;
}
int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDCreateBnodeReq createReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
......@@ -68,10 +66,8 @@ int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
}
int32_t bmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDDropBnodeReq dropReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
......
......@@ -19,11 +19,11 @@
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) {
dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
taosWLockLatch(&pMgmt->pData->latch);
taosThreadRwlockWrlock(&pMgmt->pData->lock);
pMgmt->pData->dnodeId = pCfg->dnodeId;
pMgmt->pData->clusterId = pCfg->clusterId;
dmWriteEps(pMgmt->pData);
taosWUnLockLatch(&pMgmt->pData->latch);
taosThreadRwlockUnlock(&pMgmt->pData->lock);
}
}
......@@ -50,7 +50,7 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
void dmSendStatusReq(SDnodeMgmt *pMgmt) {
SStatusReq req = {0};
taosRLockLatch(&pMgmt->pData->latch);
taosThreadRwlockRdlock(&pMgmt->pData->lock);
req.sver = tsVersion;
req.dnodeVer = pMgmt->pData->dnodeVer;
req.dnodeId = pMgmt->pData->dnodeId;
......@@ -69,7 +69,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
taosRUnLockLatch(&pMgmt->pData->latch);
taosThreadRwlockUnlock(&pMgmt->pData->lock);
SMonVloadInfo vinfo = {0};
(*pMgmt->getVnodeLoadsFp)(&vinfo);
......
......@@ -41,18 +41,18 @@ typedef struct SMnodeMgmt {
// mmFile.c
int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed);
int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed);
int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed);
// mmInt.c
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq);
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg);
// mmHandle.c
SArray *mmGetMsgHandles();
int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq);
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq);
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
// mmWorker.c
int32_t mmStartWorker(SMnodeMgmt *pMgmt);
......@@ -62,10 +62,10 @@ int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutRpcMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc);
int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc);
int32_t mmPutRpcMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc);
int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc);
int32_t mmPutRpcMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutRpcMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -104,7 +104,7 @@ _OVER:
return code;
}
int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed) {
int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed) {
char file[PATH_MAX] = {0};
char realfile[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%smnode.json.bak", pMgmt->path, TD_DIRSEP);
......@@ -124,11 +124,11 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed) {
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n");
int8_t replica = (pReq != NULL ? pReq->replica : pMgmt->replica);
int8_t replica = (pMsg != NULL ? pMsg->replica : pMgmt->replica);
for (int32_t i = 0; i < replica; ++i) {
SReplica *pReplica = &pMgmt->replicas[i];
if (pReq != NULL) {
pReplica = &pReq->replicas[i];
if (pMsg != NULL) {
pReplica = &pMsg->replicas[i];
}
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id);
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn);
......
......@@ -25,7 +25,7 @@ void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) {
mndGetLoad(pMgmt->pMnode, &pInfo->load);
}
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonMmInfo mmInfo = {0};
mmGetMonitorInfo(pMgmt, &mmInfo);
dmGetMonitorSystemInfo(&mmInfo.sys);
......@@ -44,13 +44,13 @@ int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
}
tSerializeSMonMmInfo(pRsp, rspLen, &mmInfo);
pReq->info.rsp = pRsp;
pReq->info.rspLen = rspLen;
pMsg->info.rsp = pRsp;
pMsg->info.rspLen = rspLen;
tFreeSMonMmInfo(&mmInfo);
return 0;
}
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonMloadInfo mloads = {0};
mmGetMnodeLoads(pMgmt, &mloads);
......@@ -67,16 +67,14 @@ int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
}
tSerializeSMonMloadInfo(pRsp, rspLen, &mloads);
pReq->info.rsp = pRsp;
pReq->info.rspLen = rspLen;
pMsg->info.rsp = pRsp;
pMsg->info.rspLen = rspLen;
return 0;
}
int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDCreateMnodeReq createReq = {0};
if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
if (tDeserializeSDCreateMnodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
......@@ -101,10 +99,8 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
}
int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDDropMnodeReq dropReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
......@@ -129,10 +125,8 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
}
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDAlterMnodeReq alterReq = {0};
if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) {
if (tDeserializeSDCreateMnodeReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
......
......@@ -87,9 +87,9 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre
return 0;
}
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq) {
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg) {
SMnodeOpt option = {0};
if (mmBuildOptionFromReq(pMgmt, &option, pReq) != 0) {
if (mmBuildOptionFromReq(pMgmt, &option, pMsg) != 0) {
return -1;
}
......@@ -98,7 +98,7 @@ int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq) {
}
bool deployed = true;
if (mmWriteFile(pMgmt, pReq, deployed) != 0) {
if (mmWriteFile(pMgmt, pMsg, deployed) != 0) {
dError("failed to write mnode file since %s", terrstr());
return -1;
}
......@@ -135,7 +135,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)mmPutRpcMsgToQueryQueue;
pMgmt->msgCb.queueFps[READ_QUEUE] = (PutToQueueFp)mmPutRpcMsgToReadQueue;
pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue;
pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue;
pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)mmPutRpcMsgToSyncQueue;
pMgmt->msgCb.mgmt = pMgmt;
bool deployed = false;
......
......@@ -126,7 +126,7 @@ int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutRpcMsgToWorker(&pMgmt->readWorker, pMsg);
}
int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); }
int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); }
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
SSingleWorkerCfg qCfg = {
......
......@@ -39,7 +39,7 @@ typedef struct SQnodeMgmt {
SArray *qmGetMsgHandles();
int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t qmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq);
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
// qmWorker.c
int32_t qmPutRpcMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
......
......@@ -18,7 +18,7 @@
void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {}
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq) {
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonQmInfo qmInfo = {0};
qmGetMonitorInfo(pMgmt, &qmInfo);
dmGetMonitorSystemInfo(&qmInfo.sys);
......@@ -37,17 +37,15 @@ int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq) {
}
tSerializeSMonQmInfo(pRsp, rspLen, &qmInfo);
pReq->info.rsp = pRsp;
pReq->info.rspLen = rspLen;
pMsg->info.rsp = pRsp;
pMsg->info.rspLen = rspLen;
tFreeSMonQmInfo(&qmInfo);
return 0;
}
int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDCreateQnodeReq createReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
......@@ -68,10 +66,8 @@ int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
}
int32_t qmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDDropQnodeReq dropReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
......
......@@ -30,7 +30,6 @@ typedef struct SSnodeMgmt {
SMsgCb msgCb;
const char *path;
const char *name;
SRWLatch latch;
int8_t uniqueWorkerInUse;
SArray *uniqueWorkers; // SArray<SMultiWorker*>
SSingleWorker sharedWorker;
......@@ -41,7 +40,7 @@ typedef struct SSnodeMgmt {
SArray *smGetMsgHandles();
int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq);
int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pMsg);
// smWorker.c
int32_t smStartWorker(SSnodeMgmt *pMgmt);
......
......@@ -18,7 +18,7 @@
void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {}
int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq) {
int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonSmInfo smInfo = {0};
smGetMonitorInfo(pMgmt, &smInfo);
dmGetMonitorSystemInfo(&smInfo.sys);
......@@ -37,17 +37,15 @@ int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq) {
}
tSerializeSMonSmInfo(pRsp, rspLen, &smInfo);
pReq->info.rsp = pRsp;
pReq->info.rspLen = rspLen;
pMsg->info.rsp = pRsp;
pMsg->info.rspLen = rspLen;
tFreeSMonSmInfo(&smInfo);
return 0;
}
int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDCreateSnodeReq createReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
......@@ -68,10 +66,8 @@ int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
}
int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDDropSnodeReq dropReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
......
......@@ -26,21 +26,21 @@ extern "C" {
#endif
typedef struct SVnodeMgmt {
SDnodeData *pData;
SMsgCb msgCb;
const char *path;
const char *name;
SQWorkerPool queryPool;
SQWorkerPool fetchPool;
SWWorkerPool syncPool;
SWWorkerPool writePool;
SWWorkerPool mergePool;
SSingleWorker mgmtWorker;
SSingleWorker monitorWorker;
SHashObj *hash;
SRWLatch latch;
SVnodesStat state;
STfs *pTfs;
SDnodeData *pData;
SMsgCb msgCb;
const char *path;
const char *name;
SQWorkerPool queryPool;
SQWorkerPool fetchPool;
SWWorkerPool syncPool;
SWWorkerPool writePool;
SWWorkerPool mergePool;
SSingleWorker mgmtWorker;
SSingleWorker monitorWorker;
SHashObj *hash;
TdThreadRwlock lock;
SVnodesStat state;
STfs *pTfs;
} SVnodeMgmt;
typedef struct {
......@@ -84,10 +84,10 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
// vmHandle.c
SArray *vmGetMsgHandles();
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq);
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq);
int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq);
int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq);
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
// vmFile.c
int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
......
......@@ -17,7 +17,7 @@
#include "vmInt.h"
SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) {
taosRLockLatch(&pMgmt->latch);
taosThreadRwlockRdlock(&pMgmt->lock);
int32_t num = 0;
int32_t size = taosHashGetSize(pMgmt->hash);
......@@ -38,7 +38,7 @@ SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) {
}
}
taosRUnLockLatch(&pMgmt->latch);
taosThreadRwlockUnlock(&pMgmt->lock);
*numOfVnodes = num;
return pVnodes;
......
......@@ -20,7 +20,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
if (pInfo->pVloads == NULL) return;
taosRLockLatch(&pMgmt->latch);
taosThreadRwlockRdlock(&pMgmt->lock);
void *pIter = taosHashIterate(pMgmt->hash, NULL);
while (pIter) {
......@@ -34,7 +34,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
pIter = taosHashIterate(pMgmt->hash, pIter);
}
taosRUnLockLatch(&pMgmt->latch);
taosThreadRwlockUnlock(&pMgmt->lock);
}
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
......@@ -82,7 +82,7 @@ void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
taosArrayDestroy(pVloads);
}
int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) {
int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonVmInfo vmInfo = {0};
vmGetMonitorInfo(pMgmt, &vmInfo);
dmGetMonitorSystemInfo(&vmInfo.sys);
......@@ -101,13 +101,13 @@ int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) {
}
tSerializeSMonVmInfo(pRsp, rspLen, &vmInfo);
pReq->info.rsp = pRsp;
pReq->info.rspLen = rspLen;
pMsg->info.rsp = pRsp;
pMsg->info.rspLen = rspLen;
tFreeSMonVmInfo(&vmInfo);
return 0;
}
int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) {
int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonVloadInfo vloads = {0};
vmGetVnodeLoads(pMgmt, &vloads);
......@@ -124,8 +124,8 @@ int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) {
}
tSerializeSMonVloadInfo(pRsp, rspLen, &vloads);
pReq->info.rsp = pRsp;
pReq->info.rspLen = rspLen;
pMsg->info.rsp = pRsp;
pMsg->info.rspLen = rspLen;
tFreeSMonVloadInfo(&vloads);
return 0;
}
......@@ -174,12 +174,11 @@ static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SW
}
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SCreateVnodeReq createReq = {0};
int32_t code = -1;
char path[TSDB_FILENAME_LEN] = {0};
if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
......@@ -242,9 +241,8 @@ _OVER:
}
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDropVnodeReq dropReq = {0};
if (tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
......
......@@ -20,14 +20,14 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
SVnodeObj *pVnode = NULL;
int32_t refCount = 0;
taosRLockLatch(&pMgmt->latch);
taosThreadRwlockRdlock(&pMgmt->lock);
taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
if (pVnode == NULL) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
} else {
refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
}
taosRUnLockLatch(&pMgmt->latch);
taosThreadRwlockUnlock(&pMgmt->lock);
if (pVnode != NULL) {
dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount);
......@@ -39,9 +39,9 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
if (pVnode == NULL) return;
taosRLockLatch(&pMgmt->latch);
taosThreadRwlockRdlock(&pMgmt->lock);
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
taosRUnLockLatch(&pMgmt->latch);
taosThreadRwlockUnlock(&pMgmt->lock);
dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount);
}
......@@ -70,9 +70,9 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
return -1;
}
taosWLockLatch(&pMgmt->latch);
taosThreadRwlockWrlock(&pMgmt->lock);
int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
taosWUnLockLatch(&pMgmt->latch);
taosThreadRwlockUnlock(&pMgmt->lock);
return code;
}
......@@ -80,9 +80,9 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
char path[TSDB_FILENAME_LEN] = {0};
taosWLockLatch(&pMgmt->latch);
taosThreadRwlockWrlock(&pMgmt->lock);
taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
taosWUnLockLatch(&pMgmt->latch);
taosThreadRwlockUnlock(&pMgmt->lock);
vmReleaseVnode(pMgmt, pVnode);
while (pVnode->refCount > 0) taosMsleep(10);
......@@ -239,6 +239,7 @@ static void vmCleanup(SVnodeMgmt *pMgmt) {
vmStopWorker(pMgmt);
vnodeCleanup();
tfsClose(pMgmt->pTfs);
taosThreadRwlockDestroy(&pMgmt->lock);
taosMemoryFree(pMgmt);
}
......@@ -260,7 +261,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->msgCb.queueFps[MERGE_QUEUE] = (PutToQueueFp)vmPutRpcMsgToMergeQueue;
pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
pMgmt->msgCb.mgmt = pMgmt;
taosInitRWLatch(&pMgmt->latch);
taosThreadRwlockInit(&pMgmt->lock, NULL);
SDiskCfg dCfg = {0};
tstrncpy(dCfg.dir, tsDataDir, TSDB_FILENAME_LEN);
......@@ -334,19 +335,23 @@ static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) {
}
static int32_t vmStart(SVnodeMgmt *pMgmt) {
taosRLockLatch(&pMgmt->latch);
void *pIter = taosHashIterate(pMgmt->hash, NULL);
while (pIter) {
SVnodeObj **ppVnode = pIter;
if (ppVnode == NULL || *ppVnode == NULL) continue;
int32_t numOfVnodes = 0;
SVnodeObj **pVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
SVnodeObj *pVnode = *ppVnode;
for (int32_t i = 0; i < numOfVnodes; ++i) {
SVnodeObj *pVnode = pVnodes[i];
vnodeStart(pVnode->pImpl);
pIter = taosHashIterate(pMgmt->hash, pIter);
}
taosRUnLockLatch(&pMgmt->latch);
for (int32_t i = 0; i < numOfVnodes; ++i) {
SVnodeObj *pVnode = pVnodes[i];
vmReleaseVnode(pMgmt, pVnode);
}
if (pVnodes != NULL) {
taosMemoryFree(pVnodes);
}
return 0;
}
......
......@@ -70,7 +70,7 @@ typedef struct SMgmtWrapper {
const char *name;
char *path;
int32_t refCount;
SRWLatch latch;
TdThreadRwlock lock;
EDndNodeType ntype;
bool deployed;
bool required;
......
......@@ -91,7 +91,7 @@ static int32_t dmInitVars(SDnode *pDnode, EDndNodeType rtype) {
return -1;
}
taosInitRWLatch(&pData->latch);
taosThreadRwlockInit(&pData->lock, NULL);
taosThreadMutexInit(&pDnode->mutex, NULL);
return 0;
}
......@@ -100,6 +100,7 @@ static void dmClearVars(SDnode *pDnode) {
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
taosMemoryFreeClear(pWrapper->path);
taosThreadRwlockDestroy(&pWrapper->lock);
}
if (pDnode->lockfile != NULL) {
taosUnLockFile(pDnode->lockfile);
......@@ -108,7 +109,7 @@ static void dmClearVars(SDnode *pDnode) {
}
SDnodeData *pData = &pDnode->data;
taosWLockLatch(&pData->latch);
taosThreadRwlockWrlock(&pData->lock);
if (pData->dnodeEps != NULL) {
taosArrayDestroy(pData->dnodeEps);
pData->dnodeEps = NULL;
......@@ -117,8 +118,9 @@ static void dmClearVars(SDnode *pDnode) {
taosHashCleanup(pData->dnodeHash);
pData->dnodeHash = NULL;
}
taosWUnLockLatch(&pData->latch);
taosThreadRwlockUnlock(&pData->lock);
taosThreadRwlockDestroy(&pData->lock);
taosThreadMutexDestroy(&pDnode->mutex);
memset(&pDnode->mutex, 0, sizeof(pDnode->mutex));
}
......@@ -151,7 +153,7 @@ int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype) {
if (ntype == DNODE) {
pWrapper->proc.ptype = DND_PROC_SINGLE;
}
taosInitRWLatch(&pWrapper->latch);
taosThreadRwlockInit(&pWrapper->lock, NULL);
snprintf(path, sizeof(path), "%s%s%s", tsDataDir, TD_DIRSEP, pWrapper->name);
pWrapper->path = strdup(path);
......@@ -223,7 +225,7 @@ SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
SMgmtWrapper *pRetWrapper = pWrapper;
taosRLockLatch(&pWrapper->latch);
taosThreadRwlockRdlock(&pWrapper->lock);
if (pWrapper->deployed) {
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
dTrace("node:%s, is acquired, ref:%d", pWrapper->name, refCount);
......@@ -231,7 +233,7 @@ SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
pRetWrapper = NULL;
}
taosRUnLockLatch(&pWrapper->latch);
taosThreadRwlockUnlock(&pWrapper->lock);
return pRetWrapper;
}
......@@ -239,7 +241,7 @@ SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) {
int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) {
int32_t code = 0;
taosRLockLatch(&pWrapper->latch);
taosThreadRwlockRdlock(&pWrapper->lock);
if (pWrapper->deployed || (InParentProc(pWrapper) && pWrapper->required)) {
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
dTrace("node:%s, is marked, ref:%d", pWrapper->name, refCount);
......@@ -247,7 +249,7 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
code = -1;
}
taosRUnLockLatch(&pWrapper->latch);
taosThreadRwlockUnlock(&pWrapper->lock);
return code;
}
......@@ -255,9 +257,9 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) {
void dmReleaseWrapper(SMgmtWrapper *pWrapper) {
if (pWrapper == NULL) return;
taosRLockLatch(&pWrapper->latch);
taosThreadRwlockRdlock(&pWrapper->lock);
int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1);
taosRUnLockLatch(&pWrapper->latch);
taosThreadRwlockUnlock(&pWrapper->lock);
dTrace("node:%s, is released, ref:%d", pWrapper->name, refCount);
}
......
......@@ -186,12 +186,12 @@ void dmCloseNode(SMgmtWrapper *pWrapper) {
}
}
taosWLockLatch(&pWrapper->latch);
taosThreadRwlockWrlock(&pWrapper->lock);
if (pWrapper->pMgmt != NULL) {
(*pWrapper->func.closeFp)(pWrapper->pMgmt);
pWrapper->pMgmt = NULL;
}
taosWUnLockLatch(&pWrapper->latch);
taosThreadRwlockUnlock(&pWrapper->lock);
if (!OnlyInSingleProc(pWrapper)) {
dmCleanupProc(pWrapper);
......
......@@ -200,12 +200,12 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
return 0;
}
static void dmSendRpcRedirectRsp(const SRpcMsg *pReq) {
static void dmSendRpcRedirectRsp(const SRpcMsg *pMsg) {
SDnode *pDnode = dmInstance();
SEpSet epSet = {0};
dmGetMnodeEpSet(&pDnode->data, &epSet);
dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->info.handle, epSet.numOfEps, epSet.inUse);
dDebug("RPC %p, req is redirected, num:%d use:%d", pMsg->info.handle, epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) {
......@@ -220,12 +220,14 @@ static void dmSendRpcRedirectRsp(const SRpcMsg *pReq) {
SRpcMsg rsp = {
.code = TSDB_CODE_RPC_REDIRECT,
.info = pReq->info,
.info = pMsg->info,
.contLen = len,
};
rsp.pCont = rpcMallocCont(len);
tSerializeSMEpSet(rsp.pCont, len, &msg);
rpcSendResponse(&rsp);
rpcFreeCont(pMsg->pCont);
}
static inline void dmSendRecv(SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) {
......@@ -239,16 +241,16 @@ static inline void dmSendRecv(SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) {
}
}
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pReq) {
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
SDnode *pDnode = dmInstance();
if (pDnode->status != DND_STAT_RUNNING) {
rpcFreeCont(pReq->pCont);
pReq->pCont = NULL;
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
terrno = TSDB_CODE_NODE_OFFLINE;
dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->info.handle);
dError("failed to send rpc msg since %s, handle:%p", terrstr(), pMsg->info.handle);
return -1;
} else {
rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pReq, NULL);
rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL);
return 0;
}
}
......
......@@ -94,18 +94,18 @@ typedef void (*GetVnodeLoadsFp)();
typedef void (*GetMnodeLoadsFp)();
typedef struct {
int32_t dnodeId;
int64_t clusterId;
int64_t dnodeVer;
int64_t updateTime;
int64_t rebootTime;
bool dropped;
bool stopped;
SEpSet mnodeEps;
SArray *dnodeEps;
SHashObj *dnodeHash;
SRWLatch latch;
SMsgCb msgCb;
int32_t dnodeId;
int64_t clusterId;
int64_t dnodeVer;
int64_t updateTime;
int64_t rebootTime;
bool dropped;
bool stopped;
SEpSet mnodeEps;
SArray *dnodeEps;
SHashObj *dnodeHash;
TdThreadRwlock lock;
SMsgCb msgCb;
} SDnodeData;
typedef struct {
......
......@@ -21,7 +21,7 @@ static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep);
static void dmResetEps(SDnodeData *pData, SArray *dnodeEps);
static void dmGetDnodeEp(SDnodeData *pData, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) {
taosRLockLatch(&pData->latch);
taosThreadRwlockRdlock(&pData->lock);
SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t));
if (pDnodeEp != NULL) {
......@@ -36,7 +36,7 @@ static void dmGetDnodeEp(SDnodeData *pData, int32_t dnodeId, char *pEp, char *pF
}
}
taosRUnLockLatch(&pData->latch);
taosThreadRwlockUnlock(&pData->lock);
}
int32_t dmReadEps(SDnodeData *pData) {
......@@ -232,7 +232,7 @@ void dmUpdateEps(SDnodeData *pData, SArray *eps) {
int32_t numOfEps = taosArrayGetSize(eps);
if (numOfEps <= 0) return;
taosWLockLatch(&pData->latch);
taosThreadRwlockWrlock(&pData->lock);
int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pData->dnodeEps);
if (numOfEps != numOfEpsOld) {
......@@ -246,7 +246,7 @@ void dmUpdateEps(SDnodeData *pData, SArray *eps) {
}
}
taosWUnLockLatch(&pData->latch);
taosThreadRwlockUnlock(&pData->lock);
}
static void dmResetEps(SDnodeData *pData, SArray *dnodeEps) {
......@@ -292,7 +292,7 @@ static void dmPrintEps(SDnodeData *pData) {
static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep) {
bool changed = false;
if (dnodeId == 0) return changed;
taosRLockLatch(&pData->latch);
taosThreadRwlockRdlock(&pData->lock);
SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t));
if (pDnodeEp != NULL) {
......@@ -304,24 +304,23 @@ static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep) {
}
}
taosRUnLockLatch(&pData->latch);
taosThreadRwlockUnlock(&pData->lock);
return changed;
}
void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
taosRLockLatch(&pData->latch);
taosThreadRwlockRdlock(&pData->lock);
*pEpSet = pData->mnodeEps;
taosRUnLockLatch(&pData->latch);
taosThreadRwlockUnlock(&pData->lock);
}
void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse);
taosWLockLatch(&pData->latch);
taosThreadRwlockWrlock(&pData->lock);
pData->mnodeEps = *pEpSet;
taosThreadRwlockUnlock(&pData->lock);
dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse);
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
}
taosWUnLockLatch(&pData->latch);
}
......@@ -848,7 +848,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false);
// app id
// client id
char clientId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
tstrncpy(varDataVal(clientId), pConsumer->clientId, TSDB_CGROUP_LEN);
varDataSetLen(clientId, strlen(varDataVal(clientId)));
......
......@@ -624,14 +624,12 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen, .info = pReq->info};
mInfo("dnode:%d, app:%p config:%s req send to dnode", cfgReq.dnodeId, rpcMsg.info.ahandle, cfgReq.config);
tmsgSendReq(&epSet, &rpcMsg);
return 0;
mDebug("dnode:%d, send config req to dnode, app:%p", cfgReq.dnodeId, rpcMsg.info.ahandle);
return tmsgSendReq(&epSet, &rpcMsg);
}
static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp) {
mInfo("app:%p config rsp from dnode", pRsp->info.ahandle);
mDebug("config rsp from dnode, app:%p", pRsp->info.ahandle);
return TSDB_CODE_SUCCESS;
}
......
......@@ -48,7 +48,7 @@ SSdb *sdbInit(SSdbOpt *pOption) {
}
for (ESdbType i = 0; i < SDB_MAX; ++i) {
taosInitRWLatch(&pSdb->locks[i]);
taosThreadRwlockInit(&pSdb->locks[i], NULL);
pSdb->maxId[i] = 0;
pSdb->tableVer[i] = 0;
pSdb->keyTypes[i] = SDB_KEY_INT32;
......@@ -98,7 +98,10 @@ void sdbCleanup(SSdb *pSdb) {
taosHashClear(hash);
taosHashCleanup(hash);
taosThreadRwlockDestroy(&pSdb->locks[i]);
pSdb->hashObjs[i] = NULL;
memset(&pSdb->locks[i], 0, sizeof(pSdb->locks[i]));
mDebug("sdb table:%s is cleaned up", sdbTableName(i));
}
......@@ -134,7 +137,6 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) {
pSdb->maxId[sdbType] = 0;
pSdb->hashObjs[sdbType] = hash;
taosInitRWLatch(&pSdb->locks[sdbType]);
mDebug("sdb table:%s is initialized", sdbTableName(sdbType));
return 0;
......
......@@ -257,8 +257,8 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
mTrace("write %s to file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
SHashObj *hash = pSdb->hashObjs[i];
SRWLatch *pLock = &pSdb->locks[i];
taosWLockLatch(pLock);
TdThreadRwlock *pLock = &pSdb->locks[i];
taosThreadRwlockWrlock(pLock);
SSdbRow **ppRow = taosHashIterate(hash, NULL);
while (ppRow != NULL) {
......@@ -303,7 +303,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
sdbFreeRaw(pRaw);
ppRow = taosHashIterate(hash, ppRow);
}
taosWUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
}
if (code == 0) {
......
......@@ -129,12 +129,12 @@ static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, const void *pKey) {
}
static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
SRWLatch *pLock = &pSdb->locks[pRow->type];
taosWLockLatch(pLock);
TdThreadRwlock *pLock = &pSdb->locks[pRow->type];
taosThreadRwlockWrlock(pLock);
SSdbRow *pOldRow = taosHashGet(hash, pRow->pObj, keySize);
if (pOldRow != NULL) {
taosWUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
sdbFreeRow(pSdb, pRow, false);
terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE;
return terrno;
......@@ -145,13 +145,13 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
sdbPrintOper(pSdb, pRow, "insert");
if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) {
taosWUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
sdbFreeRow(pSdb, pRow, false);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
taosWUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
int32_t code = 0;
SdbInsertFp insertFp = pSdb->insertFps[pRow->type];
......@@ -159,9 +159,9 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
code = (*insertFp)(pSdb, pRow->pObj);
if (code != 0) {
code = terrno;
taosWLockLatch(pLock);
taosThreadRwlockWrlock(pLock);
taosHashRemove(hash, pRow->pObj, keySize);
taosWUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
sdbFreeRow(pSdb, pRow, false);
terrno = code;
return terrno;
......@@ -180,19 +180,19 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
}
static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pNewRow, int32_t keySize) {
SRWLatch *pLock = &pSdb->locks[pNewRow->type];
taosWLockLatch(pLock);
TdThreadRwlock *pLock = &pSdb->locks[pNewRow->type];
taosThreadRwlockWrlock(pLock);
SSdbRow **ppOldRow = taosHashGet(hash, pNewRow->pObj, keySize);
if (ppOldRow == NULL || *ppOldRow == NULL) {
taosWUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
return sdbInsertRow(pSdb, hash, pRaw, pNewRow, keySize);
}
SSdbRow *pOldRow = *ppOldRow;
pOldRow->status = pRaw->status;
sdbPrintOper(pSdb, pOldRow, "update");
taosWUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
int32_t code = 0;
SdbUpdateFp updateFp = pSdb->updateFps[pNewRow->type];
......@@ -207,12 +207,12 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
}
static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
SRWLatch *pLock = &pSdb->locks[pRow->type];
taosWLockLatch(pLock);
TdThreadRwlock *pLock = &pSdb->locks[pRow->type];
taosThreadRwlockWrlock(pLock);
SSdbRow **ppOldRow = taosHashGet(hash, pRow->pObj, keySize);
if (ppOldRow == NULL || *ppOldRow == NULL) {
taosWUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
sdbFreeRow(pSdb, pRow, false);
terrno = TSDB_CODE_SDB_OBJ_NOT_THERE;
return terrno;
......@@ -223,7 +223,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
sdbPrintOper(pSdb, pOldRow, "delete");
taosHashRemove(hash, pOldRow->pObj, keySize);
taosWUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
pSdb->tableVer[pOldRow->type]++;
sdbFreeRow(pSdb, pRow, false);
......@@ -278,12 +278,12 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) {
void *pRet = NULL;
int32_t keySize = sdbGetkeySize(pSdb, type, pKey);
SRWLatch *pLock = &pSdb->locks[type];
taosRLockLatch(pLock);
TdThreadRwlock *pLock = &pSdb->locks[type];
taosThreadRwlockRdlock(pLock);
SSdbRow **ppRow = taosHashGet(hash, pKey, keySize);
if (ppRow == NULL || *ppRow == NULL) {
taosRUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
terrno = TSDB_CODE_SDB_OBJ_NOT_THERE;
return NULL;
}
......@@ -306,13 +306,13 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) {
break;
}
taosRUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
return pRet;
}
static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) {
SRWLatch *pLock = &pSdb->locks[pRow->type];
taosWLockLatch(pLock);
TdThreadRwlock *pLock = &pSdb->locks[pRow->type];
taosThreadRwlockWrlock(pLock);
int32_t ref = atomic_load_32(&pRow->refCount);
sdbPrintOper(pSdb, pRow, "check");
......@@ -320,7 +320,7 @@ static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) {
sdbFreeRow(pSdb, pRow, true);
}
taosWUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
}
void sdbRelease(SSdb *pSdb, void *pObj) {
......@@ -329,8 +329,8 @@ void sdbRelease(SSdb *pSdb, void *pObj) {
SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow));
if (pRow->type >= SDB_MAX) return;
SRWLatch *pLock = &pSdb->locks[pRow->type];
taosWLockLatch(pLock);
TdThreadRwlock *pLock = &pSdb->locks[pRow->type];
taosThreadRwlockWrlock(pLock);
int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1);
sdbPrintOper(pSdb, pRow, "release");
......@@ -338,7 +338,7 @@ void sdbRelease(SSdb *pSdb, void *pObj) {
sdbFreeRow(pSdb, pRow, true);
}
taosWUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
}
void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
......@@ -347,8 +347,8 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
SHashObj *hash = sdbGetHash(pSdb, type);
if (hash == NULL) return NULL;
SRWLatch *pLock = &pSdb->locks[type];
taosRLockLatch(pLock);
TdThreadRwlock *pLock = &pSdb->locks[type];
taosThreadRwlockRdlock(pLock);
SSdbRow **ppRow = taosHashIterate(hash, pIter);
while (ppRow != NULL) {
......@@ -363,7 +363,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
*ppObj = pRow->pObj;
break;
}
taosRUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
return ppRow;
}
......@@ -374,18 +374,18 @@ void sdbCancelFetch(SSdb *pSdb, void *pIter) {
SHashObj *hash = sdbGetHash(pSdb, pRow->type);
if (hash == NULL) return;
SRWLatch *pLock = &pSdb->locks[pRow->type];
taosRLockLatch(pLock);
TdThreadRwlock *pLock = &pSdb->locks[pRow->type];
taosThreadRwlockRdlock(pLock);
taosHashCancelIterate(hash, pIter);
taosRUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
}
void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2, void *p3) {
SHashObj *hash = sdbGetHash(pSdb, type);
if (hash == NULL) return;
SRWLatch *pLock = &pSdb->locks[type];
taosRLockLatch(pLock);
TdThreadRwlock *pLock = &pSdb->locks[type];
taosThreadRwlockRdlock(pLock);
SSdbRow **ppRow = taosHashIterate(hash, NULL);
while (ppRow != NULL) {
......@@ -401,17 +401,17 @@ void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2
ppRow = taosHashIterate(hash, ppRow);
}
taosRUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
}
int32_t sdbGetSize(SSdb *pSdb, ESdbType type) {
SHashObj *hash = sdbGetHash(pSdb, type);
if (hash == NULL) return 0;
SRWLatch *pLock = &pSdb->locks[type];
taosRLockLatch(pLock);
TdThreadRwlock *pLock = &pSdb->locks[type];
taosThreadRwlockRdlock(pLock);
int32_t size = taosHashGetSize(hash);
taosRUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
return size;
}
......@@ -424,8 +424,8 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type) {
int32_t maxId = 0;
SRWLatch *pLock = &pSdb->locks[type];
taosRLockLatch(pLock);
TdThreadRwlock *pLock = &pSdb->locks[type];
taosThreadRwlockRdlock(pLock);
SSdbRow **ppRow = taosHashIterate(hash, NULL);
while (ppRow != NULL) {
......@@ -435,7 +435,7 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type) {
ppRow = taosHashIterate(hash, ppRow);
}
taosRUnLockLatch(pLock);
taosThreadRwlockUnlock(pLock);
maxId = TMAX(maxId, pSdb->maxId[type]);
return maxId + 1;
......
......@@ -128,6 +128,7 @@ int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList
int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle *pHandle);
bool tqNextDataBlockFilterOut(STqReadHandle *pHandle, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, uint64_t *pUid,
int32_t *pNumOfRows, int16_t *pNumOfCols);
......
......@@ -163,6 +163,7 @@ typedef struct {
int8_t withSchema;
int8_t withTag;
char* qmsg;
SHashObj* pDropTbUid;
STqPushHandle pushHandle;
// SRWLatch lock;
SWalReadHandle* pWalReader;
......
......@@ -82,7 +82,7 @@ int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq*
int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq);
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline);
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
......@@ -104,7 +104,7 @@ int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeep
int tsdbClose(STsdb** pTsdb);
int tsdbBegin(STsdb* pTsdb);
int tsdbCommit(STsdb* pTsdb);
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, const SSubmitReq *pMsg);
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, const SSubmitReq* pMsg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp);
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
......@@ -118,7 +118,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal);
void tqClose(STQ*);
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId);
......
......@@ -44,6 +44,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
pMeta->path = (char *)&pMeta[1];
sprintf(pMeta->path, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP,
VNODE_META_DIR);
taosRealPath(pMeta->path, NULL, slen);
pMeta->pVnode = pVnode;
// create path if not created yet
......
......@@ -255,7 +255,7 @@ _err:
return -1;
}
int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) {
int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUids) {
TBC *pTbDbc = NULL;
TBC *pUidIdxc = NULL;
TBC *pNameIdxc = NULL;
......@@ -336,6 +336,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) {
if (type == TSDB_CHILD_TABLE) {
ctime = me.ctbEntry.ctime;
suid = me.ctbEntry.suid;
taosArrayPush(tbUids, &me.uid);
} else if (type == TSDB_NORMAL_TABLE) {
ctime = me.ntbEntry.ctime;
suid = 0;
......@@ -906,4 +907,4 @@ static int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) {
_err:
metaULock(pMeta);
return -1;
}
\ No newline at end of file
}
......@@ -104,16 +104,26 @@ static void tdSRowDemo() {
taosMemoryFree(pTSChema);
}
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList) {
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
void* pIter = NULL;
STqExec* pExec = NULL;
while (1) {
pIter = taosHashIterate(pTq->execs, pIter);
if (pIter == NULL) break;
pExec = (STqExec*)pIter;
if (pExec->subType == TOPIC_SUB_TYPE__DB) continue;
if (pExec->subType == TOPIC_SUB_TYPE__DB) {
if (isAdd) {
continue;
} else {
int32_t sz = taosArrayGetSize(tbUidList);
for (int32_t i = 0; i < sz; i++) {
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
taosHashPut(pExec->pDropTbUid, &tbUid, sizeof(int64_t), NULL, 0);
}
}
}
for (int32_t i = 0; i < 5; i++) {
int32_t code = qUpdateQualifiedTableId(pExec->task[i], tbUidList, true);
int32_t code = qUpdateQualifiedTableId(pExec->task[i], tbUidList, isAdd);
ASSERT(code == 0);
}
}
......@@ -582,7 +592,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
rsp.withSchema = 1;
STqReadHandle* pReader = pExec->pExecReader[workerId];
tqReadHandleSetMsg(pReader, pCont, 0);
while (tqNextDataBlock(pReader)) {
while (tqNextDataBlockFilterOut(pReader, pExec->pDropTbUid)) {
SSDataBlock block = {0};
if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.uid, &block.info.rows,
&block.info.numOfCols) < 0) {
......@@ -915,9 +925,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
req.qmsg = NULL;
pExec->pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
for (int32_t i = 0; i < 5; i++) {
pExec->pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
for (int32_t i = 0; i < 5; i++) {
pExec->pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
SReadHandle handle = {
.reader = pExec->pExecReader[i],
.meta = pTq->pVnode->pMeta,
......@@ -925,9 +936,12 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
};
pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle);
ASSERT(pExec->task[i]);
} else {
pExec->task[i] = NULL;
}
} else {
for (int32_t i = 0; i < 5; i++) {
pExec->pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
}
pExec->pDropTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
}
taosHashPut(pTq->execs, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec));
return 0;
......
......@@ -64,22 +64,28 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
}
if (pHandle->pBlock == NULL) return false;
/*pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);*/
/*if (pHandle->tbUid == pHandle->pBlock->uid) {*/
if (pHandle->tbIdHash == NULL) {
return true;
}
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->msgIter.uid, sizeof(int64_t));
if (ret != NULL) {
/*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/
/*pHandle->pBlock->tid = htonl(pHandle->pBlock->tid);*/
/*pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion);*/
/*pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen);*/
/*pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen);*/
/*pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows);*/
return true;
/*} else {*/
/*printf("skip one tb %ld\n", pHandle->pBlock->uid);*/
}
}
return false;
}
bool tqNextDataBlockFilterOut(STqReadHandle* pHandle, SHashObj* filterOutUids) {
while (1) {
if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
return false;
}
if (pHandle->pBlock == NULL) return false;
ASSERT(pHandle->tbIdHash == NULL);
void* ret = taosHashGet(filterOutUids, &pHandle->msgIter.uid, sizeof(int64_t));
if (ret == NULL) {
return true;
}
}
return false;
......
......@@ -55,6 +55,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
memcpy(pTsdb->dir, dir, strlen(dir));
pTsdb->path = (char *)&pTsdb[1];
sprintf(pTsdb->path, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP, dir);
taosRealPath(pTsdb->path, NULL, slen);
pTsdb->pVnode = pVnode;
pTsdb->repoLocked = false;
taosThreadMutexInit(&pTsdb->mutex, NULL);
......
......@@ -420,6 +420,11 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond*
setQueryTimewindow(pReadHandle, pCond);
if (pCond->numOfCols > 0) {
int32_t rowLen = 0;
for(int32_t i = 0; i < pCond->numOfCols; ++i) {
rowLen += pCond->colList[i].bytes;
}
// allocate buffer in order to load data blocks from file
pReadHandle->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg));
if (pReadHandle->suppInfo.pstatis == NULL) {
......
......@@ -109,6 +109,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
// open wal
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_WAL_DIR);
taosRealPath(tdir, NULL, sizeof(tdir));
pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg));
if (pVnode->pWal == NULL) {
vError("vgId:%d failed to open vnode wal since %s", TD_VID(pVnode), tstrerror(terrno));
......@@ -117,6 +118,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
// open tq
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR);
taosRealPath(tdir, NULL, sizeof(tdir));
pVnode->pTq = tqOpen(tdir, pVnode, pVnode->pWal);
if (pVnode->pTq == NULL) {
vError("vgId:%d failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno));
......
......@@ -62,11 +62,6 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
len = pMsg->contLen - sizeof(SMsgHead);
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
vError("vgId:%d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
}
switch (pMsg->msgType) {
/* META */
case TDMT_VND_CREATE_STB:
......@@ -125,6 +120,11 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
vDebug("vgId:%d process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
vError("vgId:%d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
}
// commit if need
if (vnodeShouldCommit(pVnode)) {
vInfo("vgId:%d commit at version %" PRId64, TD_VID(pVnode), version);
......@@ -386,7 +386,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
tDecoderClear(&decoder);
tqUpdateTbUidList(pVnode->pTq, tbUids);
tqUpdateTbUidList(pVnode->pTq, tbUids, true);
tdUpdateTbUidList(pVnode->pSma, pStore);
tdUidStoreFree(pStore);
......@@ -517,6 +517,7 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in
SDecoder decoder = {0};
SEncoder encoder = {0};
int ret;
SArray *tbUids = NULL;
pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
pRsp->pCont = NULL;
......@@ -533,13 +534,16 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in
}
// process req
tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp));
if (tbUids == NULL || rsp.pArray == NULL) goto _exit;
for (int iReq = 0; iReq < req.nReqs; iReq++) {
SVDropTbReq *pDropTbReq = req.pReqs + iReq;
SVDropTbRsp dropTbRsp = {0};
/* code */
ret = metaDropTable(pVnode->pMeta, version, pDropTbReq);
ret = metaDropTable(pVnode->pMeta, version, pDropTbReq, tbUids);
if (ret < 0) {
if (pDropTbReq->igNotExists && terrno == TSDB_CODE_VND_TABLE_NOT_EXIST) {
dropTbRsp.code = TSDB_CODE_SUCCESS;
......@@ -553,7 +557,10 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in
taosArrayPush(rsp.pArray, &dropTbRsp);
}
tqUpdateTbUidList(pVnode->pTq, tbUids, false);
_exit:
taosArrayDestroy(tbUids);
tDecoderClear(&decoder);
tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
pRsp->pCont = rpcMallocCont(pRsp->contLen);
......
......@@ -217,6 +217,13 @@ typedef struct SExecTaskInfo {
int64_t owner; // if it is in execution
int32_t code;
uint64_t totalRows; // total number of rows
struct {
char *tablename;
char *dbname;
int32_t sversion;
int32_t tversion;
} schemaVer;
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
char* sql; // query sql string
jmp_buf env; // jump to this position when error happens.
......@@ -689,7 +696,8 @@ SSDataBlock* loadNextDataBlock(void* param);
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
int32_t type);
SExecTaskInfo* pTaskInfo, int32_t type);
SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs);
SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode);
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
......
......@@ -14,7 +14,6 @@
*/
#include "executor.h"
#include <vnode.h>
#include "executorimpl.h"
#include "planner.h"
#include "tdatablock.h"
......@@ -171,3 +170,15 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
return TSDB_CODE_SUCCESS;
}
int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion, int32_t* tversion) {
ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL);
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo;
*sversion = pTaskInfo->schemaVer.sversion;
*tversion = pTaskInfo->schemaVer.tversion;
strcpy(dbName, pTaskInfo->schemaVer.dbname);
strcpy(tableName, pTaskInfo->schemaVer.tablename);
return 0;
}
\ No newline at end of file
......@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <executorimpl.h>
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
......@@ -585,8 +586,7 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow
int32_t numOfRows = pCtx[k].input.numOfRows;
int32_t startOffset = pCtx[k].input.startRowIndex;
int32_t pos = (order == TSDB_ORDER_ASC) ? offset : offset - (forwardStep - 1);
pCtx[k].input.startRowIndex = pos;
pCtx[k].input.startRowIndex = offset;
pCtx[k].input.numOfRows = forwardStep;
if (tsCol != NULL) {
......@@ -757,45 +757,6 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
}
}
}
// setBlockStatisInfo(&pCtx[i], pBlock, pOperator->pExpr[i].base.pColumns);
// uint32_t flag = pOperator->pExpr[i].base.pParam[0].pCol->flag;
// if (TSDB_COL_IS_NORMAL_COL(flag) /*|| (pCtx[i].functionId == FUNCTION_BLKINFO) ||
// (TSDB_COL_IS_TAG(flag) && pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)*/) {
// SColumn* pCol = pOperator->pExpr[i].base.pParam[0].pCol;
// if (pCtx[i].columnIndex == -1) {
// for(int32_t j = 0; j < pBlock->info.numOfCols; ++j) {
// SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
// if (pColData->info.colId == pCol->colId) {
// pCtx[i].columnIndex = j;
// break;
// }
// }
// }
// uint32_t status = aAggs[pCtx[i].functionId].status;
// if ((status & (FUNCSTATE_SELECTIVITY | FUNCSTATE_NEED_TS)) != 0) {
// SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0);
// In case of the top/bottom query again the nest query result, which has no timestamp column
// don't set the ptsList attribute.
// if (tsInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
// pCtx[i].ptsList = (int64_t*) tsInfo->pData;
// } else {
// pCtx[i].ptsList = NULL;
// }
// }
// } else if (TSDB_COL_IS_UD_COL(pCol->flag) && (pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)) {
// SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo;
// SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pColIndex->colIndex);
//
// pCtx[i].pInput = p->pData;
// assert(p->info.colId == pColIndex->info.colId && pCtx[i].inputType == p->info.type);
// for(int32_t j = 0; j < pBlock->info.rows; ++j) {
// char* dst = p->pData + j * p->info.bytes;
// taosVariantDump(&pOperator->pExpr[i].base.param[1], dst, p->info.type, true);
// }
// }
}
return code;
......@@ -874,7 +835,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
taosArrayDestroy(pBlockList);
return code;
}
int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
colInfoDataEnsureCapacity(pResColData, startOffset, pResult->info.capacity);
colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
......@@ -1831,12 +1792,6 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t
pCtx[i].resultInfo = pEntry;
pCtx[i].scanFlag = stage;
// set the timestamp output buffer for top/bottom/diff query
// int32_t fid = pCtx[i].functionId;
// if (fid == FUNCTION_TOP || fid == FUNCTION_BOTTOM || fid == FUNCTION_DIFF || fid == FUNCTION_DERIVATIVE) {
// if (i > 0) pCtx[i].pTsOutput = pCtx[i-1].pOutput;
// }
}
initCtxOutputBuffer(pCtx, pDataBlock->info.numOfCols);
......@@ -2847,9 +2802,11 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData idata = {0};
idata.info.type = pSchema[i].type;
idata.info.type = pSchema[i].type;
idata.info.bytes = pSchema[i].bytes;
idata.info.colId = pSchema[i].colId;
idata.hasNull = true;
taosArrayPush(pBlock->pDataBlock, &idata);
if (IS_VAR_DATA_TYPE(idata.info.type)) {
......@@ -4635,6 +4592,28 @@ static SArray* extractColumnInfo(SNodeList* pNodeList);
static SArray* createSortInfo(SNodeList* pNodeList);
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
void extractTableSchemaVersion(SReadHandle *pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) {
SMetaReader mr = {0};
metaReaderInit(&mr, pHandle->meta, 0);
metaGetTableEntryByUid(&mr, uid);
pTaskInfo->schemaVer.tablename = strdup(mr.me.name);
if (mr.me.type == TSDB_SUPER_TABLE) {
pTaskInfo->schemaVer.sversion = mr.me.stbEntry.schema.sver;
pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.sver;
} else if (mr.me.type == TSDB_CHILD_TABLE) {
tb_uid_t suid = mr.me.ctbEntry.suid;
metaGetTableEntryByUid(&mr, suid);
pTaskInfo->schemaVer.sversion = mr.me.stbEntry.schema.sver;
pTaskInfo->schemaVer.tversion = mr.me.stbEntry.schemaTag.sver;
} else {
pTaskInfo->schemaVer.sversion = mr.me.ntbEntry.schema.sver;
}
metaReaderClear(&mr);
}
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
int32_t type = nodeType(pPhyNode);
......@@ -4648,6 +4627,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL;
}
extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
STableScanInfo* pScanInfo = pOperator->info;
......@@ -4685,7 +4665,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pDataReader, pHandle, pScanPhyNode->uid, pResBlock, pCols, tableIdList, pTaskInfo,
pScanPhyNode->node.pConditions, pOperatorDumy);
......@@ -4700,7 +4680,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
int32_t numOfOutputCols = 0;
SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfOutputCols, COL_MATCH_FROM_COL_ID);
SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
SOperatorInfo* pOperator = createSysTableScanOperatorInfo(
pHandle, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, colList,
pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId);
......@@ -4723,7 +4703,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t numOfOutputCols = 0;
SArray* colList =
extractColMatchInfo(pScanPhyNode->pScanPseudoCols, pDescNode, &numOfOutputCols, COL_MATCH_FROM_COL_ID);
extractColMatchInfo(pScanPhyNode->pScanPseudoCols, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
SOperatorInfo* pOperator =
createTagScanOperatorInfo(pHandle, pExprInfo, num, pResBlock, colList, pTableGroupInfo, pTaskInfo);
......@@ -4805,7 +4785,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols);
int32_t numOfOutputCols = 0;
SArray* pColList = extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
SArray* pColList = extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID);
pOptr = createSortOperatorInfo(ops[0], pResBlock, info, pExprInfo, numOfCols, pColList, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
......@@ -4996,7 +4976,7 @@ SArray* createSortInfo(SNodeList* pNodeList) {
}
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
int32_t type) {
SExecTaskInfo* pTaskInfo, int32_t type) {
size_t numOfCols = LIST_LENGTH(pNodeList);
SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchInfo));
if (pList == NULL) {
......@@ -5004,10 +4984,16 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
return NULL;
}
const char* tname = pTaskInfo->schemaVer.tablename;
for (int32_t i = 0; i < numOfCols; ++i) {
STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
if (tname != NULL && (pTaskInfo->schemaVer.dbname == NULL) &&
strncmp(pColNode->tableName, tname, tListLen(pColNode->tableName)) == 0) {
pTaskInfo->schemaVer.dbname = strdup(pColNode->dbName);
}
SColMatchInfo c = {0};
c.output = true;
c.colId = pColNode->colId;
......@@ -5201,6 +5187,8 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) {
// taosArrayDestroy(pTaskInfo->summary.queryProfEvents);
// taosHashCleanup(pTaskInfo->summary.operatorProfResults);
taosMemoryFree(pTaskInfo->schemaVer.dbname);
taosMemoryFree(pTaskInfo->schemaVer.tablename);
taosMemoryFreeClear(pTaskInfo->sql);
taosMemoryFreeClear(pTaskInfo->id.str);
taosMemoryFreeClear(pTaskInfo);
......@@ -5315,4 +5303,4 @@ int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, size_t ke
pCatchSup->pWindowHashTable = taosHashInit(10000, hashFn, true, HASH_NO_LOCK);;
return TSDB_CODE_SUCCESS;
}
......@@ -460,7 +460,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
int32_t numOfCols = 0;
SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
if (code != TSDB_CODE_SUCCESS) {
......@@ -898,11 +898,11 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
SSDataBlock* upRes = getUpdateDataBlock(pInfo, true); //TODO(liuyao) get invertible from plan
if (upRes) {
pInfo->pUpdateRes = upRes;
if (upRes->info.type = STREAM_REPROCESS) {
if (upRes->info.type == STREAM_REPROCESS) {
pInfo->updateResIndex = 0;
prepareDataScan(pInfo);
pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
} else if (upRes->info.type = STREAM_INVERT) {
} else if (upRes->info.type == STREAM_INVERT) {
pInfo->scanMode = STREAM_SCAN_FROM_RES;
return upRes;
}
......
......@@ -1072,7 +1072,9 @@ static void doClearWindows(SAggSupporter* pSup, SOptrBasicInfo* pBinfo,
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SIntervalAggOperatorInfo* pInfo = pOperator->info;
int32_t order = TSDB_ORDER_ASC;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
pInfo->order = TSDB_ORDER_ASC;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
......@@ -1086,11 +1088,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
}
// STimeWindow win = {0};
SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = NULL;
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
......@@ -1103,16 +1103,18 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
// The timewindows that overlaps the timestamps of the input pBlock need to be recalculated and return to the
// caller. Note that all the time window are not close till now.
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true);
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, MAIN_SCAN, true);
if (pInfo->invertible) {
setInverFunction(pInfo->binfo.pCtx, pOperator->numOfExprs, pBlock->info.type);
}
if (pBlock->info.type == STREAM_REPROCESS) {
doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval,
pInfo->primaryTsIndex, pOperator->numOfExprs, pBlock);
qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo));
continue;
}
pInfo->order = TSDB_ORDER_ASC;
pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
}
......
......@@ -207,6 +207,9 @@ bool fmIsInvertible(int32_t funcId) {
case FUNCTION_TYPE_SUM:
case FUNCTION_TYPE_STDDEV:
case FUNCTION_TYPE_AVG:
case FUNCTION_TYPE_WSTARTTS:
case FUNCTION_TYPE_WENDTS:
case FUNCTION_TYPE_WDURATION:
res = true;
break;
default:
......
......@@ -64,7 +64,11 @@ int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInte
*(double*)(newInterBuf->buf) = sumSquares;
newInterBuf->bufLen = sizeof(double);
}
newInterBuf->numOfResult = numOutput;
if (interBuf->numOfResult == 0 && numOutput == 0) {
newInterBuf->numOfResult = 0;
} else {
newInterBuf->numOfResult = 1;
}
return 0;
}
......
......@@ -214,6 +214,8 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
}
}
pPager->dbOrigSize = pPager->dbFileSize;
// release the page
for (pPage = pPager->pDirty; pPage; pPage = pPager->pDirty) {
pPager->pDirty = pPage->pDirtyNext;
......@@ -230,7 +232,6 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
// remote the journal file
tdbOsClose(pPager->jfd);
tdbOsRemove(pPager->jFileName);
pPager->dbOrigSize = pPager->dbFileSize;
pPager->inTran = 0;
return 0;
......
#include <gtest/gtest.h>
#define ALLOW_FORBID_FUNC
#include "os.h"
#include "tdb.h"
#include <string>
#include <thread>
#include <vector>
typedef struct SPoolMem {
int64_t size;
......@@ -480,4 +483,118 @@ TEST(tdb_test, simple_upsert1) {
tdbTbClose(pDb);
tdbClose(pEnv);
}
TEST(tdb_test, multi_thread_query) {
int ret;
TDB *pEnv;
TTB *pDb;
tdb_cmpr_fn_t compFunc;
int nData = 20000;
TXN txn;
taosRemoveDir("tdb");
// Open Env
ret = tdbOpen("tdb", 512, 1, &pEnv);
GTEST_ASSERT_EQ(ret, 0);
// Create a database
compFunc = tKeyCmpr;
ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb);
GTEST_ASSERT_EQ(ret, 0);
char key[64];
char val[64];
int64_t poolLimit = 4096; // 1M pool limit
int64_t txnid = 0;
SPoolMem *pPool;
// open the pool
pPool = openPool();
// start a transaction
txnid++;
txn.flags = TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED;
txn.txnId = -1;
txn.xMalloc = poolMalloc;
txn.xFree = poolFree;
txn.xArg = pPool;
// tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, );
tdbBegin(pEnv, &txn);
for (int iData = 1; iData <= nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(val, "value%d", iData);
ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
GTEST_ASSERT_EQ(ret, 0);
}
auto f = [](TTB *pDb, int nData) {
TBC *pDBC;
void *pKey = NULL;
void *pVal = NULL;
int vLen, kLen;
int count = 0;
int ret;
TXN txn;
SPoolMem *pPool = openPool();
txn.flags = 0;
txn.txnId = 0;
txn.xMalloc = poolMalloc;
txn.xFree = poolFree;
txn.xArg = pPool;
ret = tdbTbcOpen(pDb, &pDBC, &txn);
GTEST_ASSERT_EQ(ret, 0);
tdbTbcMoveToFirst(pDBC);
for (;;) {
ret = tdbTbcNext(pDBC, &pKey, &kLen, &pVal, &vLen);
if (ret < 0) break;
// std::cout.write((char *)pKey, kLen) /* << " " << kLen */ << " ";
// std::cout.write((char *)pVal, vLen) /* << " " << vLen */;
// std::cout << std::endl;
count++;
}
GTEST_ASSERT_EQ(count, nData);
tdbTbcClose(pDBC);
tdbFree(pKey);
tdbFree(pVal);
};
// tdbCommit(pEnv, &txn);
// multi-thread query
int nThreads = 20;
std::vector<std::thread> threads;
for (int i = 0; i < nThreads; i++) {
if (i == 0) {
threads.push_back(std::thread(tdbCommit, pEnv, &txn));
} else {
threads.push_back(std::thread(f, pDb, nData));
}
}
for (auto &th : threads) {
th.join();
}
// commit the transaction
tdbCommit(pEnv, &txn);
tdbTxnClose(&txn);
// Close a database
tdbTbClose(pDb);
// Close Env
ret = tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
}
\ No newline at end of file
......@@ -160,7 +160,14 @@ bool tfsIsSameFile(const STfsFile *pFile1, const STfsFile *pFile2) {
if (pFile1 == NULL || pFile2 == NULL || pFile1->pTfs != pFile2->pTfs) return false;
if (pFile1->did.level != pFile2->did.level) return false;
if (pFile1->did.id != pFile2->did.id) return false;
if (strncmp(pFile1->rname, pFile2->rname, TSDB_FILENAME_LEN) != 0) return false;
char nameBuf1[TMPNAME_LEN], nameBuf2[TMPNAME_LEN];
strncpy(nameBuf1, pFile1->rname, TMPNAME_LEN);
strncpy(nameBuf2, pFile2->rname, TMPNAME_LEN);
nameBuf1[TMPNAME_LEN - 1] = 0;
nameBuf2[TMPNAME_LEN - 1] = 0;
taosRealPath(nameBuf1, NULL, TMPNAME_LEN);
taosRealPath(nameBuf2, NULL, TMPNAME_LEN);
if (strncmp(nameBuf1, nameBuf2, TMPNAME_LEN) != 0) return false;
return true;
}
......
......@@ -31,7 +31,7 @@ if $data[0][4] != ready then
goto check_dnode_ready
endi
#sql connect
sql connect
sql create dnode $hostname port 7200
sql create dnode $hostname port 7300
sql create dnode $hostname port 7400
......@@ -83,7 +83,7 @@ print $data(db)[13] $data(db)[14] $data(db)[15] $data(db)[16] $data(db)[17] $dat
if $rows != 3 then
return -1
endi
if $data(db)[19] != ready then
if $data(db)[19] != nostrict then
goto check_db_ready
endi
......@@ -93,49 +93,48 @@ $loop_cnt = 0
check_vg_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 10 then
if $loop_cnt == 40 then
print ====> vgroups not ready!
return -1
endi
sql show vgroups
print ===> rows: $rows
print $data(2)[0] $data(2)[1] $data(2)[2] $data(2)[3] $data(2)[4] $data(2)[5] $data(2)[6] $data(2)[7] $data(2)[8] $data(2)[9] $data(2)[10] $data(2)[11] $data(2)[12] $data(2)[13]
print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[10][6] $data[0][11] $data[0][12] $data[0][13]
if $rows != $vgroups then
return -1
endi
if $data[0][4] == LEADER then
if $data[0][6] != FLLOWER then
if $data[0][6] != FOLLOWER then
goto check_vg_ready
endi
if $data[0][8] != FLLOWER then
if $data[0][8] != FOLLOWER then
goto check_vg_ready
endi
print ---- vgroup $data[0][0] leader locate on dnode $data[0][3]
goto vg_ready
endi
if $data[0][6] == LEADER then
if $data[0][4] != FLLOWER then
goto vg_ready
elif $data[0][6] == LEADER then
if $data[0][4] != FOLLOWER then
goto check_vg_ready
endi
if $data[0][8] != FLLOWER then
if $data[0][8] != FOLLOWER then
goto check_vg_ready
endi
print ---- vgroup $data[0][0] leader locate on dnode $data[0][5]
goto vg_ready
endi
if $data[0][8] == LEADER then
if $data[0][4] != FLLOWER then
goto vg_ready
elif $data[0][8] == LEADER then
if $data[0][4] != FOLLOWER then
goto check_vg_ready
endi
if $data[0][6] != FLLOWER then
if $data[0][6] != FOLLOWER then
goto check_vg_ready
endi
print ---- vgroup $data[0][0] leader locate on dnode $data[0][7]
goto vg_ready
goto vg_ready
else
goto check_vg_ready
endi
vg_ready:
vg_ready:
print ====> create stable/child table
sql create table stb (ts timestamp, c1 int, c2 float, c3 binary(10)) tags (t1 int)
......@@ -185,7 +184,7 @@ print ====> create a normal table for interaction between main and back threads
sql create table interaction (ts timestamp, flag binary(10), childrows int, stbrows int)
print ====> start to run_back to insert data
run_back tsim/tmq/insertDataByRunBack.sim
run_back tsim/sync/insertDataByRunBack.sim
print ====> waiting insert thread starting insert data
......@@ -239,34 +238,34 @@ if $rows != $vgroups then
return -1
endi
if $data[0][4] == LEADER then
if $data[0][6] != FLLOWER then
if $data[0][6] != FOLLOWER then
goto check_vg_ready_2
endi
if $data[0][8] != FLLOWER then
if $data[0][8] != FOLLOWER then
goto check_vg_ready_2
endi
print ---- vgroup $data[0][0] leader switch to dnode $data[0][3]
goto vg_ready_2
endi
if $data[0][6] == LEADER then
if $data[0][4] != FLLOWER then
elif $data[0][6] == LEADER then
if $data[0][4] != FOLLOWER then
goto check_vg_ready_2
endi
if $data[0][8] != FLLOWER then
if $data[0][8] != FOLLOWER then
goto check_vg_ready_2
endi
print ---- vgroup $data[0][0] leader switch to dnode $data[0][5]
goto vg_ready_2
endi
if $data[0][8] == LEADER then
if $data[0][4] != FLLOWER then
elif $data[0][8] == LEADER then
if $data[0][4] != FOLLOWER then
goto check_vg_ready_2
endi
if $data[0][6] != FLLOWER then
if $data[0][6] != FOLLOWER then
goto check_vg_ready_2
endi
print ---- vgroup $data[0][0] leader switch to dnode $data[0][7]
goto vg_ready_2
else
goto check_vg_ready_2
endi
vg_ready_2:
......@@ -344,28 +343,28 @@ if $rows != $vgroups then
return -1
endi
if $data[0][4] == LEADER then
if $data[0][6] != FLLOWER then
if $data[0][6] != FOLLOWER then
goto check_vg_ready_1
endi
if $data[0][8] != FLLOWER then
if $data[0][8] != FOLLOWER then
goto check_vg_ready_1
endi
goto vg_ready_1
endi
if $data[0][6] == LEADER then
if $data[0][4] != FLLOWER then
if $data[0][4] != FOLLOWER then
goto check_vg_ready_1
endi
if $data[0][8] != FLLOWER then
if $data[0][8] != FOLLOWER then
goto check_vg_ready_1
endi
goto vg_ready_1
endi
if $data[0][8] == LEADER then
if $data[0][4] != FLLOWER then
if $data[0][4] != FOLLOWER then
goto check_vg_ready_1
endi
if $data[0][6] != FLLOWER then
if $data[0][6] != FOLLOWER then
goto check_vg_ready_1
endi
goto vg_ready_1
......
......@@ -137,7 +137,11 @@ endi
sql insert into t1 values(1648791223001,12,14,13,11.1);
sleep 500
sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt;
sql select * from streamt;
print count(*) , count(d) , sum(a) , max(b) , min(c)
print 0: $data00 , $data01 , $data02 , $data03 , $data04 , $data05
print 1: $data10 , $data11 , $data12 , $data13 , $data14 , $data15
if $rows != 4 then
print ======$rows
......
......@@ -13,7 +13,7 @@
import sys
import os
import threading
import threading as thd
import multiprocessing as mp
from numpy.lib.function_base import insert
import taos
......@@ -30,7 +30,10 @@ class TDTestCase:
#
# --------------- main frame -------------------
#
clientCfgDict = {'queryproxy': '1'}
clientCfgDict["queryproxy"] = '2'
updatecfgDict = {'clientCfg': {}}
updatecfgDict["clientCfg"] = clientCfgDict
def caseDescription(self):
'''
limit and offset keyword function test cases;
......@@ -63,53 +66,13 @@ class TDTestCase:
# self.create_tables();
self.ts = 1500000000000
# run case
def run(self):
# # test base case
# self.test_case1()
# tdLog.debug(" LIMIT test_case1 ............ [OK]")
# test case
# self.test_case2()
# tdLog.debug(" LIMIT test_case2 ............ [OK]")
# test case
self.test_case3()
tdLog.debug(" LIMIT test_case3 ............ [OK]")
# stop
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
# --------------- case -------------------
# create tables
def create_tables(self,dbname,stbname,count):
tdSql.execute("use %s" %dbname)
tdSql.execute("create stable %s(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)"%stbname)
pre_create = "create table"
sql = pre_create
tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
# print(time.time())
exeStartTime=time.time()
for i in range(count):
sql += " %s_%d using %s tags(%d)"%(stbname,i,stbname,i+1)
if i >0 and i%3000 == 0:
tdSql.execute(sql)
sql = pre_create
# print(time.time())
# end sql
if sql != pre_create:
tdSql.execute(sql)
exeEndTime=time.time()
spendTime=exeEndTime-exeStartTime
speedCreate=count/spendTime
tdLog.debug("spent %.2fs to create 1 stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,count,speedCreate))
return
# --------------- case -------------------
def newcur(self,host,cfg):
user = "root"
......@@ -120,28 +83,23 @@ class TDTestCase:
print(cur)
return cur
def new_create_tables(self,dbname,vgroups,stbname,tcountStart,tcountStop):
host = "localhost"
# create tables
def create_tables(self,host,dbname,stbname,count):
buildPath = self.getBuildPath()
config = buildPath+ "../sim/dnode1/cfg/"
tsql=self.newcur(host,config)
tsql.execute("drop database if exists %s"%dbname)
tsql.execute("create database %s vgroups %d"%(dbname,vgroups))
tsql.execute("use %s" %dbname)
tsql.execute("create stable %s(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)"%stbname)
pre_create = "create table"
sql = pre_create
tcountStop=int(tcountStop)
tcountStart=int(tcountStart)
count=tcountStop-tcountStart
count=int(count)
tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
# print(time.time())
exeStartTime=time.time()
# print(type(tcountStop),type(tcountStart))
for i in range(tcountStart,tcountStop):
for i in range(0,count):
sql += " %s_%d using %s tags(%d)"%(stbname,i,stbname,i+1)
if i >0 and i%20000 == 0:
# print(sql)
......@@ -158,11 +116,78 @@ class TDTestCase:
# tdLog.debug("spent %.2fs to create 1 stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,count,speedCreate))
return
def mutiThread_create_tables(self,host,dbname,stbname,vgroups,threadNumbers,count):
buildPath = self.getBuildPath()
config = buildPath+ "../sim/dnode1/cfg/"
tsql=self.newcur(host,config)
tdLog.debug("create database %s"%dbname)
tsql.execute("drop database if exists %s"%dbname)
tsql.execute("create database %s vgroups %d"%(dbname,vgroups))
tsql.execute("use %s" %dbname)
count=int(count)
threads = []
for i in range(threadNumbers):
tsql.execute("create stable %s%d(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)"%(stbname,i))
threads.append(thd.Thread(target=self.create_tables, args=(host, dbname, stbname+"%d"%i, count,)))
start_time = time.time()
for tr in threads:
tr.start()
for tr in threads:
tr.join()
end_time = time.time()
spendTime=end_time-start_time
speedCreate=count/spendTime
tdLog.debug("spent %.2fs to create %d stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,threadNumbers,threadNumbers*count,speedCreate))
return
# def create_tables(self,host,dbname,stbname,vgroups,tcountStart,tcountStop):
# insert data
def insert_data(self, dbname, stbname, ts_start, tcountStart,tcountStop,rowCount):
tdSql.execute("use %s" %dbname)
def insert_data(self, host, dbname, stbname, ts_start,rowCount):
buildPath = self.getBuildPath()
config = buildPath+ "../sim/dnode1/cfg/"
tsql=self.newcur(host,config)
tdLog.debug("ready to inser data")
tsql.execute("use %s" %dbname)
pre_insert = "insert into "
sql = pre_insert
tcount=int(tcount)
allRows=tcount*rowCount
tdLog.debug("doing insert data into stable-index:%s rows:%d ..."%(stbname, allRows))
exeStartTime=time.time()
for i in range(0,tcount):
sql += " %s_%d values "%(stbname,i)
for j in range(rowCount):
sql += "(%d, %d, 'taos_%d') "%(ts_start + j*1000, j, j)
if j >0 and j%5000 == 0:
# print(sql)
tdSql.execute(sql)
sql = "insert into %s_%d values " %(stbname,i)
# end sql
if sql != pre_insert:
# print(sql)
tdSql.execute(sql)
exeEndTime=time.time()
spendTime=exeEndTime-exeStartTime
speedInsert=allRows/spendTime
# tdLog.debug("spent %.2fs to INSERT %d rows , insert rate is %.2f rows/s... [OK]"% (spendTime,allRows,speedInsert))
tdLog.debug("INSERT TABLE DATA ............ [OK]")
return
def mutiThread_insert_data(self, host, dbname, stbname, threadNumbers, ts_start, tcountStart,tcountStop,rowCount):
buildPath = self.getBuildPath()
config = buildPath+ "../sim/dnode1/cfg/"
tsql=self.newcur(host,config)
tdLog.debug("ready to inser data")
tsql.execute("use %s" %dbname)
pre_insert = "insert into "
sql = pre_insert
tcount=tcountStop-tcountStart
......@@ -187,8 +212,30 @@ class TDTestCase:
# tdLog.debug("spent %.2fs to INSERT %d rows , insert rate is %.2f rows/s... [OK]"% (spendTime,allRows,speedInsert))
tdLog.debug("INSERT TABLE DATA ............ [OK]")
buildPath = self.getBuildPath()
config = buildPath+ "../sim/dnode1/cfg/"
tsql=self.newcur(host,config)
tsql.execute("use %s" %dbname)
count=int(count)
threads = []
for i in range(threadNumbers):
tsql.execute("create stable %s%d(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)"%(stbname,i))
threads.append(thd.Thread(target=self.create_tables, args=(host, dbname, stbname+"%d"%i, count,)))
start_time = time.time()
for tr in threads:
tr.start()
for tr in threads:
tr.join()
end_time = time.time()
spendTime=end_time-start_time
speedCreate=count/spendTime
tdLog.debug("spent %.2fs to create %d stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,threadNumbers,threadNumbers*count,speedCreate))
return
def taosBench(self,jsonFile):
buildPath = self.getBuildPath()
if (buildPath == ""):
......@@ -199,16 +246,10 @@ class TDTestCase:
os.system("%s -f %s -y " %(taosBenchbin,jsonFile))
return
def taosBenchCreate(self,host,dropdb,dbname,stbname,vgroups,threadNumbers,count):
def taosBenchCreate(self,host,dropdb,dbname,stbname,vgroups,processNumbers,count):
# count=50000
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
tdLog.info("taosd found in %s" % buildPath)
taosBenchbin = buildPath+ "/build/bin/taosBenchmark"
buildPath = self.getBuildPath()
config = buildPath+ "../sim/dnode1/cfg/"
tsql=self.newcur(host,config)
......@@ -222,8 +263,7 @@ class TDTestCase:
tsql.execute("use %s" %dbname)
threads = []
# threadNumbers=2
for i in range(threadNumbers):
for i in range(processNumbers):
jsonfile="1-insert/Vgroups%d%d.json"%(vgroups,i)
os.system("cp -f 1-insert/manyVgroups.json %s"%(jsonfile))
os.system("sed -i 's/\"name\": \"db\",/\"name\": \"%s\",/g' %s"%(dbname,jsonfile))
......@@ -246,68 +286,15 @@ class TDTestCase:
return
# test case1 base
def test_case1(self):
tdLog.debug("-----create database and tables test------- ")
tdSql.execute("drop database if exists db1")
tdSql.execute("drop database if exists db4")
tdSql.execute("drop database if exists db6")
tdSql.execute("drop database if exists db8")
tdSql.execute("drop database if exists db12")
tdSql.execute("drop database if exists db16")
#create database and tables;
# tdSql.execute("create database db11 vgroups 1")
# # self.create_tables("db1", "stb1", 30*10000)
# tdSql.execute("use db1")
# tdSql.execute("create stable stb1(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)")
# tdSql.execute("create database db12 vgroups 1")
# # self.create_tables("db1", "stb1", 30*10000)
# tdSql.execute("use db1")
# t1 = threading.Thread(target=self.new_create_tables("db1", "stb1", 15*10000), args=(1,))
# t2 = threading.Thread(target=self.new_create_tables("db1", "stb1", 15*10000), args=(2,))
# t1 = mp.Process(target=self.new_create_tables, args=("db1", "stb1", 0,count/2,))
# t2 = mp.Process(target=self.new_create_tables, args=("db1", "stb1", count/2,count,))
count=50000
vgroups=1
threads = []
threadNumbers=2
for i in range(threadNumbers):
threads.append(mp.Process(target=self.new_create_tables, args=("db1%d"%i, vgroups, "stb1", 0,count,)))
start_time = time.time()
for tr in threads:
tr.start()
for tr in threads:
tr.join()
end_time = time.time()
spendTime=end_time-start_time
speedCreate=count/spendTime
tdLog.debug("spent %.2fs to create 1 stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,count,speedCreate))
# self.new_create_tables("db1", "stb1", 15*10000)
# self.new_create_tables("db1", "stb1", 15*10000)
# tdSql.execute("create database db4 vgroups 4")
# self.create_tables("db4", "stb4", 30*10000)
# tdSql.execute("create database db6 vgroups 6")
# self.create_tables("db6", "stb6", 30*10000)
# tdSql.execute("create database db8 vgroups 8")
# self.create_tables("db8", "stb8", 30*10000)
# tdSql.execute("create database db12 vgroups 12")
# self.create_tables("db12", "stb12", 30*10000)
# tdSql.execute("create database db16 vgroups 16")
# self.create_tables("db16", "stb16", 30*10000)
tdLog.debug("-----create database and muti-thread create tables test------- ")
#host,dbname,stbname,vgroups,threadNumbers,tcountStart,tcountStop
self.mutiThread_create_tables(host="localhost",dbname="db2",stbname="stb2", vgroups=1, threadNumbers=5, count=10000)
return
# test case2 base:insert data
def test_case2(self):
tdLog.debug("-----insert data test------- ")
tdLog.debug("-----muti-thread insert data test------- ")
# drop database
tdSql.execute("drop database if exists db1")
tdSql.execute("drop database if exists db4")
......@@ -321,32 +308,54 @@ class TDTestCase:
tdSql.execute("create database db1 vgroups 1")
self.create_tables("db1", "stb1", 1*100)
self.insert_data("db1", "stb1", self.ts, 1*50,1*10000)
return
tdSql.execute("create database db4 vgroups 4")
self.create_tables("db4", "stb4", 1*100)
self.insert_data("db4", "stb4", self.ts, 1*100,1*10000)
def test_case3(self):
self.taosBenchCreate("127.0.0.1","no","db1", "stb1", 1, 8, 1*10000)
# self.taosBenchCreate("test209","no","db2", "stb2", 1, 8, 1*10000)
tdSql.execute("create database db6 vgroups 6")
self.create_tables("db6", "stb6", 1*100)
self.insert_data("db6", "stb6", self.ts, 1*100,1*10000)
# self.taosBenchCreate("chenhaoran02","no","db1", "stb1", 1, 8, 1*10000)
tdSql.execute("create database db8 vgroups 8")
self.create_tables("db8", "stb8", 1*100)
self.insert_data("db8", "stb8", self.ts, 1*100,1*10000)
# self.taosBenchCreate("db1", "stb1", 4, 5, 100*10000)
# self.taosBenchCreate("db1", "stb1", 1, 5, 100*10000)
tdSql.execute("create database db12 vgroups 12")
self.create_tables("db12", "stb12", 1*100)
self.insert_data("db12", "stb12", self.ts, 1*100,1*10000)
return
def test_case4(self):
self.taosBenchCreate("127.0.0.1","no","db1", "stb1", 1, 2, 1*10)
tdSql.execute("use db1;")
tdSql.query("show dnodes;")
dnodeId=tdSql.getData(0,0)
print(dnodeId)
tdSql.execute("create qnode on dnode %s"%dnodeId)
tdSql.query("select max(c1) from stb10;")
maxQnode=tdSql.getData(0,0)
tdSql.query("select min(c1) from stb11;")
minQnode=tdSql.getData(0,0)
tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;")
unionQnode=tdSql.queryResult
tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;")
unionallQnode=tdSql.queryResult
# tdSql.query("show qnodes;")
# qnodeId=tdSql.getData(0,0)
tdSql.execute("drop qnode on dnode %s"%dnodeId)
tdSql.execute("reset query cache")
tdSql.query("select max(c1) from stb10;")
tdSql.checkData(0, 0, "%s"%maxQnode)
tdSql.query("select min(c1) from stb11;")
tdSql.checkData(0, 0, "%s"%minQnode)
tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union select c0,c1 from stb11_1 where c0>2000;")
unionVnode=tdSql.queryResult
assert unionQnode == unionVnode
tdSql.query("select c0,c1 from stb11_1 where (c0>1000) union all select c0,c1 from stb11_1 where c0>2000;")
unionallVnode=tdSql.queryResult
assert unionallQnode == unionallVnode
# tdSql.execute("create qnode on dnode %s"%dnodeId)
tdSql.execute("create database db16 vgroups 16")
self.create_tables("db16", "stb16", 1*100)
self.insert_data("db16", "stb16", self.ts, 1*100,1*10000)
return
def test_case3(self):
self.taosBenchCreate("127.0.0.1","no","db1", "stb1", 1, 8, 1*10000)
# self.taosBenchCreate("test209","no","db2", "stb2", 1, 8, 1*10000)
# self.taosBenchCreate("chenhaoran02","no","db1", "stb1", 1, 8, 1*10000)
......@@ -354,8 +363,28 @@ class TDTestCase:
# self.taosBenchCreate("db1", "stb1", 4, 5, 100*10000)
# self.taosBenchCreate("db1", "stb1", 1, 5, 100*10000)
return
# run case
def run(self):
# # test base case
# self.test_case1()
# tdLog.debug(" LIMIT test_case1 ............ [OK]")
# test case
# self.test_case2()
# tdLog.debug(" LIMIT test_case2 ............ [OK]")
# test case
self.test_case3()
tdLog.debug(" LIMIT test_case3 ............ [OK]")
# # test qnode
# self.test_case4()
# tdLog.debug(" LIMIT test_case3 ............ [OK]")
return
#
# add case with filename
#
......
......@@ -11,7 +11,7 @@
"confirm_parameter_prompt": "no",
"insert_interval": 0,
"interlace_rows": 100000,
"num_of_records_per_req": 100000,
"num_of_records_per_req": 100,
"databases": [
{
"dbinfo": {
......
......@@ -13,7 +13,7 @@ class TDTestCase:
"wDebugFlag":143,"sDebugFlag":143,"tsdbDebugFlag":143,"tqDebugFlag":143 ,"fsDebugFlag":143 ,"fnDebugFlag":143}
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
tdSql.init(conn.cursor(), True)
def prepare_datas(self):
tdSql.execute(
......
......@@ -354,7 +354,7 @@ class TDTestCase:
event.wait()
tdLog.info("start consume processor")
pollDelay = 5
pollDelay = 15
showMsg = 1
showRow = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
......
......@@ -183,6 +183,7 @@ SScript *simParseScript(char *fileName) {
strcpy(name, fileName);
} else {
sprintf(name, "%s" TD_DIRSEP "%s", simScriptDir, fileName);
taosRealPath(name, NULL, sizeof(name));
}
// if ((fd = fopen(name, "r")) == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册