You need to sign in or sign up before continuing.
提交 7b0ba6bd 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/develop' into feature/ad

...@@ -98,12 +98,12 @@ TDengine缺省的时间戳是毫秒精度,但通过修改配置参数enableMic ...@@ -98,12 +98,12 @@ TDengine缺省的时间戳是毫秒精度,但通过修改配置参数enableMic
KEEP参数是指修改数据文件保存的天数,缺省值为3650,取值范围[days, 365000],必须大于或等于days参数值。 KEEP参数是指修改数据文件保存的天数,缺省值为3650,取值范围[days, 365000],必须大于或等于days参数值。
```mysql ```mysql
ALTER DATABASE db_name QUORUM 365; ALTER DATABASE db_name QUORUM 2;
``` ```
QUORUM参数是指数据写入成功所需要的确认数。取值范围[1, 3]。对于异步复制,quorum设为1,具有master角色的虚拟节点自己确认即可。对于同步复制,需要至少大于等于2。原则上,Quorum >=1 并且 Quorum <= replica(副本数),这个参数在启动一个同步模块实例时需要提供。 QUORUM参数是指数据写入成功所需要的确认数。取值范围[1, 3]。对于异步复制,quorum设为1,具有master角色的虚拟节点自己确认即可。对于同步复制,需要至少大于等于2。原则上,Quorum >=1 并且 Quorum <= replica(副本数),这个参数在启动一个同步模块实例时需要提供。
```mysql ```mysql
ALTER DATABASE db_name BLOCKS 365; ALTER DATABASE db_name BLOCKS 100;
``` ```
BLOCKS参数是每个VNODE (TSDB) 中有多少cache大小的内存块,因此一个VNODE的用的内存大小粗略为(cache * blocks)。取值范围[3, 1000]。 BLOCKS参数是每个VNODE (TSDB) 中有多少cache大小的内存块,因此一个VNODE的用的内存大小粗略为(cache * blocks)。取值范围[3, 1000]。
......
...@@ -258,6 +258,8 @@ void tscDoQuery(SSqlObj* pSql); ...@@ -258,6 +258,8 @@ void tscDoQuery(SSqlObj* pSql);
*/ */
SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd); SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd);
void registerSqlObj(SSqlObj* pSql);
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, int32_t cmd, SSqlObj* pPrevSql); SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, int32_t cmd, SSqlObj* pPrevSql);
void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClauseIndex, int32_t tableIndex); void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClauseIndex, int32_t tableIndex);
......
...@@ -89,7 +89,7 @@ typedef struct STableComInfo { ...@@ -89,7 +89,7 @@ typedef struct STableComInfo {
typedef struct SCMCorVgroupInfo { typedef struct SCMCorVgroupInfo {
int32_t version; int32_t version;
int8_t inUse; int8_t inUse;
int8_t numOfEps; int8_t numOfEps;
SEpAddr epAddr[TSDB_MAX_REPLICA]; SEpAddr epAddr[TSDB_MAX_REPLICA];
} SCMCorVgroupInfo; } SCMCorVgroupInfo;
...@@ -107,7 +107,7 @@ typedef struct STableMeta { ...@@ -107,7 +107,7 @@ typedef struct STableMeta {
} STableMeta; } STableMeta;
typedef struct STableMetaInfo { typedef struct STableMetaInfo {
STableMeta * pTableMeta; // table meta, cached in client side and acquired by name STableMeta *pTableMeta; // table meta, cached in client side and acquired by name
SVgroupsInfo *vgroupList; SVgroupsInfo *vgroupList;
SArray *pVgroupTables; // SArray<SVgroupTableInfo> SArray *pVgroupTables; // SArray<SVgroupTableInfo>
......
...@@ -51,10 +51,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const ...@@ -51,10 +51,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
pSql->fp = fp; pSql->fp = fp;
pSql->fetchFp = fp; pSql->fetchFp = fp;
uint64_t handle = (uint64_t) pSql; registerSqlObj(pSql);
pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
T_REF_INC(pSql->pTscObj);
pSql->sqlstr = calloc(1, sqlLen + 1); pSql->sqlstr = calloc(1, sqlLen + 1);
if (pSql->sqlstr == NULL) { if (pSql->sqlstr == NULL) {
......
...@@ -545,10 +545,8 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { ...@@ -545,10 +545,8 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
pSql->cmd.numOfParams = 0; pSql->cmd.numOfParams = 0;
pSql->cmd.batchSize = 0; pSql->cmd.batchSize = 0;
uint64_t handle = (uint64_t) pSql; registerSqlObj(pSql);
pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
T_REF_INC(pSql->pTscObj);
int32_t code = tsParseSql(pSql, true); int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
......
...@@ -820,20 +820,20 @@ int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pzTableNa ...@@ -820,20 +820,20 @@ int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pzTableNa
if (hasSpecifyDB(pzTableName)) { if (hasSpecifyDB(pzTableName)) {
// db has been specified in sql string so we ignore current db path // db has been specified in sql string so we ignore current db path
code = setObjFullName(pTableMetaInfo->name, getAccountId(pSql), NULL, pzTableName, NULL); code = setObjFullName(pTableMetaInfo->name, getAccountId(pSql), NULL, pzTableName, NULL);
if (code != 0) { if (code != TSDB_CODE_SUCCESS) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
} }
} else { // get current DB name first, then set it into path } else { // get current DB name first, then set it into path
SStrToken t = {0}; SStrToken t = {0};
getCurrentDBName(pSql, &t); getCurrentDBName(pSql, &t);
if (t.n == 0) { if (t.n == 0) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); code = invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
} } else {
code = setObjFullName(pTableMetaInfo->name, NULL, &t, pzTableName, NULL);
code = setObjFullName(pTableMetaInfo->name, NULL, &t, pzTableName, NULL); if (code != TSDB_CODE_SUCCESS) {
if (code != 0) { code = invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); }
} }
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
......
...@@ -128,6 +128,7 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) { ...@@ -128,6 +128,7 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
tscDebug("after: EndPoint in use: %d", pVgroupInfo->inUse); tscDebug("after: EndPoint in use: %d", pVgroupInfo->inUse);
taosCorEndWrite(&pVgroupInfo->version); taosCorEndWrite(&pVgroupInfo->version);
} }
void tscPrintMgmtEp() { void tscPrintMgmtEp() {
SRpcEpSet dump; SRpcEpSet dump;
tscDumpMgmtEpSet(&dump); tscDumpMgmtEpSet(&dump);
...@@ -745,7 +746,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -745,7 +746,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i); SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) { if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
/* column id is not valid according to the cached table meta, the table meta is expired */
tscError("%p table schema is not matched with parsed sql", pSql); tscError("%p table schema is not matched with parsed sql", pSql);
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
...@@ -1987,15 +1987,11 @@ static void createHBObj(STscObj* pObj) { ...@@ -1987,15 +1987,11 @@ static void createHBObj(STscObj* pObj) {
pSql->param = pObj; pSql->param = pObj;
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
pSql->signature = pSql; pSql->signature = pSql;
pObj->pHb = pSql;
tscAddSubqueryInfo(&pObj->pHb->cmd);
int64_t ad = (int64_t) pSql; registerSqlObj(pSql);
pSql->self = taosCachePut(tscObjCache, &ad, sizeof(int64_t), &pSql, sizeof(int64_t), 2 * 60 * 1000); tscDebug("%p HB is allocated, pObj:%p", pSql, pObj);
T_REF_INC(pObj);
tscDebug("%p HB is allocated, pObj:%p", pObj->pHb, pObj); pObj->pHb = pSql;
} }
int tscProcessConnectRsp(SSqlObj *pSql) { int tscProcessConnectRsp(SSqlObj *pSql) {
...@@ -2170,11 +2166,7 @@ static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInf ...@@ -2170,11 +2166,7 @@ static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInf
pNew->signature = pNew; pNew->signature = pNew;
pNew->cmd.command = TSDB_SQL_META; pNew->cmd.command = TSDB_SQL_META;
T_REF_INC(pNew->pTscObj); registerSqlObj(pNew);
// TODO add test case on x86 platform
uint64_t adr = (uint64_t) pNew;
pNew->self = taosCachePut(tscObjCache, &adr, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2*60*1000);
tscAddSubqueryInfo(&pNew->cmd); tscAddSubqueryInfo(&pNew->cmd);
...@@ -2301,10 +2293,8 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { ...@@ -2301,10 +2293,8 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
} }
pNewQueryInfo->numOfTables = pQueryInfo->numOfTables; pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
T_REF_INC(pNew->pTscObj); registerSqlObj(pNew);
uint64_t p = (uint64_t) pNew;
pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 1000);
tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables); tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables);
pNew->fp = tscTableMetaCallBack; pNew->fp = tscTableMetaCallBack;
......
...@@ -156,10 +156,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con ...@@ -156,10 +156,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
*taos = pObj; *taos = pObj;
} }
T_REF_INC(pSql->pTscObj); registerSqlObj(pSql);
uint64_t key = (uint64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &key, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg); tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
return pSql; return pSql;
...@@ -270,7 +267,7 @@ void taos_close(TAOS *taos) { ...@@ -270,7 +267,7 @@ void taos_close(TAOS *taos) {
pHb->pRpcCtx = NULL; pHb->pRpcCtx = NULL;
} }
tscDebug("%p, HB is freed", pHb); tscDebug("%p HB is freed", pHb);
taos_free_result(pHb); taos_free_result(pHb);
} }
...@@ -789,14 +786,17 @@ int taos_validate_sql(TAOS *taos, const char *sql) { ...@@ -789,14 +786,17 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
} }
SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
pSql->pTscObj = taos; pSql->pTscObj = taos;
pSql->signature = pSql; pSql->signature = pSql;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pRes->numOfTotal = 0; pRes->numOfTotal = 0;
pRes->numOfClauseTotal = 0; pRes->numOfClauseTotal = 0;
tscDebug("%p Valid SQL: %s pObj:%p", pSql, sql, pObj); tscDebug("%p Valid SQL: %s pObj:%p", pSql, sql, pObj);
int32_t sqlLen = (int32_t)strlen(sql); int32_t sqlLen = (int32_t)strlen(sql);
...@@ -832,11 +832,12 @@ int taos_validate_sql(TAOS *taos, const char *sql) { ...@@ -832,11 +832,12 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
tsem_wait(&pSql->rspSem); tsem_wait(&pSql->rspSem);
code = pSql->res.code; code = pSql->res.code;
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tscDebug("%p Valid SQL result:%d, %s pObj:%p", pSql, code, taos_errstr(taos), pObj); tscDebug("%p Valid SQL result:%d, %s pObj:%p", pSql, code, taos_errstr(taos), pObj);
} }
taos_free_result(pSql);
taos_free_result(pSql);
return code; return code;
} }
...@@ -935,34 +936,32 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { ...@@ -935,34 +936,32 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
pSql->pTscObj = taos; pSql->pTscObj = taos;
pSql->signature = pSql; pSql->signature = pSql;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
pRes->code = 0;
pRes->numOfTotal = 0; // the number of getting table meta from server pRes->numOfTotal = 0; // the number of getting table meta from server
pRes->numOfClauseTotal = 0; pRes->numOfClauseTotal = 0;
pRes->code = 0;
assert(pSql->fp == NULL); assert(pSql->fp == NULL);
tscDebug("%p tableNameList: %s pObj:%p", pSql, tableNameList, pObj); tscDebug("%p tableNameList: %s pObj:%p", pSql, tableNameList, pObj);
int32_t tblListLen = (int32_t)strlen(tableNameList); int32_t tblListLen = (int32_t)strlen(tableNameList);
if (tblListLen > MAX_TABLE_NAME_LENGTH) { if (tblListLen > MAX_TABLE_NAME_LENGTH) {
tscError("%p tableNameList too long, length:%d, maximum allowed:%d", pSql, tblListLen, MAX_TABLE_NAME_LENGTH); tscError("%p tableNameList too long, length:%d, maximum allowed:%d", pSql, tblListLen, MAX_TABLE_NAME_LENGTH);
pRes->code = TSDB_CODE_TSC_INVALID_SQL; tscFreeSqlObj(pSql);
taosTFree(pSql); return TSDB_CODE_TSC_INVALID_SQL;
return pRes->code;
} }
char *str = calloc(1, tblListLen + 1); char *str = calloc(1, tblListLen + 1);
if (str == NULL) { if (str == NULL) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscError("%p failed to malloc sql string buffer", pSql); tscError("%p failed to malloc sql string buffer", pSql);
taosTFree(pSql); tscFreeSqlObj(pSql);
return pRes->code; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
strtolower(str, tableNameList); strtolower(str, tableNameList);
pRes->code = (uint8_t)tscParseTblNameList(pSql, str, tblListLen); int32_t code = (uint8_t) tscParseTblNameList(pSql, str, tblListLen);
/* /*
* set the qhandle to 0 before return in order to erase the qhandle value assigned in the previous successful query. * set the qhandle to 0 before return in order to erase the qhandle value assigned in the previous successful query.
...@@ -972,17 +971,17 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { ...@@ -972,17 +971,17 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
pRes->qhandle = 0; pRes->qhandle = 0;
free(str); free(str);
if (pRes->code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
return pRes->code; return code;
} }
tscDoQuery(pSql); tscDoQuery(pSql);
tscDebug("%p load multi metermeta result:%d %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj); tscDebug("%p load multi table meta result:%d %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
if (pRes->code != TSDB_CODE_SUCCESS) { if ((code = pRes->code) != TSDB_CODE_SUCCESS) {
tscPartiallyFreeSqlObj(pSql); tscFreeSqlObj(pSql);
} }
return pRes->code; return code;
} }
...@@ -510,9 +510,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { ...@@ -510,9 +510,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
return; return;
} }
uint64_t handle = (uint64_t) pSql; registerSqlObj(pSql);
pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
T_REF_INC(pSql->pTscObj);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......
...@@ -105,6 +105,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* ...@@ -105,6 +105,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto fail; goto fail;
} }
tstrncpy(pSub->topic, topic, sizeof(pSub->topic)); tstrncpy(pSub->topic, topic, sizeof(pSub->topic));
pSub->progress = taosArrayInit(32, sizeof(SSubscriptionProgress)); pSub->progress = taosArrayInit(32, sizeof(SSubscriptionProgress));
if (pSub->progress == NULL) { if (pSub->progress == NULL) {
...@@ -119,6 +120,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* ...@@ -119,6 +120,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
code = TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto fail; goto fail;
} }
pSql->signature = pSql; pSql->signature = pSql;
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
pSql->pSubscription = pSub; pSql->pSubscription = pSub;
...@@ -142,6 +144,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* ...@@ -142,6 +144,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
code = TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto fail; goto fail;
} }
strtolower(pSql->sqlstr, pSql->sqlstr); strtolower(pSql->sqlstr, pSql->sqlstr);
pRes->qhandle = 0; pRes->qhandle = 0;
pRes->numOfRows = 1; pRes->numOfRows = 1;
...@@ -152,15 +155,14 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* ...@@ -152,15 +155,14 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
goto fail; goto fail;
} }
uint64_t handle = (uint64_t) pSql; registerSqlObj(pSql);
pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
T_REF_INC(pSql->pTscObj);
code = tsParseSql(pSql, false); code = tsParseSql(pSql, false);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
tsem_wait(&pSub->sem); tsem_wait(&pSub->sem);
code = pSql->res.code; code = pSql->res.code;
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
line = __LINE__; line = __LINE__;
goto fail; goto fail;
...@@ -182,8 +184,10 @@ fail: ...@@ -182,8 +184,10 @@ fail:
} else { } else {
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
} }
pSql = NULL; pSql = NULL;
} }
if (pSub != NULL) { if (pSub != NULL) {
taosArrayDestroy(pSub->progress); taosArrayDestroy(pSub->progress);
tsem_destroy(&pSub->sem); tsem_destroy(&pSub->sem);
......
...@@ -122,11 +122,8 @@ void taos_init_imp(void) { ...@@ -122,11 +122,8 @@ void taos_init_imp(void) {
tscInitMsgsFp(); tscInitMsgsFp();
int queueSize = tsMaxConnections*2; int queueSize = tsMaxConnections*2;
if (tscEmbedded == 0) { double factor = (tscEmbedded == 0)? 2.0:4.0;
tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / 2.0); tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / factor);
} else {
tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / 4.0);
}
if (tscNumOfThreads < 2) tscNumOfThreads = 2; if (tscNumOfThreads < 2) tscNumOfThreads = 2;
...@@ -140,11 +137,8 @@ void taos_init_imp(void) { ...@@ -140,11 +137,8 @@ void taos_init_imp(void) {
if(0 == tscEmbedded){ if(0 == tscEmbedded){
taosTmrReset(tscCheckDiskUsage, 10, NULL, tscTmr, &tscCheckDiskUsageTmr); taosTmrReset(tscCheckDiskUsage, 10, NULL, tscTmr, &tscCheckDiskUsageTmr);
} }
int64_t refreshTime = tsTableMetaKeepTimer;
refreshTime = refreshTime > 10 ? 10 : refreshTime;
refreshTime = refreshTime < 10 ? 10 : refreshTime;
int64_t refreshTime = 10; // 10 seconds by default
if (tscMetaCache == NULL) { if (tscMetaCache == NULL) {
tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "tableMeta"); tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "tableMeta");
tscObjCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, refreshTime/2, false, tscFreeSqlObjInCache, "sqlObj"); tscObjCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, refreshTime/2, false, tscFreeSqlObjInCache, "sqlObj");
......
...@@ -391,10 +391,21 @@ static UNUSED_FUNC void tscFreeSubobj(SSqlObj* pSql) { ...@@ -391,10 +391,21 @@ static UNUSED_FUNC void tscFreeSubobj(SSqlObj* pSql) {
*/ */
void tscFreeSqlObjInCache(void *pSql) { void tscFreeSqlObjInCache(void *pSql) {
assert(pSql != NULL); assert(pSql != NULL);
SSqlObj** p = (SSqlObj**)pSql; SSqlObj** p = (SSqlObj**)pSql;
STscObj* pTscObj = (*p)->pTscObj;
assert((*p)->self != 0 && (*p)->self == (p)); assert((*p)->self != 0 && (*p)->self == (p));
tscFreeSqlObj(*p); tscFreeSqlObj(*p);
int32_t ref = T_REF_DEC(pTscObj);
assert(ref >= 0);
tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", *p, pTscObj, ref);
if (ref == 0) {
tscDebug("%p all sqlObj freed, free tscObj:%p", *p, pTscObj);
tscCloseTscObj(pTscObj);
}
} }
void tscFreeSqlObj(SSqlObj* pSql) { void tscFreeSqlObj(SSqlObj* pSql) {
...@@ -403,7 +414,6 @@ void tscFreeSqlObj(SSqlObj* pSql) { ...@@ -403,7 +414,6 @@ void tscFreeSqlObj(SSqlObj* pSql) {
} }
tscDebug("%p start to free sqlObj", pSql); tscDebug("%p start to free sqlObj", pSql);
STscObj* pTscObj = pSql->pTscObj;
tscFreeSubobj(pSql); tscFreeSubobj(pSql);
tscPartiallyFreeSqlObj(pSql); tscPartiallyFreeSqlObj(pSql);
...@@ -421,14 +431,6 @@ void tscFreeSqlObj(SSqlObj* pSql) { ...@@ -421,14 +431,6 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tsem_destroy(&pSql->rspSem); tsem_destroy(&pSql->rspSem);
free(pSql); free(pSql);
tscDebug("%p free sqlObj completed", pSql);
int32_t ref = T_REF_DEC(pTscObj);
assert(ref >= 0);
if (ref == 0) {
tscCloseTscObj(pTscObj);
}
} }
void tscDestroyDataBlock(STableDataBlocks* pDataBlock) { void tscDestroyDataBlock(STableDataBlocks* pDataBlock) {
...@@ -1780,6 +1782,16 @@ void tscResetForNextRetrieve(SSqlRes* pRes) { ...@@ -1780,6 +1782,16 @@ void tscResetForNextRetrieve(SSqlRes* pRes) {
pRes->numOfRows = 0; pRes->numOfRows = 0;
} }
void registerSqlObj(SSqlObj* pSql) {
int32_t DEFAULT_LIFE_TIME = 2 * 600 * 1000; // 1200 sec
int32_t ref = T_REF_INC(pSql->pTscObj);
tscDebug("%p add to tscObj:%p, ref:%d", pSql, pSql->pTscObj, ref);
uint64_t p = (uint64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &p, sizeof(uint64_t), DEFAULT_LIFE_TIME);
}
SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd) { SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd) {
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj)); SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
if (pNew == NULL) { if (pNew == NULL) {
...@@ -1808,8 +1820,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm ...@@ -1808,8 +1820,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
pNew->sqlstr = strdup(pSql->sqlstr); pNew->sqlstr = strdup(pSql->sqlstr);
if (pNew->sqlstr == NULL) { if (pNew->sqlstr == NULL) {
tscError("%p new subquery failed", pSql); tscError("%p new subquery failed", pSql);
tscFreeSqlObj(pNew);
free(pNew);
return NULL; return NULL;
} }
...@@ -1820,9 +1831,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm ...@@ -1820,9 +1831,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL); tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL);
T_REF_INC(pNew->pTscObj); registerSqlObj(pNew);
uint64_t p = (uint64_t) pNew;
pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 1000);
return pNew; return pNew;
} }
...@@ -1912,7 +1921,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -1912,7 +1921,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
pNew->pTscObj = pSql->pTscObj; pNew->pTscObj = pSql->pTscObj;
pNew->signature = pNew; pNew->signature = pNew;
T_REF_INC(pNew->pTscObj);
pNew->sqlstr = strdup(pSql->sqlstr); pNew->sqlstr = strdup(pSql->sqlstr);
if (pNew->sqlstr == NULL) { if (pNew->sqlstr == NULL) {
...@@ -2061,10 +2069,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -2061,10 +2069,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
tscDebug("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->vgroupIndex); tscDebug("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->vgroupIndex);
} }
T_REF_INC(pNew->pTscObj); registerSqlObj(pNew);
uint64_t p = (uint64_t) pNew;
pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 10);
return pNew; return pNew;
_error: _error:
......
...@@ -136,6 +136,7 @@ typedef struct SSkipListIterator { ...@@ -136,6 +136,7 @@ typedef struct SSkipListIterator {
SSkipListNode *cur; SSkipListNode *cur;
int32_t step; // the number of nodes that have been checked already int32_t step; // the number of nodes that have been checked already
int32_t order; // order of the iterator int32_t order; // order of the iterator
SSkipListNode *next; // next points to the true qualified node in skip list
} SSkipListIterator; } SSkipListIterator;
/** /**
......
...@@ -97,7 +97,7 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo ...@@ -97,7 +97,7 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo
int32_t size = (int32_t)taosHashGetSize(pCacheObj->pHashTable); int32_t size = (int32_t)taosHashGetSize(pCacheObj->pHashTable);
assert(size > 0); assert(size > 0);
uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, num:%d size:%" PRId64 "bytes", uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, totalNum:%d size:%" PRId64 "bytes",
pCacheObj->name, pNode->key, pNode->data, pNode->size, size - 1, pCacheObj->totalSize); pCacheObj->name, pNode->key, pNode->data, pNode->size, size - 1, pCacheObj->totalSize);
if (pCacheObj->freeFp) { if (pCacheObj->freeFp) {
......
...@@ -79,9 +79,12 @@ static SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t ...@@ -79,9 +79,12 @@ static SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t
// when order is TSDB_ORDER_ASC, return the last node with key less than val // when order is TSDB_ORDER_ASC, return the last node with key less than val
// when order is TSDB_ORDER_DESC, return the first node with key large than val // when order is TSDB_ORDER_DESC, return the first node with key large than val
static SSkipListNode* getPriorNode(SSkipList* pSkipList, const char* val, int32_t order) { static SSkipListNode* getPriorNode(SSkipList* pSkipList, const char* val, int32_t order, SSkipListNode** pCur) {
__compar_fn_t comparFn = pSkipList->comparFn; __compar_fn_t comparFn = pSkipList->comparFn;
SSkipListNode *pNode = NULL; SSkipListNode *pNode = NULL;
if (pCur != NULL) {
*pCur = NULL;
}
if (order == TSDB_ORDER_ASC) { if (order == TSDB_ORDER_ASC) {
pNode = pSkipList->pHead; pNode = pSkipList->pHead;
...@@ -93,6 +96,9 @@ static SSkipListNode* getPriorNode(SSkipList* pSkipList, const char* val, int32_ ...@@ -93,6 +96,9 @@ static SSkipListNode* getPriorNode(SSkipList* pSkipList, const char* val, int32_
pNode = p; pNode = p;
p = SL_GET_FORWARD_POINTER(p, i); p = SL_GET_FORWARD_POINTER(p, i);
} else { } else {
if (pCur != NULL) {
*pCur = p;
}
break; break;
} }
} }
...@@ -107,6 +113,9 @@ static SSkipListNode* getPriorNode(SSkipList* pSkipList, const char* val, int32_ ...@@ -107,6 +113,9 @@ static SSkipListNode* getPriorNode(SSkipList* pSkipList, const char* val, int32_
pNode = p; pNode = p;
p = SL_GET_BACKWARD_POINTER(p, i); p = SL_GET_BACKWARD_POINTER(p, i);
} else { } else {
if (pCur != NULL) {
*pCur = p;
}
break; break;
} }
} }
...@@ -295,7 +304,7 @@ SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey key) { ...@@ -295,7 +304,7 @@ SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey key) {
pthread_rwlock_wrlock(pSkipList->lock); pthread_rwlock_wrlock(pSkipList->lock);
} }
SSkipListNode* pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC); SSkipListNode* pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC, NULL);
while (1) { while (1) {
SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, 0); SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, 0);
if (p == pSkipList->pTail) { if (p == pSkipList->pTail) {
...@@ -452,7 +461,7 @@ uint32_t tSkipListRemove(SSkipList *pSkipList, SSkipListKey key) { ...@@ -452,7 +461,7 @@ uint32_t tSkipListRemove(SSkipList *pSkipList, SSkipListKey key) {
pthread_rwlock_wrlock(pSkipList->lock); pthread_rwlock_wrlock(pSkipList->lock);
} }
SSkipListNode* pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC); SSkipListNode* pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC, NULL);
while (1) { while (1) {
SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, 0); SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, 0);
if (p == pSkipList->pTail) { if (p == pSkipList->pTail) {
...@@ -545,7 +554,7 @@ SSkipListIterator *tSkipListCreateIterFromVal(SSkipList* pSkipList, const char* ...@@ -545,7 +554,7 @@ SSkipListIterator *tSkipListCreateIterFromVal(SSkipList* pSkipList, const char*
pthread_rwlock_rdlock(pSkipList->lock); pthread_rwlock_rdlock(pSkipList->lock);
} }
iter->cur = getPriorNode(pSkipList, val, order); iter->cur = getPriorNode(pSkipList, val, order, &iter->next);
if (pSkipList->lock) { if (pSkipList->lock) {
pthread_rwlock_unlock(pSkipList->lock); pthread_rwlock_unlock(pSkipList->lock);
...@@ -567,8 +576,22 @@ bool tSkipListIterNext(SSkipListIterator *iter) { ...@@ -567,8 +576,22 @@ bool tSkipListIterNext(SSkipListIterator *iter) {
if (iter->order == TSDB_ORDER_ASC) { // ascending order iterate if (iter->order == TSDB_ORDER_ASC) { // ascending order iterate
iter->cur = SL_GET_FORWARD_POINTER(iter->cur, 0); iter->cur = SL_GET_FORWARD_POINTER(iter->cur, 0);
// a new node is inserted into between iter->cur and iter->next, ignore it
if (iter->cur != iter->next && (iter->next != NULL)) {
iter->cur = iter->next;
}
iter->next = SL_GET_FORWARD_POINTER(iter->cur, 0);
} else { // descending order iterate } else { // descending order iterate
iter->cur = SL_GET_BACKWARD_POINTER(iter->cur, 0); iter->cur = SL_GET_BACKWARD_POINTER(iter->cur, 0);
// a new node is inserted into between iter->cur and iter->next, ignore it
if (iter->cur != iter->next && (iter->next != NULL)) {
iter->cur = iter->next;
}
iter->next = SL_GET_BACKWARD_POINTER(iter->cur, 0);
} }
if (pSkipList->lock) { if (pSkipList->lock) {
...@@ -715,9 +738,11 @@ SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t order) ...@@ -715,9 +738,11 @@ SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t order)
iter->order = order; iter->order = order;
if(order == TSDB_ORDER_ASC) { if(order == TSDB_ORDER_ASC) {
iter->cur = pSkipList->pHead; iter->cur = pSkipList->pHead;
iter->next = SL_GET_FORWARD_POINTER(iter->cur, 0);
} else { } else {
iter->cur = pSkipList->pTail; iter->cur = pSkipList->pTail;
iter->next = SL_GET_BACKWARD_POINTER(iter->cur, 0);
} }
return iter; return iter;
} }
\ No newline at end of file
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tdSql.prepare()
tdSql.query('select database()')
tdSql.checkData(0, 0, "db")
tdSql.execute("alter database db comp 2")
tdSql.query("show databases")
tdSql.checkData(0, 14, 2)
tdSql.execute("alter database db keep 365")
tdSql.query("show databases")
tdSql.checkData(0, 7, "3650,3650,365")
tdSql.execute("alter database db quorum 2")
tdSql.query("show databases")
tdSql.checkData(0, 5, 2)
tdSql.execute("alter database db blocks 100")
tdSql.query("show databases")
tdSql.checkData(0, 9, 100)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
...@@ -17,6 +17,7 @@ python3 ./test.py -f insert/nchar-unicode.py ...@@ -17,6 +17,7 @@ python3 ./test.py -f insert/nchar-unicode.py
python3 ./test.py -f insert/multi.py python3 ./test.py -f insert/multi.py
python3 ./test.py -f insert/randomNullCommit.py python3 ./test.py -f insert/randomNullCommit.py
python3 insert/retentionpolicy.py python3 insert/retentionpolicy.py
python3 ./test.py -f insert/alterTableAndInsert.py
python3 ./test.py -f table/column_name.py python3 ./test.py -f table/column_name.py
python3 ./test.py -f table/column_num.py python3 ./test.py -f table/column_num.py
...@@ -163,6 +164,7 @@ python3 ./test.py -f alter/alter_table_crash.py ...@@ -163,6 +164,7 @@ python3 ./test.py -f alter/alter_table_crash.py
# client # client
python3 ./test.py -f client/client.py python3 ./test.py -f client/client.py
python3 ./test.py -f client/version.py python3 ./test.py -f client/version.py
python3 ./test.py -f client/alterDatabase.py
# Misc # Misc
python3 testCompress.py python3 testCompress.py
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tdSql.prepare()
tdSql.execute("create table cars(ts timestamp, speed int) tags(id int)")
tdSql.execute("create table car0 using cars tags(0)")
tdSql.execute("insert into car0 values(now, 1)")
tdSql.execute("alter table cars add column c2 int")
tdSql.execute("insert into car0(ts, 'speed') values(now, 2)")
tdSql.checkAffectedRows(1)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import requests, json
import threading
import string
import random
class RestfulInsert:
def init(self):
self.header = {'Authorization': 'Basic cm9vdDp0YW9zZGF0YQ=='}
self.url = "http://127.0.0.1:6041/rest/sql"
self.ts = 1104508800000
self.numOfThreads = 50
def get_random_string(self, length):
letters = string.ascii_lowercase
result_str = ''.join(random.choice(letters) for i in range(length))
return result_str
def insertData(self, threadID):
print("thread %d started" % threadID)
data = "create table test.tb%d(ts timestamp, name nchar(20))" % threadID
requests.post(self.url, data, headers = self.header)
name = self.get_random_string(10)
start = self.ts
while True:
start += 1
data = "insert into test.tb%d values(%d, '%s')" % (threadID, start, name)
requests.post(self.url, data, headers = self.header)
def run(self):
data = "drop database if exists test"
requests.post(self.url, data, headers = self.header)
data = "create database test keep 7300"
requests.post(self.url, data, headers = self.header)
threads = []
for i in range(self.numOfThreads):
thread = threading.Thread(target=self.insertData, args=(i,))
thread.start()
threads.append(thread)
for i in range(self.numOfThreads):
threads[i].join()
ri = RestfulInsert()
ri.init()
ri.run()
\ No newline at end of file
...@@ -43,7 +43,8 @@ class TDTestRetetion: ...@@ -43,7 +43,8 @@ class TDTestRetetion:
else: else:
caller = inspect.getframeinfo(inspect.stack()[1][0]) caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, self.queryRows, expectRows) args = (caller.filename, caller.lineno, sql, self.queryRows, expectRows)
os.system("timedatectl set-ntp true") os.system("sudo timedatectl set-ntp true")
time.sleep(40)
tdLog.exit("%s(%d) failed: sql:%s, queryRows:%d != expect:%d" % args) tdLog.exit("%s(%d) failed: sql:%s, queryRows:%d != expect:%d" % args)
def run(self): def run(self):
...@@ -53,7 +54,7 @@ class TDTestRetetion: ...@@ -53,7 +54,7 @@ class TDTestRetetion:
tdSql.execute('use test;') tdSql.execute('use test;')
tdSql.execute('create table test(ts timestamp,i int);') tdSql.execute('create table test(ts timestamp,i int);')
cmd = 'insert into test values(now-2d,11)(now-1d,11)(now,11)(now+1d,11);' cmd = 'insert into test values(now-2d,1)(now-1d,2)(now,3)(now+1d,4);'
tdLog.info(cmd) tdLog.info(cmd)
tdSql.execute(cmd) tdSql.execute(cmd)
tdSql.query('select * from test') tdSql.query('select * from test')
...@@ -61,46 +62,55 @@ class TDTestRetetion: ...@@ -61,46 +62,55 @@ class TDTestRetetion:
tdLog.info("=============== step2") tdLog.info("=============== step2")
tdDnodes.stop(1) tdDnodes.stop(1)
os.system("timedatectl set-ntp false") os.system("sudo timedatectl set-ntp false")
os.system("date -s $(date -d \"${DATE} 2 days\" \"+%Y%m%d\")") os.system("sudo date -s $(date -d \"${DATE} 2 days\" \"+%Y%m%d\")")
tdDnodes.start(1) tdDnodes.start(1)
cmd = 'insert into test values(now,11);' cmd = 'insert into test values(now,5);'
tdDnodes.stop(1)
tdDnodes.start(1)
tdLog.info(cmd) tdLog.info(cmd)
tdSql.execute(cmd) tdSql.execute(cmd)
queryRows=tdSql.query('select * from test') self.queryRows=tdSql.query('select * from test')
if queryRows==4: if self.queryRows==4:
tdSql.checkRows(4) self.checkRows(4,cmd)
return 0 return 0
else: else:
tdSql.checkRows(5) self.checkRows(5,cmd)
tdLog.info("=============== step3") tdLog.info("=============== step3")
tdDnodes.stop(1) tdDnodes.stop(1)
os.system("date -s $(date -d \"${DATE} 2 days\" \"+%Y%m%d\")") os.system("sudo date -s $(date -d \"${DATE} 2 days\" \"+%Y%m%d\")")
tdDnodes.start(1) tdDnodes.start(1)
cmd = 'insert into test values(now-1d,11);'
tdLog.info(cmd) tdLog.info(cmd)
tdSql.execute(cmd) tdSql.execute(cmd)
queryRows=tdSql.query('select * from test') self.queryRows=tdSql.query('select * from test')
tdSql.checkRows(6) if self.queryRows==4:
self.checkRows(4,cmd)
return 0
cmd = 'insert into test values(now-1d,6);'
tdLog.info(cmd)
tdSql.execute(cmd)
self.queryRows=tdSql.query('select * from test')
self.checkRows(6,cmd)
tdLog.info("=============== step4") tdLog.info("=============== step4")
tdDnodes.stop(1) tdDnodes.stop(1)
tdDnodes.start(1) tdDnodes.start(1)
cmd = 'insert into test values(now,11);' cmd = 'insert into test values(now,7);'
tdLog.info(cmd) tdLog.info(cmd)
tdSql.execute(cmd) tdSql.execute(cmd)
tdSql.query('select * from test') self.queryRows=tdSql.query('select * from test')
tdSql.checkRows(7) self.checkRows(7,cmd)
tdLog.info("=============== step5") tdLog.info("=============== step5")
tdDnodes.stop(1) tdDnodes.stop(1)
tdDnodes.start(1) tdDnodes.start(1)
cmd='select * from test where ts > now-1d' cmd='select * from test where ts > now-1d'
queryRows=tdSql.query('select * from test where ts > now-1d') self.queryRows=tdSql.query('select * from test where ts > now-1d')
self.checkRows(1,cmd) self.checkRows(1,cmd)
def stop(self): def stop(self):
os.system("timedatectl set-ntp true") os.system("sudo timedatectl set-ntp true")
time.sleep(40)
tdSql.close() tdSql.close()
tdLog.success("%s successfully executed" % __file__) tdLog.success("%s successfully executed" % __file__)
......
...@@ -35,7 +35,8 @@ class TDTestCase: ...@@ -35,7 +35,8 @@ class TDTestCase:
tdSql.execute( tdSql.execute(
"insert into tb2 using stb1 tags(2,'tb2', '表2') values ('2020-04-18 15:00:02.000', 3, 2.1), ('2020-04-18 15:00:03.000', 4, 2.2)") "insert into tb2 using stb1 tags(2,'tb2', '表2') values ('2020-04-18 15:00:02.000', 3, 2.1), ('2020-04-18 15:00:03.000', 4, 2.2)")
# inner join --- bug tdSql.error("select * from tb 1")
tdSql.query("select * from tb1 a, tb2 b where a.ts = b.ts") tdSql.query("select * from tb1 a, tb2 b where a.ts = b.ts")
tdSql.checkRows(0) tdSql.checkRows(0)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册