提交 368c41be 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into feature/dnode

......@@ -61,7 +61,7 @@ int32_t init_env() {
taos_free_result(pRes);
pRes =
taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)");
taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
......@@ -106,8 +106,8 @@ int32_t create_topic() {
}
taos_free_result(pRes);
/*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");
pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");
/*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1;
......
......@@ -185,6 +185,7 @@ typedef struct {
int32_t async;
tsem_t rspSem;
tmq_resp_err_t rspErr;
SArray* offsets;
} SMqCommitCbParam;
tmq_conf_t* tmq_conf_new() {
......@@ -246,10 +247,13 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
if (strcmp(key, "msg.with.table.name") == 0) {
if (strcmp(value, "true") == 0) {
conf->withTbName = 1;
return TMQ_CONF_OK;
} else if (strcmp(value, "false") == 0) {
conf->withTbName = 0;
return TMQ_CONF_OK;
} else if (strcmp(value, "none") == 0) {
conf->withTbName = -1;
return TMQ_CONF_OK;
} else {
return TMQ_CONF_INVALID;
}
......@@ -395,6 +399,9 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
if (!pParam->async)
tsem_post(&pParam->rspSem);
else {
if (pParam->offsets) {
taosArrayDestroy(pParam->offsets);
}
tsem_destroy(&pParam->rspSem);
/*if (pParam->pArray) {*/
/*taosArrayDestroy(pParam->pArray);*/
......@@ -540,10 +547,10 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
// build msg
// send to mnode
SMqCMCommitOffsetReq req;
SArray* pArray = NULL;
SArray* pOffsets = NULL;
if (offsets == NULL) {
pArray = taosArrayInit(0, sizeof(SMqOffset));
pOffsets = taosArrayInit(0, sizeof(SMqOffset));
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
......@@ -553,11 +560,11 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
strcpy(offset.cgroup, tmq->groupId);
offset.vgId = pVg->vgId;
offset.offset = pVg->currentOffset;
taosArrayPush(pArray, &offset);
taosArrayPush(pOffsets, &offset);
}
}
req.num = pArray->size;
req.offsets = pArray->pData;
req.num = pOffsets->size;
req.offsets = pOffsets->pData;
} else {
req.num = taosArrayGetSize(&offsets->container);
req.offsets = (SMqOffset*)offsets->container.pData;
......@@ -591,6 +598,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
pParam->tmq = tmq;
tsem_init(&pParam->rspSem, 0, 0);
pParam->async = async;
pParam->offsets = pOffsets;
pRequest->body.requestMsg = (SDataBuf){
.pData = buf,
......@@ -613,8 +621,8 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
tsem_destroy(&pParam->rspSem);
taosMemoryFree(pParam);
if (pArray) {
taosArrayDestroy(pArray);
if (pOffsets) {
taosArrayDestroy(pOffsets);
}
}
......@@ -1015,7 +1023,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
atomic_store_32(&tmq->epSkipCnt, 0);
#endif
int32_t tlen = sizeof(SMqAskEpReq);
SMqAskEpReq* req = taosMemoryMalloc(tlen);
SMqAskEpReq* req = taosMemoryCalloc(1, tlen);
if (req == NULL) {
tscError("failed to malloc get subscribe ep buf");
/*atomic_store_8(&tmq->epStatus, 0);*/
......@@ -1025,7 +1033,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
req->epoch = htonl(tmq->epoch);
strcpy(req->cgroup, tmq->groupId);
SMqAskEpCbParam* pParam = taosMemoryMalloc(sizeof(SMqAskEpCbParam));
SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
if (pParam == NULL) {
tscError("failed to malloc subscribe param");
taosMemoryFree(req);
......@@ -1107,7 +1115,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopic*
reqOffset = tmq->resetOffsetCfg;
}
SMqPollReq* pReq = taosMemoryMalloc(sizeof(SMqPollReq));
SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
if (pReq == NULL) {
return NULL;
}
......
......@@ -47,27 +47,23 @@ typedef struct {
int32_t vgId;
int32_t vgVersion;
int8_t dropped;
uint64_t dbUid;
char db[TSDB_DB_FNAME_LEN];
char path[PATH_MAX + 20];
} SWrapperCfg;
typedef struct {
int32_t vgId;
int32_t refCount;
int32_t vgVersion;
int8_t dropped;
int8_t accessState;
uint64_t dbUid;
char *db;
char *path;
SVnode *pImpl;
STaosQueue *pWriteQ;
STaosQueue *pSyncQ;
STaosQueue *pApplyQ;
STaosQueue *pQueryQ;
STaosQueue *pFetchQ;
STaosQueue *pMergeQ;
int32_t vgId;
int32_t refCount;
int32_t vgVersion;
int8_t dropped;
int8_t accessState;
char *path;
SVnode *pImpl;
STaosQueue *pWriteQ;
STaosQueue *pSyncQ;
STaosQueue *pApplyQ;
STaosQueue *pQueryQ;
STaosQueue *pFetchQ;
STaosQueue *pMergeQ;
} SVnodeObj;
typedef struct {
......
......@@ -47,7 +47,7 @@ SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) {
int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) {
int32_t code = TSDB_CODE_INVALID_JSON_FORMAT;
int32_t len = 0;
int32_t maxLen = 30000;
int32_t maxLen = 1024 * 1024;
char *content = taosMemoryCalloc(1, maxLen + 1);
cJSON *root = NULL;
FILE *fp = NULL;
......@@ -64,6 +64,11 @@ int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t
goto _OVER;
}
if (content == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
len = (int32_t)taosReadFile(pFile, content, maxLen);
if (len <= 0) {
dError("failed to read %s since content is null", file);
......@@ -116,20 +121,6 @@ int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t
goto _OVER;
}
pCfg->vgVersion = vgVersion->valueint;
cJSON *dbUid = cJSON_GetObjectItem(vnode, "dbUid");
if (!dbUid || dbUid->type != cJSON_String) {
dError("failed to read %s since dbUid not found", file);
goto _OVER;
}
pCfg->dbUid = atoll(dbUid->valuestring);
cJSON *db = cJSON_GetObjectItem(vnode, "db");
if (!db || db->type != cJSON_String) {
dError("failed to read %s since db not found", file);
goto _OVER;
}
tstrncpy(pCfg->db, db->valuestring, TSDB_DB_FNAME_LEN);
}
*ppCfgs = pCfgs;
......@@ -165,8 +156,12 @@ int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt) {
SVnodeObj **pVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
int32_t len = 0;
int32_t maxLen = 65536;
int32_t maxLen = 1024 * 1024;
char *content = taosMemoryCalloc(1, maxLen + 1);
if (content == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"vnodes\": [\n");
......@@ -175,9 +170,7 @@ int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt) {
len += snprintf(content + len, maxLen - len, " {\n");
len += snprintf(content + len, maxLen - len, " \"vgId\": %d,\n", pVnode->vgId);
len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pVnode->dropped);
len += snprintf(content + len, maxLen - len, " \"vgVersion\": %d,\n", pVnode->vgVersion);
len += snprintf(content + len, maxLen - len, " \"dbUid\": \"%" PRIu64 "\",\n", pVnode->dbUid);
len += snprintf(content + len, maxLen - len, " \"db\": \"%s\"\n", pVnode->db);
len += snprintf(content + len, maxLen - len, " \"vgVersion\": %d\n", pVnode->vgVersion);
if (i < numOfVnodes - 1) {
len += snprintf(content + len, maxLen - len, " },\n");
} else {
......
......@@ -170,8 +170,6 @@ static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SW
pCfg->vgId = pCreate->vgId;
pCfg->vgVersion = pCreate->vgVersion;
pCfg->dropped = 0;
pCfg->dbUid = pCreate->dbUid;
tstrncpy(pCfg->db, pCreate->db, TSDB_DB_FNAME_LEN);
snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId);
}
......@@ -213,6 +211,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) {
dError("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr());
code = terrno;
goto _OVER;
}
......
......@@ -57,12 +57,10 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
pVnode->vgVersion = pCfg->vgVersion;
pVnode->dropped = 0;
pVnode->accessState = TSDB_VN_ALL_ACCCESS;
pVnode->dbUid = pCfg->dbUid;
pVnode->db = tstrdup(pCfg->db);
pVnode->path = tstrdup(pCfg->path);
pVnode->pImpl = pImpl;
if (pVnode->path == NULL || pVnode->db == NULL) {
if (pVnode->path == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
......@@ -108,7 +106,6 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
}
taosMemoryFree(pVnode->path);
taosMemoryFree(pVnode->db);
taosMemoryFree(pVnode);
}
......
......@@ -1051,7 +1051,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) {
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions);
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("failed to execute redoActions since %s", terrstr());
mError("failed to execute redoActions since:%s, code:0x%x", terrstr(), terrno);
}
return code;
}
......
......@@ -93,6 +93,7 @@ struct STqReadHandle {
SMeta* pVnodeMeta;
SArray* pColIdList; // SArray<int16_t>
int32_t sver;
int64_t cachedSchemaUid;
SSchemaWrapper* pSchemaWrapper;
STSchema* pSchema;
};
......
......@@ -559,6 +559,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
}
// db subscribe
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
rsp.withSchema = 1;
STqReadHandle* pReader = pExec->pExecReader[workerId];
tqReadHandleSetMsg(pReader, pCont, 0);
while (tqNextDataBlock(pReader)) {
......
......@@ -25,6 +25,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
pReadHandle->ver = -1;
pReadHandle->pColIdList = NULL;
pReadHandle->sver = -1;
pReadHandle->cachedSchemaUid = -1;
pReadHandle->pSchema = NULL;
pReadHandle->pSchemaWrapper = NULL;
pReadHandle->tbIdHash = NULL;
......@@ -84,19 +85,20 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
return false;
}
int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* pGroupId, uint64_t* pUid, int32_t* pNumOfRows,
int16_t* pNumOfCols) {
int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* pGroupId, uint64_t* pUid,
int32_t* pNumOfRows, int16_t* pNumOfCols) {
/*int32_t sversion = pHandle->pBlock->sversion;*/
// TODO set to real sversion
*pUid = 0;
int32_t sversion = 0;
if (pHandle->sver != sversion) {
if (pHandle->sver != sversion || pHandle->cachedSchemaUid != pHandle->msgIter.suid) {
pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion);
// this interface use suid instead of uid
pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.suid, sversion, true);
pHandle->sver = sversion;
pHandle->cachedSchemaUid = pHandle->msgIter.suid;
}
STSchema* pTschema = pHandle->pSchema;
......
......@@ -137,18 +137,21 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
// open query
if (vnodeQueryOpen(pVnode)) {
vError("vgId:%d failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno));
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
// vnode begin
if (vnodeBegin(pVnode) < 0) {
vError("vgId:%d failed to begin since %s", TD_VID(pVnode), tstrerror(terrno));
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
// open sync
if (vnodeSyncOpen(pVnode, dir)) {
vError("vgId:%d failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno));
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
......
......@@ -13,12 +13,11 @@ from util.dnodes import *
class TDTestCase:
hostname = socket.gethostname()
#rpcDebugFlagVal = '143'
#clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
#clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
#updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
#updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal
#print ("===================: ", updatecfgDict)
clientCfgDict = {'qdebugflag':'143'}
updatecfgDict = {'clientCfg': {}, 'qdebugflag':'143'}
updatecfgDict["clientCfg"] = clientCfgDict
print ("===================: ", updatecfgDict)
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
......
......@@ -49,6 +49,19 @@ class TDTestCase:
print(cur)
return cur
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg,showRow,cdbName,valgrind):
shellCmd = 'nohup '
if valgrind == 1:
logFile = cfgPath + '/../log/valgrind-tmq.log'
shellCmd = 'nohup valgrind --log-file=' + logFile
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd)
os.system(shellCmd)
def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl):
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
tsql.execute("use %s" %dbName)
......@@ -113,9 +126,8 @@ class TDTestCase:
parameterDict["startTs"])
return
def tmqCase1(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 1: Produce while consume to subscribe one db")
tdLog.printNoPrefix("======== test case 1: Produce while one consume to subscribe one db")
tdLog.info("step 1: create database, stb, ctb and insert data")
# create and start thread
parameterDict = {'cfg': '', \
......@@ -153,7 +165,7 @@ class TDTestCase:
auto.offset.reset:earliest'
sql = "insert into %s.consumeinfo values "%cdbName
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
tdSql.query(sql)
tdSql.query(sql)
event.wait()
......@@ -162,11 +174,8 @@ class TDTestCase:
showMsg = 1
showRow = 1
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd)
os.system(shellCmd)
valgrind = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName,valgrind)
# wait for data ready
prepareEnvThread.join()
......@@ -190,6 +199,187 @@ class TDTestCase:
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 2: Produce while two consumers to subscribe one db")
tdLog.info("step 1: create database, stb, ctb and insert data")
# create and start thread
parameterDict = {'cfg': '', \
'dbName': 'db2', \
'vgroups': 4, \
'stbName': 'stb', \
'ctbNum': 10, \
'rowsPerTbl': 100000, \
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups']))
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
prepareEnvThread.start()
tdLog.info("create topics from db")
topicName1 = 'topic_db1'
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
tdLog.info("create consume info table and consume result table")
cdbName = parameterDict["dbName"]
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
topicList = topicName1
ifcheckdata = 0
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
sql = "insert into %s.consumeinfo values "%cdbName
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
tdSql.query(sql)
consumerId = 1
sql = "insert into %s.consumeinfo values "%cdbName
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
tdSql.query(sql)
event.wait()
tdLog.info("start consume processor")
pollDelay = 5
showMsg = 1
showRow = 1
valgrind = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName,valgrind)
# wait for data ready
prepareEnvThread.join()
tdLog.info("insert process end, and start to check consume result")
while 1:
tdSql.query("select * from %s.consumeresult"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if tdSql.getRows() == 2:
break
else:
time.sleep(5)
consumerId0 = tdSql.getData(0 , 1)
consumerId1 = tdSql.getData(1 , 1)
actConsumeRows0 = tdSql.getData(0 , 3)
actConsumeRows1 = tdSql.getData(1 , 3)
tdLog.info("consumer %d rows: %d"%(consumerId0, actConsumeRows0))
tdLog.info("consumer %d rows: %d"%(consumerId1, actConsumeRows1))
totalConsumeRows = actConsumeRows0 + actConsumeRows1
if totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error!")
tdSql.query("drop topic %s"%topicName1)
tdLog.printNoPrefix("======== test case 2 end ...... ")
def tmqCase3(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 3: Produce while one consumers to subscribe one db, include 2 stb")
tdLog.info("step 1: create database, stb, ctb and insert data")
# create and start thread
parameterDict = {'cfg': '', \
'dbName': 'db3', \
'vgroups': 4, \
'stbName': 'stb', \
'ctbNum': 10, \
'rowsPerTbl': 100000, \
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups']))
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
prepareEnvThread.start()
parameterDict2 = {'cfg': '', \
'dbName': 'db3', \
'vgroups': 4, \
'stbName': 'stb2', \
'ctbNum': 10, \
'rowsPerTbl': 100000, \
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
prepareEnvThread2 = threading.Thread(target=self.prepareEnv, kwargs=parameterDict2)
prepareEnvThread2.start()
tdLog.info("create topics from db")
topicName1 = 'topic_db1'
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
tdLog.info("create consume info table and consume result table")
cdbName = parameterDict["dbName"]
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
topicList = topicName1
ifcheckdata = 0
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
sql = "insert into %s.consumeinfo values "%cdbName
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
tdSql.query(sql)
# consumerId = 1
# sql = "insert into %s.consumeinfo values "%cdbName
# sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
# tdSql.query(sql)
event.wait()
tdLog.info("start consume processor")
pollDelay = 5
showMsg = 1
showRow = 1
valgrind = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName,valgrind)
# wait for data ready
prepareEnvThread.join()
prepareEnvThread2.join()
tdLog.info("insert process end, and start to check consume result")
while 1:
tdSql.query("select * from %s.consumeresult"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if tdSql.getRows() == 1:
break
else:
time.sleep(5)
consumerId0 = tdSql.getData(0 , 1)
#consumerId1 = tdSql.getData(1 , 1)
actConsumeRows0 = tdSql.getData(0 , 3)
#actConsumeRows1 = tdSql.getData(1 , 3)
tdLog.info("consumer %d rows: %d"%(consumerId0, actConsumeRows0))
#tdLog.info("consumer %d rows: %d"%(consumerId1, actConsumeRows1))
#totalConsumeRows = actConsumeRows0 + actConsumeRows1
if actConsumeRows0 != expectrowcnt:
tdLog.exit("tmq consume rows error!")
tdSql.query("drop topic %s"%topicName1)
tdLog.printNoPrefix("======== test case 3 end ...... ")
def run(self):
tdSql.prepare()
......
......@@ -332,7 +332,7 @@ int32_t shellParseArgs(int32_t argc, char *argv[]) {
shellInitArgs(argc, argv);
shell.info.clientVersion =
"Welcome to the TDengine shell from %s, Client Version:%s\n"
"Copyright (c) 2020 by TAOS Data, Inc. All rights reserved.\n\n";
"Copyright (c) 2022 by TAOS Data, Inc. All rights reserved.\n\n";
shell.info.promptHeader = "taos> ";
shell.info.promptContinue = " -> ";
shell.info.promptSize = 6;
......
......@@ -29,11 +29,11 @@ static void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD
static int32_t shellDumpResultToFile(const char *fname, TAOS_RES *tres);
static void shellPrintNChar(const char *str, int32_t length, int32_t width);
static void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t length, int32_t precision);
static int32_t shellVerticalPrintResult(TAOS_RES *tres);
static int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql);
static int32_t shellCalcColWidth(TAOS_FIELD *field, int32_t precision);
static void shellPrintHeader(TAOS_FIELD *fields, int32_t *width, int32_t num_fields);
static int32_t shellHorizontalPrintResult(TAOS_RES *tres);
static int32_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical);
static int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql);
static int32_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical, const char *sql);
static void shellReadHistory();
static void shellWriteHistory();
static void shellPrintError(TAOS_RES *tres, int64_t st);
......@@ -121,7 +121,7 @@ int32_t shellRunCommand(char *command) {
char quote = 0, *cmd = command;
for (char c = *command++; c != 0; c = *command++) {
if (c == '\\' && (*command == '\'' || *command == '"' || *command == '`')) {
command ++;
command++;
continue;
}
......@@ -190,7 +190,7 @@ void shellRunSingleCommandImp(char *command) {
if (pFields != NULL) { // select and show kinds of commands
int32_t error_no = 0;
int32_t numOfRows = shellDumpResult(pSql, fname, &error_no, printMode);
int32_t numOfRows = shellDumpResult(pSql, fname, &error_no, printMode, command);
if (numOfRows < 0) return;
et = taosGetTimestampUs();
......@@ -272,6 +272,7 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i
return;
}
int n;
char buf[TSDB_MAX_BYTES_PER_ROW];
switch (field->type) {
case TSDB_DATA_TYPE_BOOL:
......@@ -280,20 +281,37 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i
case TSDB_DATA_TYPE_TINYINT:
taosFprintfFile(pFile, "%d", *((int8_t *)val));
break;
case TSDB_DATA_TYPE_UTINYINT:
taosFprintfFile(pFile, "%u", *((uint8_t *)val));
break;
case TSDB_DATA_TYPE_SMALLINT:
taosFprintfFile(pFile, "%d", *((int16_t *)val));
break;
case TSDB_DATA_TYPE_USMALLINT:
taosFprintfFile(pFile, "%u", *((uint16_t *)val));
break;
case TSDB_DATA_TYPE_INT:
taosFprintfFile(pFile, "%d", *((int32_t *)val));
break;
case TSDB_DATA_TYPE_UINT:
taosFprintfFile(pFile, "%u", *((uint32_t *)val));
break;
case TSDB_DATA_TYPE_BIGINT:
taosFprintfFile(pFile, "%" PRId64, *((int64_t *)val));
break;
case TSDB_DATA_TYPE_UBIGINT:
taosFprintfFile(pFile, "%" PRIu64, *((uint64_t *)val));
break;
case TSDB_DATA_TYPE_FLOAT:
taosFprintfFile(pFile, "%.5f", GET_FLOAT_VAL(val));
break;
case TSDB_DATA_TYPE_DOUBLE:
taosFprintfFile(pFile, "%.9f", GET_DOUBLE_VAL(val));
n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", length, GET_DOUBLE_VAL(val));
if (n > MAX(25, length)) {
taosFprintfFile(pFile, "%*.15e", length, GET_DOUBLE_VAL(val));
} else {
taosFprintfFile(pFile, "%s", buf);
}
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
......@@ -435,6 +453,7 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t
return;
}
int n;
char buf[TSDB_MAX_BYTES_PER_ROW];
switch (field->type) {
case TSDB_DATA_TYPE_BOOL:
......@@ -468,7 +487,12 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t
printf("%*.5f", width, GET_FLOAT_VAL(val));
break;
case TSDB_DATA_TYPE_DOUBLE:
printf("%*.9f", width, GET_DOUBLE_VAL(val));
n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", width, GET_DOUBLE_VAL(val));
if (n > MAX(25, width)) {
printf("%*.15e", width, GET_DOUBLE_VAL(val));
} else {
printf("%s", buf);
}
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
......@@ -483,7 +507,16 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t
}
}
int32_t shellVerticalPrintResult(TAOS_RES *tres) {
bool shellIsLimitQuery(const char *sql) {
//todo refactor
if (strstr(sql, "limit") != NULL || strstr(sql, "LIMIT") != NULL) {
return true;
}
return false;
}
int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql) {
TAOS_ROW row = taos_fetch_row(tres);
if (row == NULL) {
return 0;
......@@ -503,7 +536,7 @@ int32_t shellVerticalPrintResult(TAOS_RES *tres) {
uint64_t resShowMaxNum = UINT64_MAX;
if (shell.args.commands == NULL && shell.args.file[0] == 0) {
if (shell.args.commands == NULL && shell.args.file[0] == 0 && !shellIsLimitQuery(sql)) {
resShowMaxNum = SHELL_DEFAULT_RES_SHOW_NUM;
}
......@@ -525,8 +558,13 @@ int32_t shellVerticalPrintResult(TAOS_RES *tres) {
putchar('\n');
}
} else if (showMore) {
printf("[100 Rows showed, and more rows are fetching but will not be showed. You can ctrl+c to stop or wait.]\n");
printf("[You can add limit statement to get more or redirect results to specific file to get all.]\n");
printf("\n");
printf(" Notice: The result shows only the first %d rows.\n", SHELL_DEFAULT_RES_SHOW_NUM);
printf(" You can use the `LIMIT` clause to get fewer result to show.\n");
printf(" Or use '>>' to redirect the whole set of the result to a specified file.\n");
printf("\n");
printf(" You can use Ctrl+C to stop the underway fetching.\n");
printf("\n");
showMore = 0;
}
......@@ -618,7 +656,7 @@ void shellPrintHeader(TAOS_FIELD *fields, int32_t *width, int32_t num_fields) {
putchar('\n');
}
int32_t shellHorizontalPrintResult(TAOS_RES *tres) {
int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql) {
TAOS_ROW row = taos_fetch_row(tres);
if (row == NULL) {
return 0;
......@@ -637,7 +675,7 @@ int32_t shellHorizontalPrintResult(TAOS_RES *tres) {
uint64_t resShowMaxNum = UINT64_MAX;
if (shell.args.commands == NULL && shell.args.file[0] == 0) {
if (shell.args.commands == NULL && shell.args.file[0] == 0 && !shellIsLimitQuery(sql)) {
resShowMaxNum = SHELL_DEFAULT_RES_SHOW_NUM;
}
......@@ -655,8 +693,13 @@ int32_t shellHorizontalPrintResult(TAOS_RES *tres) {
}
putchar('\n');
} else if (showMore) {
printf("[100 Rows showed, and more rows are fetching but will not be showed. You can ctrl+c to stop or wait.]\n");
printf("[You can add limit statement to show more or redirect results to specific file to get all.]\n");
printf("\n");
printf(" Notice: The result shows only the first %d rows.\n", SHELL_DEFAULT_RES_SHOW_NUM);
printf(" You can use the `LIMIT` clause to get fewer result to show.\n");
printf(" Or use '>>' to redirect the whole set of the result to a specified file.\n");
printf("\n");
printf(" You can use Ctrl+C to stop the underway fetching.\n");
printf("\n");
showMore = 0;
}
......@@ -667,14 +710,14 @@ int32_t shellHorizontalPrintResult(TAOS_RES *tres) {
return numOfRows;
}
int32_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical) {
int32_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical, const char *sql) {
int32_t numOfRows = 0;
if (fname != NULL) {
numOfRows = shellDumpResultToFile(fname, tres);
} else if (vertical) {
numOfRows = shellVerticalPrintResult(tres);
numOfRows = shellVerticalPrintResult(tres, sql);
} else {
numOfRows = shellHorizontalPrintResult(tres);
numOfRows = shellHorizontalPrintResult(tres, sql);
}
*error_no = taos_errno(tres);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册