提交 8f0e016f 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/develop' into feature/http

......@@ -98,12 +98,12 @@ TDengine缺省的时间戳是毫秒精度,但通过修改配置参数enableMic
KEEP参数是指修改数据文件保存的天数,缺省值为3650,取值范围[days, 365000],必须大于或等于days参数值。
```mysql
ALTER DATABASE db_name QUORUM 365;
ALTER DATABASE db_name QUORUM 2;
```
QUORUM参数是指数据写入成功所需要的确认数。取值范围[1, 3]。对于异步复制,quorum设为1,具有master角色的虚拟节点自己确认即可。对于同步复制,需要至少大于等于2。原则上,Quorum >=1 并且 Quorum <= replica(副本数),这个参数在启动一个同步模块实例时需要提供。
```mysql
ALTER DATABASE db_name BLOCKS 365;
ALTER DATABASE db_name BLOCKS 100;
```
BLOCKS参数是每个VNODE (TSDB) 中有多少cache大小的内存块,因此一个VNODE的用的内存大小粗略为(cache * blocks)。取值范围[3, 1000]。
......
......@@ -39,7 +39,7 @@ create table D1002 using meters tags ("Beijing.Haidian", 2);
我们已经知道,可以通过下面这条SQL语句以一分钟为时间窗口、30秒为前向增量统计这些电表的平均电压。
```sql
select avg(voltage) from meters interval(1m) sliding(30s)
select avg(voltage) from meters interval(1m) sliding(30s);
```
每次执行这条语句,都会重新计算所有数据。
......@@ -47,14 +47,14 @@ select avg(voltage) from meters interval(1m) sliding(30s);
可以把上面的语句改进成下面的样子,每次使用不同的 `startTime` 并定期执行:
```sql
select avg(voltage) from meters where ts > {startTime} interval(1m) sliding(30s)
select avg(voltage) from meters where ts > {startTime} interval(1m) sliding(30s);
```
这样做没有问题,但TDengine提供了更简单的方法,
只要在最初的查询语句前面加上 `create table {tableName} as ` 就可以了, 例如:
```sql
create table avg_vol as select avg(voltage) from meters interval(1m) sliding(30s)
create table avg_vol as select avg(voltage) from meters interval(1m) sliding(30s);
```
会自动创建一个名为 `avg_vol` 的新表,然后每隔30秒,TDengine会增量执行 `as` 后面的 SQL 语句,
......@@ -80,7 +80,7 @@ taos> select * from avg_vol;
比如使用下面的SQL创建的连续查询将运行一小时,之后会自动停止。
```sql
create table avg_vol as select avg(voltage) from meters where ts > now and ts <= now + 1h interval(1m) sliding(30s)
create table avg_vol as select avg(voltage) from meters where ts > now and ts <= now + 1h interval(1m) sliding(30s);
```
需要说明的是,上面例子中的 `now` 是指创建连续查询的时间,而不是查询执行的时间,否则,查询就无法自动停止了。
......@@ -396,7 +396,7 @@ ts: 1597465200000 current: 11.2 voltage: 220 phase: 1 location: Beijing.Haidian
```sql
# taos
taos> use power
taos> use power;
taos> insert into d1001 values("2020-08-15 12:40:00.000", 12.4, 220, 1);
```
......
......@@ -276,14 +276,14 @@ SQL语句的解析和校验工作在客户端完成。解析SQL语句并生成
在TDengine中引入关键词interval来进行时间轴上固定长度时间窗口的切分,并按照时间窗口对数据进行聚合,对窗口范围内的数据按需进行聚合。例如:
```mysql
select count(*) from d1001 interval(1h)
select count(*) from d1001 interval(1h);
```
针对d1001设备采集的数据,按照1小时的时间窗口返回每小时存储的记录数量。
在需要连续获得查询结果的应用场景下,如果给定的时间区间存在数据缺失,会导致该区间数据结果也丢失。TDengine提供策略针对时间轴聚合计算的结果进行插值,通过使用关键词Fill就能够对时间轴聚合结果进行插值。例如:
```mysql
select count(*) from d1001 interval(1h) fill(prev)
select count(*) from d1001 interval(1h) fill(prev);
```
针对d1001设备采集数据统计每小时记录数,如果某一个小时不存在数据,这返回之前一个小时的统计数据。TDengine提供前向插值(prev)、线性插值(linear)、NULL值填充(NULL)、特定值填充(value)。
......
......@@ -89,7 +89,7 @@ taos>
2. 在第一个数据节点,使用CLI程序taos, 登录进TDengine系统, 执行命令:
```
CREATE DNODE "h2.taos.com:6030"
CREATE DNODE "h2.taos.com:6030";
```
将新数据节点的End Point (准备工作中第四步获知的) 添加进集群的EP列表。**"fqdn:port"需要用双引号引起来**,否则出错。请注意将示例的“h2.taos.com:6030" 替换为这个新数据节点的End Point。
......@@ -97,7 +97,7 @@ taos>
3. 然后执行命令
```
SHOW DNODES
SHOW DNODES;
```
查看新节点是否被成功加入。如果该被加入的数据节点处于离线状态,请做两个检查
......@@ -122,7 +122,7 @@ taos>
执行CLI程序taos, 使用root账号登录进系统, 执行:
```
CREATE DNODE "fqdn:port"
CREATE DNODE "fqdn:port";
```
将新数据节点的End Point添加进集群的EP列表。**"fqdn:port"需要用双引号引起来**,否则出错。一个数据节点对外服务的fqdn和port可以通过配置文件taos.cfg进行配置,缺省是自动获取。【强烈不建议用自动获取方式来配置FQDN,可能导致生成的数据节点的End Point不是所期望的】
......
......@@ -46,7 +46,7 @@ taos>
5. 在第一个节点,使用CLI程序taos, 登录进TDengine系统, 使用命令:
```
CREATE DNODE "h2.taos.com:6030"
CREATE DNODE "h2.taos.com:6030";
```
将新节点的End Point添加进集群的EP列表。**"fqdn:port"需要用双引号引起来**,否则出错。请注意将示例的“h2.taos.com:6030" 替换为你自己第一个节点的End Point
......@@ -54,7 +54,7 @@ taos>
6. 使用命令
```
SHOW DNODES
SHOW DNODES;
```
查看新节点是否被成功加入。
......@@ -71,7 +71,7 @@ taos>
###添加节点
执行CLI程序taos, 使用root账号登录进系统, 执行:
```
CREATE DNODE "fqdn:port"
CREATE DNODE "fqdn:port";
```
将新节点的End Point添加进集群的EP列表。**"fqdn:port"需要用双引号引起来**,否则出错。一个节点对外服务的fqdn和port可以通过配置文件taos.cfg进行配置,缺省是自动获取。
......
......@@ -191,6 +191,7 @@ SColumn* tscColumnListInsert(SArray* pColList, SColumnIndex* colIndex);
SArray* tscColumnListClone(const SArray* src, int16_t tableIndex);
void tscColumnListDestroy(SArray* pColList);
void tscDequoteAndTrimToken(SStrToken* pToken);
int32_t tscValidateName(SStrToken* pToken);
void tscIncStreamExecutionCount(void* pStream);
......@@ -257,6 +258,8 @@ void tscDoQuery(SSqlObj* pSql);
*/
SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd);
void registerSqlObj(SSqlObj* pSql);
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, int32_t cmd, SSqlObj* pPrevSql);
void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClauseIndex, int32_t tableIndex);
......
......@@ -89,7 +89,7 @@ typedef struct STableComInfo {
typedef struct SCMCorVgroupInfo {
int32_t version;
int8_t inUse;
int8_t inUse;
int8_t numOfEps;
SEpAddr epAddr[TSDB_MAX_REPLICA];
} SCMCorVgroupInfo;
......@@ -107,7 +107,7 @@ typedef struct STableMeta {
} STableMeta;
typedef struct STableMetaInfo {
STableMeta * pTableMeta; // table meta, cached in client side and acquired by name
STableMeta *pTableMeta; // table meta, cached in client side and acquired by name
SVgroupsInfo *vgroupList;
SArray *pVgroupTables; // SArray<SVgroupTableInfo>
......@@ -261,7 +261,7 @@ typedef struct {
};
int32_t insertType;
int32_t clauseIndex; // index of multiple subclause query
int32_t clauseIndex; // index of multiple subclause query
char * curSql; // current sql, resume position of sql after parsing paused
int8_t parseFinished;
......@@ -276,7 +276,8 @@ typedef struct {
int32_t numOfParams;
int8_t dataSourceType; // load data from file or not
int8_t submitSchema; // submit block is built with table schema
int8_t submitSchema; // submit block is built with table schema
STagData tagData;
SHashObj *pTableList; // referred table involved in sql
SArray *pDataBlocks; // SArray<STableDataBlocks*> submit data blocks after parsing sql
} SSqlCmd;
......
......@@ -51,10 +51,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
pSql->fp = fp;
pSql->fetchFp = fp;
uint64_t handle = (uint64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
T_REF_INC(pSql->pTscObj);
registerSqlObj(pSql);
pSql->sqlstr = calloc(1, sqlLen + 1);
if (pSql->sqlstr == NULL) {
......
......@@ -791,7 +791,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
sql += index;
tscAllocPayload(pCmd, sizeof(STagData));
STagData *pTag = (STagData *) pCmd->payload;
STagData *pTag = &pCmd->tagData;
memset(pTag, 0, sizeof(STagData));
......@@ -946,7 +946,6 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
return tscSQLSyntaxErrMsg(pCmd->payload, ") expected", sToken.z);
}
pCmd->payloadLen = sizeof(pTag->name) + sizeof(pTag->dataLen) + pTag->dataLen;
pTag->dataLen = htonl(pTag->dataLen);
if (tscValidateName(&tableToken) != TSDB_CODE_SUCCESS) {
......@@ -1192,8 +1191,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
str += index;
if (TK_STRING == sToken.type) {
strdequote(sToken.z);
sToken.n = (uint32_t)strtrim(sToken.z);
tscDequoteAndTrimToken(&sToken);
}
if (sToken.type == TK_RP) {
......
......@@ -545,10 +545,8 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
pSql->cmd.numOfParams = 0;
pSql->cmd.batchSize = 0;
uint64_t handle = (uint64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
T_REF_INC(pSql->pTscObj);
registerSqlObj(pSql);
int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
......
......@@ -820,20 +820,20 @@ int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pzTableNa
if (hasSpecifyDB(pzTableName)) {
// db has been specified in sql string so we ignore current db path
code = setObjFullName(pTableMetaInfo->name, getAccountId(pSql), NULL, pzTableName, NULL);
if (code != 0) {
if (code != TSDB_CODE_SUCCESS) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
} else { // get current DB name first, then set it into path
SStrToken t = {0};
getCurrentDBName(pSql, &t);
if (t.n == 0) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
code = setObjFullName(pTableMetaInfo->name, NULL, &t, pzTableName, NULL);
if (code != 0) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
code = invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
} else {
code = setObjFullName(pTableMetaInfo->name, NULL, &t, pzTableName, NULL);
if (code != TSDB_CODE_SUCCESS) {
code = invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
}
}
if (code != TSDB_CODE_SUCCESS) {
......
......@@ -128,6 +128,7 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
tscDebug("after: EndPoint in use: %d", pVgroupInfo->inUse);
taosCorEndWrite(&pVgroupInfo->version);
}
void tscPrintMgmtEp() {
SRpcEpSet dump;
tscDumpMgmtEpSet(&dump);
......@@ -745,7 +746,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
/* column id is not valid according to the cached table meta, the table meta is expired */
tscError("%p table schema is not matched with parsed sql", pSql);
return TSDB_CODE_TSC_INVALID_SQL;
}
......@@ -1495,43 +1495,29 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SCMTableInfoMsg *pInfoMsg;
char * pMsg;
int msgLen = 0;
char *tmpData = NULL;
uint32_t len = pSql->cmd.payloadLen;
if (len > 0) {
if ((tmpData = calloc(1, len)) == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
// STagData is in binary format, strncpy is not available
memcpy(tmpData, pSql->cmd.payload, len);
}
SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
pInfoMsg = (SCMTableInfoMsg *)pCmd->payload;
SCMTableInfoMsg* pInfoMsg = (SCMTableInfoMsg *)pCmd->payload;
strcpy(pInfoMsg->tableId, pTableMetaInfo->name);
pInfoMsg->createFlag = htons(pSql->cmd.autoCreated ? 1 : 0);
pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg);
char* pMsg = (char*)pInfoMsg + sizeof(SCMTableInfoMsg);
if (pSql->cmd.autoCreated && len > 0) {
memcpy(pInfoMsg->tags, tmpData, len);
pMsg += len;
size_t len = htonl(pCmd->tagData.dataLen);
if (pSql->cmd.autoCreated) {
if (len > 0) {
len += sizeof(pCmd->tagData.name) + sizeof(pCmd->tagData.dataLen);
memcpy(pInfoMsg->tags, &pCmd->tagData, len);
pMsg += len;
}
}
pCmd->payloadLen = (int32_t)(pMsg - (char*)pInfoMsg);
pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META;
taosTFree(tmpData);
assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize);
return TSDB_CODE_SUCCESS;
}
......@@ -1987,15 +1973,11 @@ static void createHBObj(STscObj* pObj) {
pSql->param = pObj;
pSql->pTscObj = pObj;
pSql->signature = pSql;
pObj->pHb = pSql;
tscAddSubqueryInfo(&pObj->pHb->cmd);
registerSqlObj(pSql);
tscDebug("%p HB is allocated, pObj:%p", pSql, pObj);
int64_t ad = (int64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &ad, sizeof(int64_t), &pSql, sizeof(int64_t), 2 * 60 * 1000);
T_REF_INC(pObj);
tscDebug("%p HB is allocated, pObj:%p", pObj->pHb, pObj);
pObj->pHb = pSql;
}
int tscProcessConnectRsp(SSqlObj *pSql) {
......@@ -2170,11 +2152,7 @@ static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInf
pNew->signature = pNew;
pNew->cmd.command = TSDB_SQL_META;
T_REF_INC(pNew->pTscObj);
// TODO add test case on x86 platform
uint64_t adr = (uint64_t) pNew;
pNew->self = taosCachePut(tscObjCache, &adr, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2*60*1000);
registerSqlObj(pNew);
tscAddSubqueryInfo(&pNew->cmd);
......@@ -2192,8 +2170,7 @@ static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInf
assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
tstrncpy(pNewMeterMetaInfo->name, pTableMetaInfo->name, sizeof(pNewMeterMetaInfo->name));
memcpy(pNew->cmd.payload, pSql->cmd.payload, pSql->cmd.payloadLen); // tag information if table does not exists.
pNew->cmd.payloadLen = pSql->cmd.payloadLen;
memcpy(&pNew->cmd.tagData, &pSql->cmd.tagData, sizeof(pSql->cmd.tagData));
tscDebug("%p new pSqlObj:%p to get tableMeta, auto create:%d", pSql, pNew, pNew->cmd.autoCreated);
pNew->fp = tscTableMetaCallBack;
......@@ -2301,10 +2278,8 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
}
pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
T_REF_INC(pNew->pTscObj);
registerSqlObj(pNew);
uint64_t p = (uint64_t) pNew;
pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 1000);
tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables);
pNew->fp = tscTableMetaCallBack;
......
......@@ -156,10 +156,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
*taos = pObj;
}
T_REF_INC(pSql->pTscObj);
uint64_t key = (uint64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &key, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
registerSqlObj(pSql);
tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
return pSql;
......@@ -270,7 +267,7 @@ void taos_close(TAOS *taos) {
pHb->pRpcCtx = NULL;
}
tscDebug("%p, HB is freed", pHb);
tscDebug("%p HB is freed", pHb);
taos_free_result(pHb);
}
......@@ -789,14 +786,17 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
}
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
pSql->pTscObj = taos;
pSql->signature = pSql;
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
pRes->numOfTotal = 0;
pRes->numOfClauseTotal = 0;
tscDebug("%p Valid SQL: %s pObj:%p", pSql, sql, pObj);
int32_t sqlLen = (int32_t)strlen(sql);
......@@ -832,11 +832,12 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
tsem_wait(&pSql->rspSem);
code = pSql->res.code;
}
if (code != TSDB_CODE_SUCCESS) {
tscDebug("%p Valid SQL result:%d, %s pObj:%p", pSql, code, taos_errstr(taos), pObj);
}
taos_free_result(pSql);
taos_free_result(pSql);
return code;
}
......@@ -935,34 +936,32 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
pSql->pTscObj = taos;
pSql->signature = pSql;
SSqlRes *pRes = &pSql->res;
pRes->code = 0;
pRes->numOfTotal = 0; // the number of getting table meta from server
pRes->numOfClauseTotal = 0;
pRes->code = 0;
assert(pSql->fp == NULL);
tscDebug("%p tableNameList: %s pObj:%p", pSql, tableNameList, pObj);
int32_t tblListLen = (int32_t)strlen(tableNameList);
if (tblListLen > MAX_TABLE_NAME_LENGTH) {
tscError("%p tableNameList too long, length:%d, maximum allowed:%d", pSql, tblListLen, MAX_TABLE_NAME_LENGTH);
pRes->code = TSDB_CODE_TSC_INVALID_SQL;
taosTFree(pSql);
return pRes->code;
tscFreeSqlObj(pSql);
return TSDB_CODE_TSC_INVALID_SQL;
}
char *str = calloc(1, tblListLen + 1);
if (str == NULL) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscError("%p failed to malloc sql string buffer", pSql);
taosTFree(pSql);
return pRes->code;
tscFreeSqlObj(pSql);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
strtolower(str, tableNameList);
pRes->code = (uint8_t)tscParseTblNameList(pSql, str, tblListLen);
int32_t code = (uint8_t) tscParseTblNameList(pSql, str, tblListLen);
/*
* set the qhandle to 0 before return in order to erase the qhandle value assigned in the previous successful query.
......@@ -972,17 +971,17 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
pRes->qhandle = 0;
free(str);
if (pRes->code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS) {
tscFreeSqlObj(pSql);
return pRes->code;
return code;
}
tscDoQuery(pSql);
tscDebug("%p load multi metermeta result:%d %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
if (pRes->code != TSDB_CODE_SUCCESS) {
tscPartiallyFreeSqlObj(pSql);
tscDebug("%p load multi table meta result:%d %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
if ((code = pRes->code) != TSDB_CODE_SUCCESS) {
tscFreeSqlObj(pSql);
}
return pRes->code;
return code;
}
......@@ -510,9 +510,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
return;
}
uint64_t handle = (uint64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
T_REF_INC(pSql->pTscObj);
registerSqlObj(pSql);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......
......@@ -105,6 +105,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
code = TAOS_SYSTEM_ERROR(errno);
goto fail;
}
tstrncpy(pSub->topic, topic, sizeof(pSub->topic));
pSub->progress = taosArrayInit(32, sizeof(SSubscriptionProgress));
if (pSub->progress == NULL) {
......@@ -119,6 +120,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto fail;
}
pSql->signature = pSql;
pSql->pTscObj = pObj;
pSql->pSubscription = pSub;
......@@ -142,6 +144,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto fail;
}
strtolower(pSql->sqlstr, pSql->sqlstr);
pRes->qhandle = 0;
pRes->numOfRows = 1;
......@@ -152,15 +155,14 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
goto fail;
}
uint64_t handle = (uint64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
T_REF_INC(pSql->pTscObj);
registerSqlObj(pSql);
code = tsParseSql(pSql, false);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
tsem_wait(&pSub->sem);
code = pSql->res.code;
}
if (code != TSDB_CODE_SUCCESS) {
line = __LINE__;
goto fail;
......@@ -182,8 +184,10 @@ fail:
} else {
tscFreeSqlObj(pSql);
}
pSql = NULL;
}
if (pSub != NULL) {
taosArrayDestroy(pSub->progress);
tsem_destroy(&pSub->sem);
......
......@@ -122,11 +122,8 @@ void taos_init_imp(void) {
tscInitMsgsFp();
int queueSize = tsMaxConnections*2;
if (tscEmbedded == 0) {
tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / 2.0);
} else {
tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / 4.0);
}
double factor = (tscEmbedded == 0)? 2.0:4.0;
tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / factor);
if (tscNumOfThreads < 2) tscNumOfThreads = 2;
......@@ -140,11 +137,8 @@ void taos_init_imp(void) {
if(0 == tscEmbedded){
taosTmrReset(tscCheckDiskUsage, 10, NULL, tscTmr, &tscCheckDiskUsageTmr);
}
int64_t refreshTime = tsTableMetaKeepTimer;
refreshTime = refreshTime > 10 ? 10 : refreshTime;
refreshTime = refreshTime < 10 ? 10 : refreshTime;
int64_t refreshTime = 10; // 10 seconds by default
if (tscMetaCache == NULL) {
tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "tableMeta");
tscObjCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, refreshTime/2, false, tscFreeSqlObjInCache, "sqlObj");
......
......@@ -391,10 +391,21 @@ static UNUSED_FUNC void tscFreeSubobj(SSqlObj* pSql) {
*/
void tscFreeSqlObjInCache(void *pSql) {
assert(pSql != NULL);
SSqlObj** p = (SSqlObj**)pSql;
STscObj* pTscObj = (*p)->pTscObj;
assert((*p)->self != 0 && (*p)->self == (p));
tscFreeSqlObj(*p);
int32_t ref = T_REF_DEC(pTscObj);
assert(ref >= 0);
tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", *p, pTscObj, ref);
if (ref == 0) {
tscDebug("%p all sqlObj freed, free tscObj:%p", *p, pTscObj);
tscCloseTscObj(pTscObj);
}
}
void tscFreeSqlObj(SSqlObj* pSql) {
......@@ -403,7 +414,6 @@ void tscFreeSqlObj(SSqlObj* pSql) {
}
tscDebug("%p start to free sqlObj", pSql);
STscObj* pTscObj = pSql->pTscObj;
tscFreeSubobj(pSql);
tscPartiallyFreeSqlObj(pSql);
......@@ -421,14 +431,6 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tsem_destroy(&pSql->rspSem);
free(pSql);
tscDebug("%p free sqlObj completed", pSql);
int32_t ref = T_REF_DEC(pTscObj);
assert(ref >= 0);
if (ref == 0) {
tscCloseTscObj(pTscObj);
}
}
void tscDestroyDataBlock(STableDataBlocks* pDataBlock) {
......@@ -1265,6 +1267,51 @@ static int32_t validateQuoteToken(SStrToken* pToken) {
return TSDB_CODE_SUCCESS;
}
void tscDequoteAndTrimToken(SStrToken* pToken) {
assert(pToken->type == TK_STRING);
uint32_t first = 0, last = pToken->n;
// trim leading spaces
while (first < last) {
char c = pToken->z[first];
if (c != ' ' && c != '\t') {
break;
}
first++;
}
// trim ending spaces
while (first < last) {
char c = pToken->z[last - 1];
if (c != ' ' && c != '\t') {
break;
}
last--;
}
// there are still at least two characters
if (first < last - 1) {
char c = pToken->z[first];
// dequote
if ((c == '\'' || c == '"') && c == pToken->z[last - 1]) {
first++;
last--;
}
}
// left shift the string and pad spaces
for (uint32_t i = 0; i + first < last; i++) {
pToken->z[i] = pToken->z[first + i];
}
for (uint32_t i = last - first; i < pToken->n; i++) {
pToken->z[i] = ' ';
}
// adjust token length
pToken->n = last - first;
}
int32_t tscValidateName(SStrToken* pToken) {
if (pToken->type != TK_STRING && pToken->type != TK_ID) {
return TSDB_CODE_TSC_INVALID_SQL;
......@@ -1735,6 +1782,16 @@ void tscResetForNextRetrieve(SSqlRes* pRes) {
pRes->numOfRows = 0;
}
void registerSqlObj(SSqlObj* pSql) {
int32_t DEFAULT_LIFE_TIME = 2 * 600 * 1000; // 1200 sec
int32_t ref = T_REF_INC(pSql->pTscObj);
tscDebug("%p add to tscObj:%p, ref:%d", pSql, pSql->pTscObj, ref);
uint64_t p = (uint64_t) pSql;
pSql->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &p, sizeof(uint64_t), DEFAULT_LIFE_TIME);
}
SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd) {
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
if (pNew == NULL) {
......@@ -1748,6 +1805,8 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
SSqlCmd* pCmd = &pNew->cmd;
pCmd->command = cmd;
pCmd->parseFinished = 1;
pCmd->autoCreated = pSql->cmd.autoCreated;
memcpy(&pCmd->tagData, &pSql->cmd.tagData, sizeof(pCmd->tagData));
if (tscAddSubqueryInfo(pCmd) != TSDB_CODE_SUCCESS) {
tscFreeSqlObj(pNew);
......@@ -1762,8 +1821,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
pNew->sqlstr = strdup(pSql->sqlstr);
if (pNew->sqlstr == NULL) {
tscError("%p new subquery failed", pSql);
free(pNew);
tscFreeSqlObj(pNew);
return NULL;
}
......@@ -1774,9 +1832,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL);
T_REF_INC(pNew->pTscObj);
uint64_t p = (uint64_t) pNew;
pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 1000);
registerSqlObj(pNew);
return pNew;
}
......@@ -1866,7 +1922,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
pNew->pTscObj = pSql->pTscObj;
pNew->signature = pNew;
T_REF_INC(pNew->pTscObj);
pNew->sqlstr = strdup(pSql->sqlstr);
if (pNew->sqlstr == NULL) {
......@@ -2015,10 +2070,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
tscDebug("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->vgroupIndex);
}
T_REF_INC(pNew->pTscObj);
uint64_t p = (uint64_t) pNew;
pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 10);
registerSqlObj(pNew);
return pNew;
_error:
......
Subproject commit 06ec30a0f1762e8169bf6b9045c82bcaa52bcdf0
Subproject commit 8d7bf743852897110cbdcc7c4322cd7a74d4167b
......@@ -5,10 +5,9 @@
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.0-SNAPSHOT</version>
<version>2.0.6</version>
<packaging>jar</packaging>
<name>JDBCDriver</name>
<url>https://github.com/taosdata/TDengine/tree/master/src/connector/jdbc</url>
<description>TDengine JDBC Driver</description>
......
......@@ -291,8 +291,8 @@ public class TSDBDriver implements java.sql.Driver {
return null;
}
String lowerUrl = url.toLowerCase();
if (!lowerUrl.startsWith(URL_PREFIX) && !lowerUrl.startsWith(URL_PREFIX1)) {
// String lowerUrl = url.toLowerCase();
if (!url.startsWith(URL_PREFIX) && !url.startsWith(URL_PREFIX1)) {
return null;
}
......
......@@ -660,13 +660,13 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p
for (int32_t i = 0; i < pShow->maxReplica; ++i) {
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "dnode");
snprintf(pSchema[cols].name, TSDB_COL_NAME_LEN, "dnode%d", i + 1);
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 9 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "vstatus");
snprintf(pSchema[cols].name, TSDB_COL_NAME_LEN, "v%dstatus", i + 1);
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
}
......
......@@ -136,6 +136,7 @@ typedef struct SSkipListIterator {
SSkipListNode *cur;
int32_t step; // the number of nodes that have been checked already
int32_t order; // order of the iterator
SSkipListNode *next; // next points to the true qualified node in skip list
} SSkipListIterator;
/**
......
......@@ -97,7 +97,7 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo
int32_t size = (int32_t)taosHashGetSize(pCacheObj->pHashTable);
assert(size > 0);
uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, num:%d size:%" PRId64 "bytes",
uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, totalNum:%d size:%" PRId64 "bytes",
pCacheObj->name, pNode->key, pNode->data, pNode->size, size - 1, pCacheObj->totalSize);
if (pCacheObj->freeFp) {
......
......@@ -79,9 +79,12 @@ static SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t
// when order is TSDB_ORDER_ASC, return the last node with key less than val
// when order is TSDB_ORDER_DESC, return the first node with key large than val
static SSkipListNode* getPriorNode(SSkipList* pSkipList, const char* val, int32_t order) {
static SSkipListNode* getPriorNode(SSkipList* pSkipList, const char* val, int32_t order, SSkipListNode** pCur) {
__compar_fn_t comparFn = pSkipList->comparFn;
SSkipListNode *pNode = NULL;
if (pCur != NULL) {
*pCur = NULL;
}
if (order == TSDB_ORDER_ASC) {
pNode = pSkipList->pHead;
......@@ -93,6 +96,9 @@ static SSkipListNode* getPriorNode(SSkipList* pSkipList, const char* val, int32_
pNode = p;
p = SL_GET_FORWARD_POINTER(p, i);
} else {
if (pCur != NULL) {
*pCur = p;
}
break;
}
}
......@@ -107,6 +113,9 @@ static SSkipListNode* getPriorNode(SSkipList* pSkipList, const char* val, int32_
pNode = p;
p = SL_GET_BACKWARD_POINTER(p, i);
} else {
if (pCur != NULL) {
*pCur = p;
}
break;
}
}
......@@ -295,7 +304,7 @@ SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey key) {
pthread_rwlock_wrlock(pSkipList->lock);
}
SSkipListNode* pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC);
SSkipListNode* pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC, NULL);
while (1) {
SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, 0);
if (p == pSkipList->pTail) {
......@@ -452,7 +461,7 @@ uint32_t tSkipListRemove(SSkipList *pSkipList, SSkipListKey key) {
pthread_rwlock_wrlock(pSkipList->lock);
}
SSkipListNode* pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC);
SSkipListNode* pNode = getPriorNode(pSkipList, key, TSDB_ORDER_ASC, NULL);
while (1) {
SSkipListNode *p = SL_GET_FORWARD_POINTER(pNode, 0);
if (p == pSkipList->pTail) {
......@@ -545,7 +554,7 @@ SSkipListIterator *tSkipListCreateIterFromVal(SSkipList* pSkipList, const char*
pthread_rwlock_rdlock(pSkipList->lock);
}
iter->cur = getPriorNode(pSkipList, val, order);
iter->cur = getPriorNode(pSkipList, val, order, &iter->next);
if (pSkipList->lock) {
pthread_rwlock_unlock(pSkipList->lock);
......@@ -567,8 +576,22 @@ bool tSkipListIterNext(SSkipListIterator *iter) {
if (iter->order == TSDB_ORDER_ASC) { // ascending order iterate
iter->cur = SL_GET_FORWARD_POINTER(iter->cur, 0);
// a new node is inserted into between iter->cur and iter->next, ignore it
if (iter->cur != iter->next && (iter->next != NULL)) {
iter->cur = iter->next;
}
iter->next = SL_GET_FORWARD_POINTER(iter->cur, 0);
} else { // descending order iterate
iter->cur = SL_GET_BACKWARD_POINTER(iter->cur, 0);
// a new node is inserted into between iter->cur and iter->next, ignore it
if (iter->cur != iter->next && (iter->next != NULL)) {
iter->cur = iter->next;
}
iter->next = SL_GET_BACKWARD_POINTER(iter->cur, 0);
}
if (pSkipList->lock) {
......@@ -715,9 +738,11 @@ SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t order)
iter->order = order;
if(order == TSDB_ORDER_ASC) {
iter->cur = pSkipList->pHead;
iter->next = SL_GET_FORWARD_POINTER(iter->cur, 0);
} else {
iter->cur = pSkipList->pTail;
iter->next = SL_GET_BACKWARD_POINTER(iter->cur, 0);
}
return iter;
}
\ No newline at end of file
# custom
/out/
/logs/
*.jar
# Created by .ignore support plugin (hsz.mobi)
.gitignore
# Build Artifacts
.gradle/*
build/*
target/*
bin/*
dependency-reduced-pom.xml
# Eclipse Project Files
.classpath
.project
.settings/*
......@@ -65,5 +65,10 @@
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.4</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
</project>
......@@ -11,7 +11,6 @@ public class JDBCConnectorChecker {
private static String tbName = "weather";
private Connection connection;
/**
* get connection
**/
......@@ -170,5 +169,4 @@ public class JDBCConnectorChecker {
checker.close();
}
}
package com.taosdata.example.jdbcTaosdemo;
import com.taosdata.example.jdbcTaosdemo.domain.JdbcTaosdemoConfig;
import com.taosdata.example.jdbcTaosdemo.task.CreateTableTask;
import com.taosdata.example.jdbcTaosdemo.task.InsertTableDatetimeTask;
import com.taosdata.example.jdbcTaosdemo.task.InsertTableTask;
import com.taosdata.example.jdbcTaosdemo.utils.TimeStampUtil;
import com.taosdata.jdbc.TSDBDriver;
import org.apache.log4j.Logger;
import java.sql.*;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
public class JdbcTaosdemo {
private static Logger logger = Logger.getLogger(JdbcTaosdemo.class);
private static AtomicLong beginTimestamp = new AtomicLong(TimeStampUtil.datetimeToLong("2005-01-01 00:00:00.000"));
private final JdbcTaosdemoConfig config;
private Connection connection;
private static final String[] locations = {"Beijing", "Shanghai", "Guangzhou", "Shenzhen", "HangZhou", "Tianjin", "Wuhan", "Changsha", "Nanjing", "Xian"};
private static Random random = new Random(System.currentTimeMillis());
public JdbcTaosdemo(JdbcTaosdemoConfig config) {
this.config = config;
}
public static void main(String[] args) {
JdbcTaosdemoConfig config = JdbcTaosdemoConfig.build(args);
boolean isHelp = Arrays.asList(args).contains("--help");
if (isHelp) {
JdbcTaosdemoConfig.printHelp();
return;
}
if (config.getHost() == null) {
JdbcTaosdemoConfig.printHelp();
return;
}
JdbcTaosdemo taosdemo = new JdbcTaosdemo(config);
taosdemo.init();
taosdemo.dropDatabase();
taosdemo.createDatabase();
taosdemo.useDatabase();
taosdemo.createSuperTable();
taosdemo.createTableMultiThreads();
boolean infinite = Arrays.asList(args).contains("--infinite");
if (infinite) {
logger.info("!!! Infinite Insert Mode Started. !!!!");
taosdemo.insertInfinite();
} else {
taosdemo.insertMultiThreads();
taosdemo.countFromSuperTable();
if (config.isDeleteTable())
taosdemo.dropSuperTable();
taosdemo.close();
}
}
/**
* establish the connection
*/
private void init() {
try {
Class.forName("com.taosdata.jdbc.TSDBDriver");
connection = getConnection(config);
if (connection != null)
logger.info("[ OK ] Connection established.");
} catch (ClassNotFoundException | SQLException e) {
logger.error(e.getMessage());
throw new RuntimeException("connection failed: " + config.getHost());
}
}
public static Connection getConnection(JdbcTaosdemoConfig config) throws SQLException {
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, config.getHost());
properties.setProperty(TSDBDriver.PROPERTY_KEY_USER, config.getUser());
properties.setProperty(TSDBDriver.PROPERTY_KEY_PASSWORD, config.getPassword());
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
return DriverManager.getConnection("jdbc:TAOS://" + config.getHost() + ":" + config.getPort() + "/" + config.getDbName() + "", properties);
}
/**
* create database
*/
private void createDatabase() {
String sql = "create database if not exists " + config.getDbName() + " keep " + config.getKeep() + " days " + config.getDays();
execute(sql);
}
private void dropDatabase() {
String sql = "drop database if exists " + config.getDbName();
execute(sql);
}
/**
* use database
*/
private void useDatabase() {
String sql = "use " + config.getDbName();
execute(sql);
}
private void createSuperTable() {
String sql = "create table if not exists " + config.getStbName() + "(ts timestamp, current float, voltage int, phase float) tags(location binary(64), groupId int)";
execute(sql);
}
/**
* create table use super table with multi threads
*/
private void createTableMultiThreads() {
try {
final int tableSize = config.getNumberOfTable() / config.getNumberOfThreads();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < config.getNumberOfThreads(); i++) {
Thread thread = new Thread(new CreateTableTask(config, i * tableSize, tableSize), "Thread-" + i);
threads.add(thread);
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
logger.info(">>> Multi Threads create table finished.");
} catch (InterruptedException e) {
logger.error(e.getMessage());
e.printStackTrace();
}
}
private void insertInfinite() {
try {
final long startDatetime = TimeStampUtil.datetimeToLong("2005-01-01 00:00:00.000");
final long finishDatetime = TimeStampUtil.datetimeToLong("2030-01-01 00:00:00.000");
final int tableSize = config.getNumberOfTable() / config.getNumberOfThreads();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < config.getNumberOfThreads(); i++) {
Thread thread = new Thread(new InsertTableDatetimeTask(config, i * tableSize, tableSize, startDatetime, finishDatetime), "Thread-" + i);
threads.add(thread);
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
logger.info(">>> Multi Threads insert table finished.");
} catch (InterruptedException e) {
logger.error(e.getMessage());
e.printStackTrace();
}
}
private void insertMultiThreads() {
try {
final int tableSize = config.getNumberOfTable() / config.getNumberOfThreads();
final int numberOfRecordsPerTable = config.getNumberOfRecordsPerTable();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < config.getNumberOfThreads(); i++) {
Thread thread = new Thread(new InsertTableTask(config, i * tableSize, tableSize, numberOfRecordsPerTable), "Thread-" + i);
threads.add(thread);
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
logger.info(">>> Multi Threads insert table finished.");
} catch (InterruptedException e) {
logger.error(e.getMessage());
e.printStackTrace();
}
}
public static String insertSql(int tableIndex, JdbcTaosdemoConfig config) {
float current = 10 + random.nextFloat();
int voltage = 200 + random.nextInt(20);
float phase = random.nextFloat();
String sql = "insert into " + config.getDbName() + "." + config.getTbPrefix() + "" + tableIndex + " " +
"values(" + beginTimestamp.getAndIncrement() + ", " + current + ", " + voltage + ", " + phase + ") ";
return sql;
}
public static String insertSql(int tableIndex, long ts, JdbcTaosdemoConfig config) {
float current = 10 + random.nextFloat();
int voltage = 200 + random.nextInt(20);
float phase = random.nextFloat();
String sql = "insert into " + config.getDbName() + "." + config.getTbPrefix() + "" + tableIndex + " " +
"values(" + ts + ", " + current + ", " + voltage + ", " + phase + ") ";
return sql;
}
public static String batchInsertSql(int tableIndex, long ts, int valueCnt, JdbcTaosdemoConfig config) {
float current = 10 + random.nextFloat();
int voltage = 200 + random.nextInt(20);
float phase = random.nextFloat();
StringBuilder sb = new StringBuilder();
sb.append("insert into " + config.getDbName() + "." + config.getTbPrefix() + "" + tableIndex + " " + "values");
for (int i = 0; i < valueCnt; i++) {
sb.append("(" + (ts + i) + ", " + current + ", " + voltage + ", " + phase + ") ");
}
return sb.toString();
}
public static String createTableSql(int tableIndex, JdbcTaosdemoConfig config) {
String location = locations[random.nextInt(locations.length)];
return "create table d" + tableIndex + " using " + config.getDbName() + "." + config.getStbName() + " tags('" + location + "'," + tableIndex + ")";
}
private void countFromSuperTable() {
String sql = "select count(*) from " + config.getDbName() + "." + config.getStbName();
executeQuery(sql);
}
private void close() {
try {
if (connection != null) {
this.connection.close();
logger.info("connection closed.");
}
} catch (SQLException e) {
logger.error(e.getMessage());
e.printStackTrace();
}
}
/**
* drop super table
*/
private void dropSuperTable() {
String sql = "drop table if exists " + config.getDbName() + "." + config.getStbName();
execute(sql);
}
/**
* execute sql, use this method when sql is create, alter, drop..
*/
private void execute(String sql) {
try (Statement statement = connection.createStatement()) {
long start = System.currentTimeMillis();
boolean execute = statement.execute(sql);
long end = System.currentTimeMillis();
printSql(sql, execute, (end - start));
} catch (SQLException e) {
logger.error(e.getMessage());
e.printStackTrace();
}
}
private static void printSql(String sql, boolean succeed, long cost) {
logger.info("[ " + (succeed ? "OK" : "ERROR!") + " ] time cost: " + cost + " ms, execute statement ====> " + sql);
}
private void executeQuery(String sql) {
try (Statement statement = connection.createStatement()) {
long start = System.currentTimeMillis();
ResultSet resultSet = statement.executeQuery(sql);
long end = System.currentTimeMillis();
printSql(sql, true, (end - start));
printResult(resultSet);
} catch (SQLException e) {
logger.error(e.getMessage());
e.printStackTrace();
}
}
private static void printResult(ResultSet resultSet) throws SQLException {
ResultSetMetaData metaData = resultSet.getMetaData();
while (resultSet.next()) {
StringBuilder sb = new StringBuilder();
for (int i = 1; i <= metaData.getColumnCount(); i++) {
String columnLabel = metaData.getColumnLabel(i);
String value = resultSet.getString(i);
sb.append(columnLabel + ": " + value + "\t");
}
logger.info(sb.toString());
}
}
}
package com.taosdata.example.jdbcTaosdemo.domain;
public class JdbcTaosdemoConfig {
//The host to connect to TDengine. Must insert one
private String host;
//The TCP/IP port number to use for the connection. Default is 6030.
private int port = 6030;
//The TDengine user name to use when connecting to the server. Default is 'root'
private String user = "root";
//The password to use when connecting to the server. Default is 'taosdata'
private String password = "taosdata";
//Destination database. Default is 'test'
private String dbName = "test";
//keep
private int keep = 365 * 20;
//
private int days = 30;
//Super table Name. Default is 'meters'
private String stbName = "meters";
//Table name prefix. Default is 'd'
private String tbPrefix = "d";
//The number of threads. Default is 10.
private int numberOfThreads = 10;
//The number of tables. Default is 10000.
private int numberOfTable = 10000;
//The number of records per table. Default is 100000
private int numberOfRecordsPerTable = 100000;
//Delete data. Default is false
private boolean deleteTable = true;
public static void printHelp() {
System.out.println("Usage: java -jar JDBCConnectorChecker.jar -h host [OPTION...]");
System.out.println("-p port The TCP/IP port number to use for the connection. Default is 6030");
System.out.println("-u user The TDengine user name to use when connecting to the server. Default is 'root'");
System.out.println("-P password The password to use when connecting to the server.Default is 'taosdata'");
System.out.println("-d database Destination database. Default is 'test'");
System.out.println("-m tablePrefix Table prefix name. Default is 'd'");
System.out.println("-T num_of_threads The number of threads. Default is 10");
System.out.println("-t num_of_tables The number of tables. Default is 10000");
System.out.println("-n num_of_records_per_table The number of records per table. Default is 100000");
System.out.println("-D delete table Delete data methods. Default is false");
System.out.println("--help Give this help list");
}
/**
* parse args from command line
*
* @param args command line args
* @return JdbcTaosdemoConfig
*/
public static JdbcTaosdemoConfig build(String[] args) {
JdbcTaosdemoConfig config = new JdbcTaosdemoConfig();
for (int i = 0; i < args.length; i++) {
if ("-h".equals(args[i]) && i < args.length - 1) {
config.setHost(args[++i]);
}
if ("-p".equals(args[i]) && i < args.length - 1) {
config.setPort(Integer.parseInt(args[++i]));
}
if ("-u".equals(args[i]) && i < args.length - 1) {
config.setUser(args[++i]);
}
if ("-P".equals(args[i]) && i < args.length - 1) {
config.setPassword(args[++i]);
}
if ("-d".equals(args[i]) && i < args.length - 1) {
config.setDbName(args[++i]);
}
if ("-m".equals(args[i]) && i < args.length - 1) {
config.setTbPrefix(args[++i]);
}
if ("-T".equals(args[i]) && i < args.length - 1) {
config.setNumberOfThreads(Integer.parseInt(args[++i]));
}
if ("-t".equals(args[i]) && i < args.length - 1) {
config.setNumberOfTable(Integer.parseInt(args[++i]));
}
if ("-n".equals(args[i]) && i < args.length - 1) {
config.setNumberOfRecordsPerTable(Integer.parseInt(args[++i]));
}
if ("-D".equals(args[i]) && i < args.length - 1) {
config.setDeleteTable(Boolean.parseBoolean(args[++i]));
}
}
return config;
}
public void setHost(String host) {
this.host = host;
}
public String getHost() {
return host;
}
public String getDbName() {
return dbName;
}
public void setDbName(String dbName) {
this.dbName = dbName;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getStbName() {
return stbName;
}
public void setStbName(String stbName) {
this.stbName = stbName;
}
public String getTbPrefix() {
return tbPrefix;
}
public void setTbPrefix(String tbPrefix) {
this.tbPrefix = tbPrefix;
}
public int getNumberOfThreads() {
return numberOfThreads;
}
public void setNumberOfThreads(int numberOfThreads) {
this.numberOfThreads = numberOfThreads;
}
public int getNumberOfTable() {
return numberOfTable;
}
public void setNumberOfTable(int numberOfTable) {
this.numberOfTable = numberOfTable;
}
public int getNumberOfRecordsPerTable() {
return numberOfRecordsPerTable;
}
public void setNumberOfRecordsPerTable(int numberOfRecordsPerTable) {
this.numberOfRecordsPerTable = numberOfRecordsPerTable;
}
public boolean isDeleteTable() {
return deleteTable;
}
public void setDeleteTable(boolean deleteTable) {
this.deleteTable = deleteTable;
}
public int getKeep() {
return keep;
}
public void setKeep(int keep) {
this.keep = keep;
}
public int getDays() {
return days;
}
public void setDays(int days) {
this.days = days;
}
}
package com.taosdata.example.jdbcTaosdemo.task;
import com.taosdata.example.jdbcTaosdemo.JdbcTaosdemo;
import com.taosdata.example.jdbcTaosdemo.domain.JdbcTaosdemoConfig;
import org.apache.log4j.Logger;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
public class CreateTableTask implements Runnable {
private static Logger logger = Logger.getLogger(CreateTableTask.class);
private final JdbcTaosdemoConfig config;
private final int startIndex;
private final int tableNumber;
public CreateTableTask(JdbcTaosdemoConfig config, int startIndex, int tableNumber) {
this.config = config;
this.startIndex = startIndex;
this.tableNumber = tableNumber;
}
@Override
public void run() {
try {
Connection connection = JdbcTaosdemo.getConnection(config);
for (int i = startIndex; i < startIndex + tableNumber; i++) {
Statement statement = connection.createStatement();
String sql = JdbcTaosdemo.createTableSql(i + 1, config);
// long start = System.currentTimeMillis();
boolean execute = statement.execute(sql);
// long end = System.currentTimeMillis();
// printSql(sql, execute, (end - start));
statement.close();
logger.info(">>> " + sql);
}
connection.close();
} catch (SQLException e) {
logger.error(e.getMessage());
e.printStackTrace();
}
}
}
package com.taosdata.example.jdbcTaosdemo.task;
import com.taosdata.example.jdbcTaosdemo.JdbcTaosdemo;
import com.taosdata.example.jdbcTaosdemo.domain.JdbcTaosdemoConfig;
import org.apache.log4j.Logger;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
public class InsertTableDatetimeTask implements Runnable {
private static Logger logger = Logger.getLogger(InsertTableDatetimeTask.class);
private final JdbcTaosdemoConfig config;
private final int startTableIndex;
private final int tableNumber;
private final long startDatetime;
private final long finishedDatetime;
public InsertTableDatetimeTask(JdbcTaosdemoConfig config, int startTableIndex, int tableNumber, long startDatetime, long finishedDatetime) {
this.config = config;
this.startTableIndex = startTableIndex;
this.tableNumber = tableNumber;
this.startDatetime = startDatetime;
this.finishedDatetime = finishedDatetime;
}
@Override
public void run() {
try {
Connection connection = JdbcTaosdemo.getConnection(config);
int valueCnt = 100;
for (long ts = startDatetime; ts < finishedDatetime; ts+= valueCnt) {
for (int i = startTableIndex; i < startTableIndex + tableNumber; i++) {
// String sql = JdbcTaosdemo.insertSql(i + 1, ts, config);
String sql = JdbcTaosdemo.batchInsertSql(i + 1, ts, valueCnt, config);
Statement statement = connection.createStatement();
statement.execute(sql);
statement.close();
logger.info(Thread.currentThread().getName() + ">>> " + sql);
}
}
connection.close();
} catch (SQLException e) {
logger.error(e.getMessage());
e.printStackTrace();
}
}
}
package com.taosdata.example.jdbcTaosdemo.task;
import com.taosdata.example.jdbcTaosdemo.JdbcTaosdemo;
import com.taosdata.example.jdbcTaosdemo.domain.JdbcTaosdemoConfig;
import org.apache.log4j.Logger;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
public class InsertTableTask implements Runnable {
private static final Logger logger = Logger.getLogger(InsertTableTask.class);
private final JdbcTaosdemoConfig config;
private final int startIndex;
private final int tableNumber;
private final int recordsNumber;
public InsertTableTask(JdbcTaosdemoConfig config, int startIndex, int tableNumber, int recordsNumber) {
this.config = config;
this.startIndex = startIndex;
this.tableNumber = tableNumber;
this.recordsNumber = recordsNumber;
}
@Override
public void run() {
try {
Connection connection = JdbcTaosdemo.getConnection(config);
for (int i = startIndex; i < startIndex + tableNumber; i++) {
for (int j = 0; j < recordsNumber; j++) {
String sql = JdbcTaosdemo.insertSql(i + 1, config);
Statement statement = connection.createStatement();
statement.execute(sql);
statement.close();
logger.info(Thread.currentThread().getName() + ">>> " + sql);
}
}
connection.close();
} catch (SQLException e) {
logger.error(e.getMessage());
e.printStackTrace();
}
}
}
package com.taosdata.example.jdbcTaosdemo.utils;
import java.sql.Date;
import java.text.ParseException;
import java.text.SimpleDateFormat;
public class TimeStampUtil {
private static final String datetimeFormat = "yyyy-MM-dd HH:mm:ss.SSS";
public static long datetimeToLong(String dateTime) {
SimpleDateFormat sdf = new SimpleDateFormat(datetimeFormat);
try {
return sdf.parse(dateTime).getTime();
} catch (ParseException e) {
throw new RuntimeException(e);
}
}
public static String longToDatetime(long time) {
SimpleDateFormat sdf = new SimpleDateFormat(datetimeFormat);
return sdf.format(new Date(time));
}
public static void main(String[] args) {
final String startTime = "2005-01-01 00:00:00.000";
long start = TimeStampUtil.datetimeToLong(startTime);
System.out.println(start);
String datetime = TimeStampUtil.longToDatetime(1519833600000L);
System.out.println(datetime);
}
}
### 设置###
#log4j.rootLogger=debug,stdout,DebugLog,ErrorLog
log4j.rootLogger=debug,DebugLog,ErrorLog
### 输出信息到控制抬 ###
#log4j.appender.stdout=org.apache.log4j.ConsoleAppender
#log4j.appender.stdout.Target=System.out
#log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
#log4j.appender.stdout.layout.ConversionPattern=[%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
### 输出DEBUG 级别以上的日志到=logs/error.log ###
log4j.appender.DebugLog=org.apache.log4j.DailyRollingFileAppender
log4j.appender.DebugLog.File=logs/debug.log
log4j.appender.DebugLog.Append=true
log4j.appender.DebugLog.Threshold=DEBUG
log4j.appender.DebugLog.layout=org.apache.log4j.PatternLayout
log4j.appender.DebugLog.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
### 输出ERROR 级别以上的日志到=logs/error.log ###
log4j.appender.ErrorLog=org.apache.log4j.DailyRollingFileAppender
log4j.appender.ErrorLog.File=logs/error.log
log4j.appender.ErrorLog.Append=true
log4j.appender.ErrorLog.Threshold=ERROR
log4j.appender.ErrorLog.layout=org.apache.log4j.PatternLayout
log4j.appender.ErrorLog.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
\ No newline at end of file
......@@ -22,8 +22,12 @@ if __name__ == '__main__':
# @password : Password
# @database : Database to use when connecting to TDengine server
# @config : Configuration directory
conn = taos.connect(host="127.0.0.1", user="root", password="taosdata", config="/etc/taos")
if len(sys.argv)>1:
hostname=sys.argv[1]
conn = taos.connect(host=hostname, user="root", password="taosdata", config="/etc/taos")
else:
conn = taos.connect(host="127.0.0.1", user="root", password="taosdata", config="/etc/taos")
# Generate a cursor object to run SQL commands
c1 = conn.cursor()
# Create a database named db
......
#!/bin/bash
bash ./case001/case001.sh
#bash ./case002/case002.sh
#bash ./case003/case003.sh
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
import (
"database/sql"
"fmt"
_ "github.com/taosdata/driver-go/taosSql"
"log"
"time"
)
func main() {
taosDriverName := "taosSql"
demodb := "demodb"
demot := "demot"
fmt.Printf("\n======== start demo test ========\n")
// open connect to taos server
db, err := sql.Open(taosDriverName, "root:taosdata@/tcp(192.168.1.217:7100)/")
if err != nil {
log.Fatalf("Open database error: %s\n", err)
}
defer db.Close()
drop_database(db, demodb)
create_database(db, demodb)
use_database(db, demodb)
create_table(db, demot)
insert_data(db, demot)
select_data(db, demot)
fmt.Printf("\n======== start stmt mode test ========\n")
demodbStmt := "demodbStmt"
demotStmt := "demotStmt"
drop_database_stmt(db, demodbStmt)
create_database_stmt(db, demodbStmt)
use_database_stmt(db, demodbStmt)
create_table_stmt(db, demotStmt)
insert_data_stmt(db, demotStmt)
select_data_stmt(db, demotStmt)
fmt.Printf("\n======== end demo test ========\n")
}
func drop_database(db *sql.DB, demodb string) {
st := time.Now().Nanosecond()
res, err := db.Exec("drop database if exists " + demodb)
checkErr(err, "drop database if exists "+demodb)
affectd, err := res.RowsAffected()
checkErr(err, "drop db, res.RowsAffected")
et := time.Now().Nanosecond()
fmt.Printf("drop database result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1e9)
//sleep 50毫秒
time.Sleep(time.Duration(50)* time.Millisecond)
}
func create_database(db *sql.DB, demodb string) {
st := time.Now().Nanosecond()
// create database
res, err := db.Exec("create database " + demodb)
checkErr(err, "create db, db.Exec")
affectd, err := res.RowsAffected()
checkErr(err, "create db, res.RowsAffected")
et := time.Now().Nanosecond()
fmt.Printf("create database result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1e9)
return
}
func use_database(db *sql.DB, demodb string) {
st := time.Now().Nanosecond()
// use database
res, err := db.Exec("use " + demodb) // notes: must no quote to db name
checkErr(err, "use db db.Exec")
affectd, err := res.RowsAffected()
checkErr(err, "use db, res.RowsAffected")
et := time.Now().Nanosecond()
fmt.Printf("use database result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1e9)
}
func create_table(db *sql.DB, demot string) {
st := time.Now().Nanosecond()
// create table
res, err := db.Exec("create table " + demot + " (ts timestamp, id int, name binary(8), len tinyint, flag bool, notes binary(8), fv float, dv double)")
checkErr(err, "create table db.Exec")
affectd, err := res.RowsAffected()
checkErr(err, "create table res.RowsAffected")
et := time.Now().Nanosecond()
fmt.Printf("create table result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1e9)
}
func insert_data(db *sql.DB, demot string) {
st := time.Now().Nanosecond()
// insert data
res, err := db.Exec("insert into " + demot +
" values (now, 100, 'beijing', 10, true, 'one', 123.456, 123.456)" +
" (now+1s, 101, 'shanghai', 11, true, 'two', 789.123, 789.123)" +
" (now+2s, 102, 'shenzhen', 12, false, 'three', 456.789, 456.789)")
checkErr(err, "insert data, db.Exec")
affectd, err := res.RowsAffected()
checkErr(err, "insert data res.RowsAffected")
et := time.Now().Nanosecond()
fmt.Printf("insert data result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1e9)
}
func select_data(db *sql.DB, demot string) {
st := time.Now().Nanosecond()
rows, err := db.Query("select * from ? ", demot) // go text mode
checkErr(err, "select db.Query")
fmt.Printf("%10s%s%8s %5s %9s%s %s %8s%s %7s%s %8s%s %4s%s %5s%s\n", " ", "ts", " ", "id", " ", "name", " ", "len", " ", "flag", " ", "notes", " ", "fv", " ", " ", "dv")
var affectd int
//decoder := mahonia.NewDecoder("gbk") // 把原来ANSI格式的文本文件里的字符,用gbk进行解码。
for rows.Next() {
var ts string
var name string
var id int
var len int8
var flag bool
var notes string
var fv float32
var dv float64
err = rows.Scan(&ts, &id, &name, &len, &flag, &notes, &fv, &dv)
checkErr(err, "select rows.Scan")
fmt.Printf("%s|\t", ts)
fmt.Printf("%d|\t", id)
fmt.Printf("%10s|\t", name)
fmt.Printf("%d|\t", len)
fmt.Printf("%t|\t", flag)
fmt.Printf("%s|\t", notes)
fmt.Printf("%06.3f|\t", fv)
fmt.Printf("%09.6f|\n\n", dv)
affectd++
}
et := time.Now().Nanosecond()
fmt.Printf("select data result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1e9)
//fmt.Printf("insert data result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1E9)
}
func drop_database_stmt(db *sql.DB, demodb string) {
st := time.Now().Nanosecond()
// drop test db
res, err := db.Exec("drop database if exists " + demodb)
checkErr(err, "drop database "+demodb)
affectd, err := res.RowsAffected()
checkErr(err, "drop db, res.RowsAffected")
et := time.Now().Nanosecond()
fmt.Printf("drop database result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1e9)
}
func create_database_stmt(db *sql.DB, demodb string) {
st := time.Now().Nanosecond()
// create database
//var stmt interface{}
stmt, err := db.Prepare("create database ?")
checkErr(err, "create db, db.Prepare")
//var res driver.Result
res, err := stmt.Exec(demodb)
checkErr(err, "create db, stmt.Exec")
//fmt.Printf("Query OK, %d row(s) affected()", res.RowsAffected())
affectd, err := res.RowsAffected()
checkErr(err, "create db, res.RowsAffected")
et := time.Now().Nanosecond()
fmt.Printf("create database result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1e9)
}
func use_database_stmt(db *sql.DB, demodb string) {
st := time.Now().Nanosecond()
// create database
//var stmt interface{}
stmt, err := db.Prepare("use " + demodb)
checkErr(err, "use db, db.Prepare")
res, err := stmt.Exec()
checkErr(err, "use db, stmt.Exec")
affectd, err := res.RowsAffected()
checkErr(err, "use db, res.RowsAffected")
et := time.Now().Nanosecond()
fmt.Printf("use database result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1e9)
}
func create_table_stmt(db *sql.DB, demot string) {
st := time.Now().Nanosecond()
// create table
// (ts timestamp, id int, name binary(8), len tinyint, flag bool, notes binary(8), fv float, dv double)
stmt, err := db.Prepare("create table ? (? timestamp, ? int, ? binary(10), ? tinyint, ? bool, ? binary(8), ? float, ? double)")
checkErr(err, "create table db.Prepare")
res, err := stmt.Exec(demot, "ts", "id", "name", "len", "flag", "notes", "fv", "dv")
checkErr(err, "create table stmt.Exec")
affectd, err := res.RowsAffected()
checkErr(err, "create table res.RowsAffected")
et := time.Now().Nanosecond()
fmt.Printf("create table result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1e9)
}
func insert_data_stmt(db *sql.DB, demot string) {
st := time.Now().Nanosecond()
// insert data into table
stmt, err := db.Prepare("insert into ? values(?, ?, ?, ?, ?, ?, ?, ?) (?, ?, ?, ?, ?, ?, ?, ?) (?, ?, ?, ?, ?, ?, ?, ?)")
checkErr(err, "insert db.Prepare")
res, err := stmt.Exec(demot, "now", 1000, "'haidian'", 6, true, "'AI world'", 6987.654, 321.987,
"now+1s", 1001, "'changyang'", 7, false, "'DeepMode'", 12356.456, 128634.456,
"now+2s", 1002, "'chuangping'", 8, true, "'database'", 3879.456, 65433478.456)
checkErr(err, "insert data, stmt.Exec")
affectd, err := res.RowsAffected()
checkErr(err, "res.RowsAffected")
et := time.Now().Nanosecond()
fmt.Printf("insert data result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1e9)
}
func select_data_stmt(db *sql.DB, demot string) {
st := time.Now().Nanosecond()
stmt, err := db.Prepare("select ?, ?, ?, ?, ?, ?, ?, ? from ?") // go binary mode
checkErr(err, "db.Prepare")
rows, err := stmt.Query("ts", "id", "name", "len", "flag", "notes", "fv", "dv", demot)
checkErr(err, "stmt.Query")
fmt.Printf("%10s%s%8s %5s %8s%s %s %10s%s %7s%s %8s%s %11s%s %14s%s\n", " ", "ts", " ", "id", " ", "name", " ", "len", " ", "flag", " ", "notes", " ", "fv", " ", " ", "dv")
var affectd int
for rows.Next() {
var ts string
var name string
var id int
var len int8
var flag bool
var notes string
var fv float32
var dv float64
err = rows.Scan(&ts, &id, &name, &len, &flag, &notes, &fv, &dv)
//fmt.Println("start scan fields from row.rs, &fv:", &fv)
//err = rows.Scan(&fv)
checkErr(err, "rows.Scan")
fmt.Printf("%s|\t", ts)
fmt.Printf("%d|\t", id)
fmt.Printf("%10s|\t", name)
fmt.Printf("%d|\t", len)
fmt.Printf("%t|\t", flag)
fmt.Printf("%s|\t", notes)
fmt.Printf("%06.3f|\t", fv)
fmt.Printf("%09.6f|\n", dv)
affectd++
}
et := time.Now().Nanosecond()
fmt.Printf("select data result:\n %d row(s) affectd (%6.6fs)\n\n", affectd, (float32(et-st))/1e9)
}
func checkErr(err error, prompt string) {
if err != nil {
fmt.Printf("%s\n", prompt)
panic(err)
}
}
#!/bin/bash
##################################################
#
# Do go test
#
##################################################
set +e
#set -x
script_dir="$(dirname $(readlink -f $0))"
#echo "pwd: $script_dir, para0: $0"
execName=$0
execName=`echo ${execName##*/}`
goName=`echo ${execName%.*}`
###### step 1: start one taosd
scriptDir=$script_dir/../../script/sh
bash $scriptDir/stop_dnodes.sh
bash $scriptDir/deploy.sh -n dnode1 -i 1
bash $scriptDir/cfg.sh -n dnode1 -c walLevel -v 0
bash $scriptDir/exec.sh -n dnode1 -s start
###### step 2: set config item
TAOS_CFG=/etc/taos/taos.cfg
HOSTNAME=`hostname -f`
if [ ! -f ${TAOS_CFG} ]; then
touch -f $TAOS_CFG
fi
echo " " > $TAOS_CFG
echo "firstEp ${HOSTNAME}:7100" >> $TAOS_CFG
echo "secondEp ${HOSTNAME}:7200" >> $TAOS_CFG
echo "serverPort 7100" >> $TAOS_CFG
#echo "dataDir $DATA_DIR" >> $TAOS_CFG
#echo "logDir $LOG_DIR" >> $TAOS_CFG
#echo "scriptDir ${CODE_DIR}/../script" >> $TAOS_CFG
echo "numOfLogLines 100000000" >> $TAOS_CFG
echo "dDebugFlag 135" >> $TAOS_CFG
echo "mDebugFlag 135" >> $TAOS_CFG
echo "sdbDebugFlag 135" >> $TAOS_CFG
echo "rpcDebugFlag 135" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "cDebugFlag 135" >> $TAOS_CFG
echo "httpDebugFlag 135" >> $TAOS_CFG
echo "monitorDebugFlag 135" >> $TAOS_CFG
echo "udebugFlag 135" >> $TAOS_CFG
echo "tablemetakeeptimer 5" >> $TAOS_CFG
echo "wal 0" >> $TAOS_CFG
echo "asyncLog 0" >> $TAOS_CFG
echo "locale en_US.UTF-8" >> $TAOS_CFG
echo "enableCoreFile 1" >> $TAOS_CFG
echo " " >> $TAOS_CFG
ulimit -n 600000
ulimit -c unlimited
#
##sudo sysctl -w kernel.core_pattern=$TOP_DIR/core.%p.%e
#
###### step 3: start build
cd $script_dir
rm -f go.*
go mod init $goName
go build
sleep 1s
sudo ./$goName
#!/bin/bash
##################################################
#
# Do go test
#
##################################################
set +e
#set -x
FILE_NAME=
RELEASE=0
while getopts "f:" arg
do
case $arg in
f)
FILE_NAME=$OPTARG
echo "input file: $FILE_NAME"
;;
?)
echo "unknow argument"
;;
esac
done
# start one taosd
bash ../script/sh/stop_dnodes.sh
bash ../script/sh/deploy.sh -n dnode1 -i 1
bash ../script/sh/cfg.sh -n dnode1 -c walLevel -v 0
bash ../script/sh/exec.sh -n dnode1 -s start
# start build test go file
caseDir=`echo ${FILE_NAME%/*}`
echo "caseDir: $caseDir"
cd $caseDir
rm go.*
go mod init $caseDir
go build
sleep 1s
./$caseDir
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tdSql.prepare()
tdSql.query('select database()')
tdSql.checkData(0, 0, "db")
tdSql.execute("alter database db comp 2")
tdSql.query("show databases")
tdSql.checkData(0, 14, 2)
tdSql.execute("alter database db keep 365")
tdSql.query("show databases")
tdSql.checkData(0, 7, "3650,3650,365")
tdSql.execute("alter database db quorum 2")
tdSql.query("show databases")
tdSql.checkData(0, 5, 2)
tdSql.execute("alter database db blocks 100")
tdSql.query("show databases")
tdSql.checkData(0, 9, 100)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
......@@ -17,6 +17,7 @@ python3 ./test.py -f insert/nchar-unicode.py
python3 ./test.py -f insert/multi.py
python3 ./test.py -f insert/randomNullCommit.py
python3 insert/retentionpolicy.py
python3 ./test.py -f insert/alterTableAndInsert.py
python3 ./test.py -f table/column_name.py
python3 ./test.py -f table/column_num.py
......@@ -163,6 +164,7 @@ python3 ./test.py -f alter/alter_table_crash.py
# client
python3 ./test.py -f client/client.py
python3 ./test.py -f client/version.py
python3 ./test.py -f client/alterDatabase.py
# Misc
python3 testCompress.py
......@@ -197,5 +199,5 @@ python3 test.py -f tools/taosdemo.py
# subscribe
python3 test.py -f subscribe/singlemeter.py
python3 test.py -f subscribe/stability.py
#python3 test.py -f subscribe/stability.py
python3 test.py -f subscribe/supertable.py
\ No newline at end of file
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
tdSql.prepare()
tdSql.execute("create table cars(ts timestamp, speed int) tags(id int)")
tdSql.execute("create table car0 using cars tags(0)")
tdSql.execute("insert into car0 values(now, 1)")
tdSql.execute("alter table cars add column c2 int")
tdSql.execute("insert into car0(ts, 'speed') values(now, 2)")
tdSql.checkAffectedRows(1)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import requests, json
import threading
import string
import random
class RestfulInsert:
def init(self):
self.header = {'Authorization': 'Basic cm9vdDp0YW9zZGF0YQ=='}
self.url = "http://127.0.0.1:6041/rest/sql"
self.ts = 1104508800000
self.numOfThreads = 50
def get_random_string(self, length):
letters = string.ascii_lowercase
result_str = ''.join(random.choice(letters) for i in range(length))
return result_str
def insertData(self, threadID):
print("thread %d started" % threadID)
data = "create table test.tb%d(ts timestamp, name nchar(20))" % threadID
requests.post(self.url, data, headers = self.header)
name = self.get_random_string(10)
start = self.ts
while True:
start += 1
data = "insert into test.tb%d values(%d, '%s')" % (threadID, start, name)
requests.post(self.url, data, headers = self.header)
def run(self):
data = "drop database if exists test"
requests.post(self.url, data, headers = self.header)
data = "create database test keep 7300"
requests.post(self.url, data, headers = self.header)
threads = []
for i in range(self.numOfThreads):
thread = threading.Thread(target=self.insertData, args=(i,))
thread.start()
threads.append(thread)
for i in range(self.numOfThreads):
threads[i].join()
ri = RestfulInsert()
ri.init()
ri.run()
\ No newline at end of file
......@@ -43,7 +43,8 @@ class TDTestRetetion:
else:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, self.queryRows, expectRows)
os.system("timedatectl set-ntp true")
os.system("sudo timedatectl set-ntp true")
time.sleep(40)
tdLog.exit("%s(%d) failed: sql:%s, queryRows:%d != expect:%d" % args)
def run(self):
......@@ -53,7 +54,7 @@ class TDTestRetetion:
tdSql.execute('use test;')
tdSql.execute('create table test(ts timestamp,i int);')
cmd = 'insert into test values(now-2d,11)(now-1d,11)(now,11)(now+1d,11);'
cmd = 'insert into test values(now-2d,1)(now-1d,2)(now,3)(now+1d,4);'
tdLog.info(cmd)
tdSql.execute(cmd)
tdSql.query('select * from test')
......@@ -61,46 +62,55 @@ class TDTestRetetion:
tdLog.info("=============== step2")
tdDnodes.stop(1)
os.system("timedatectl set-ntp false")
os.system("date -s $(date -d \"${DATE} 2 days\" \"+%Y%m%d\")")
os.system("sudo timedatectl set-ntp false")
os.system("sudo date -s $(date -d \"${DATE} 2 days\" \"+%Y%m%d\")")
tdDnodes.start(1)
cmd = 'insert into test values(now,11);'
cmd = 'insert into test values(now,5);'
tdDnodes.stop(1)
tdDnodes.start(1)
tdLog.info(cmd)
tdSql.execute(cmd)
queryRows=tdSql.query('select * from test')
if queryRows==4:
tdSql.checkRows(4)
self.queryRows=tdSql.query('select * from test')
if self.queryRows==4:
self.checkRows(4,cmd)
return 0
else:
tdSql.checkRows(5)
self.checkRows(5,cmd)
tdLog.info("=============== step3")
tdDnodes.stop(1)
os.system("date -s $(date -d \"${DATE} 2 days\" \"+%Y%m%d\")")
os.system("sudo date -s $(date -d \"${DATE} 2 days\" \"+%Y%m%d\")")
tdDnodes.start(1)
cmd = 'insert into test values(now-1d,11);'
tdLog.info(cmd)
tdSql.execute(cmd)
queryRows=tdSql.query('select * from test')
tdSql.checkRows(6)
self.queryRows=tdSql.query('select * from test')
if self.queryRows==4:
self.checkRows(4,cmd)
return 0
cmd = 'insert into test values(now-1d,6);'
tdLog.info(cmd)
tdSql.execute(cmd)
self.queryRows=tdSql.query('select * from test')
self.checkRows(6,cmd)
tdLog.info("=============== step4")
tdDnodes.stop(1)
tdDnodes.start(1)
cmd = 'insert into test values(now,11);'
cmd = 'insert into test values(now,7);'
tdLog.info(cmd)
tdSql.execute(cmd)
tdSql.query('select * from test')
tdSql.checkRows(7)
self.queryRows=tdSql.query('select * from test')
self.checkRows(7,cmd)
tdLog.info("=============== step5")
tdDnodes.stop(1)
tdDnodes.start(1)
cmd='select * from test where ts > now-1d'
queryRows=tdSql.query('select * from test where ts > now-1d')
self.queryRows=tdSql.query('select * from test where ts > now-1d')
self.checkRows(1,cmd)
def stop(self):
os.system("timedatectl set-ntp true")
os.system("sudo timedatectl set-ntp true")
time.sleep(40)
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
......
......@@ -35,7 +35,8 @@ class TDTestCase:
tdSql.execute(
"insert into tb2 using stb1 tags(2,'tb2', '表2') values ('2020-04-18 15:00:02.000', 3, 2.1), ('2020-04-18 15:00:03.000', 4, 2.2)")
# inner join --- bug
tdSql.error("select * from tb 1")
tdSql.query("select * from tb1 a, tb2 b where a.ts = b.ts")
tdSql.checkRows(0)
......
......@@ -16,6 +16,7 @@ import os
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestCase:
......@@ -25,11 +26,30 @@ class TDTestCase:
self.numberOfTables = 10000
self.numberOfRecords = 100
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root)-len("/build/bin")]
break
return buildPath
def run(self):
tdSql.prepare()
os.system("yes | taosdemo -t %d -n %d" % (self.numberOfTables, self.numberOfRecords))
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
tdLog.info("taosd found in %s" % buildPath)
binPath = buildPath+ "/build/bin/"
os.system("yes | %staosdemo -t %d -n %d" % (binPath,self.numberOfTables, self.numberOfRecords))
tdSql.execute("use test")
tdSql.query("select count(*) from meters")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册