提交 d37ead6e 编写于 作者: H hjxilinx

merge code, and fix memory leaks

......@@ -208,7 +208,7 @@ Since the native language of TDengine is C, the necessary TDengine library shoul
* taos.dll (Windows)
After TDengine client is installed, the library `taos.dll` will be automatically copied to the `C:/Windows/System32`, which is the system's default search path.
> Note: Please make sure that TDengine Windows client has been installed if developing on Windows.
> Note: Please make sure that [TDengine Windows client][14] has been installed if developing on Windows. Now although TDengine client would be defaultly installed together with TDengine server, it can also be installed [alone][15].
Since TDengine is time-series database, there are still some differences compared with traditional databases in using TDengine JDBC driver:
* TDengine doesn't allow to delete/modify a single record, and thus JDBC driver also has no such method.
......@@ -882,4 +882,5 @@ An example of using the NodeJS connector to achieve the same things but without
[11]: https://github.com/taosdata/TDengine/tree/develop/tests/examples/JDBC/SpringJdbcTemplate
[12]: https://github.com/taosdata/TDengine/tree/develop/tests/examples/JDBC/springbootdemo
[13]: https://www.taosdata.com/cn/documentation/administrator/#%E5%AE%A2%E6%88%B7%E7%AB%AF%E9%85%8D%E7%BD%AE
[14]: https://www.taosdata.com/cn/documentation/connector/#Windows%E5%AE%A2%E6%88%B7%E7%AB%AF%E5%8F%8A%E7%A8%8B%E5%BA%8F%E6%8E%A5%E5%8F%A3
[15]: https://www.taosdata.com/cn/getting-started/#%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B
......@@ -198,7 +198,7 @@ TDengine 为了方便 Java 应用使用,提供了遵循 JDBC 标准(3.0)API
* taos.dll
在 windows 系统中安装完客户端之后,驱动包依赖的 taos.dll 文件会自动拷贝到系统默认搜索路径 C:/Windows/System32 下,同样无需要单独指定。
> 注意:在 windows 环境开发时需要安装 TDengine 对应的 windows 版本客户端,由于目前没有提供 Linux 环境单独的客户端,需要安装 TDengine 才能使用
> 注意:在 windows 环境开发时需要安装 TDengine 对应的 [windows 客户端][14],Linux 服务器安装完 TDengine 之后默认已安装 client,也可以单独安装 [Linux 客户端][15] 连接远程 TDengine Server
TDengine 的 JDBC 驱动实现尽可能的与关系型数据库驱动保持一致,但时序空间数据库与关系对象型数据库服务的对象和技术特征的差异导致 taos-jdbcdriver 并未完全实现 JDBC 标准规范。在使用时需要注意以下几点:
......@@ -1135,3 +1135,5 @@ TDengine在Window系统上提供的API与Linux系统是相同的, 应用程序
[11]: https://github.com/taosdata/TDengine/tree/develop/tests/examples/JDBC/SpringJdbcTemplate
[12]: https://github.com/taosdata/TDengine/tree/develop/tests/examples/JDBC/springbootdemo
[13]: https://www.taosdata.com/cn/documentation/administrator/#%E5%AE%A2%E6%88%B7%E7%AB%AF%E9%85%8D%E7%BD%AE
[14]: https://www.taosdata.com/cn/documentation/connector/#Windows%E5%AE%A2%E6%88%B7%E7%AB%AF%E5%8F%8A%E7%A8%8B%E5%BA%8F%E6%8E%A5%E5%8F%A3
[15]: https://www.taosdata.com/cn/getting-started/#%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B
......@@ -97,6 +97,10 @@ go build -o bin/taosimport app/main.go
是否保存统计信息到 tdengine 的 statistic 表中,1 是,0 否, 默认 0。
* -savetb int
当 save 为 1 时保存统计信息的表名, 默认 statistic。
* -auto int
是否自动生成样例数据中的主键时间戳,1 是,0 否, 默认 0。
......
......@@ -28,6 +28,7 @@ const (
DEFAULT_STARTTIME int64 = -1
DEFAULT_INTERVAL int64 = 1*1000
DEFAULT_DELAY int64 = -1
DEFAULT_STATISTIC_TABLE = "statistic"
JSON_FORMAT = "json"
CSV_FORMAT = "csv"
......@@ -37,7 +38,6 @@ const (
DRIVER_NAME = "taosSql"
STARTTIME_LAYOUT = "2006-01-02 15:04:05.000"
INSERT_PREFIX = "insert into "
STATISTIC_TABLE = "statistic"
)
var (
......@@ -75,6 +75,7 @@ var (
delay int64 // default 10 milliseconds
tick int64
save int
saveTable string
)
type superTableConfig struct {
......@@ -278,9 +279,9 @@ func staticSpeed(){
if save == 1 {
connection.Exec("use " + db)
_, err := connection.Exec("create table if not exists " + STATISTIC_TABLE +"(ts timestamp, speed int)")
_, err := connection.Exec("create table if not exists " + saveTable +"(ts timestamp, speed int)")
if err != nil {
log.Fatalf("create %s Table error: %s\n", STATISTIC_TABLE, err)
log.Fatalf("create %s Table error: %s\n", saveTable, err)
}
}
......@@ -297,7 +298,7 @@ func staticSpeed(){
log.Printf("insert %d rows, used %d ms, speed %d rows/s", currentSuccessRows, usedTime/1e6, speed)
if save == 1 {
insertSql := fmt.Sprintf("insert into %s values(%d, %d)", STATISTIC_TABLE, currentTime.UnixNano()/1e6, speed)
insertSql := fmt.Sprintf("insert into %s values(%d, %d)", saveTable, currentTime.UnixNano()/1e6, speed)
connection.Exec(insertSql)
}
......@@ -353,7 +354,7 @@ func createStatisticTable(){
connection := getConnection()
defer connection.Close()
_, err := connection.Exec("create table if not exist " + db + "."+ STATISTIC_TABLE +"(ts timestamp, speed int)")
_, err := connection.Exec("create table if not exist " + db + "."+ saveTable +"(ts timestamp, speed int)")
if err != nil {
log.Fatalf("createStatisticTable error: %s\n", err)
}
......@@ -1037,6 +1038,7 @@ func parseArg() {
flag.Int64Var(&delay, "delay", DEFAULT_DELAY, "the delay time interval(millisecond) to continue generating data when vnum set 0.")
flag.Int64Var(&tick, "tick", 2000, "the tick time interval(millisecond) to print statistic info.")
flag.IntVar(&save, "save", 0, "whether to save the statistical info into 'statistic' table. 0 is disabled and 1 is enabled.")
flag.StringVar(&saveTable, "savetb", DEFAULT_STATISTIC_TABLE, "the table to save 'statistic' info when save set 1.")
flag.IntVar(&thread, "thread", 10, "number of threads to import data.")
flag.IntVar(&batch, "batch", 100, "rows of records in one import batch.")
flag.IntVar(&auto, "auto", 0, "whether to use the starttime and interval specified by users when simulating the data. 0 is disabled and 1 is enabled.")
......@@ -1062,6 +1064,7 @@ func printArg() {
fmt.Println("-delay:", delay)
fmt.Println("-tick:", tick)
fmt.Println("-save:", save)
fmt.Println("-savetb:", saveTable)
fmt.Println("-thread:", thread)
fmt.Println("-batch:", batch)
fmt.Println("-auto:", auto)
......
......@@ -68,6 +68,7 @@ enum _sql_cmd {
TSDB_SQL_RETRIEVE_METRIC,
TSDB_SQL_METRIC_JOIN_RETRIEVE,
TSDB_SQL_RETRIEVE_TAGS,
/*
* build empty result instead of accessing dnode to fetch result
* reset the client cache
......
......@@ -148,6 +148,8 @@ SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functi
SSqlExpr* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index);
void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t uid);
void* tscSqlExprDestroy(SSqlExpr* pExpr);
void tscSqlExprInfoDestroy(SSqlExprInfo* pExprInfo);
SColumnBase* tscColumnBaseInfoInsert(SQueryInfo* pQueryInfo, SColumnIndex* colIndex);
void tscColumnFilterInfoCopy(SColumnFilterInfo* dst, const SColumnFilterInfo* src);
......@@ -236,7 +238,7 @@ TAOS* taos_connect_a(char* ip, char* user, char* pass, char* db, uint16_t port,
void sortRemoveDuplicates(STableDataBlocks* dataBuf);
void tscPrintSelectClause(SSqlCmd* pCmd, int32_t subClauseIndex);
void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex);
bool hasMoreVnodesToTry(SSqlObj *pSql);
void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
......
......@@ -171,6 +171,7 @@ typedef struct STableDataBlocks {
int32_t rowSize; // row size for current table
uint32_t nAllocSize;
uint32_t headerSize; // header for metadata (submit metadata)
uint32_t size;
/*
......@@ -248,7 +249,9 @@ typedef struct {
};
int32_t clauseIndex; // index of multiple subclause query
int8_t isParseFinish;
short numOfCols;
uint32_t allocSize;
char * payload;
int payloadLen;
......
......@@ -477,7 +477,7 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
}
if (pSql->pStream == NULL) {
// check if it is a sub-query of metric query first, if true, enter another routine
// check if it is a sub-query of super table query first, if true, enter another routine
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) == TSDB_QUERY_TYPE_STABLE_SUBQUERY) {
......@@ -490,7 +490,7 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
assert(pParObj->signature == pParObj && trs->subqueryIndex == pMeterMetaInfo->vnodeIndex &&
pMeterMetaInfo->pMeterMeta->numOfTags != 0);
tscTrace("%p get metricMeta during metric query successfully", pSql);
tscTrace("%p get metricMeta during super table query successfully", pSql);
code = tscGetMeterMeta(pSql, pMeterMetaInfo);
pRes->code = code;
......@@ -502,8 +502,21 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
} else { // normal async query continues
code = tsParseSql(pSql, false);
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
if (pCmd->isParseFinish) {
tscTrace("%p resend data to vnode in metermeta callback since sql has been parsed completed", pSql);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
code = tscGetMeterMeta(pSql, pMeterMetaInfo);
assert(code == TSDB_CODE_SUCCESS);
if (pMeterMetaInfo->pMeterMeta) {
code = tscSendMsgToServer(pSql);
if (code == TSDB_CODE_SUCCESS) return;
}
} else {
code = tsParseSql(pSql, false);
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
}
}
} else { // stream computing
......@@ -514,7 +527,7 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
if (code == TSDB_CODE_SUCCESS && UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) {
code = tscGetMetricMeta(pSql, 0);
code = tscGetMetricMeta(pSql, pCmd->clauseIndex);
pRes->code = code;
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
......@@ -531,7 +544,7 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
tscTrace("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command);
/*
* NOTE:
* transfer the sql function for metric query before get meter/metric meta,
* transfer the sql function for super table query before get meter/metric meta,
* since in callback functions, only tscProcessSql(pStream->pSql) is executed!
*/
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
......
......@@ -199,7 +199,7 @@ void tscDestroyJoinSupporter(SJoinSubquerySupporter* pSupporter) {
return;
}
tfree(pSupporter->exprsInfo.pExprs);
tscSqlExprInfoDestroy(&pSupporter->exprsInfo);
tscColumnBaseInfoDestroy(&pSupporter->colList);
tscClearFieldInfo(&pSupporter->fieldsInfo);
......@@ -264,19 +264,25 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
pState->numOfCompleted = (pSql->numOfSubs - numOfSub);
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
pSupporter = pSub->param;
SSqlObj* pPrevSub = pSql->pSubs[i];
pSupporter = pPrevSub->param;
if (pSupporter->exprsInfo.numOfExprs == 0) {
tscTrace("%p subquery %d, not need to launch query, ignore it", pSql, i);
tscDestroyJoinSupporter(pSupporter);
tscFreeSqlObj(pSub);
tscFreeSqlObj(pPrevSub);
pSql->pSubs[i] = NULL;
continue;
}
SQueryInfo* pSubQueryInfo = tscGetQueryInfoDetail(&pPrevSub->cmd, 0);
STSBuf* pTSBuf = pSubQueryInfo->tsBuf;
pSubQueryInfo->tsBuf = NULL;
taos_free_result(pPrevSub);
// todo refactor to avoid the memory problem handling
SSqlObj* pNew = createSubqueryObj(pSql, (int16_t)i, tscJoinQueryCallback, pSupporter, NULL);
if (pNew == NULL) {
......@@ -289,16 +295,10 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
}
tscClearSubqueryInfo(&pNew->cmd);
pSql->pSubs[i] = pNew;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
SQueryInfo* pSubQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0);
pQueryInfo->tsBuf = pSubQueryInfo->tsBuf;
pSubQueryInfo->tsBuf = NULL;
taos_free_result(pSub);
pQueryInfo->tsBuf = pTSBuf; // transfer the ownership of timestamp comp-z data to the new created object
// set the second stage sub query for join process
pQueryInfo->type |= TSDB_QUERY_TYPE_JOIN_SEC_STAGE;
......@@ -321,7 +321,6 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
assert(pNew->numOfSubs == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
tscAddTimestampColumn(pQueryInfo, TSDB_FUNC_TS, 0);
tscFieldInfoCalOffset(pNewQueryInfo);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pNewQueryInfo, 0);
......@@ -343,9 +342,7 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
pExpr->numOfParams = 1;
}
#ifdef _DEBUG_VIEW
tscPrintSelectClause(&pNew->cmd, 0);
#endif
tscPrintSelectClause(pNew, 0);
tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, "
"exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
......
......@@ -303,11 +303,11 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload,
// binary data cannot be null-terminated char string, otherwise the last char of the string is lost
if (pToken->type == TK_NULL) {
*payload = TSDB_DATA_BINARY_NULL;
} else { // too long values will return invalid sql, not be truncated automatically
} else { // too long values will return invalid sql, not be truncated automatically
if (pToken->n > pSchema->bytes) {
return tscInvalidSQLErrMsg(msg, "string data overflow", pToken->z);
}
strncpy(payload, pToken->z, pToken->n);
if (pToken->n < pSchema->bytes) {
......@@ -325,7 +325,7 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload,
if (!taosMbsToUcs4(pToken->z, pToken->n, payload, pSchema->bytes)) {
char buf[512] = {0};
snprintf(buf, 512, "%s", strerror(errno));
return tscInvalidSQLErrMsg(msg, buf, pToken->z);
}
}
......@@ -343,7 +343,7 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload,
if (tsParseTime(pToken, &temp, str, msg, timePrec) != TSDB_CODE_SUCCESS) {
return tscInvalidSQLErrMsg(msg, "invalid timestamp", pToken->z);
}
*((int64_t *)payload) = temp;
}
......@@ -526,8 +526,7 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, SMeterMeta *pMeterMe
*code = retcode;
return -1;
}
assert(tSize > maxRows);
ASSERT(tSize > maxRows);
maxRows = tSize;
}
......@@ -576,8 +575,9 @@ static void tscSetAssignedColumnInfo(SParsedDataColInfo *spd, SSchema *pSchema,
int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows) {
size_t remain = pDataBlock->nAllocSize - pDataBlock->size;
const int factor = 5;
uint32_t nAllocSizeOld = pDataBlock->nAllocSize;
uint32_t nAllocSizeOld = pDataBlock->nAllocSize;
assert(pDataBlock->headerSize >= 0);
// expand the allocated size
if (remain < rowSize * factor) {
while (remain < rowSize * factor) {
......@@ -590,15 +590,14 @@ int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int3
pDataBlock->pData = tmp;
memset(pDataBlock->pData + pDataBlock->size, 0, pDataBlock->nAllocSize - pDataBlock->size);
} else {
// assert(false);
// do nothing
// do nothing, if allocate more memory failed
pDataBlock->nAllocSize = nAllocSizeOld;
*numOfRows = (int32_t)(pDataBlock->nAllocSize) / rowSize;
*numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
}
*numOfRows = (int32_t)(pDataBlock->nAllocSize) / rowSize;
*numOfRows = (int32_t)(pDataBlock->nAllocSize - pDataBlock->headerSize) / rowSize;
return TSDB_CODE_SUCCESS;
}
......@@ -925,6 +924,10 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
createTable = true;
code = tscGetMeterMetaEx(pSql, pMeterMetaInfo, true);
if (TSDB_CODE_ACTION_IN_PROGRESS == code) {
return code;
}
} else {
if (cstart != NULL) {
sql = cstart;
......@@ -995,7 +998,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
return code;
}
ASSERT(((NULL == pSql->asyncTblPos) && (NULL == pSql->pTableHashList))
assert(((NULL == pSql->asyncTblPos) && (NULL == pSql->pTableHashList))
|| ((NULL != pSql->asyncTblPos) && (NULL != pSql->pTableHashList)));
if ((NULL == pSql->asyncTblPos) && (NULL == pSql->pTableHashList)) {
......@@ -1007,11 +1010,11 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
goto _error_clean;
}
} else {
ASSERT((NULL != pSql->asyncTblPos) && (NULL != pSql->pTableHashList));
assert((NULL != pSql->asyncTblPos) && (NULL != pSql->pTableHashList));
str = pSql->asyncTblPos;
}
tscTrace("%p create data block list for submit data, %p", pSql, pSql->cmd.pDataBlocks);
tscTrace("%p create data block list for submit data:%p, asyncTblPos:%p, pTableHashList:%p", pSql, pSql->cmd.pDataBlocks, pSql->asyncTblPos, pSql->pTableHashList);
while (1) {
int32_t index = 0;
......@@ -1067,7 +1070,8 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
return code;
}
tscTrace("async insert parse error, code:%d, %s", code, tsError[code]);
// todo add to return
tscError("async insert parse error, code:%d, %s", code, tsError[code]);
pSql->asyncTblPos = NULL;
}
......@@ -1257,7 +1261,11 @@ _error_clean:
_clean:
taosCleanUpHashTable(pSql->pTableHashList);
pSql->pTableHashList = NULL;
pSql->asyncTblPos = NULL;
pCmd->isParseFinish = 1;
return code;
}
......
......@@ -200,8 +200,8 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
int32_t code = tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo);
assert(pQueryInfo->numOfTables == 0);
SMeterMetaInfo* pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pQueryInfo);
pCmd->command = pInfo->type;
......@@ -496,7 +496,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
case TSDB_SQL_SELECT: {
assert(pCmd->numOfClause == 1);
const char* msg1 = "columns in select caluse not identical";
const char* msg1 = "columns in select clause not identical";
for (int32_t i = pCmd->numOfClause; i < pInfo->subclauseInfo.numOfClause; ++i) {
SQueryInfo* pqi = NULL;
......@@ -513,7 +513,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return code;
}
tscPrintSelectClause(pCmd, i);
tscPrintSelectClause(pSql, i);
}
// set the command/global limit parameters from the first subclause to the sqlcmd object
......@@ -590,7 +590,7 @@ int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
return TSDB_CODE_INVALID_SQL;
}
/* revised the time precision according to the flag */
// if the unit of time window value is millisecond, change the value from microsecond
if (pMeterMetaInfo->pMeterMeta->precision == TSDB_TIME_PRECISION_MILLI) {
pQueryInfo->nAggTimeInterval = pQueryInfo->nAggTimeInterval / 1000;
}
......@@ -608,13 +608,24 @@ int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
return TSDB_CODE_SUCCESS;
}
// check the invalid sql expresssion: select count(tbname)/count(tag1)/count(tag2) from super_table interval(1d);
/*
* check invalid SQL:
* select count(tbname)/count(tag1)/count(tag2) from super_table_name interval(1d);
*/
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId == TSDB_FUNC_COUNT && TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
}
}
/*
* check invalid SQL:
* select tbname, tags_fields from super_table_name interval(1s)
*/
if (tscQueryMetricTags(pQueryInfo) && pQueryInfo->nAggTimeInterval > 0) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
}
// need to add timestamp column in result set, if interval is existed
uint64_t uid = tscSqlExprGet(pQueryInfo, 0)->uid;
......@@ -5119,33 +5130,32 @@ int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCreateDbMsg* pCreate) {
}
// for debug purpose
void tscPrintSelectClause(SSqlCmd* pCmd, int32_t subClauseIndex) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex);
void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, subClauseIndex);
if (pCmd == NULL || pQueryInfo->exprsInfo.numOfExprs == 0) {
if (pQueryInfo->exprsInfo.numOfExprs == 0) {
return;
}
char* str = calloc(1, 10240);
int32_t totalBufSize = 1024;
char str[1024] = {0};
int32_t offset = 0;
offset += sprintf(str, "%d [", pQueryInfo->exprsInfo.numOfExprs);
offset += sprintf(str, "num:%d [", pQueryInfo->exprsInfo.numOfExprs);
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
int32_t size = sprintf(str + offset, "%s(uid:%" PRId64 ", %d)", aAggs[pExpr->functionId].aName,
offset += snprintf(str + offset, totalBufSize - offset, "%s(uid:%" PRId64 ", %d)", aAggs[pExpr->functionId].aName,
pExpr->uid, pExpr->colInfo.colId);
offset += size;
if (i < pQueryInfo->exprsInfo.numOfExprs - 1) {
str[offset++] = ',';
}
}
str[offset] = ']';
printf("%s\n", str);
free(str);
tscTrace("%p select clause:%s", pSql, str);
}
int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSqlInfo* pInfo) {
......
......@@ -731,16 +731,15 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
pSql, pNew, tableIndex, pMeterMetaInfo->vnodeIndex, pNewQueryInfo->type,
pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pMeterInfo[0]->name);
tscPrintSelectClause(pNew, 0);
} else {
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY;
}
#ifdef _DEBUG_VIEW
tscPrintSelectClause(&pNew->cmd, 0);
tscPrintSelectClause(pNew, 0);
#endif
return tscProcessSql(pNew);
}
......@@ -1391,10 +1390,14 @@ static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsu
void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
SSqlObj * pSql = (SSqlObj *)tres;
SSqlObj* pParentSql = trsupport->pParentSqlObj;
SSqlObj* pSql = (SSqlObj *)tres;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
int32_t idx = pMeterMetaInfo->vnodeIndex;
assert(pSql->cmd.numOfClause == 1 && pSql->cmd.pQueryInfo[0]->numOfTables == 1);
int32_t idx = pMeterMetaInfo->vnodeIndex;
SVnodeSidList *vnodeInfo = NULL;
SVPeerDesc * pSvd = NULL;
......@@ -1405,17 +1408,17 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
SSubqueryState* pState = trsupport->pState;
assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
trsupport->pParentSqlObj->numOfSubs == pState->numOfTotal);
pParentSql->numOfSubs == pState->numOfTotal);
if (trsupport->pParentSqlObj->res.code != TSDB_CODE_SUCCESS || pState->code != TSDB_CODE_SUCCESS) {
if (pParentSql->res.code != TSDB_CODE_SUCCESS || pState->code != TSDB_CODE_SUCCESS) {
// metric query is killed, Note: code must be less than 0
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
if (trsupport->pParentSqlObj->res.code != TSDB_CODE_SUCCESS) {
code = -(int)(trsupport->pParentSqlObj->res.code);
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
code = -(int)(pParentSql->res.code);
} else {
code = pState->code;
}
tscTrace("%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%d", trsupport->pParentSqlObj, pSql,
tscTrace("%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%d", pParentSql, pSql,
trsupport->subqueryIndex, code);
}
......@@ -1428,15 +1431,15 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
*/
if (code != TSDB_CODE_SUCCESS) {
if (trsupport->numOfRetry++ >= MAX_NUM_OF_SUBQUERY_RETRY) {
tscTrace("%p sub:%p reach the max retry count,set global code:%d", trsupport->pParentSqlObj, pSql, code);
tscTrace("%p sub:%p reach the max retry count,set global code:%d", pParentSql, pSql, code);
atomic_val_compare_exchange_32(&pState->code, 0, code);
} else { // does not reach the maximum retry count, go on
tscTrace("%p sub:%p failed code:%d, retry:%d", trsupport->pParentSqlObj, pSql, code, trsupport->numOfRetry);
tscTrace("%p sub:%p failed code:%d, retry:%d", pParentSql, pSql, code, trsupport->numOfRetry);
SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql);
SSqlObj *pNew = tscCreateSqlObjForSubquery(pParentSql, trsupport, pSql);
if (pNew == NULL) {
tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vid:%d, orderOfSub:%d",
trsupport->pParentSqlObj, pSql, pSvd->vnode, trsupport->subqueryIndex);
pParentSql, pSql, pSvd->vnode, trsupport->subqueryIndex);
pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
......@@ -1451,17 +1454,17 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
if (pState->code != TSDB_CODE_SUCCESS) { // failed, abort
if (vnodeInfo != NULL) {
tscTrace("%p sub:%p query failed,ip:%u,vid:%d,orderOfSub:%d,global code:%d", trsupport->pParentSqlObj, pSql,
tscTrace("%p sub:%p query failed,ip:%u,vid:%d,orderOfSub:%d,global code:%d", pParentSql, pSql,
vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode,
trsupport->subqueryIndex, pState->code);
} else {
tscTrace("%p sub:%p query failed,orderOfSub:%d,global code:%d", trsupport->pParentSqlObj, pSql,
tscTrace("%p sub:%p query failed,orderOfSub:%d,global code:%d", pParentSql, pSql,
trsupport->subqueryIndex, pState->code);
}
tscRetrieveFromVnodeCallBack(param, tres, pState->code);
} else { // success, proceed to retrieve data from dnode
tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", pParentSql, pSql,
vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode,
trsupport->subqueryIndex);
......@@ -1471,7 +1474,6 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
char *pMsg, *pStart;
int msgLen = 0;
pStart = pSql->cmd.payload + tsRpcHeadSize;
pMsg = pStart;
......@@ -2143,7 +2145,6 @@ int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SDropUserMsg *pDropMsg;
char * pMsg, *pStart;
int msgLen = 0;
SSqlCmd *pCmd = &pSql->cmd;
......@@ -2604,7 +2605,6 @@ int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder
int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SConnectMsg *pConnect;
char * pMsg, *pStart;
int msgLen = 0;
SSqlCmd *pCmd = &pSql->cmd;
STscObj *pObj = pSql->pTscObj;
......@@ -3697,6 +3697,11 @@ int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) {
pNewQueryInfo->slimit = pQueryInfo->slimit;
pNewQueryInfo->order = pQueryInfo->order;
STagCond* pTagCond = &pNewQueryInfo->tagCond;
tscTrace("%p new sqlobj:%p info, numOfTables:%d, slimit:%" PRId64 ", soffset:%" PRId64 ", order:%d, tbname cond:%s",
pSql, pNew, pNewQueryInfo->numOfTables, pNewQueryInfo->slimit.limit, pNewQueryInfo->slimit.offset,
pNewQueryInfo->order.order, pTagCond->tbnameCond.cond)
// if (pSql->fp != NULL && pSql->pStream == NULL) {
// pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
......@@ -3725,7 +3730,6 @@ int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) {
pMeterMetaInfo->pMetricMeta = (SMetricMeta *)taosGetDataFromCache(tscCacheHandle, tagstr);
}
}
tscFreeSqlObj(pNew);
} else {
......
......@@ -386,7 +386,14 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) {
static void **doSetResultRowData(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
assert(pRes->row >= 0 && pRes->row <= pRes->numOfRows);
if (pRes->row >= pRes->numOfRows) { // all the results has returned to invoker
tfree(pRes->tsrow);
return pRes->tsrow;
}
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
int32_t num = 0;
......@@ -399,7 +406,6 @@ static void **doSetResultRowData(SSqlObj *pSql) {
}
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
if (isNull(pRes->tsrow[i], pField->type)) {
pRes->tsrow[i] = NULL;
} else if (pField->type == TSDB_DATA_TYPE_NCHAR) {
......@@ -419,21 +425,14 @@ static void **doSetResultRowData(SSqlObj *pSql) {
tscError("%p charset:%s to %s. val:%ls convert failed.", pSql, DEFAULT_UNICODE_ENCODEC, tsCharset, pRes->tsrow);
pRes->tsrow[i] = NULL;
}
num++;
}
}
assert(num <= pQueryInfo->fieldsInfo.numOfOutputCols);
return pRes->tsrow;
}
static void **getOneRowFromBuf(SSqlObj *pSql) {
doSetResultRowData(pSql);
SSqlRes *pRes = &pSql->res;
pRes->row++;
pRes->row++; // index increase one-step
return pRes->tsrow;
}
......@@ -493,7 +492,7 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) {
while (1) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
if (pRes->tsrow == NULL) {
pRes->tsrow = malloc(POINTER_BYTES * pQueryInfo->exprsInfo.numOfExprs);
pRes->tsrow = calloc(pQueryInfo->exprsInfo.numOfExprs, POINTER_BYTES);
}
bool success = false;
......@@ -506,29 +505,21 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) {
}
if (numOfTableHasRes >= 2) { // do merge result
SSqlRes *pRes1 = &pSql->pSubs[0]->res;
SSqlRes *pRes2 = &pSql->pSubs[1]->res;
if (pRes1->row < pRes1->numOfRows && pRes2->row < pRes2->numOfRows) {
doSetResultRowData(pSql->pSubs[0]);
doSetResultRowData(pSql->pSubs[1]);
success = (doSetResultRowData(pSql->pSubs[0]) != NULL) &&
(doSetResultRowData(pSql->pSubs[1]) != NULL);
// TSKEY key1 = *(TSKEY *)pRes1->tsrow[0];
// TSKEY key2 = *(TSKEY *)pRes2->tsrow[0];
// printf("first:%" PRId64 ", second:%" PRId64 "\n", key1, key2);
success = true;
pRes1->row++;
pRes2->row++;
}
} else { // only one subquery
SSqlObj *pSub = pSql->pSubs[0];
if (pSub == NULL) {
pSub = pSql->pSubs[1];
}
SSqlRes *pRes1 = &pSub->res;
doSetResultRowData(pSub);
success = (doSetResultRowData(pSub) != NULL);
success = (pRes1->row++ < pRes1->numOfRows);
// success = (pRes1->row++ < pRes1->numOfRows);
}
if (success) { // current row of final output has been built, return to app
......@@ -629,7 +620,7 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) {
}
}
return getOneRowFromBuf(pSql);
return doSetResultRowData(pSql);
}
TAOS_ROW taos_fetch_row(TAOS_RES *res) {
......
......@@ -48,6 +48,7 @@ static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
extern int tsTscEnableRecordSql;
extern int tsNumOfLogLines;
void taosInitNote(int numOfNoteLines, int maxNotes, char* lable);
void deltaToUtcInitOnce();
void tscCheckDiskUsage(void *para, void *unused) {
taosGetDisk();
......@@ -60,6 +61,7 @@ void taos_init_imp() {
SRpcInit rpcInit;
srand(taosGetTimestampSec());
deltaToUtcInitOnce();
if (tscEmbedded == 0) {
/*
......
......@@ -590,6 +590,11 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
}
dataBuf->nAllocSize = (uint32_t)initialSize;
dataBuf->headerSize = startOffset; // the header size will always be the startOffset value, reserved for the subumit block header
if (dataBuf->nAllocSize <= dataBuf->headerSize) {
dataBuf->nAllocSize = dataBuf->headerSize*2;
}
dataBuf->pData = calloc(1, dataBuf->nAllocSize);
dataBuf->ordered = true;
dataBuf->prevTS = INT64_MIN;
......@@ -1083,6 +1088,37 @@ SSqlExpr* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index) {
return &pQueryInfo->exprsInfo.pExprs[index];
}
void* tscSqlExprDestroy(SSqlExpr* pExpr) {
if (pExpr == NULL) {
return NULL;
}
for(int32_t i = 0; i < tListLen(pExpr->param); ++i) {
tVariantDestroy(&pExpr->param[i]);
}
return NULL;
}
/*
* NOTE: Does not release SSqlExprInfo here.
*/
void tscSqlExprInfoDestroy(SSqlExprInfo* pExprInfo) {
if (pExprInfo->numOfAlloc == 0) {
return;
}
for(int32_t i = 0; i < pExprInfo->numOfAlloc; ++i) {
tscSqlExprDestroy(&pExprInfo->pExprs[i]);
}
tfree(pExprInfo->pExprs);
pExprInfo->numOfAlloc = 0;
pExprInfo->numOfExprs = 0;
}
void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t tableuid) {
if (src == NULL) {
return;
......@@ -1090,7 +1126,7 @@ void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t tableui
*dst = *src;
dst->pExprs = malloc(sizeof(SSqlExpr) * dst->numOfAlloc);
dst->pExprs = calloc(dst->numOfAlloc, sizeof(SSqlExpr));
int16_t num = 0;
for (int32_t i = 0; i < src->numOfExprs; ++i) {
if (src->pExprs[i].uid == tableuid) {
......@@ -1667,7 +1703,7 @@ static void doClearSubqueryInfo(SQueryInfo* pQueryInfo) {
tscTagCondRelease(&pQueryInfo->tagCond);
tscClearFieldInfo(&pQueryInfo->fieldsInfo);
tfree(pQueryInfo->exprsInfo.pExprs);
tscSqlExprInfoDestroy(&pQueryInfo->exprsInfo);
memset(&pQueryInfo->exprsInfo, 0, sizeof(pQueryInfo->exprsInfo));
tscColumnBaseInfoDestroy(&pQueryInfo->colList);
......@@ -1910,12 +1946,15 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
if (UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) {
assert(pFinalInfo->pMetricMeta != NULL);
}
tscTrace(
"%p new subquery %p, tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d,"
"fieldInfo:%d, name:%s",
"%p new subquery: %p, tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d,"
"fieldInfo:%d, name:%s, qrang:%" PRId64 " - %" PRId64 " order:%d, limit:%" PRId64,
pSql, pNew, tableIndex, pMeterMetaInfo->vnodeIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs,
pNewQueryInfo->colList.numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pFinalInfo->name);
pNewQueryInfo->colList.numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pFinalInfo->name, pNewQueryInfo->stime,
pNewQueryInfo->etime, pNewQueryInfo->order.order, pNewQueryInfo->limit.limit);
tscPrintSelectClause(pNew, 0);
return pNew;
}
......@@ -1923,7 +1962,9 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
void tscDoQuery(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
void* fp = pSql->fp;
assert(pSql->res.code == TSDB_CODE_SUCCESS);
if (pCmd->command > TSDB_SQL_LOCAL) {
tscProcessLocalCmd(pSql);
} else {
......@@ -2110,10 +2151,10 @@ void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) {
// current subclause is completed, try the next subclause
assert(pCmd->clauseIndex < pCmd->numOfClause - 1);
pCmd->clauseIndex++;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
pSql->cmd.command = pQueryInfo->command;
pCmd->clauseIndex++;
pRes->numOfTotal += pRes->numOfTotalInCurrentClause;
pRes->numOfTotalInCurrentClause = 0;
......
......@@ -42,6 +42,7 @@ int64_t taosGetTimestamp(int32_t precision);
int32_t getTimestampInUsFromStr(char* token, int32_t tokenlen, int64_t* ts);
int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec);
void deltaToUtcInitOnce();
#ifdef __cplusplus
}
......
......@@ -329,7 +329,7 @@ int32_t mgmtDoJoin(SMetricMetaMsg* pMetricMetaMsg, tQueryResultset* pRes) {
bool allEmpty = false;
for (int32_t i = 0; i < pMetricMetaMsg->numOfMeters; ++i) {
if (pRes->num == 0) { // all results are empty if one of them is empty
if (pRes[i].num == 0) { // all results are empty if one of them is empty
allEmpty = true;
break;
}
......
......@@ -5997,7 +5997,7 @@ int32_t getDataBlocksForMeters(SMeterQuerySupportObj *pSupporter, SQuery *pQuery
int32_t size = compInfo.numOfBlocks * sizeof(SCompBlock);
size_t bufferSize = size + sizeof(TSCKSUM);
pMeterDataInfo[j]->numOfBlocks = compInfo.numOfBlocks; // set to be the initial value
pMeterDataInfo[j]->numOfBlocks = compInfo.numOfBlocks;
pMeterDataInfo[j]->pBlock = calloc(1, bufferSize);
if (pMeterDataInfo[j]->pBlock == NULL) {
clearAllMeterDataBlockInfo(pMeterDataInfo, 0, j);
......@@ -6030,6 +6030,7 @@ int32_t getDataBlocksForMeters(SMeterQuerySupportObj *pSupporter, SQuery *pQuery
int32_t end = 0;
if (!getValidDataBlocksRangeIndex(pMeterDataInfo[j], pQuery, pMeterDataInfo[j]->pBlock, compInfo.numOfBlocks,
minval, maxval, &end)) {
// current table has no qualified data blocks, erase its information.
clearAllMeterDataBlockInfo(pMeterDataInfo, j, j + 1);
continue;
}
......
......@@ -1108,9 +1108,9 @@ void tsSetAllDebugFlag() {
* In case that the setLocale failed to be executed, the right charset needs to be set.
*/
void tsSetLocale() {
char msgLocale[] = "Invalid locale:%s, please set the valid locale in config file";
char msgCharset[] = "Invalid charset:%s, please set the valid charset in config file";
char msgCharset1[] = "failed to get charset, please set the valid charset in config file";
char msgLocale[] = "Invalid locale:%s, please set the valid locale in config file\n";
char msgCharset[] = "Invalid charset:%s, please set the valid charset in config file\n";
char msgCharset1[] = "failed to get charset, please set the valid charset in config file\n";
char *locale = setlocale(LC_CTYPE, tsLocale);
......
......@@ -45,6 +45,7 @@ typedef struct {
uint32_t uDebugFlag = 131; // all the messages
short tsAsyncLog = 1;
static pid_t logPid = 0;
static SLogBuff *logHandle = NULL;
static int taosLogFileNum = 1;
static int taosLogMaxLines = 0;
......@@ -82,6 +83,11 @@ int taosStartLog() {
}
int taosInitLog(char *logName, int numOfLogLines, int maxFiles) {
#ifdef LINUX
logPid = (pid_t)syscall(SYS_gettid);
#endif
logHandle = taosLogBuffNew(TSDB_DEFAULT_LOG_BUF_SIZE);
if (logHandle == NULL) return -1;
......@@ -306,8 +312,8 @@ char *tprefix(char *prefix) {
sprintf(prefix, "%02d/%02d %02d:%02d:%02d.%06d 0x%lld ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min,
ptm->tm_sec, (int)timeSecs.tv_usec, taosGetPthreadId());
#else
sprintf(prefix, "%02d/%02d %02d:%02d:%02d.%06d 0x%lx ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min,
ptm->tm_sec, (int)timeSecs.tv_usec, pthread_self());
sprintf(prefix, "%02d/%02d %02d:%02d:%02d.%06d %d 0x%lx ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min,
ptm->tm_sec, (int)timeSecs.tv_usec, logPid, pthread_self());
#endif
return prefix;
}
......@@ -333,8 +339,8 @@ void tprintf(const char *const flags, int dflag, const char *const format, ...)
len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d 0x%lld ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour,
ptm->tm_min, ptm->tm_sec, (int)timeSecs.tv_usec, taosGetPthreadId());
#else
len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d %lx ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min,
ptm->tm_sec, (int)timeSecs.tv_usec, pthread_self());
len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d %d %lx ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min,
ptm->tm_sec, (int)timeSecs.tv_usec, logPid, pthread_self());
#endif
len += sprintf(buffer + len, "%s", flags);
......@@ -424,8 +430,8 @@ void taosPrintLongString(const char *const flags, int dflag, const char *const f
len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d 0x%lld ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour,
ptm->tm_min, ptm->tm_sec, (int)timeSecs.tv_usec, taosGetPthreadId());
#else
len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d %lx ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min,
ptm->tm_sec, (int)timeSecs.tv_usec, pthread_self());
len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d %d %lx ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min,
ptm->tm_sec, (int)timeSecs.tv_usec, logPid, pthread_self());
#endif
len += sprintf(buffer + len, "%s", flags);
......
......@@ -24,6 +24,61 @@
#include "ttime.h"
#include "tutil.h"
// ==== mktime() kernel code =================//
static int64_t m_deltaUtc = 0;
void deltaToUtcInitOnce() {
struct tm tm = {0};
(void)strptime("1970-01-01 00:00:00", (const char *)("%Y-%m-%d %H:%M:%S"), &tm);
m_deltaUtc = (int64_t)mktime(&tm);
//printf("====delta:%lld\n\n", seconds);
return;
}
int64_t user_mktime(struct tm * tm)
{
#define TAOS_MINUTE 60
#define TAOS_HOUR (60*TAOS_MINUTE)
#define TAOS_DAY (24*TAOS_HOUR)
#define TAOS_YEAR (365*TAOS_DAY)
static int month[12] = {
0,
TAOS_DAY*(31),
TAOS_DAY*(31+29),
TAOS_DAY*(31+29+31),
TAOS_DAY*(31+29+31+30),
TAOS_DAY*(31+29+31+30+31),
TAOS_DAY*(31+29+31+30+31+30),
TAOS_DAY*(31+29+31+30+31+30+31),
TAOS_DAY*(31+29+31+30+31+30+31+31),
TAOS_DAY*(31+29+31+30+31+30+31+31+30),
TAOS_DAY*(31+29+31+30+31+30+31+31+30+31),
TAOS_DAY*(31+29+31+30+31+30+31+31+30+31+30)
};
int64_t res;
int year;
year= tm->tm_year - 70;
res= TAOS_YEAR*year + TAOS_DAY*((year+1)/4);
res+= month[tm->tm_mon];
if(tm->tm_mon > 1 && ((year+2)%4)) {
res-= TAOS_DAY;
}
res+= TAOS_DAY*(tm->tm_mday-1);
res+= TAOS_HOUR*tm->tm_hour;
res+= TAOS_MINUTE*tm->tm_min;
res+= tm->tm_sec;
return res + m_deltaUtc;
}
static int64_t parseFraction(char* str, char** end, int32_t timePrec);
static int32_t parseTimeWithTz(char* timestr, int64_t* time, int32_t timePrec);
static int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec);
......@@ -238,6 +293,8 @@ int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) {
/* mktime will be affected by TZ, set by using taos_options */
int64_t seconds = mktime(&tm);
//int64_t seconds = (int64_t)user_mktime(&tm);
int64_t fraction = 0;
if (*str == '.') {
......
......@@ -162,12 +162,13 @@ void* insert_rows(void *sarg)
}
// insert data
int index = 0;
int64_t begin = (int64_t)time(NULL);
int index = 0;
while (1) {
if (g_thread_exit_flag) break;
index++;
sprintf(command, "insert into %s values (%ld, %d)", winfo->tbl_name, 1546300800000+index*1000, index);
sprintf(command, "insert into %s values (%ld, %d)", winfo->tbl_name, (begin + index) * 1000, index);
if (taos_query(taos, command)) {
printf("failed to insert row [%s], reason:%s\n", command, taos_errstr(taos));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册