提交 6beb221c 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/tsdb_snapshot

......@@ -223,8 +223,8 @@ typedef struct SRequestObj {
SArray* tableList;
SQueryExecMetric metric;
SRequestSendRecvBody body;
bool stableQuery; // todo refactor
bool validateOnly; // todo refactor
bool stableQuery; // todo refactor
bool validateOnly; // todo refactor
bool killed;
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
......@@ -325,7 +325,8 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql, SRequestObj** pRequest);
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
SRequestObj** pRequest);
void taos_close_internal(void* taos);
......@@ -359,9 +360,6 @@ int32_t removeMeta(STscObj* pTscObj, SArray* tbList); // todo move to clie
int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog); // todo move to xxx
bool qnodeRequired(SRequestObj* pRequest);
void initTscQhandle();
void cleanupTscQhandle();
#ifdef __cplusplus
}
#endif
......
......@@ -35,22 +35,10 @@ SAppInfo appInfo;
int32_t clientReqRefPool = -1;
int32_t clientConnRefPool = -1;
void *tscQhandle = NULL;
static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
volatile int32_t tscInitRes = 0;
void initTscQhandle() {
// init handle
tscQhandle = taosInitScheduler(4096, 5, "tsc");
}
void cleanupTscQhandle() {
// destroy handle
taosCleanUpScheduler(tscQhandle);
}
static int32_t registerRequest(SRequestObj *pRequest, STscObj* pTscObj) {
static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) {
// connection has been released already, abort creating request.
pRequest->self = taosAddRef(clientReqRefPool, pRequest);
......@@ -72,7 +60,7 @@ static int32_t registerRequest(SRequestObj *pRequest, STscObj* pTscObj) {
static void deregisterRequest(SRequestObj *pRequest) {
assert(pRequest != NULL);
STscObj * pTscObj = pRequest->pTscObj;
STscObj *pTscObj = pRequest->pTscObj;
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
......@@ -97,7 +85,8 @@ void closeTransporter(SAppInstInfo *pAppInfo) {
static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
if (NEED_REDIRECT_ERROR(code)) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
msgType == TDMT_SCH_MERGE_FETCH) {
return false;
}
return true;
......@@ -251,7 +240,7 @@ void *createRequest(uint64_t connId, int32_t type) {
return NULL;
}
STscObj* pTscObj = acquireTscObj(connId);
STscObj *pTscObj = acquireTscObj(connId);
if (pTscObj == NULL) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
......@@ -348,7 +337,6 @@ void taos_init_imp(void) {
// In the APIs of other program language, taos_cleanup is not available yet.
// So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
atexit(taos_cleanup);
initTscQhandle();
errno = TSDB_CODE_SUCCESS;
taosSeedRand(taosGetTimestampSec());
......@@ -407,7 +395,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
return 0;
}
SConfig * pCfg = taosGetCfg();
SConfig *pCfg = taosGetCfg();
SConfigItem *pItem = NULL;
switch (option) {
......
......@@ -1274,8 +1274,8 @@ typedef struct SchedArg {
SEpSet* pEpset;
} SchedArg;
void doProcessMsgFromServer(SSchedMsg* schedMsg) {
SchedArg* arg = (SchedArg*)schedMsg->ahandle;
int32_t doProcessMsgFromServer(void* param) {
SchedArg* arg = (SchedArg*)param;
SRpcMsg* pMsg = &arg->msg;
SEpSet* pEpSet = arg->pEpset;
......@@ -1328,11 +1328,10 @@ void doProcessMsgFromServer(SSchedMsg* schedMsg) {
rpcFreeCont(pMsg->pCont);
destroySendMsgInfo(pSendInfo);
taosMemoryFree(arg);
return TSDB_CODE_SUCCESS;
}
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
SSchedMsg schedMsg = {0};
SEpSet* tEpSet = NULL;
if (pEpSet != NULL) {
tEpSet = taosMemoryCalloc(1, sizeof(SEpSet));
......@@ -1343,9 +1342,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
arg->msg = *pMsg;
arg->pEpset = tEpSet;
schedMsg.fp = doProcessMsgFromServer;
schedMsg.ahandle = arg;
taosScheduleTask(tscQhandle, &schedMsg);
taosAsyncExec(doProcessMsgFromServer, arg, NULL);
}
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
......
......@@ -72,7 +72,6 @@ void taos_cleanup(void) {
catalogDestroy();
schedulerDestroy();
cleanupTscQhandle();
rpcCleanup();
tscInfo("all local resources released");
taosCleanupCfg();
......@@ -242,7 +241,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
#endif
} else if (TD_RES_TMQ(res)) {
SMqRspObj * msg = ((SMqRspObj *)res);
SMqRspObj *msg = ((SMqRspObj *)res);
SReqResultInfo *pResultInfo;
if (msg->resIter == -1) {
pResultInfo = tmqGetNextResInfo(res, true);
......@@ -418,7 +417,7 @@ int taos_affected_rows(TAOS_RES *res) {
return 0;
}
SRequestObj * pRequest = (SRequestObj *)res;
SRequestObj *pRequest = (SRequestObj *)res;
SReqResultInfo *pResInfo = &pRequest->body.resInfo;
return pResInfo->numOfRows;
}
......@@ -601,7 +600,7 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
}
SReqResultInfo *pResInfo = tscGetCurResInfo(res);
TAOS_FIELD * pField = &pResInfo->userFields[columnIndex];
TAOS_FIELD *pField = &pResInfo->userFields[columnIndex];
if (!IS_VAR_DATA_TYPE(pField->type)) {
return 0;
}
......@@ -645,8 +644,8 @@ const char *taos_get_server_info(TAOS *taos) {
typedef struct SqlParseWrapper {
SParseContext *pCtx;
SCatalogReq catalogReq;
SRequestObj * pRequest;
SQuery * pQuery;
SRequestObj *pRequest;
SQuery *pQuery;
} SqlParseWrapper;
static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) {
......@@ -665,8 +664,8 @@ static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) {
void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
SqlParseWrapper *pWrapper = (SqlParseWrapper *)param;
SQuery * pQuery = pWrapper->pQuery;
SRequestObj * pRequest = pWrapper->pRequest;
SQuery *pQuery = pWrapper->pQuery;
SRequestObj *pRequest = pWrapper->pRequest;
if (code == TSDB_CODE_SUCCESS) {
code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery);
......@@ -684,7 +683,8 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
destorySqlParseWrapper(pWrapper);
tscDebug("0x%"PRIx64" analysis semantics completed, start async query, reqId:0x%"PRIx64, pRequest->self, pRequest->requestId);
tscDebug("0x%" PRIx64 " analysis semantics completed, start async query, reqId:0x%" PRIx64, pRequest->self,
pRequest->requestId);
launchAsyncQuery(pRequest, pQuery, pResultMeta);
} else {
destorySqlParseWrapper(pWrapper);
......@@ -705,7 +705,7 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
}
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
int64_t connId = *(int64_t*)taos;
int64_t connId = *(int64_t *)taos;
taosAsyncQueryImpl(connId, sql, fp, param, false);
}
......@@ -739,7 +739,7 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
SParseContext *pCxt = NULL;
STscObj * pTscObj = pRequest->pTscObj;
STscObj *pTscObj = pRequest->pTscObj;
int32_t code = 0;
if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) {
......@@ -874,9 +874,9 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
SSchedulerReq req = {
.syncReq = false,
.fetchFp = fetchCallback,
.cbParam = pRequest,
.syncReq = false,
.fetchFp = fetchCallback,
.cbParam = pRequest,
};
schedulerFetchRows(pRequest->body.queryJob, &req);
......@@ -886,7 +886,7 @@ void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
ASSERT(res != NULL && fp != NULL);
ASSERT(TD_RES_QUERY(res));
SRequestObj *pRequest = res;
SRequestObj *pRequest = res;
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
// set the current block is all consumed
......@@ -928,7 +928,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
int64_t connId = *(int64_t *)taos;
const int32_t MAX_TABLE_NAME_LENGTH = 12 * 1024 * 1024; // 12MB list
int32_t code = 0;
SRequestObj * pRequest = NULL;
SRequestObj *pRequest = NULL;
SCatalogReq catalogReq = {0};
if (NULL == tableNameList) {
......@@ -950,7 +950,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
goto _return;
}
STscObj* pTscObj = pRequest->pTscObj;
STscObj *pTscObj = pRequest->pTscObj;
code = transferTableNameList(tableNameList, pTscObj->acctId, pTscObj->db, &catalogReq.pTableMeta);
if (code) {
goto _return;
......@@ -972,7 +972,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
goto _return;
}
SSyncQueryParam* pParam = pRequest->body.param;
SSyncQueryParam *pParam = pRequest->body.param;
tsem_wait(&pParam->sem);
_return:
......
......@@ -179,7 +179,6 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
if (code != 0) {
terrno = code;
if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash);
taosMemoryFreeClear(output.dbVgroup);
tscError("0x%" PRIx64 " failed to build use db output since %s", pRequest->requestId, terrstr());
} else if (output.dbVgroup && output.dbVgroup->vgHash) {
......@@ -189,12 +188,14 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
if (code1 != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId,
tstrerror(code1));
taosMemoryFreeClear(output.dbVgroup);
} else {
catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup);
output.dbVgroup = NULL;
}
}
taosMemoryFreeClear(output.dbVgroup);
tFreeSUsedbRsp(&usedbRsp);
char db[TSDB_DB_NAME_LEN] = {0};
......
......@@ -22,7 +22,6 @@ typedef struct SLastrowReader {
SVnode* pVnode;
STSchema* pSchema;
uint64_t uid;
// int32_t* pSlotIds;
char** transferBuf; // todo remove it soon
int32_t numOfCols;
int32_t type;
......@@ -31,6 +30,7 @@ typedef struct SLastrowReader {
} SLastrowReader;
static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReader, const int32_t* slotIds) {
ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock));
int32_t numOfRows = pBlock->info.rows;
SColVal colVal = {0};
......@@ -40,15 +40,17 @@ static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReade
if (slotIds[i] == -1) {
colDataAppend(pColInfoData, numOfRows, (const char*)&pRow->ts, false);
} else {
tTSRowGetVal(pRow, pReader->pSchema, slotIds[i], &colVal);
int32_t slotId = slotIds[i];
tTSRowGetVal(pRow, pReader->pSchema, slotId, &colVal);
if (IS_VAR_DATA_TYPE(colVal.type)) {
if (colVal.isNull || colVal.isNone) {
colDataAppendNULL(pColInfoData, numOfRows);
} else {
varDataSetLen(pReader->transferBuf[i], colVal.value.nData);
memcpy(varDataVal(pReader->transferBuf[i]), colVal.value.pData, colVal.value.nData);
colDataAppend(pColInfoData, numOfRows, pReader->transferBuf[i], false);
varDataSetLen(pReader->transferBuf[slotId], colVal.value.nData);
memcpy(varDataVal(pReader->transferBuf[slotId]), colVal.value.pData, colVal.value.nData);
colDataAppend(pColInfoData, numOfRows, pReader->transferBuf[slotId], false);
}
} else {
colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, colVal.isNull || colVal.isNone);
......@@ -68,7 +70,6 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList,
p->type = type;
p->pVnode = pVnode;
p->numOfCols = numOfCols;
p->transferBuf = taosMemoryCalloc(p->numOfCols, POINTER_BYTES);
if (taosArrayGetSize(pTableIdList) == 0) {
*pReader = p;
......@@ -79,7 +80,8 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList,
p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1);
p->pTableList = pTableIdList;
for (int32_t i = 0; i < p->numOfCols; ++i) {
p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES);
for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
if (IS_VAR_DATA_TYPE(p->pSchema->columns[i].type)) {
p->transferBuf[i] = taosMemoryMalloc(p->pSchema->columns[i].bytes);
}
......@@ -92,10 +94,11 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList,
int32_t tsdbLastrowReaderClose(void* pReader) {
SLastrowReader* p = pReader;
for (int32_t i = 0; i < p->numOfCols; ++i) {
for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
taosMemoryFreeClear(p->transferBuf[i]);
}
taosMemoryFree(p->pSchema);
taosMemoryFree(p->transferBuf);
taosMemoryFree(pReader);
return TSDB_CODE_SUCCESS;
......
......@@ -63,10 +63,10 @@ typedef struct SBlockLoadSuppInfo {
} SBlockLoadSuppInfo;
typedef struct SFilesetIter {
int32_t numOfFiles; // number of total files
int32_t index; // current accessed index in the list
SArray* pFileList; // data file list
int32_t order;
int32_t numOfFiles; // number of total files
int32_t index; // current accessed index in the list
SArray* pFileList; // data file list
int32_t order;
} SFilesetIter;
typedef struct SFileDataBlockInfo {
......@@ -302,7 +302,10 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
STimeWindow win = {0};
while (1) {
/*if (pReader->pFileReader != NULL) tsdbDataFReaderClose(&pReader->pFileReader);*/
// if (pReader->pFileReader != NULL) {
// tsdbDataFReaderClose(&pReader->pFileReader);
// }
pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
......@@ -2434,6 +2437,7 @@ static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowInd
SVersionRange* pVerRange, int32_t step) {
while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) {
if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
rowIndex += step;
continue;
}
......@@ -2832,7 +2836,10 @@ void tsdbReaderClose(STsdbReader* pReader) {
destroyBlockScanInfo(pReader->status.pTableMap);
blockDataDestroy(pReader->pResBlock);
if (pReader->pFileReader != NULL) tsdbDataFReaderClose(&pReader->pFileReader);
if (pReader->pFileReader != NULL) {
tsdbDataFReaderClose(&pReader->pFileReader);
}
#if 0
// if (pReader->status.pTableScanInfo != NULL) {
// pReader->status.pTableScanInfo = destroyTableCheckInfo(pReader->status.pTableScanInfo);
......@@ -3020,6 +3027,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
memset(pReader->suppInfo.plist, 0, POINTER_BYTES);
pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
tsdbDataFReaderClose(&pReader->pFileReader);
// todo set the correct numOfTables
int32_t numOfTables = 1;
......
......@@ -642,6 +642,7 @@ void ctgFreeSTableIndex(void *info);
void ctgClearSubTaskRes(SCtgSubRes *pRes);
void ctgFreeQNode(SCtgQNode *node);
void ctgClearHandle(SCatalog* pCtg);
void ctgFreeTbCacheImpl(SCtgTbCache *pCache);
extern SCatalogMgmt gCtgMgmt;
......
......@@ -647,6 +647,8 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
CTG_RET(TSDB_CODE_OUT_OF_MEMORY);
}
bool syncOp = operation->syncOp;
char* opName = gCtgCacheOperation[operation->opId].name;
if (operation->syncOp) {
tsem_init(&operation->rspSem, 0, 0);
}
......@@ -664,14 +666,14 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
gCtgMgmt.queue.tail = node;
CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
ctgDebug("action [%s] added into queue", opName);
CTG_QUEUE_INC();
CTG_RT_STAT_INC(numOfOpEnqueue, 1);
tsem_post(&gCtgMgmt.queue.reqSem);
ctgDebug("action [%s] added into queue", gCtgCacheOperation[operation->opId].name);
if (operation->syncOp) {
if (syncOp) {
tsem_wait(&operation->rspSem);
taosMemoryFree(operation);
}
......@@ -840,6 +842,7 @@ _return:
ctgFreeVgInfo(dbInfo);
taosMemoryFreeClear(op->data);
taosMemoryFreeClear(op);
CTG_RET(code);
}
......@@ -852,7 +855,7 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy
SCtgUpdateTbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbMetaMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbMetaMsg));
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
char *p = strchr(output->dbFName, '.');
......@@ -871,6 +874,11 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy
_return:
if (output) {
taosMemoryFree(output->tbMeta);
taosMemoryFree(output);
}
taosMemoryFreeClear(msg);
CTG_RET(code);
......@@ -1753,6 +1761,16 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
CTG_CACHE_STAT_DEC(numOfStb, 1);
}
SCtgTbCache* pTbCache = taosHashGet(dbCache->tbCache, msg->stbName, strlen(msg->stbName));
if (NULL == pTbCache) {
ctgDebug("stb %s already not in cache", msg->stbName);
goto _return;
}
CTG_LOCK(CTG_WRITE, &pTbCache->metaLock);
ctgFreeTbCacheImpl(pTbCache);
CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock);
if (taosHashRemove(dbCache->tbCache, msg->stbName, strlen(msg->stbName))) {
ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:0x%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
} else {
......@@ -1780,14 +1798,24 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
SCtgDBCache *dbCache = NULL;
ctgGetDBCache(pCtg, msg->dbFName, &dbCache);
if (NULL == dbCache) {
return TSDB_CODE_SUCCESS;
goto _return;
}
if (dbCache->dbId != msg->dbId) {
ctgDebug("dbId 0x%" PRIx64 " not match with curId 0x%"PRIx64", dbFName:%s, tbName:%s", msg->dbId, dbCache->dbId, msg->dbFName, msg->tbName);
return TSDB_CODE_SUCCESS;
goto _return;
}
SCtgTbCache* pTbCache = taosHashGet(dbCache->tbCache, msg->tbName, strlen(msg->tbName));
if (NULL == pTbCache) {
ctgDebug("tb %s already not in cache", msg->tbName);
goto _return;
}
CTG_LOCK(CTG_WRITE, &pTbCache->metaLock);
ctgFreeTbCacheImpl(pTbCache);
CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock);
if (taosHashRemove(dbCache->tbCache, msg->tbName, strlen(msg->tbName))) {
ctgError("tb %s not exist in cache, dbFName:%s", msg->tbName, msg->dbFName);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
......@@ -2063,6 +2091,8 @@ void* ctgUpdateThreadFunc(void* param) {
if (operation->syncOp) {
tsem_post(&operation->rspSem);
} else {
taosMemoryFreeClear(operation);
}
CTG_RT_STAT_INC(numOfOpDequeue, 1);
......
......@@ -261,6 +261,8 @@ int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
_return:
taosMemoryFree(pMsg->pData);
if (pJob) {
taosReleaseRef(gCtgMgmt.jobPool, cbParam->refId);
}
......
......@@ -152,6 +152,7 @@ void ctgFreeStbMetaCache(SCtgDBCache *dbCache) {
}
void ctgFreeTbCacheImpl(SCtgTbCache *pCache) {
qDebug("tbMeta freed, p:%p", pCache->pMeta);
taosMemoryFreeClear(pCache->pMeta);
if (pCache->pIndex) {
taosArrayDestroyEx(pCache->pIndex->pIndex, tFreeSTableIndexInfo);
......@@ -831,6 +832,7 @@ int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput)
if (output->tbMeta) {
int32_t metaSize = CTG_META_SIZE(output->tbMeta);
(*pOutput)->tbMeta = taosMemoryMalloc(metaSize);
qDebug("tbMeta cloned, size:%d, p:%p", metaSize, (*pOutput)->tbMeta);
if (NULL == (*pOutput)->tbMeta) {
qError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
taosMemoryFreeClear(*pOutput);
......
......@@ -2217,7 +2217,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "last_row",
.type = FUNCTION_TYPE_LAST_ROW,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.translateFunc = translateFirstLast,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
......
......@@ -80,8 +80,10 @@ typedef struct STopBotRes {
} STopBotRes;
typedef struct SFirstLastRes {
bool hasResult;
bool isNull; // used for last_row function only
bool hasResult;
// used for last_row function only, isNullRes in SResultRowEntry can not be passed to downstream.So,
// this attribute is required
bool isNull;
int32_t bytes;
char buf[];
} SFirstLastRes;
......@@ -2948,7 +2950,7 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0;
SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pResInfo);
colDataAppend(pCol, pBlock->info.rows, pRes->buf, pResInfo->isNullRes);
colDataAppend(pCol, pBlock->info.rows, pRes->buf, pRes->isNull||pResInfo->isNullRes);
// handle selectivity
STuplePos* pTuplePos = (STuplePos*)(pRes->buf + pRes->bytes + sizeof(TSKEY));
setSelectivityValue(pCtx, pBlock, pTuplePos, pBlock->info.rows);
......@@ -5988,24 +5990,28 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) {
int32_t type = pInputCol->info.type;
int32_t bytes = pInputCol->info.bytes;
pInfo->bytes = bytes;
// last_row function does not ignore the null value
for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
if (pInputCol->hasNull && colDataIsNull_s(pInputCol, i)) {
continue;
}
numOfElems++;
char* data = colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i);
if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) < cts) {
if (IS_VAR_DATA_TYPE(type)) {
bytes = varDataTLen(data);
pInfo->bytes = bytes;
if (colDataIsNull_s(pInputCol, i)) {
pInfo->isNull = true;
} else {
if (IS_VAR_DATA_TYPE(type)) {
bytes = varDataTLen(data);
pInfo->bytes = bytes;
}
memcpy(pInfo->buf, data, bytes);
}
memcpy(pInfo->buf, data, bytes);
*(TSKEY*)(pInfo->buf + bytes) = cts;
pInfo->hasResult = true;
......
......@@ -282,7 +282,7 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t
}
*str = '"';
int32_t length = taosUcs4ToMbs((TdUcs4 *)buf, bufSize, str + 1);
int32_t length = taosUcs4ToMbs((TdUcs4*)buf, bufSize, str + 1);
if (length <= 0) {
return TSDB_CODE_TSC_INVALID_VALUE;
}
......@@ -310,15 +310,15 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t
return TSDB_CODE_TSC_INVALID_VALUE;
}
if(len) *len = n;
if (len) *len = n;
return TSDB_CODE_SUCCESS;
}
char* parseTagDatatoJson(void* p) {
char* string = NULL;
char* string = NULL;
SArray* pTagVals = NULL;
cJSON* json = NULL;
cJSON* json = NULL;
if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
goto end;
}
......@@ -327,7 +327,7 @@ char* parseTagDatatoJson(void* p) {
if (nCols == 0) {
goto end;
}
char tagJsonKey[256] = {0};
char tagJsonKey[256] = {0};
json = cJSON_CreateObject();
if (json == NULL) {
goto end;
......@@ -390,7 +390,7 @@ char* parseTagDatatoJson(void* p) {
end:
cJSON_Delete(json);
taosArrayDestroy(pTagVals);
if(string == NULL){
if (string == NULL) {
string = strdup(TSDB_DATA_NULL_STR_L);
}
return string;
......
......@@ -41,6 +41,8 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
if (pTask->execNodes) {
taosHashCleanup(pTask->execNodes);
}
taosMemoryFree(pTask->profile.execTime);
}
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum) {
......
......@@ -170,6 +170,7 @@
# --- valgrind
./test.sh -f tsim/valgrind/checkError1.sim
./test.sh -f tsim/valgrind/checkError2.sim
./test.sh -f tsim/valgrind/checkError3.sim
# --- vnode
# ./test.sh -f tsim/vnode/replica3_basic.sim
......
import taos
import sys
import time
import socket
import os
import threading
import math
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __init__(self):
self.vgroups = 1
self.ctbNum = 100
self.rowsPerTbl = 10000
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 1000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdSql.query("flush database %s"%(paraDict['dbName']))
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
paraDict['batchNum'] = 100
paraDict['startTs'] = paraDict['startTs'] + self.rowsPerTbl
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter")
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
# tdSql.query(queryString)
# expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
# after start consume, continue insert some data
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
#
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[0] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("%d tmq consume rows error!"%consumerId)
tmqCom.checkFileContent(consumerId, queryString)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter")
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
totalRowsInserted = expectRowsList[0]
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 1
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] / 3)
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 0")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if not (expectrowcnt <= resultList[0] and totalRowsInserted >= resultList[0]):
tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(resultList[0], expectrowcnt, totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)
firstConsumeRows = resultList[0]
# reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable()
consumerId = 2
expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2/3)
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor 1")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
actConsumeTotalRows = firstConsumeRows + resultList[0]
if not (expectrowcnt >= resultList[0] and totalRowsInserted == actConsumeTotalRows):
tdLog.info("act consume rows, first: %d, second: %d "%(firstConsumeRows, resultList[0]))
tdLog.info("and sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 2 end ...... ")
def run(self):
tdSql.prepare()
self.prepareTestEnv()
self.tmqCase1()
# self.tmqCase2()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
import taos
import sys
import time
import socket
import os
import threading
from enum import Enum
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __init__(self):
self.vgroups = 4
self.ctbNum = 500
self.rowsPerTbl = 1000
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 1000,
'rowsPerTbl': 1000,
'batchNum': 400,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdSql.query("flush database %s"%(paraDict['dbName']))
return
# 自动建表完成数据插入,启动消费
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 1000,
'rowsPerTbl': 1000,
'batchNum': 400,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
# update to half tables
paraDict['ctbNum'] = int(self.ctbNum / 2)
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
paraDict['ctbNum'] = self.ctbNum
consumerId = 0
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 3)
topicList = topicFromStb1
ifcheckdata = 0
ifManualCommit = 0
keyList = 'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("insert process end, and start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
tdSql.query(queryString)
totalRowsInserted = tdSql.getRows()
tdLog.info("act consume rows: %d, expect consume rows: %d, act insert rows: %d"%(totalConsumeRows, expectrowcnt, totalRowsInserted))
if totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error!")
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 1000,
'rowsPerTbl': 1000,
'batchNum': 1000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tdLog.info("restart taosd to ensure that the data falls into the disk")
tdSql.query("flush database %s"%(paraDict['dbName']))
# update to half tables
paraDict['ctbNum'] = int(self.ctbNum / 2)
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+paraDict['ctbNum'])
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+paraDict['ctbNum'])
tmqCom.initConsumerTable()
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
paraDict['ctbNum'] = self.ctbNum
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 * 2
topicList = topicFromStb1
ifcheckdata = 0
ifManualCommit = 0
keyList = 'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("insert process end, and start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
tdSql.query(queryString)
totalRowsInserted = tdSql.getRows()
tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt))
if totalConsumeRows != totalRowsInserted:
tdLog.exit("tmq consume rows error!")
tmqCom.checkFileContent(consumerId, queryString)
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 2 end ...... ")
def run(self):
tdSql.prepare()
self.prepareTestEnv()
self.tmqCase1()
self.tmqCase2()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
import taos
import sys
import time
import socket
import os
import threading
from enum import Enum
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __init__(self):
self.vgroups = 4
self.ctbNum = 1000
self.rowsPerTbl = 1000
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 1000,
'rowsPerTbl': 1000,
'batchNum': 400,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
paraDict['ctbNum'] = int(self.ctbNum / 2)
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdSql.query("flush database %s"%(paraDict['dbName']))
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 1000,
'rowsPerTbl': 1000,
'batchNum': 1000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tdLog.info("restart taosd to ensure that the data falls into the disk")
tdSql.query("flush database %s"%(paraDict['dbName']))
# update to half tables
paraDict['ctbNum'] = int(self.ctbNum / 4)
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+paraDict['ctbNum'])
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+paraDict['ctbNum'])
tmqCom.initConsumerTable()
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
paraDict['ctbNum'] = self.ctbNum
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 + 1)
topicList = topicFromStb1
ifcheckdata = 0
ifManualCommit = 0
keyList = 'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt + paraDict["rowsPerTbl"] * paraDict["ctbNum"],topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
paraDict['ctbNum'] = int(self.ctbNum / 2)
paraDict['ctbStartIdx'] += paraDict['ctbNum']
_ = tmqCom.asyncInsertDataByInterlace(paraDict)
time.sleep(3)
pthread = tmqCom.asyncInsertDataByInterlace(paraDict)
pthread.join()
tdLog.info("insert process end, and start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
tdSql.query(queryString)
totalRowsInserted = tdSql.getRows()
tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt))
if totalConsumeRows <= totalRowsInserted or totalConsumeRows != expectrowcnt:
tdLog.exit("tmq consume rows error!")
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 1 end ...... ")
def run(self):
tdSql.prepare()
self.prepareTestEnv()
self.tmqCase1()
# self.tmqCase2()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
Subproject commit 6dccac192a2ae7dd78718ab926201aab5419327a
Subproject commit 7a94ffab45f08e16f09b3f430fe75d717054adb6
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册