diff --git a/documentation20/webdocs/markdowndocs/TAOS SQL-ch.md b/documentation20/webdocs/markdowndocs/TAOS SQL-ch.md index e0acaee137eb8f4d65e95ae97be7bbc8988cb122..66d17c6aa686a7ed07ff1b865000f7b4a43f3834 100644 --- a/documentation20/webdocs/markdowndocs/TAOS SQL-ch.md +++ b/documentation20/webdocs/markdowndocs/TAOS SQL-ch.md @@ -98,12 +98,12 @@ TDengine缺省的时间戳是毫秒精度,但通过修改配置参数enableMic KEEP参数是指修改数据文件保存的天数,缺省值为3650,取值范围[days, 365000],必须大于或等于days参数值。 ```mysql - ALTER DATABASE db_name QUORUM 365; + ALTER DATABASE db_name QUORUM 2; ``` QUORUM参数是指数据写入成功所需要的确认数。取值范围[1, 3]。对于异步复制,quorum设为1,具有master角色的虚拟节点自己确认即可。对于同步复制,需要至少大于等于2。原则上,Quorum >=1 并且 Quorum <= replica(副本数),这个参数在启动一个同步模块实例时需要提供。 ```mysql - ALTER DATABASE db_name BLOCKS 365; + ALTER DATABASE db_name BLOCKS 100; ``` BLOCKS参数是每个VNODE (TSDB) 中有多少cache大小的内存块,因此一个VNODE的用的内存大小粗略为(cache * blocks)。取值范围[3, 1000]。 diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 79a792ab65a5327d68f3e75da4ee0610d1f7a6fd..0323434a992ae25b9f3fe54fb596b56e865bdf55 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -258,6 +258,8 @@ void tscDoQuery(SSqlObj* pSql); */ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd); +void registerSqlObj(SSqlObj* pSql); + SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, int32_t cmd, SSqlObj* pPrevSql); void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClauseIndex, int32_t tableIndex); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 55ca02dfb5d67ea150fc0725191fdc570a6c28c8..ea42b0f6a339c241aefd9c16da30a6b98d3ca09b 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -89,7 +89,7 @@ typedef struct STableComInfo { typedef struct SCMCorVgroupInfo { int32_t version; - int8_t inUse; + int8_t inUse; int8_t numOfEps; SEpAddr epAddr[TSDB_MAX_REPLICA]; } SCMCorVgroupInfo; @@ -107,7 +107,7 @@ typedef struct STableMeta { } STableMeta; typedef struct STableMetaInfo { - STableMeta * pTableMeta; // table meta, cached in client side and acquired by name + STableMeta *pTableMeta; // table meta, cached in client side and acquired by name SVgroupsInfo *vgroupList; SArray *pVgroupTables; // SArray diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 09610575f619be3f02059b6e6ca1ba2c56edcb62..c5d622e2452d42e6863a294ce83fb850d21c5818 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -51,10 +51,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const pSql->fp = fp; pSql->fetchFp = fp; - uint64_t handle = (uint64_t) pSql; - pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000); - - T_REF_INC(pSql->pTscObj); + registerSqlObj(pSql); pSql->sqlstr = calloc(1, sqlLen + 1); if (pSql->sqlstr == NULL) { diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index c4ca6793ff14a86039738ffce1ffc04dfcc42aec..cdbd8685df55d74836b2bcc768e52bf22b972d72 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -545,10 +545,8 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { pSql->cmd.numOfParams = 0; pSql->cmd.batchSize = 0; - - uint64_t handle = (uint64_t) pSql; - pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000); - T_REF_INC(pSql->pTscObj); + + registerSqlObj(pSql); int32_t code = tsParseSql(pSql, true); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index b9d4cc13d1692a9cb349ea348ca6596e3053c71f..f690d13164bb82749ec7ce9f6229376bfb3f61ac 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -820,20 +820,20 @@ int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pzTableNa if (hasSpecifyDB(pzTableName)) { // db has been specified in sql string so we ignore current db path code = setObjFullName(pTableMetaInfo->name, getAccountId(pSql), NULL, pzTableName, NULL); - if (code != 0) { + if (code != TSDB_CODE_SUCCESS) { invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } } else { // get current DB name first, then set it into path SStrToken t = {0}; getCurrentDBName(pSql, &t); if (t.n == 0) { - invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); - } - - code = setObjFullName(pTableMetaInfo->name, NULL, &t, pzTableName, NULL); - if (code != 0) { - invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); - } + code = invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); + } else { + code = setObjFullName(pTableMetaInfo->name, NULL, &t, pzTableName, NULL); + if (code != TSDB_CODE_SUCCESS) { + code = invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); + } + } } if (code != TSDB_CODE_SUCCESS) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 8eaa406bce95a4831a80366e62b34efea0cc2a5a..3cbb0d936e23bb5e90773f1cb2b0e32159de938c 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -128,6 +128,7 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) { tscDebug("after: EndPoint in use: %d", pVgroupInfo->inUse); taosCorEndWrite(&pVgroupInfo->version); } + void tscPrintMgmtEp() { SRpcEpSet dump; tscDumpMgmtEpSet(&dump); @@ -745,7 +746,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i); if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) { - /* column id is not valid according to the cached table meta, the table meta is expired */ tscError("%p table schema is not matched with parsed sql", pSql); return TSDB_CODE_TSC_INVALID_SQL; } @@ -1987,15 +1987,11 @@ static void createHBObj(STscObj* pObj) { pSql->param = pObj; pSql->pTscObj = pObj; pSql->signature = pSql; - pObj->pHb = pSql; - - tscAddSubqueryInfo(&pObj->pHb->cmd); - int64_t ad = (int64_t) pSql; - pSql->self = taosCachePut(tscObjCache, &ad, sizeof(int64_t), &pSql, sizeof(int64_t), 2 * 60 * 1000); - T_REF_INC(pObj); + registerSqlObj(pSql); + tscDebug("%p HB is allocated, pObj:%p", pSql, pObj); - tscDebug("%p HB is allocated, pObj:%p", pObj->pHb, pObj); + pObj->pHb = pSql; } int tscProcessConnectRsp(SSqlObj *pSql) { @@ -2170,11 +2166,7 @@ static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInf pNew->signature = pNew; pNew->cmd.command = TSDB_SQL_META; - T_REF_INC(pNew->pTscObj); - - // TODO add test case on x86 platform - uint64_t adr = (uint64_t) pNew; - pNew->self = taosCachePut(tscObjCache, &adr, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2*60*1000); + registerSqlObj(pNew); tscAddSubqueryInfo(&pNew->cmd); @@ -2301,10 +2293,8 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { } pNewQueryInfo->numOfTables = pQueryInfo->numOfTables; - T_REF_INC(pNew->pTscObj); + registerSqlObj(pNew); - uint64_t p = (uint64_t) pNew; - pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 1000); tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables); pNew->fp = tscTableMetaCallBack; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index e1a07fdcfe7f9ff2e24ae9f6469bad181dd0786f..430a762321718eb78f6271e43086390ea1f2439b 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -156,10 +156,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con *taos = pObj; } - T_REF_INC(pSql->pTscObj); - - uint64_t key = (uint64_t) pSql; - pSql->self = taosCachePut(tscObjCache, &key, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000); + registerSqlObj(pSql); tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg); return pSql; @@ -270,7 +267,7 @@ void taos_close(TAOS *taos) { pHb->pRpcCtx = NULL; } - tscDebug("%p, HB is freed", pHb); + tscDebug("%p HB is freed", pHb); taos_free_result(pHb); } @@ -789,14 +786,17 @@ int taos_validate_sql(TAOS *taos, const char *sql) { } SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); + pSql->pTscObj = taos; pSql->signature = pSql; + SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; pRes->numOfTotal = 0; pRes->numOfClauseTotal = 0; + tscDebug("%p Valid SQL: %s pObj:%p", pSql, sql, pObj); int32_t sqlLen = (int32_t)strlen(sql); @@ -832,11 +832,12 @@ int taos_validate_sql(TAOS *taos, const char *sql) { tsem_wait(&pSql->rspSem); code = pSql->res.code; } + if (code != TSDB_CODE_SUCCESS) { tscDebug("%p Valid SQL result:%d, %s pObj:%p", pSql, code, taos_errstr(taos), pObj); } - taos_free_result(pSql); + taos_free_result(pSql); return code; } @@ -935,34 +936,32 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); pSql->pTscObj = taos; pSql->signature = pSql; + SSqlRes *pRes = &pSql->res; + pRes->code = 0; pRes->numOfTotal = 0; // the number of getting table meta from server pRes->numOfClauseTotal = 0; - pRes->code = 0; - assert(pSql->fp == NULL); tscDebug("%p tableNameList: %s pObj:%p", pSql, tableNameList, pObj); int32_t tblListLen = (int32_t)strlen(tableNameList); if (tblListLen > MAX_TABLE_NAME_LENGTH) { tscError("%p tableNameList too long, length:%d, maximum allowed:%d", pSql, tblListLen, MAX_TABLE_NAME_LENGTH); - pRes->code = TSDB_CODE_TSC_INVALID_SQL; - taosTFree(pSql); - return pRes->code; + tscFreeSqlObj(pSql); + return TSDB_CODE_TSC_INVALID_SQL; } char *str = calloc(1, tblListLen + 1); if (str == NULL) { - pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; tscError("%p failed to malloc sql string buffer", pSql); - taosTFree(pSql); - return pRes->code; + tscFreeSqlObj(pSql); + return TSDB_CODE_TSC_OUT_OF_MEMORY; } strtolower(str, tableNameList); - pRes->code = (uint8_t)tscParseTblNameList(pSql, str, tblListLen); + int32_t code = (uint8_t) tscParseTblNameList(pSql, str, tblListLen); /* * set the qhandle to 0 before return in order to erase the qhandle value assigned in the previous successful query. @@ -972,17 +971,17 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { pRes->qhandle = 0; free(str); - if (pRes->code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS) { tscFreeSqlObj(pSql); - return pRes->code; + return code; } tscDoQuery(pSql); - tscDebug("%p load multi metermeta result:%d %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj); - if (pRes->code != TSDB_CODE_SUCCESS) { - tscPartiallyFreeSqlObj(pSql); + tscDebug("%p load multi table meta result:%d %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj); + if ((code = pRes->code) != TSDB_CODE_SUCCESS) { + tscFreeSqlObj(pSql); } - return pRes->code; + return code; } diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 35cd09a0336dbb3580d4025aed363fc65acaf4b7..81b8cf7359a93764551cc4765efebc19be4d62ad 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -510,9 +510,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { return; } - uint64_t handle = (uint64_t) pSql; - pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000); - T_REF_INC(pSql->pTscObj); + registerSqlObj(pSql); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 760c5f5a514af8f90ce3e6461dfff019a766c682..2c81bd7c7ce25535dfffe9c1fd6f8a0197fb900b 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -105,6 +105,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* code = TAOS_SYSTEM_ERROR(errno); goto fail; } + tstrncpy(pSub->topic, topic, sizeof(pSub->topic)); pSub->progress = taosArrayInit(32, sizeof(SSubscriptionProgress)); if (pSub->progress == NULL) { @@ -119,6 +120,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* code = TSDB_CODE_TSC_OUT_OF_MEMORY; goto fail; } + pSql->signature = pSql; pSql->pTscObj = pObj; pSql->pSubscription = pSub; @@ -142,6 +144,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* code = TSDB_CODE_TSC_OUT_OF_MEMORY; goto fail; } + strtolower(pSql->sqlstr, pSql->sqlstr); pRes->qhandle = 0; pRes->numOfRows = 1; @@ -152,15 +155,14 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* goto fail; } - uint64_t handle = (uint64_t) pSql; - pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000); - T_REF_INC(pSql->pTscObj); + registerSqlObj(pSql); code = tsParseSql(pSql, false); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { tsem_wait(&pSub->sem); code = pSql->res.code; } + if (code != TSDB_CODE_SUCCESS) { line = __LINE__; goto fail; @@ -182,8 +184,10 @@ fail: } else { tscFreeSqlObj(pSql); } + pSql = NULL; } + if (pSub != NULL) { taosArrayDestroy(pSub->progress); tsem_destroy(&pSub->sem); diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 82ce1d36791e2a5e79177ea1405bf90145f2776d..620fe13a9fd37c44b9fa9d25183d5134304a08c3 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -122,11 +122,8 @@ void taos_init_imp(void) { tscInitMsgsFp(); int queueSize = tsMaxConnections*2; - if (tscEmbedded == 0) { - tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / 2.0); - } else { - tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / 4.0); - } + double factor = (tscEmbedded == 0)? 2.0:4.0; + tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / factor); if (tscNumOfThreads < 2) tscNumOfThreads = 2; @@ -140,11 +137,8 @@ void taos_init_imp(void) { if(0 == tscEmbedded){ taosTmrReset(tscCheckDiskUsage, 10, NULL, tscTmr, &tscCheckDiskUsageTmr); } - - int64_t refreshTime = tsTableMetaKeepTimer; - refreshTime = refreshTime > 10 ? 10 : refreshTime; - refreshTime = refreshTime < 10 ? 10 : refreshTime; + int64_t refreshTime = 10; // 10 seconds by default if (tscMetaCache == NULL) { tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "tableMeta"); tscObjCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, refreshTime/2, false, tscFreeSqlObjInCache, "sqlObj"); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 23f412ddd1de1b7ae0f35b702337a9173f984959..33362409cf1fe35384e38fab75282c24707dbb01 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -391,10 +391,21 @@ static UNUSED_FUNC void tscFreeSubobj(SSqlObj* pSql) { */ void tscFreeSqlObjInCache(void *pSql) { assert(pSql != NULL); + SSqlObj** p = (SSqlObj**)pSql; + STscObj* pTscObj = (*p)->pTscObj; assert((*p)->self != 0 && (*p)->self == (p)); tscFreeSqlObj(*p); + + int32_t ref = T_REF_DEC(pTscObj); + assert(ref >= 0); + + tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", *p, pTscObj, ref); + if (ref == 0) { + tscDebug("%p all sqlObj freed, free tscObj:%p", *p, pTscObj); + tscCloseTscObj(pTscObj); + } } void tscFreeSqlObj(SSqlObj* pSql) { @@ -403,7 +414,6 @@ void tscFreeSqlObj(SSqlObj* pSql) { } tscDebug("%p start to free sqlObj", pSql); - STscObj* pTscObj = pSql->pTscObj; tscFreeSubobj(pSql); tscPartiallyFreeSqlObj(pSql); @@ -421,14 +431,6 @@ void tscFreeSqlObj(SSqlObj* pSql) { tsem_destroy(&pSql->rspSem); free(pSql); - tscDebug("%p free sqlObj completed", pSql); - - int32_t ref = T_REF_DEC(pTscObj); - assert(ref >= 0); - - if (ref == 0) { - tscCloseTscObj(pTscObj); - } } void tscDestroyDataBlock(STableDataBlocks* pDataBlock) { @@ -1780,6 +1782,16 @@ void tscResetForNextRetrieve(SSqlRes* pRes) { pRes->numOfRows = 0; } +void registerSqlObj(SSqlObj* pSql) { + int32_t DEFAULT_LIFE_TIME = 2 * 600 * 1000; // 1200 sec + + int32_t ref = T_REF_INC(pSql->pTscObj); + tscDebug("%p add to tscObj:%p, ref:%d", pSql, pSql->pTscObj, ref); + + uint64_t p = (uint64_t) pSql; + pSql->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &p, sizeof(uint64_t), DEFAULT_LIFE_TIME); +} + SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd) { SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj)); if (pNew == NULL) { @@ -1808,8 +1820,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm pNew->sqlstr = strdup(pSql->sqlstr); if (pNew->sqlstr == NULL) { tscError("%p new subquery failed", pSql); - - free(pNew); + tscFreeSqlObj(pNew); return NULL; } @@ -1820,9 +1831,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL); - T_REF_INC(pNew->pTscObj); - uint64_t p = (uint64_t) pNew; - pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 1000); + registerSqlObj(pNew); return pNew; } @@ -1912,7 +1921,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void pNew->pTscObj = pSql->pTscObj; pNew->signature = pNew; - T_REF_INC(pNew->pTscObj); pNew->sqlstr = strdup(pSql->sqlstr); if (pNew->sqlstr == NULL) { @@ -2061,10 +2069,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void tscDebug("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->vgroupIndex); } - T_REF_INC(pNew->pTscObj); - - uint64_t p = (uint64_t) pNew; - pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 10); + registerSqlObj(pNew); return pNew; _error: diff --git a/src/util/inc/tskiplist.h b/src/util/inc/tskiplist.h index 4ba620dce03c51cd49faeb0ed113d64fe2d24959..a14a8565617bc08eefc989168a93f523f2e6caff 100644 --- a/src/util/inc/tskiplist.h +++ b/src/util/inc/tskiplist.h @@ -136,6 +136,7 @@ typedef struct SSkipListIterator { SSkipListNode *cur; int32_t step; // the number of nodes that have been checked already int32_t order; // order of the iterator + SSkipListNode *next; // next points to the true qualified node in skip list } SSkipListIterator; /** diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 49b9996cf4fcb813f3c0db00697277a5352e9116..2637699adb1c6b7508418fe10e189ff37a969b2a 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -97,7 +97,7 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo int32_t size = (int32_t)taosHashGetSize(pCacheObj->pHashTable); assert(size > 0); - uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, num:%d size:%" PRId64 "bytes", + uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, totalNum:%d size:%" PRId64 "bytes", pCacheObj->name, pNode->key, pNode->data, pNode->size, size - 1, pCacheObj->totalSize); if (pCacheObj->freeFp) { diff --git a/src/util/src/tskiplist.c b/src/util/src/tskiplist.c index 303c2440bf9b59282b8347604d58d3368dd312c0..bacdaef6c8da1e42218331aaa34d25aa40ab5dc4 100644 --- a/src/util/src/tskiplist.c +++ b/src/util/src/tskiplist.c @@ -79,9 +79,12 @@ static SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t // when order is TSDB_ORDER_ASC, return the last node with key less than val // when order is TSDB_ORDER_DESC, return the first node with key large than val -static SSkipListNode* getPriorNode(SSkipList* pSkipList, const char* val, int32_t order) { +static SSkipListNode* getPriorNode(SSkipList* pSkipList, const char* val, int32_t order, SSkipListNode** pCur) { __compar_fn_t comparFn = pSkipList->comparFn; SSkipListNode *pNode = NULL; + if (pCur != NULL) { + *pCur = NULL; + } if (order == TSDB_ORDER_ASC) { pNode = pSkipList->pHead; @@ -93,6 +96,9 @@ static SSkipListNode* getPriorNode(SSkipList* pSkipList, const char* val, int32_ pNode = p; p = SL_GET_FORWARD_POINTER(p, i); } else { + if (pCur != NULL) { + *pCur = p; + } break; } } @@ -107,6 +113,9 @@ static SSkipListNode* getPriorNode(SSkipList* pSkipList, const char* val, int32_ pNode = p; p = SL_GET_BACKWARD_POINTER(p, i); } else { + if (pCur != NULL) { + *pCur = p; + } break; } } @@ -295,7 +304,7 @@ SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey key) { pthread_rwlock_wrlock(pSkipList->lock); } - SSkipListNode* pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC); + SSkipListNode* pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC, NULL); while (1) { SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, 0); if (p == pSkipList->pTail) { @@ -452,7 +461,7 @@ uint32_t tSkipListRemove(SSkipList *pSkipList, SSkipListKey key) { pthread_rwlock_wrlock(pSkipList->lock); } - SSkipListNode* pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC); + SSkipListNode* pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC, NULL); while (1) { SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, 0); if (p == pSkipList->pTail) { @@ -545,7 +554,7 @@ SSkipListIterator *tSkipListCreateIterFromVal(SSkipList* pSkipList, const char* pthread_rwlock_rdlock(pSkipList->lock); } - iter->cur = getPriorNode(pSkipList, val, order); + iter->cur = getPriorNode(pSkipList, val, order, &iter->next); if (pSkipList->lock) { pthread_rwlock_unlock(pSkipList->lock); @@ -567,8 +576,22 @@ bool tSkipListIterNext(SSkipListIterator *iter) { if (iter->order == TSDB_ORDER_ASC) { // ascending order iterate iter->cur = SL_GET_FORWARD_POINTER(iter->cur, 0); + + // a new node is inserted into between iter->cur and iter->next, ignore it + if (iter->cur != iter->next && (iter->next != NULL)) { + iter->cur = iter->next; + } + + iter->next = SL_GET_FORWARD_POINTER(iter->cur, 0); } else { // descending order iterate iter->cur = SL_GET_BACKWARD_POINTER(iter->cur, 0); + + // a new node is inserted into between iter->cur and iter->next, ignore it + if (iter->cur != iter->next && (iter->next != NULL)) { + iter->cur = iter->next; + } + + iter->next = SL_GET_BACKWARD_POINTER(iter->cur, 0); } if (pSkipList->lock) { @@ -715,9 +738,11 @@ SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t order) iter->order = order; if(order == TSDB_ORDER_ASC) { iter->cur = pSkipList->pHead; + iter->next = SL_GET_FORWARD_POINTER(iter->cur, 0); } else { iter->cur = pSkipList->pTail; + iter->next = SL_GET_BACKWARD_POINTER(iter->cur, 0); } - + return iter; } \ No newline at end of file diff --git a/tests/pytest/client/alterDatabase.py b/tests/pytest/client/alterDatabase.py new file mode 100644 index 0000000000000000000000000000000000000000..fa397d16c57625d53f6f1b2772afa99ade314e1a --- /dev/null +++ b/tests/pytest/client/alterDatabase.py @@ -0,0 +1,55 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +from util.log import * +from util.cases import * +from util.sql import * + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + def run(self): + tdSql.prepare() + + tdSql.query('select database()') + tdSql.checkData(0, 0, "db") + + tdSql.execute("alter database db comp 2") + tdSql.query("show databases") + tdSql.checkData(0, 14, 2) + + tdSql.execute("alter database db keep 365") + tdSql.query("show databases") + tdSql.checkData(0, 7, "3650,3650,365") + + tdSql.execute("alter database db quorum 2") + tdSql.query("show databases") + tdSql.checkData(0, 5, 2) + + tdSql.execute("alter database db blocks 100") + tdSql.query("show databases") + tdSql.checkData(0, 9, 100) + + + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) \ No newline at end of file diff --git a/tests/pytest/fulltest.sh b/tests/pytest/fulltest.sh index 597102a0f0b356f921d284ceaace4d99c6850f24..b679942054c000050271fcb96328d18adea60b42 100755 --- a/tests/pytest/fulltest.sh +++ b/tests/pytest/fulltest.sh @@ -17,6 +17,7 @@ python3 ./test.py -f insert/nchar-unicode.py python3 ./test.py -f insert/multi.py python3 ./test.py -f insert/randomNullCommit.py python3 insert/retentionpolicy.py +python3 ./test.py -f insert/alterTableAndInsert.py python3 ./test.py -f table/column_name.py python3 ./test.py -f table/column_num.py @@ -163,6 +164,7 @@ python3 ./test.py -f alter/alter_table_crash.py # client python3 ./test.py -f client/client.py python3 ./test.py -f client/version.py +python3 ./test.py -f client/alterDatabase.py # Misc python3 testCompress.py diff --git a/tests/pytest/insert/alterTableAndInsert.py b/tests/pytest/insert/alterTableAndInsert.py new file mode 100644 index 0000000000000000000000000000000000000000..a0447704f3cfa594f3bda01cb0dd14b2e795bb5f --- /dev/null +++ b/tests/pytest/insert/alterTableAndInsert.py @@ -0,0 +1,40 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +from util.log import * +from util.cases import * +from util.sql import * + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + def run(self): + tdSql.prepare() + + tdSql.execute("create table cars(ts timestamp, speed int) tags(id int)") + tdSql.execute("create table car0 using cars tags(0)") + tdSql.execute("insert into car0 values(now, 1)") + tdSql.execute("alter table cars add column c2 int") + tdSql.execute("insert into car0(ts, 'speed') values(now, 2)") + tdSql.checkAffectedRows(1) + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/pytest/insert/restful.py b/tests/pytest/insert/restful.py new file mode 100644 index 0000000000000000000000000000000000000000..bf9bde99f0f97276e9a00aaa3290258274da988b --- /dev/null +++ b/tests/pytest/insert/restful.py @@ -0,0 +1,60 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import requests, json +import threading +import string +import random + +class RestfulInsert: + def init(self): + self.header = {'Authorization': 'Basic cm9vdDp0YW9zZGF0YQ=='} + self.url = "http://127.0.0.1:6041/rest/sql" + self.ts = 1104508800000 + self.numOfThreads = 50 + + def get_random_string(self, length): + letters = string.ascii_lowercase + result_str = ''.join(random.choice(letters) for i in range(length)) + return result_str + + def insertData(self, threadID): + print("thread %d started" % threadID) + data = "create table test.tb%d(ts timestamp, name nchar(20))" % threadID + requests.post(self.url, data, headers = self.header) + name = self.get_random_string(10) + start = self.ts + while True: + start += 1 + data = "insert into test.tb%d values(%d, '%s')" % (threadID, start, name) + requests.post(self.url, data, headers = self.header) + + def run(self): + data = "drop database if exists test" + requests.post(self.url, data, headers = self.header) + data = "create database test keep 7300" + requests.post(self.url, data, headers = self.header) + + threads = [] + for i in range(self.numOfThreads): + thread = threading.Thread(target=self.insertData, args=(i,)) + thread.start() + threads.append(thread) + + for i in range(self.numOfThreads): + threads[i].join() + +ri = RestfulInsert() +ri.init() +ri.run() \ No newline at end of file diff --git a/tests/pytest/insert/retentionpolicy.py b/tests/pytest/insert/retentionpolicy.py index c50de31cc468b6a2d48c226dae24bc3d1fe4f033..c69060b5aec16b8d58aa2742e24c44d1ebe543ab 100644 --- a/tests/pytest/insert/retentionpolicy.py +++ b/tests/pytest/insert/retentionpolicy.py @@ -43,7 +43,8 @@ class TDTestRetetion: else: caller = inspect.getframeinfo(inspect.stack()[1][0]) args = (caller.filename, caller.lineno, sql, self.queryRows, expectRows) - os.system("timedatectl set-ntp true") + os.system("sudo timedatectl set-ntp true") + time.sleep(40) tdLog.exit("%s(%d) failed: sql:%s, queryRows:%d != expect:%d" % args) def run(self): @@ -53,7 +54,7 @@ class TDTestRetetion: tdSql.execute('use test;') tdSql.execute('create table test(ts timestamp,i int);') - cmd = 'insert into test values(now-2d,11)(now-1d,11)(now,11)(now+1d,11);' + cmd = 'insert into test values(now-2d,1)(now-1d,2)(now,3)(now+1d,4);' tdLog.info(cmd) tdSql.execute(cmd) tdSql.query('select * from test') @@ -61,46 +62,55 @@ class TDTestRetetion: tdLog.info("=============== step2") tdDnodes.stop(1) - os.system("timedatectl set-ntp false") - os.system("date -s $(date -d \"${DATE} 2 days\" \"+%Y%m%d\")") + os.system("sudo timedatectl set-ntp false") + os.system("sudo date -s $(date -d \"${DATE} 2 days\" \"+%Y%m%d\")") tdDnodes.start(1) - cmd = 'insert into test values(now,11);' + cmd = 'insert into test values(now,5);' + tdDnodes.stop(1) + tdDnodes.start(1) + tdLog.info(cmd) tdSql.execute(cmd) - queryRows=tdSql.query('select * from test') - if queryRows==4: - tdSql.checkRows(4) + self.queryRows=tdSql.query('select * from test') + if self.queryRows==4: + self.checkRows(4,cmd) return 0 else: - tdSql.checkRows(5) - + self.checkRows(5,cmd) tdLog.info("=============== step3") tdDnodes.stop(1) - os.system("date -s $(date -d \"${DATE} 2 days\" \"+%Y%m%d\")") + os.system("sudo date -s $(date -d \"${DATE} 2 days\" \"+%Y%m%d\")") tdDnodes.start(1) - cmd = 'insert into test values(now-1d,11);' tdLog.info(cmd) tdSql.execute(cmd) - queryRows=tdSql.query('select * from test') - tdSql.checkRows(6) + self.queryRows=tdSql.query('select * from test') + if self.queryRows==4: + self.checkRows(4,cmd) + return 0 + cmd = 'insert into test values(now-1d,6);' + tdLog.info(cmd) + tdSql.execute(cmd) + self.queryRows=tdSql.query('select * from test') + self.checkRows(6,cmd) tdLog.info("=============== step4") tdDnodes.stop(1) tdDnodes.start(1) - cmd = 'insert into test values(now,11);' + cmd = 'insert into test values(now,7);' tdLog.info(cmd) tdSql.execute(cmd) - tdSql.query('select * from test') - tdSql.checkRows(7) + self.queryRows=tdSql.query('select * from test') + self.checkRows(7,cmd) tdLog.info("=============== step5") tdDnodes.stop(1) tdDnodes.start(1) cmd='select * from test where ts > now-1d' - queryRows=tdSql.query('select * from test where ts > now-1d') + self.queryRows=tdSql.query('select * from test where ts > now-1d') self.checkRows(1,cmd) def stop(self): - os.system("timedatectl set-ntp true") + os.system("sudo timedatectl set-ntp true") + time.sleep(40) tdSql.close() tdLog.success("%s successfully executed" % __file__) diff --git a/tests/pytest/query/queryNormal.py b/tests/pytest/query/queryNormal.py index 208ac54ecd5192aec7880c49637794c1fa8e17b8..13393117d642ec1fbdf839d59429428ffee79a27 100644 --- a/tests/pytest/query/queryNormal.py +++ b/tests/pytest/query/queryNormal.py @@ -35,7 +35,8 @@ class TDTestCase: tdSql.execute( "insert into tb2 using stb1 tags(2,'tb2', '表2') values ('2020-04-18 15:00:02.000', 3, 2.1), ('2020-04-18 15:00:03.000', 4, 2.2)") - # inner join --- bug + tdSql.error("select * from tb 1") + tdSql.query("select * from tb1 a, tb2 b where a.ts = b.ts") tdSql.checkRows(0)