提交 11f5653b 编写于 作者: S Steven Li

Merge remote-tracking branch 'origin/develop' into feature/crash_gen

...@@ -480,9 +480,9 @@ TDengine支持针对数据的聚合查询。提供支持的聚合和选择函数 ...@@ -480,9 +480,9 @@ TDengine支持针对数据的聚合查询。提供支持的聚合和选择函数
- **LEASTSQUARES** - **LEASTSQUARES**
```mysql ```mysql
SELECT LEASTSQUARES(field_name) FROM tb_name [WHERE clause] SELECT LEASTSQUARES(field_name, start_val, step_val) FROM tb_name [WHERE clause]
``` ```
功能说明:统计表中某列的值是主键(时间戳)的拟合直线方程。 功能说明:统计表中某列的值是主键(时间戳)的拟合直线方程。start_val是自变量初始值,step_val是自变量的步长值。
返回结果数据类型:字符串表达式(斜率, 截距)。 返回结果数据类型:字符串表达式(斜率, 截距)。
应用字段:不能应用在timestamp、binary、nchar、bool类型字段。 应用字段:不能应用在timestamp、binary、nchar、bool类型字段。
说明:自变量是时间戳,因变量是该列的值。 说明:自变量是时间戳,因变量是该列的值。
......
...@@ -412,7 +412,7 @@ TDengine supports aggregations over numerical values, they are listed below: ...@@ -412,7 +412,7 @@ TDengine supports aggregations over numerical values, they are listed below:
SELECT PERCENTILE(field_name, P) FROM { tb_name | stb_name } [WHERE clause] SELECT PERCENTILE(field_name, P) FROM { tb_name | stb_name } [WHERE clause]
``` ```
Function: the value of the specified column below which `P` percent of the data points fall. Function: the value of the specified column below which `P` percent of the data points fall.
Return Data Type: the same data type. Return Data Type: double.
Applicable Data Types: all types except `timestamp`, `binary`, `nchar`, `bool`. Applicable Data Types: all types except `timestamp`, `binary`, `nchar`, `bool`.
Applied to: table/STable. Applied to: table/STable.
Note: The range of `P` is `[0, 100]`. When `P=0` , `PERCENTILE` returns the equal value as `MIN`; when `P=100`, `PERCENTILE` returns the equal value as `MAX`. Note: The range of `P` is `[0, 100]`. When `P=0` , `PERCENTILE` returns the equal value as `MIN`; when `P=100`, `PERCENTILE` returns the equal value as `MAX`.
...@@ -446,7 +446,7 @@ TDengine supports aggregations over numerical values, they are listed below: ...@@ -446,7 +446,7 @@ TDengine supports aggregations over numerical values, they are listed below:
SELECT SPREAD(field_name) FROM { tb_name | stb_name } [WHERE clause] SELECT SPREAD(field_name) FROM { tb_name | stb_name } [WHERE clause]
``` ```
Function: return the difference between the maximum and the mimimum value. Function: return the difference between the maximum and the mimimum value.
Return Data Type: the same data type. Return Data Type: double.
Applicable Data Types: all types except `timestamp`, `binary`, `nchar`, `bool`. Applicable Data Types: all types except `timestamp`, `binary`, `nchar`, `bool`.
Applied to: table/STable. Applied to: table/STable.
Note: spread gives the range of data variation in a table/supertable; it is equivalent to `MAX()` - `MIN()` Note: spread gives the range of data variation in a table/supertable; it is equivalent to `MAX()` - `MIN()`
......
...@@ -31,9 +31,7 @@ extern int32_t tscEmbedded; ...@@ -31,9 +31,7 @@ extern int32_t tscEmbedded;
#define tscInfo(...) { if (cDebugFlag & DEBUG_INFO) { taosPrintLog("TSC INFO ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} #define tscInfo(...) { if (cDebugFlag & DEBUG_INFO) { taosPrintLog("TSC INFO ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }}
#define tscDebug(...) { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSC DEBUG ", cDebugFlag, __VA_ARGS__); }} #define tscDebug(...) { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSC DEBUG ", cDebugFlag, __VA_ARGS__); }}
#define tscTrace(...) { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC TRACE ", cDebugFlag, __VA_ARGS__); }} #define tscTrace(...) { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC TRACE ", cDebugFlag, __VA_ARGS__); }}
#define tscDebugL(...){ if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC DEBUG ", cDebugFlag, __VA_ARGS__); }}
#define tscDebugDump(...) { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC DEBUG ", cDebugFlag, __VA_ARGS__); }}
#define tscTraceDump(...) { if (cDebugFlag & DEBUG_TRACE) { taosPrintLongString("TSC TRACE ", cDebugFlag, __VA_ARGS__); }}
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -110,8 +110,6 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size ...@@ -110,8 +110,6 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
//todo tags value as well as the table id structure needs refactor //todo tags value as well as the table id structure needs refactor
char *tsGetTagsValue(STableMeta *pMeta); char *tsGetTagsValue(STableMeta *pMeta);
void extractTableNameFromToken(SSQLToken *pToken, SSQLToken* pTable);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -29,9 +29,6 @@ ...@@ -29,9 +29,6 @@
#define jniDebug(...) { if (jniDebugFlag & DEBUG_DEBUG) { taosPrintLog("JNI DEBUG ", jniDebugFlag, __VA_ARGS__); }} #define jniDebug(...) { if (jniDebugFlag & DEBUG_DEBUG) { taosPrintLog("JNI DEBUG ", jniDebugFlag, __VA_ARGS__); }}
#define jniTrace(...) { if (jniDebugFlag & DEBUG_TRACE) { taosPrintLog("JNI TRACE ", jniDebugFlag, __VA_ARGS__); }} #define jniTrace(...) { if (jniDebugFlag & DEBUG_TRACE) { taosPrintLog("JNI TRACE ", jniDebugFlag, __VA_ARGS__); }}
#define jniDebugDump(...) { if (jniDebugFlag & DEBUG_DEBUG) { taosPrintLongString("JNI DEBUG ", jniDebugFlag, __VA_ARGS__); }}
#define jniTraceDump(...) { if (jniDebugFlag & DEBUG_TRACE) { taosPrintLongString("JNI DEBUG ", jniDebugFlag, __VA_ARGS__); }}
int __init = 0; int __init = 0;
JavaVM *g_vm = NULL; JavaVM *g_vm = NULL;
......
...@@ -55,7 +55,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const ...@@ -55,7 +55,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
strtolower(pSql->sqlstr, sqlstr); strtolower(pSql->sqlstr, sqlstr);
tscDebugDump("%p SQL: %s", pSql, pSql->sqlstr); tscDebugL("%p SQL: %s", pSql, pSql->sqlstr);
pSql->cmd.curSql = pSql->sqlstr; pSql->cmd.curSql = pSql->sqlstr;
int32_t code = tsParseSql(pSql, true); int32_t code = tsParseSql(pSql, true);
...@@ -471,7 +471,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -471,7 +471,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
} }
// in case of insert, redo parsing the sql string and build new submit data block for two reasons: // in case of insert, redo parsing the sql string and build new submit data block for two reasons:
// 1. the table Id(tid & uid) may have been update, the submit block needs to be updated // 1. the table Id(tid & uid) may have been update, the submit block needs to be updated accordingly.
// 2. vnode may need the schema information along with submit block to update its local table schema. // 2. vnode may need the schema information along with submit block to update its local table schema.
if (pCmd->command == TSDB_SQL_INSERT) { if (pCmd->command == TSDB_SQL_INSERT) {
tscDebug("%p redo parse sql string to build submit block", pSql); tscDebug("%p redo parse sql string to build submit block", pSql);
......
...@@ -14,7 +14,6 @@ ...@@ -14,7 +14,6 @@
*/ */
#include "os.h" #include "os.h"
#include "qast.h"
#include "qextbuffer.h" #include "qextbuffer.h"
#include "qfill.h" #include "qfill.h"
#include "qhistogram.h" #include "qhistogram.h"
...@@ -23,6 +22,7 @@ ...@@ -23,6 +22,7 @@
#include "qtsbuf.h" #include "qtsbuf.h"
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "qast.h"
#include "tscLog.h" #include "tscLog.h"
#include "tscSubquery.h" #include "tscSubquery.h"
#include "tscompression.h" #include "tscompression.h"
......
...@@ -364,7 +364,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -364,7 +364,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
TSKEY stime = MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey); TSKEY stime = MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey);
int64_t revisedSTime = int64_t revisedSTime =
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, tinfo.precision); taosGetIntervalStartTimestamp(stime, pQueryInfo->slidingTime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, tinfo.precision);
if (pQueryInfo->fillType != TSDB_FILL_NONE) { if (pQueryInfo->fillType != TSDB_FILL_NONE) {
SFillColInfo* pFillCol = createFillColInfo(pQueryInfo); SFillColInfo* pFillCol = createFillColInfo(pQueryInfo);
...@@ -831,7 +831,7 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo ...@@ -831,7 +831,7 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo
if (pFillInfo != NULL) { if (pFillInfo != NULL) {
int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey; int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey;
int64_t revisedSTime = int64_t revisedSTime =
taosGetIntervalStartTimestamp(stime, pQueryInfo->slidingTime, pQueryInfo->slidingTimeUnit, tinfo.precision); taosGetIntervalStartTimestamp(stime, pQueryInfo->slidingTime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, tinfo.precision);
taosResetFillInfo(pFillInfo, revisedSTime); taosResetFillInfo(pFillInfo, revisedSTime);
} }
...@@ -1301,9 +1301,7 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer ...@@ -1301,9 +1301,7 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer
if (pQueryInfo->fillType != TSDB_FILL_NONE) { if (pQueryInfo->fillType != TSDB_FILL_NONE) {
TSKEY skey = MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey); TSKEY skey = MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey);
int64_t newTime = int64_t newTime =
taosGetIntervalStartTimestamp(skey, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, precision); taosGetIntervalStartTimestamp(skey, pQueryInfo->slidingTime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, precision);
// taosResetFillInfo(pLocalReducer->pFillInfo, pQueryInfo->order.order, newTime,
// pQueryInfo->groupbyExpr.numOfGroupCols, 4096, 0, NULL, pLocalReducer->rowSize);
taosResetFillInfo(pLocalReducer->pFillInfo, newTime); taosResetFillInfo(pLocalReducer->pFillInfo, newTime);
} }
} }
......
...@@ -538,7 +538,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { ...@@ -538,7 +538,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
pRes->numOfRows = 1; pRes->numOfRows = 1;
strtolower(pSql->sqlstr, sql); strtolower(pSql->sqlstr, sql);
tscDebugDump("%p SQL: %s", pSql, pSql->sqlstr); tscDebugL("%p SQL: %s", pSql, pSql->sqlstr);
if (tscIsInsertData(pSql->sqlstr)) { if (tscIsInsertData(pSql->sqlstr)) {
pStmt->isInsert = true; pStmt->isInsert = true;
......
...@@ -18,19 +18,19 @@ ...@@ -18,19 +18,19 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "qast.h"
#include "taos.h" #include "taos.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tstoken.h" #include "qast.h"
#include "tstrbuild.h" #include "tcompare.h"
#include "ttime.h" #include "tname.h"
#include "tscLog.h" #include "tscLog.h"
#include "tscUtil.h" #include "tscUtil.h"
#include "tschemautil.h" #include "tschemautil.h"
#include "tsclient.h" #include "tsclient.h"
#include "tstoken.h"
#include "tstrbuild.h"
#include "ttime.h"
#include "ttokendef.h" #include "ttokendef.h"
#include "tname.h"
#include "tcompare.h"
#define DEFAULT_PRIMARY_TIMESTAMP_COL_NAME "_c0" #define DEFAULT_PRIMARY_TIMESTAMP_COL_NAME "_c0"
...@@ -4487,10 +4487,12 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -4487,10 +4487,12 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload; SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
pUpdateMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId); pUpdateMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId);
pUpdateMsg->tid = htonl(pTableMeta->sid); pUpdateMsg->tid = htonl(pTableMeta->sid);
pUpdateMsg->uid = htobe64(pTableMeta->uid); pUpdateMsg->uid = htobe64(pTableMeta->uid);
pUpdateMsg->colId = htons(pTagsSchema->colId); pUpdateMsg->colId = htons(pTagsSchema->colId);
pUpdateMsg->tversion = htons(pTableMeta->tversion); pUpdateMsg->type = pTagsSchema->type;
pUpdateMsg->bytes = htons(pTagsSchema->bytes);
pUpdateMsg->tversion = htons(pTableMeta->tversion);
pUpdateMsg->numOfTags = htons(numOfTags); pUpdateMsg->numOfTags = htons(numOfTags);
pUpdateMsg->schemaLen = htonl(schemaLen); pUpdateMsg->schemaLen = htonl(schemaLen);
......
...@@ -215,25 +215,3 @@ __attribute__ ((unused)) static FORCE_INLINE size_t copy(char* dst, const char* ...@@ -215,25 +215,3 @@ __attribute__ ((unused)) static FORCE_INLINE size_t copy(char* dst, const char*
return len; return len;
} }
/*
* tablePrefix.columnName
* extract table name and save it in pTable, with only column name in pToken
*/
void extractTableNameFromToken(SSQLToken* pToken, SSQLToken* pTable) {
const char sep = TS_PATH_DELIMITER[0];
if (pToken == pTable || pToken == NULL || pTable == NULL) {
return;
}
char* r = strnchr(pToken->z, sep, pToken->n, false);
if (r != NULL) { // record the table name token
pTable->n = r - pToken->z;
pTable->z = pToken->z;
r += 1;
pToken->n -= (r - pToken->z);
pToken->z = r;
}
}
...@@ -247,7 +247,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { ...@@ -247,7 +247,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
} else { } else {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
if (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID || if (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
if (pCmd->command == TSDB_SQL_CONNECT) { if (pCmd->command == TSDB_SQL_CONNECT) {
rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
...@@ -260,7 +260,12 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { ...@@ -260,7 +260,12 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
// get table meta query will not retry, do nothing // get table meta query will not retry, do nothing
} else { } else {
tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry); tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);
// set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
if (rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
pSql->cmd.submitSchema = 1;
}
pSql->res.code = rpcMsg->code; // keep the previous error code pSql->res.code = rpcMsg->code; // keep the previous error code
if (pSql->retry > pSql->maxRetry) { if (pSql->retry > pSql->maxRetry) {
tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry); tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
...@@ -433,8 +438,9 @@ void tscKillSTableQuery(SSqlObj *pSql) { ...@@ -433,8 +438,9 @@ void tscKillSTableQuery(SSqlObj *pSql) {
* here, we cannot set the command = TSDB_SQL_KILL_QUERY. Otherwise, it may cause * here, we cannot set the command = TSDB_SQL_KILL_QUERY. Otherwise, it may cause
* sub-queries not correctly released and master sql object of super table query reaches an abnormal state. * sub-queries not correctly released and master sql object of super table query reaches an abnormal state.
*/ */
pSql->pSubs[i]->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; rpcCancelRequest(pSub->pRpcCtx);
rpcCancelRequest(pSql->pSubs[i]->pRpcCtx); pSub->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
tscQueueAsyncRes(pSub);
} }
/* /*
......
...@@ -617,19 +617,18 @@ void taos_stop_query(TAOS_RES *res) { ...@@ -617,19 +617,18 @@ void taos_stop_query(TAOS_RES *res) {
if (pSql->signature != pSql) return; if (pSql->signature != pSql) return;
tscDebug("%p start to cancel query", res); tscDebug("%p start to cancel query", res);
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
tscKillSTableQuery(pSql); tscKillSTableQuery(pSql);
return;
} }
if (pSql->cmd.command >= TSDB_SQL_LOCAL) { if (pSql->cmd.command < TSDB_SQL_LOCAL) {
return; rpcCancelRequest(pSql->pRpcCtx);
} }
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
tscQueueAsyncRes(pSql);
rpcCancelRequest(pSql->pRpcCtx);
tscDebug("%p query is cancelled", res); tscDebug("%p query is cancelled", res);
} }
......
...@@ -503,7 +503,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p ...@@ -503,7 +503,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
} }
strtolower(pSql->sqlstr, sqlstr); strtolower(pSql->sqlstr, sqlstr);
tscDebugDump("%p SQL: %s", pSql, pSql->sqlstr); tscDebugL("%p SQL: %s", pSql, pSql->sqlstr);
tsem_init(&pSql->rspSem, 0, 0); tsem_init(&pSql->rspSem, 0, 0);
int32_t code = tsParseSql(pSql, true); int32_t code = tsParseSql(pSql, true);
......
...@@ -14,12 +14,12 @@ ...@@ -14,12 +14,12 @@
*/ */
#include "os.h" #include "os.h"
#include "tscSubquery.h" #include "qtsbuf.h"
#include "qast.h" #include "qast.h"
#include "tcompare.h" #include "tcompare.h"
#include "tschemautil.h"
#include "qtsbuf.h"
#include "tscLog.h" #include "tscLog.h"
#include "tscSubquery.h"
#include "tschemautil.h"
#include "tsclient.h" #include "tsclient.h"
typedef struct SInsertSupporter { typedef struct SInsertSupporter {
...@@ -57,10 +57,15 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ ...@@ -57,10 +57,15 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
pSubQueryInfo1->tsBuf = output1; pSubQueryInfo1->tsBuf = output1;
pSubQueryInfo2->tsBuf = output2; pSubQueryInfo2->tsBuf = output2;
// no result generated, return directly
if (pSupporter1->pTSBuf == NULL || pSupporter2->pTSBuf == NULL) {
tscDebug("%p at least one ts-comp is empty, 0 for secondary query after ts blocks intersecting", pSql);
return 0;
}
tsBufResetPos(pSupporter1->pTSBuf); tsBufResetPos(pSupporter1->pTSBuf);
tsBufResetPos(pSupporter2->pTSBuf); tsBufResetPos(pSupporter2->pTSBuf);
// TODO add more details information
if (!tsBufNextPos(pSupporter1->pTSBuf)) { if (!tsBufNextPos(pSupporter1->pTSBuf)) {
tsBufFlush(output1); tsBufFlush(output1);
tsBufFlush(output2); tsBufFlush(output2);
...@@ -210,6 +215,7 @@ static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) { ...@@ -210,6 +215,7 @@ static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) {
pSupporter->f = NULL; pSupporter->f = NULL;
} }
tfree(pSupporter->pIdTagList);
tscTagCondRelease(&pSupporter->tagCond); tscTagCondRelease(&pSupporter->tagCond);
free(pSupporter); free(pSupporter);
} }
...@@ -420,43 +426,6 @@ static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) { ...@@ -420,43 +426,6 @@ static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) {
pQueryInfo->window = *win; pQueryInfo->window = *win;
} }
static UNUSED_FUNC void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj* pSql) {
SSqlObj* pParentSql = pSupporter->pObj;
SQueryInfo* pParentQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex);
// if (tscNonOrderedProjectionQueryOnSTable(pParentQueryInfo, 0)) {
// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// assert(pQueryInfo->numOfTables == 1);
//
// // for projection query, need to try next vnode
//// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes;
// int32_t totalVnode = 0;
// if ((++pTableMetaInfo->vgroupIndex) < totalVnode) {
// tscDebug("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql,
// pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVnode, pRes->numOfTotal);
//
// pSql->cmd.command = TSDB_SQL_SELECT;
// pSql->fp = tscJoinQueryCallback;
// tscProcessSql(pSql);
//
// return;
// }
// }
SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
STimeWindow win = TSWINDOW_INITIALIZER;
int64_t num = doTSBlockIntersect(pParentSql, p1, p2, &win);
if (num <= 0) { // no result during ts intersect
tscDebug("%p free all sub SqlObj and quit", pParentSql);
freeJoinSubqueryObj(pParentSql);
} else {
updateQueryTimeRange(pParentQueryInfo, &win);
tscLaunchRealSubqueries(pParentSql);
}
}
int32_t tscCompareTidTags(const void* p1, const void* p2) { int32_t tscCompareTidTags(const void* p1, const void* p2) {
const STidTags* t1 = (const STidTags*) varDataVal(p1); const STidTags* t1 = (const STidTags*) varDataVal(p1);
const STidTags* t2 = (const STidTags*) varDataVal(p2); const STidTags* t2 = (const STidTags*) varDataVal(p2);
...@@ -713,9 +682,12 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -713,9 +682,12 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
SArray *s1 = NULL, *s2 = NULL; SArray *s1 = NULL, *s2 = NULL;
getIntersectionOfTableTuple(pQueryInfo, pParentSql, &s1, &s2); getIntersectionOfTableTuple(pQueryInfo, pParentSql, &s1, &s2);
if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) { // no results,return. if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) { // no results,return.
tscDebug("%p free all sub SqlObj and quit", pParentSql); tscDebug("%p tag intersect does not generated qualified tables for join, free all sub SqlObj and quit", pParentSql);
freeJoinSubqueryObj(pParentSql); freeJoinSubqueryObj(pParentSql);
// set no result command
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
(*pParentSql->fp)(pParentSql->param, pParentSql, 0);
} else { } else {
// proceed to for ts_comp query // proceed to for ts_comp query
SSqlCmd* pSubCmd1 = &pParentSql->pSubs[0]->cmd; SSqlCmd* pSubCmd1 = &pParentSql->pSubs[0]->cmd;
...@@ -846,7 +818,10 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -846,7 +818,10 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
if (num <= 0) { // no result during ts intersect if (num <= 0) { // no result during ts intersect
tscDebug("%p no results generated in ts intersection, free all sub SqlObj and quit", pParentSql); tscDebug("%p no results generated in ts intersection, free all sub SqlObj and quit", pParentSql);
freeJoinSubqueryObj(pParentSql); freeJoinSubqueryObj(pParentSql);
// set no result command
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
(*pParentSql->fp)(pParentSql->param, pParentSql, 0);
return; return;
} }
......
...@@ -14,21 +14,21 @@ ...@@ -14,21 +14,21 @@
*/ */
#include "os.h" #include "os.h"
#include "qast.h" #include "hash.h"
#include "tscUtil.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "qast.h"
#include "tcache.h" #include "tcache.h"
#include "tkey.h" #include "tkey.h"
#include "tmd5.h" #include "tmd5.h"
#include "tscProfile.h"
#include "tscLocalMerge.h" #include "tscLocalMerge.h"
#include "tscLog.h"
#include "tscProfile.h"
#include "tscSubquery.h" #include "tscSubquery.h"
#include "tschemautil.h" #include "tschemautil.h"
#include "tsclient.h" #include "tsclient.h"
#include "ttimer.h" #include "ttimer.h"
#include "ttokendef.h" #include "ttokendef.h"
#include "tscLog.h"
#include "tscUtil.h"
#include "hash.h"
static void freeQueryInfoImpl(SQueryInfo* pQueryInfo); static void freeQueryInfoImpl(SQueryInfo* pQueryInfo);
static void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache); static void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache);
...@@ -579,9 +579,9 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo ...@@ -579,9 +579,9 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo
int32_t numOfCols = tscGetNumOfColumns(pTableDataBlock->pTableMeta); int32_t numOfCols = tscGetNumOfColumns(pTableDataBlock->pTableMeta);
for(int32_t j = 0; j < numOfCols; ++j) { for(int32_t j = 0; j < numOfCols; ++j) {
STColumn* pCol = (STColumn*) pDataBlock; STColumn* pCol = (STColumn*) pDataBlock;
pCol->colId = pSchema[j].colId; pCol->colId = htons(pSchema[j].colId);
pCol->type = pSchema[j].type; pCol->type = pSchema[j].type;
pCol->bytes = pSchema[j].bytes; pCol->bytes = htons(pSchema[j].bytes);
pCol->offset = 0; pCol->offset = 0;
pDataBlock += sizeof(STColumn); pDataBlock += sizeof(STColumn);
...@@ -663,7 +663,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { ...@@ -663,7 +663,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
} }
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData; SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize; int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
if (dataBuf->nAllocSize < destSize) { if (dataBuf->nAllocSize < destSize) {
while (dataBuf->nAllocSize < destSize) { while (dataBuf->nAllocSize < destSize) {
...@@ -691,7 +691,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { ...@@ -691,7 +691,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
tscDebug("%p tableId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableId, tscDebug("%p tableId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableId,
pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(ekey)); pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(ekey));
int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + expandSize); int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + expandSize) + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
pBlocks->tid = htonl(pBlocks->tid); pBlocks->tid = htonl(pBlocks->tid);
pBlocks->uid = htobe64(pBlocks->uid); pBlocks->uid = htobe64(pBlocks->uid);
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
#include "os.h" #include "os.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tstoken.h"
typedef struct SDataStatis { typedef struct SDataStatis {
int16_t colId; int16_t colId;
...@@ -23,10 +24,14 @@ void extractTableName(const char *tableId, char *name); ...@@ -23,10 +24,14 @@ void extractTableName(const char *tableId, char *name);
char* extractDBName(const char *tableId, char *name); char* extractDBName(const char *tableId, char *name);
void extractTableNameFromToken(SSQLToken *pToken, SSQLToken* pTable);
SSchema tGetTableNameColumnSchema(); SSchema tGetTableNameColumnSchema();
bool tscValidateTableNameLength(size_t len); bool tscValidateTableNameLength(size_t len);
SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters); SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters);
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, int64_t intervalTime, char timeUnit, int16_t precision);
#endif // TDENGINE_NAME_H #endif // TDENGINE_NAME_H
...@@ -32,9 +32,6 @@ extern int32_t tscEmbedded; ...@@ -32,9 +32,6 @@ extern int32_t tscEmbedded;
#define uDebug(...) { if (uDebugFlag & DEBUG_DEBUG) { taosPrintLog("UTL DEBUG ", uDebugFlag, __VA_ARGS__); }} #define uDebug(...) { if (uDebugFlag & DEBUG_DEBUG) { taosPrintLog("UTL DEBUG ", uDebugFlag, __VA_ARGS__); }}
#define uTrace(...) { if (uDebugFlag & DEBUG_TRACE) { taosPrintLog("UTL TRACE ", uDebugFlag, __VA_ARGS__); }} #define uTrace(...) { if (uDebugFlag & DEBUG_TRACE) { taosPrintLog("UTL TRACE ", uDebugFlag, __VA_ARGS__); }}
#define uDebugDump(...) { if (uDebugFlag & DEBUG_DEBUG) { taosPrintLongString("UTL DEBUG ", uDebugFlag, __VA_ARGS__); }}
#define uTraceDump(...) { if (uDebugFlag & DEBUG_TRACE) { taosPrintLongString("UTL TRACE ", uDebugFlag, __VA_ARGS__); }}
#define pError(...) { taosPrintLog("APP ERROR ", 255, __VA_ARGS__); } #define pError(...) { taosPrintLog("APP ERROR ", 255, __VA_ARGS__); }
#define pPrint(...) { taosPrintLog("APP INFO ", 255, __VA_ARGS__); } #define pPrint(...) { taosPrintLog("APP INFO ", 255, __VA_ARGS__); }
......
...@@ -1210,7 +1210,7 @@ void taosInitGlobalCfg() { ...@@ -1210,7 +1210,7 @@ void taosInitGlobalCfg() {
} }
bool taosCheckGlobalCfg() { bool taosCheckGlobalCfg() {
if (debugFlag & DEBUG_TRACE || debugFlag & DEBUG_DEBUG) { if (debugFlag & DEBUG_TRACE || debugFlag & DEBUG_DEBUG || debugFlag & DEBUG_DUMP) {
taosSetAllDebugFlag(); taosSetAllDebugFlag();
} }
......
...@@ -75,3 +75,56 @@ SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numO ...@@ -75,3 +75,56 @@ SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numO
return pFilter; return pFilter;
} }
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, int64_t intervalTime, char timeUnit, int16_t precision) {
if (slidingTime == 0) {
return startTime;
}
int64_t start = ((startTime - slidingTime) / slidingTime + 1) * slidingTime;
if (!(timeUnit == 'a' || timeUnit == 'm' || timeUnit == 's' || timeUnit == 'h')) {
/*
* here we revised the start time of day according to the local time zone,
* but in case of DST, the start time of one day need to be dynamically decided.
*/
// todo refactor to extract function that is available for Linux/Windows/Mac platform
#if defined(WINDOWS) && _MSC_VER >= 1900
// see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019
int64_t timezone = _timezone;
int32_t daylight = _daylight;
char** tzname = _tzname;
#endif
int64_t t = (precision == TSDB_TIME_PRECISION_MILLI) ? MILLISECOND_PER_SECOND : MILLISECOND_PER_SECOND * 1000L;
start += timezone * t;
}
int64_t end = start + intervalTime - 1;
if (end < startTime) {
start += slidingTime;
}
return start;
}
/*
* tablePrefix.columnName
* extract table name and save it in pTable, with only column name in pToken
*/
void extractTableNameFromToken(SSQLToken* pToken, SSQLToken* pTable) {
const char sep = TS_PATH_DELIMITER[0];
if (pToken == pTable || pToken == NULL || pTable == NULL) {
return;
}
char* r = strnchr(pToken->z, sep, pToken->n, false);
if (r != NULL) { // record the table name token
pTable->n = r - pToken->z;
pTable->z = pToken->z;
r += 1;
pToken->n -= (r - pToken->z);
pToken->z = r;
}
}
...@@ -131,8 +131,8 @@ static void dnodeFreeMnodeWriteMsg(SMnodeMsg *pWrite) { ...@@ -131,8 +131,8 @@ static void dnodeFreeMnodeWriteMsg(SMnodeMsg *pWrite) {
taosFreeQitem(pWrite); taosFreeQitem(pWrite);
} }
void dnodeSendRpcMnodeWriteRsp(void *pRaw, int32_t code) { void dnodeSendRpcMnodeWriteRsp(void *pMsg, int32_t code) {
SMnodeMsg *pWrite = pRaw; SMnodeMsg *pWrite = pMsg;
if (pWrite == NULL) return; if (pWrite == NULL) return;
if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return;
if (code == TSDB_CODE_MND_ACTION_NEED_REPROCESSED) { if (code == TSDB_CODE_MND_ACTION_NEED_REPROCESSED) {
......
...@@ -106,6 +106,12 @@ int32_t dnodeInitMgmt() { ...@@ -106,6 +106,12 @@ int32_t dnodeInitMgmt() {
} }
} }
int32_t code = vnodeInitResources();
if (code != TSDB_CODE_SUCCESS) {
dnodeCleanupMgmt();
return -1;
}
// create the queue and thread to handle the message // create the queue and thread to handle the message
tsMgmtQset = taosOpenQset(); tsMgmtQset = taosOpenQset();
if (tsMgmtQset == NULL) { if (tsMgmtQset == NULL) {
...@@ -127,7 +133,7 @@ int32_t dnodeInitMgmt() { ...@@ -127,7 +133,7 @@ int32_t dnodeInitMgmt() {
pthread_attr_init(&thAttr); pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
int32_t code = pthread_create(&tsQthread, &thAttr, dnodeProcessMgmtQueue, NULL); code = pthread_create(&tsQthread, &thAttr, dnodeProcessMgmtQueue, NULL);
pthread_attr_destroy(&thAttr); pthread_attr_destroy(&thAttr);
if (code != 0) { if (code != 0) {
dError("failed to create thread to process mgmt queue, reason:%s", strerror(errno)); dError("failed to create thread to process mgmt queue, reason:%s", strerror(errno));
...@@ -282,13 +288,12 @@ static void *dnodeOpenVnode(void *param) { ...@@ -282,13 +288,12 @@ static void *dnodeOpenVnode(void *param) {
} }
static int32_t dnodeOpenVnodes() { static int32_t dnodeOpenVnodes() {
int32_t *vnodeList = calloc(TSDB_MAX_VNODES, sizeof(int32_t)); int32_t vnodeList[TSDB_MAX_VNODES] = {0};
int32_t numOfVnodes = 0; int32_t numOfVnodes = 0;
int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes); int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes);
if (status != TSDB_CODE_SUCCESS) { if (status != TSDB_CODE_SUCCESS) {
dInfo("get dnode list failed"); dInfo("get dnode list failed");
free(vnodeList);
return status; return status;
} }
...@@ -334,7 +339,6 @@ static int32_t dnodeOpenVnodes() { ...@@ -334,7 +339,6 @@ static int32_t dnodeOpenVnodes() {
free(pThread->vnodeList); free(pThread->vnodeList);
} }
free(vnodeList);
free(threads); free(threads);
dInfo("there are total vnodes:%d, openned:%d failed:%d", numOfVnodes, openVnodes, failedVnodes); dInfo("there are total vnodes:%d, openned:%d failed:%d", numOfVnodes, openVnodes, failedVnodes);
...@@ -342,7 +346,7 @@ static int32_t dnodeOpenVnodes() { ...@@ -342,7 +346,7 @@ static int32_t dnodeOpenVnodes() {
} }
void dnodeStartStream() { void dnodeStartStream() {
int32_t vnodeList[TSDB_MAX_VNODES]; int32_t vnodeList[TSDB_MAX_VNODES] = {0};
int32_t numOfVnodes = 0; int32_t numOfVnodes = 0;
int32_t status = vnodeGetVnodeList(vnodeList, &numOfVnodes); int32_t status = vnodeGetVnodeList(vnodeList, &numOfVnodes);
...@@ -359,7 +363,7 @@ void dnodeStartStream() { ...@@ -359,7 +363,7 @@ void dnodeStartStream() {
} }
static void dnodeCloseVnodes() { static void dnodeCloseVnodes() {
int32_t vnodeList[TSDB_MAX_VNODES]; int32_t vnodeList[TSDB_MAX_VNODES]= {0};
int32_t numOfVnodes = 0; int32_t numOfVnodes = 0;
int32_t status; int32_t status;
......
...@@ -210,6 +210,7 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -210,6 +210,7 @@ static void *dnodeProcessWriteQueue(void *param) {
int32_t numOfMsgs; int32_t numOfMsgs;
int type; int type;
void *pVnode, *item; void *pVnode, *item;
SRspRet *pRspRet;
dDebug("write worker:%d is running", pWorker->workerId); dDebug("write worker:%d is running", pWorker->workerId);
...@@ -222,9 +223,11 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -222,9 +223,11 @@ static void *dnodeProcessWriteQueue(void *param) {
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
pWrite = NULL; pWrite = NULL;
pRspRet = NULL;
taosGetQitem(pWorker->qall, &type, &item); taosGetQitem(pWorker->qall, &type, &item);
if (type == TAOS_QTYPE_RPC) { if (type == TAOS_QTYPE_RPC) {
pWrite = (SWriteMsg *)item; pWrite = (SWriteMsg *)item;
pRspRet = &pWrite->rspRet;
pHead = (SWalHead *)(pWrite->pCont - sizeof(SWalHead)); pHead = (SWalHead *)(pWrite->pCont - sizeof(SWalHead));
pHead->msgType = pWrite->rpcMsg.msgType; pHead->msgType = pWrite->rpcMsg.msgType;
pHead->version = 0; pHead->version = 0;
...@@ -234,7 +237,7 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -234,7 +237,7 @@ static void *dnodeProcessWriteQueue(void *param) {
pHead = (SWalHead *)item; pHead = (SWalHead *)item;
} }
int32_t code = vnodeProcessWrite(pVnode, type, pHead, item); int32_t code = vnodeProcessWrite(pVnode, type, pHead, pRspRet);
if (pWrite) pWrite->rpcMsg.code = code; if (pWrite) pWrite->rpcMsg.code = code;
} }
...@@ -247,6 +250,11 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -247,6 +250,11 @@ static void *dnodeProcessWriteQueue(void *param) {
if (type == TAOS_QTYPE_RPC) { if (type == TAOS_QTYPE_RPC) {
pWrite = (SWriteMsg *)item; pWrite = (SWriteMsg *)item;
dnodeSendRpcVnodeWriteRsp(pVnode, item, pWrite->rpcMsg.code); dnodeSendRpcVnodeWriteRsp(pVnode, item, pWrite->rpcMsg.code);
} else if (type == TAOS_QTYPE_FWD) {
pHead = (SWalHead *)item;
vnodeConfirmForward(pVnode, pHead->version, 0);
taosFreeQitem(item);
vnodeRelease(pVnode);
} else { } else {
taosFreeQitem(item); taosFreeQitem(item);
vnodeRelease(pVnode); vnodeRelease(pVnode);
......
...@@ -180,7 +180,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_DISK_PERMISSIONS, 0, 0x0506, "vnode no d ...@@ -180,7 +180,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_DISK_PERMISSIONS, 0, 0x0506, "vnode no d
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR, 0, 0x0507, "vnode no such file or directory") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR, 0, 0x0507, "vnode no such file or directory")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_OUT_OF_MEMORY, 0, 0x0508, "vnode out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_VND_OUT_OF_MEMORY, 0, 0x0508, "vnode out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, 0, 0x0509, "vnode app error") TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, 0, 0x0509, "vnode app error")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0214, "vnode no write auth") TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_STATUS, 0, 0x0510, "vnode not in ready state")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, 0, 0x0511, "vnode not in synced state")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0512, "vnode no write auth")
// tsdb // tsdb
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, 0, 0x0600, "tsdb invalid table id") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, 0, 0x0600, "tsdb invalid table id")
...@@ -200,6 +202,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_ACTION, 0, 0x060D, "tsdb inval ...@@ -200,6 +202,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_ACTION, 0, 0x060D, "tsdb inval
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_CREATE_TB_MSG, 0, 0x060E, "tsdb invalid create table message") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_CREATE_TB_MSG, 0, 0x060E, "tsdb invalid create table message")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM, 0, 0x060F, "tsdb no table data in memory skiplist") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM, 0, 0x060F, "tsdb no table data in memory skiplist")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_ALREADY_EXISTS, 0, 0x0610, "tsdb file already exists") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_ALREADY_EXISTS, 0, 0x0610, "tsdb file already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECONFIGURE, 0, 0x0611, "tsdb need to reconfigure table")
// query // query
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, 0, 0x0700, "query invalid handle") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, 0, 0x0700, "query invalid handle")
......
...@@ -203,8 +203,7 @@ typedef struct SSubmitBlk { ...@@ -203,8 +203,7 @@ typedef struct SSubmitBlk {
typedef struct SSubmitMsg { typedef struct SSubmitMsg {
SMsgHead header; SMsgHead header;
int32_t length; int32_t length;
int32_t compressed : 2; int32_t numOfBlocks;
int32_t numOfBlocks : 30;
SSubmitBlk blocks[]; SSubmitBlk blocks[];
} SSubmitMsg; } SSubmitMsg;
...@@ -285,6 +284,8 @@ typedef struct { ...@@ -285,6 +284,8 @@ typedef struct {
int32_t tid; int32_t tid;
int16_t tversion; int16_t tversion;
int16_t colId; int16_t colId;
int8_t type;
int16_t bytes;
int32_t tagValLen; int32_t tagValLen;
int16_t numOfTags; int16_t numOfTags;
int32_t schemaLen; int32_t schemaLen;
......
...@@ -108,12 +108,14 @@ void tsdbClearTableCfg(STableCfg *config); ...@@ -108,12 +108,14 @@ void tsdbClearTableCfg(STableCfg *config);
void* tsdbGetTableTagVal(const void* pTable, int32_t colId, int16_t type, int16_t bytes); void* tsdbGetTableTagVal(const void* pTable, int32_t colId, int16_t type, int16_t bytes);
char* tsdbGetTableName(void *pTable); char* tsdbGetTableName(void *pTable);
STableId tsdbGetTableId(void *pTable);
#define TSDB_TABLEID(_table) ((STableId*) (_table))
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg); STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);
int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg); int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg);
int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId); int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId);
int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg); int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg);
TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid); TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid);
void tsdbStartStream(TSDB_REPO_T *repo); void tsdbStartStream(TSDB_REPO_T *repo);
...@@ -233,9 +235,10 @@ bool tsdbNextDataBlock(TsdbQueryHandleT *pQueryHandle); ...@@ -233,9 +235,10 @@ bool tsdbNextDataBlock(TsdbQueryHandleT *pQueryHandle);
* Get current data block information * Get current data block information
* *
* @param pQueryHandle * @param pQueryHandle
* @param pBlockInfo
* @return * @return
*/ */
SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pQueryHandle); void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pQueryHandle, SDataBlockInfo* pBlockInfo);
/** /**
* *
......
...@@ -60,7 +60,10 @@ void* vnodeGetWal(void *pVnode); ...@@ -60,7 +60,10 @@ void* vnodeGetWal(void *pVnode);
int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item); int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item);
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes); int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
void vnodeBuildStatusMsg(void *param); void vnodeBuildStatusMsg(void *param);
void vnodeConfirmForward(void *param, uint64_t version, int32_t code);
void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes); void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes);
int32_t vnodeInitResources();
void vnodeCleanupResources(); void vnodeCleanupResources();
int32_t vnodeProcessRead(void *pVnode, SReadMsg *pReadMsg); int32_t vnodeProcessRead(void *pVnode, SReadMsg *pReadMsg);
......
...@@ -206,9 +206,10 @@ static void shellSourceFile(TAOS *con, char *fptr) { ...@@ -206,9 +206,10 @@ static void shellSourceFile(TAOS *con, char *fptr) {
if (code != 0) { if (code != 0) {
fprintf(stderr, "DB error: %s: %s (%d)\n", taos_errstr(con), fname, lineNo); fprintf(stderr, "DB error: %s: %s (%d)\n", taos_errstr(con), fname, lineNo);
/* free local resouce: allocated memory/metric-meta refcnt */
taos_free_result(pSql);
} }
/* free local resouce: allocated memory/metric-meta refcnt */
taos_free_result(pSql);
memset(cmd, 0, MAX_COMMAND_SIZE); memset(cmd, 0, MAX_COMMAND_SIZE);
cmd_len = 0; cmd_len = 0;
......
...@@ -520,9 +520,8 @@ int main(int argc, char *argv[]) { ...@@ -520,9 +520,8 @@ int main(int argc, char *argv[]) {
snprintf(command, BUFFER_SIZE, "create table if not exists %s.meters (ts timestamp%s tags (areaid int, loc binary(10))", db_name, cols); snprintf(command, BUFFER_SIZE, "create table if not exists %s.meters (ts timestamp%s tags (areaid int, loc binary(10))", db_name, cols);
queryDB(taos, command); queryDB(taos, command);
printf("meters created!\n"); printf("meters created!\n");
taos_close(taos);
} }
taos_close(taos);
/* Wait for table to create */ /* Wait for table to create */
multiThreadCreateTable(cols, use_metric, threads, ntables, db_name, tb_prefix, ip_addr, port, user, pass); multiThreadCreateTable(cols, use_metric, threads, ntables, db_name, tb_prefix, ip_addr, port, user, pass);
...@@ -792,9 +791,6 @@ void * createTable(void *sarg) ...@@ -792,9 +791,6 @@ void * createTable(void *sarg)
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d (ts timestamp%s;", winfo->db_name, winfo->tb_prefix, i, winfo->cols); snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d (ts timestamp%s;", winfo->db_name, winfo->tb_prefix, i, winfo->cols);
queryDB(winfo->taos, command); queryDB(winfo->taos, command);
} }
taos_close(winfo->taos);
} else { } else {
/* Create all the tables; */ /* Create all the tables; */
printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id); printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id);
...@@ -812,7 +808,6 @@ void * createTable(void *sarg) ...@@ -812,7 +808,6 @@ void * createTable(void *sarg)
} }
queryDB(winfo->taos, command); queryDB(winfo->taos, command);
} }
taos_close(winfo->taos);
} }
return NULL; return NULL;
...@@ -875,6 +870,11 @@ void *readTable(void *sarg) { ...@@ -875,6 +870,11 @@ void *readTable(void *sarg) {
int64_t sTime = rinfo->start_time; int64_t sTime = rinfo->start_time;
char *tb_prefix = rinfo->tb_prefix; char *tb_prefix = rinfo->tb_prefix;
FILE *fp = fopen(rinfo->fp, "a"); FILE *fp = fopen(rinfo->fp, "a");
if (NULL == fp) {
printf("fopen %s fail, reason:%s.\n", rinfo->fp, strerror(errno));
return NULL;
}
int num_of_DPT = rinfo->nrecords_per_table; int num_of_DPT = rinfo->nrecords_per_table;
int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1; int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1;
int totalData = num_of_DPT * num_of_tables; int totalData = num_of_DPT * num_of_tables;
...@@ -930,6 +930,11 @@ void *readMetric(void *sarg) { ...@@ -930,6 +930,11 @@ void *readMetric(void *sarg) {
TAOS *taos = rinfo->taos; TAOS *taos = rinfo->taos;
char command[BUFFER_SIZE] = "\0"; char command[BUFFER_SIZE] = "\0";
FILE *fp = fopen(rinfo->fp, "a"); FILE *fp = fopen(rinfo->fp, "a");
if (NULL == fp) {
printf("fopen %s fail, reason:%s.\n", rinfo->fp, strerror(errno));
return NULL;
}
int num_of_DPT = rinfo->nrecords_per_table; int num_of_DPT = rinfo->nrecords_per_table;
int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1; int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1;
int totalData = num_of_DPT * num_of_tables; int totalData = num_of_DPT * num_of_tables;
......
此差异已折叠。
...@@ -51,6 +51,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { ...@@ -51,6 +51,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
break; break;
case 'f': case 'f':
arguments->fqdn = arg; arguments->fqdn = arg;
break;
case 'g': case 'g':
arguments->dnodeGroups = arg; arguments->dnodeGroups = arg;
break; break;
......
...@@ -96,6 +96,7 @@ void walModWalFile(char* walfile) { ...@@ -96,6 +96,7 @@ void walModWalFile(char* walfile) {
if (wfd < 0) { if (wfd < 0) {
printf("wal:%s, failed to open(%s)\n", newWalFile, strerror(errno)); printf("wal:%s, failed to open(%s)\n", newWalFile, strerror(errno));
free(buffer); free(buffer);
close(rfd);
return ; return ;
} }
...@@ -116,6 +117,11 @@ void walModWalFile(char* walfile) { ...@@ -116,6 +117,11 @@ void walModWalFile(char* walfile) {
break; break;
} }
if (pHead->len >= 1024000 - sizeof(SWalHead)) {
printf("wal:%s, SWalHead.len(%d) overflow, skip the rest of file\n", walfile, pHead->len);
break;
}
ret = read(rfd, pHead->cont, pHead->len); ret = read(rfd, pHead->cont, pHead->len);
if ( ret != pHead->len) { if ( ret != pHead->len) {
printf("wal:%s, failed to read body, skip, len:%d ret:%d\n", walfile, pHead->len, ret); printf("wal:%s, failed to read body, skip, len:%d ret:%d\n", walfile, pHead->len, ret);
......
...@@ -99,6 +99,8 @@ static int32_t readVnodeCfg(SVnodeObj *pVnode, char* cfgFile) ...@@ -99,6 +99,8 @@ static int32_t readVnodeCfg(SVnodeObj *pVnode, char* cfgFile)
goto PARSE_OVER; goto PARSE_OVER;
} }
content[maxLen] = (char)0;
root = cJSON_Parse(content); root = cJSON_Parse(content);
if (root == NULL) { if (root == NULL) {
printf("failed to json parse %s, invalid json format\n", cfgFile); printf("failed to json parse %s, invalid json format\n", cfgFile);
......
...@@ -53,6 +53,7 @@ typedef struct { ...@@ -53,6 +53,7 @@ typedef struct {
void * rowData; void * rowData;
int32_t rowSize; int32_t rowSize;
int32_t retCode; // for callback in sdb queue int32_t retCode; // for callback in sdb queue
int32_t processedCount; // for sync fwd callback
int32_t (*cb)(struct SMnodeMsg *pMsg, int32_t code); int32_t (*cb)(struct SMnodeMsg *pMsg, int32_t code);
struct SMnodeMsg *pMsg; struct SMnodeMsg *pMsg;
} SSdbOper; } SSdbOper;
......
...@@ -88,13 +88,13 @@ static int32_t mnodeDnodeActionDelete(SSdbOper *pOper) { ...@@ -88,13 +88,13 @@ static int32_t mnodeDnodeActionDelete(SSdbOper *pOper) {
} }
static int32_t mnodeDnodeActionUpdate(SSdbOper *pOper) { static int32_t mnodeDnodeActionUpdate(SSdbOper *pOper) {
SDnodeObj *pDnode = pOper->pObj; SDnodeObj *pNew = pOper->pObj;
SDnodeObj *pSaved = mnodeGetDnode(pDnode->dnodeId); SDnodeObj *pDnode = mnodeGetDnode(pNew->dnodeId);
if (pSaved != NULL && pDnode != pSaved) { if (pDnode != NULL && pNew != pDnode) {
memcpy(pSaved, pDnode, pOper->rowSize); memcpy(pDnode, pNew, pOper->rowSize);
free(pDnode); free(pNew);
mnodeDecDnodeRef(pSaved);
} }
mnodeDecDnodeRef(pDnode);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -334,7 +334,7 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) { ...@@ -334,7 +334,7 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) {
if (pStatus->dnodeId == 0) { if (pStatus->dnodeId == 0) {
mDebug("dnode:%d %s, first access", pDnode->dnodeId, pDnode->dnodeEp); mDebug("dnode:%d %s, first access", pDnode->dnodeId, pDnode->dnodeEp);
} else { } else {
//mDebug("dnode:%d, status received, access times %d", pDnode->dnodeId, pDnode->lastAccess); mTrace("dnode:%d, status received, access times %d", pDnode->dnodeId, pDnode->lastAccess);
} }
int32_t openVnodes = htons(pStatus->openVnodes); int32_t openVnodes = htons(pStatus->openVnodes);
......
...@@ -72,8 +72,6 @@ typedef struct { ...@@ -72,8 +72,6 @@ typedef struct {
void * sync; void * sync;
void * wal; void * wal;
SSyncCfg cfg; SSyncCfg cfg;
sem_t sem;
int32_t code;
int32_t numOfTables; int32_t numOfTables;
SSdbTable *tableList[SDB_TABLE_MAX]; SSdbTable *tableList[SDB_TABLE_MAX];
pthread_mutex_t mutex; pthread_mutex_t mutex;
...@@ -244,27 +242,36 @@ static void sdbNotifyRole(void *ahandle, int8_t role) { ...@@ -244,27 +242,36 @@ static void sdbNotifyRole(void *ahandle, int8_t role) {
sdbUpdateMnodeRoles(); sdbUpdateMnodeRoles();
} }
FORCE_INLINE
static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
tsSdbObj.code = code; assert(param);
sem_post(&tsSdbObj.sem); SSdbOper * pOper = param;
sdbDebug("forward request confirmed, version:%" PRIu64 ", result:%s", (int64_t)param, tstrerror(code)); SMnodeMsg *pMsg = pOper->pMsg;
} if (code <= 0) pOper->retCode = code;
int32_t processedCount = atomic_add_fetch_32(&pOper->processedCount, 1);
if (processedCount <= 1) {
if (pMsg != NULL) {
sdbDebug("app:%p:%p, waiting for confirm this operation, count:%d", pMsg->rpcMsg.ahandle, pMsg, processedCount);
}
return;
}
static int32_t sdbForwardToPeer(SWalHead *pHead) { if (pMsg != NULL) {
if (tsSdbObj.sync == NULL) return TSDB_CODE_SUCCESS; sdbDebug("app:%p:%p, is confirmed and will do callback func", pMsg->rpcMsg.ahandle, pMsg);
}
int32_t code = syncForwardToPeer(tsSdbObj.sync, pHead, (void*)pHead->version, TAOS_QTYPE_RPC); if (pOper->cb != NULL) {
if (code > 0) { pOper->retCode = (*pOper->cb)(pMsg, pOper->retCode);
sdbDebug("forward request is sent, version:%" PRIu64 ", code:%d", pHead->version, code); }
sem_wait(&tsSdbObj.sem);
return tsSdbObj.code; dnodeSendRpcMnodeWriteRsp(pMsg, pOper->retCode);
} taosFreeQitem(pOper);
return code;
} }
void sdbUpdateSync() { void sdbUpdateSync() {
SSyncCfg syncCfg = {0}; SSyncCfg syncCfg = {0};
int32_t index = 0; int32_t index = 0;
SDMMnodeInfos *mnodes = dnodeGetMnodeInfos(); SDMMnodeInfos *mnodes = dnodeGetMnodeInfos();
for (int32_t i = 0; i < mnodes->nodeNum; ++i) { for (int32_t i = 0; i < mnodes->nodeNum; ++i) {
...@@ -298,7 +305,7 @@ void sdbUpdateSync() { ...@@ -298,7 +305,7 @@ void sdbUpdateSync() {
} }
syncCfg.replica = index; syncCfg.replica = index;
syncCfg.quorum = (syncCfg.replica == 1) ? 1:2; syncCfg.quorum = (syncCfg.replica == 1) ? 1 : 2;
bool hasThisDnode = false; bool hasThisDnode = false;
for (int32_t i = 0; i < syncCfg.replica; ++i) { for (int32_t i = 0; i < syncCfg.replica; ++i) {
...@@ -325,10 +332,10 @@ void sdbUpdateSync() { ...@@ -325,10 +332,10 @@ void sdbUpdateSync() {
syncInfo.getWalInfo = sdbGetWalInfo; syncInfo.getWalInfo = sdbGetWalInfo;
syncInfo.getFileInfo = sdbGetFileInfo; syncInfo.getFileInfo = sdbGetFileInfo;
syncInfo.writeToCache = sdbWriteToQueue; syncInfo.writeToCache = sdbWriteToQueue;
syncInfo.confirmForward = sdbConfirmForward; syncInfo.confirmForward = sdbConfirmForward;
syncInfo.notifyRole = sdbNotifyRole; syncInfo.notifyRole = sdbNotifyRole;
tsSdbObj.cfg = syncCfg; tsSdbObj.cfg = syncCfg;
if (tsSdbObj.sync) { if (tsSdbObj.sync) {
syncReconfig(tsSdbObj.sync, &syncCfg); syncReconfig(tsSdbObj.sync, &syncCfg);
} else { } else {
...@@ -339,7 +346,6 @@ void sdbUpdateSync() { ...@@ -339,7 +346,6 @@ void sdbUpdateSync() {
int32_t sdbInit() { int32_t sdbInit() {
pthread_mutex_init(&tsSdbObj.mutex, NULL); pthread_mutex_init(&tsSdbObj.mutex, NULL);
sem_init(&tsSdbObj.sem, 0, 0);
if (sdbInitWriteWorker() != 0) { if (sdbInitWriteWorker() != 0) {
return -1; return -1;
...@@ -379,7 +385,6 @@ void sdbCleanUp() { ...@@ -379,7 +385,6 @@ void sdbCleanUp() {
tsSdbObj.wal = NULL; tsSdbObj.wal = NULL;
} }
sem_destroy(&tsSdbObj.sem);
pthread_mutex_destroy(&tsSdbObj.mutex); pthread_mutex_destroy(&tsSdbObj.mutex);
} }
...@@ -513,24 +518,22 @@ static int sdbWrite(void *param, void *data, int type) { ...@@ -513,24 +518,22 @@ static int sdbWrite(void *param, void *data, int type) {
assert(pTable != NULL); assert(pTable != NULL);
pthread_mutex_lock(&tsSdbObj.mutex); pthread_mutex_lock(&tsSdbObj.mutex);
if (pHead->version == 0) { if (pHead->version == 0) {
// assign version // assign version
tsSdbObj.version++; tsSdbObj.version++;
pHead->version = tsSdbObj.version; pHead->version = tsSdbObj.version;
} else { } else {
// for data from WAL or forward, version may be smaller // for data from WAL or forward, version may be smaller
if (pHead->version <= tsSdbObj.version) { if (pHead->version <= tsSdbObj.version) {
pthread_mutex_unlock(&tsSdbObj.mutex); pthread_mutex_unlock(&tsSdbObj.mutex);
if (type == TAOS_QTYPE_FWD && tsSdbObj.sync != NULL) { sdbDebug("table:%s, failed to restore %s record:%s from source(%d), version:%" PRId64 " too large, sdb version:%" PRId64,
sdbDebug("forward request is received, version:%" PRIu64 " confirm it", pHead->version); pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), type, pHead->version, tsSdbObj.version);
syncConfirmForward(tsSdbObj.sync, pHead->version, TSDB_CODE_SUCCESS);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (pHead->version != tsSdbObj.version + 1) { } else if (pHead->version != tsSdbObj.version + 1) {
pthread_mutex_unlock(&tsSdbObj.mutex); pthread_mutex_unlock(&tsSdbObj.mutex);
sdbError("table:%s, failed to restore %s record:%s from wal, version:%" PRId64 " too large, sdb version:%" PRId64, sdbError("table:%s, failed to restore %s record:%s from source(%d), version:%" PRId64 " too large, sdb version:%" PRId64,
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), type, pHead->version, tsSdbObj.version);
tsSdbObj.version);
return TSDB_CODE_MND_APP_ERROR; return TSDB_CODE_MND_APP_ERROR;
} else { } else {
tsSdbObj.version = pHead->version; tsSdbObj.version = pHead->version;
...@@ -542,28 +545,36 @@ static int sdbWrite(void *param, void *data, int type) { ...@@ -542,28 +545,36 @@ static int sdbWrite(void *param, void *data, int type) {
pthread_mutex_unlock(&tsSdbObj.mutex); pthread_mutex_unlock(&tsSdbObj.mutex);
return code; return code;
} }
code = sdbForwardToPeer(pHead);
pthread_mutex_unlock(&tsSdbObj.mutex); pthread_mutex_unlock(&tsSdbObj.mutex);
// from app, oper is created // from app, oper is created
if (pOper != NULL) { if (pOper != NULL) {
sdbTrace("record from app is disposed, table:%s action:%s record:%s version:%" PRIu64 " result:%s", // forward to peers
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, pOper->processedCount = 0;
tstrerror(code)); int32_t syncCode = syncForwardToPeer(tsSdbObj.sync, pHead, pOper, TAOS_QTYPE_RPC);
return code; if (syncCode <= 0) pOper->processedCount = 1;
if (syncCode < 0) {
sdbError("table:%s, failed to forward request, result:%s action:%s record:%s version:%" PRId64, pTable->tableName,
tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
} else if (syncCode > 0) {
sdbDebug("table:%s, forward request is sent, action:%s record:%s version:%" PRId64, pTable->tableName,
sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
} else {
sdbTrace("table:%s, no need to send fwd request, action:%s record:%s version:%" PRId64, pTable->tableName,
sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
}
return syncCode;
} }
// from wal or forward msg, oper not created, should add into hash sdbDebug("table:%s, record from wal/fwd is disposed, action:%s record:%s version:%" PRId64, pTable->tableName,
if (tsSdbObj.sync != NULL) { sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
sdbTrace("record from wal forward is disposed, table:%s action:%s record:%s version:%" PRIu64 " confirm it",
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
syncConfirmForward(tsSdbObj.sync, pHead->version, code);
} else {
sdbTrace("record from wal restore is disposed, table:%s action:%s record:%s version:%" PRIu64, pTable->tableName,
sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
}
// even it is WAL/FWD, it shall be called to update version in sync
syncForwardToPeer(tsSdbObj.sync, pHead, pOper, TAOS_QTYPE_RPC);
// from wal or forward msg, oper not created, should add into hash
if (action == SDB_ACTION_INSERT) { if (action == SDB_ACTION_INSERT) {
SSdbOper oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable}; SSdbOper oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable};
code = (*pTable->decodeFp)(&oper); code = (*pTable->decodeFp)(&oper);
...@@ -627,7 +638,7 @@ int32_t sdbInsertRow(SSdbOper *pOper) { ...@@ -627,7 +638,7 @@ int32_t sdbInsertRow(SSdbOper *pOper) {
memcpy(pNewOper, pOper, sizeof(SSdbOper)); memcpy(pNewOper, pOper, sizeof(SSdbOper));
if (pNewOper->pMsg != NULL) { if (pNewOper->pMsg != NULL) {
sdbDebug("app:%p:%p, table:%s record:%p:%s, insert action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle, sdbDebug("app:%p:%p, table:%s record:%p:%s, insert action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle,
pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj));
} }
...@@ -677,7 +688,7 @@ int32_t sdbDeleteRow(SSdbOper *pOper) { ...@@ -677,7 +688,7 @@ int32_t sdbDeleteRow(SSdbOper *pOper) {
memcpy(pNewOper, pOper, sizeof(SSdbOper)); memcpy(pNewOper, pOper, sizeof(SSdbOper));
if (pNewOper->pMsg != NULL) { if (pNewOper->pMsg != NULL) {
sdbDebug("app:%p:%p, table:%s record:%p:%s, delete action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle, sdbDebug("app:%p:%p, table:%s record:%p:%s, delete action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle,
pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj));
} }
...@@ -727,7 +738,7 @@ int32_t sdbUpdateRow(SSdbOper *pOper) { ...@@ -727,7 +738,7 @@ int32_t sdbUpdateRow(SSdbOper *pOper) {
memcpy(pNewOper, pOper, sizeof(SSdbOper)); memcpy(pNewOper, pOper, sizeof(SSdbOper));
if (pNewOper->pMsg != NULL) { if (pNewOper->pMsg != NULL) {
sdbDebug("app:%p:%p, table:%s record:%p:%s, update action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle, sdbDebug("app:%p:%p, table:%s record:%p:%s, update action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle,
pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj)); pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj));
} }
...@@ -943,20 +954,20 @@ static void *sdbWorkerFp(void *param) { ...@@ -943,20 +954,20 @@ static void *sdbWorkerFp(void *param) {
taosGetQitem(tsSdbWriteQall, &type, &item); taosGetQitem(tsSdbWriteQall, &type, &item);
if (type == TAOS_QTYPE_RPC) { if (type == TAOS_QTYPE_RPC) {
pOper = (SSdbOper *)item; pOper = (SSdbOper *)item;
pOper->processedCount = 1;
pHead = (void *)pOper + sizeof(SSdbOper) + SDB_SYNC_HACK; pHead = (void *)pOper + sizeof(SSdbOper) + SDB_SYNC_HACK;
if (pOper->pMsg != NULL) {
sdbDebug("app:%p:%p, table:%s record:%p:%s version:%" PRIu64 ", will be processed in sdb queue",
pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, ((SSdbTable *)pOper->table)->tableName, pOper->pObj,
sdbGetKeyStr(pOper->table, pHead->cont), pHead->version);
}
} else { } else {
pHead = (SWalHead *)item; pHead = (SWalHead *)item;
pOper = NULL; pOper = NULL;
} }
if (pOper != NULL && pOper->pMsg != NULL) {
sdbDebug("app:%p:%p, table:%s record:%p:%s version:%" PRIu64 ", will be processed in sdb queue",
pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, ((SSdbTable *)pOper->table)->tableName, pOper->pObj,
sdbGetKeyStr(pOper->table, pHead->cont), pHead->version);
}
int32_t code = sdbWrite(pOper, pHead, type); int32_t code = sdbWrite(pOper, pHead, type);
if (pOper) pOper->retCode = code; if (pOper && code <= 0) pOper->retCode = code;
} }
walFsync(tsSdbObj.wal); walFsync(tsSdbObj.wal);
...@@ -965,25 +976,17 @@ static void *sdbWorkerFp(void *param) { ...@@ -965,25 +976,17 @@ static void *sdbWorkerFp(void *param) {
taosResetQitems(tsSdbWriteQall); taosResetQitems(tsSdbWriteQall);
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(tsSdbWriteQall, &type, &item); taosGetQitem(tsSdbWriteQall, &type, &item);
if (type == TAOS_QTYPE_RPC) { if (type == TAOS_QTYPE_RPC) {
pOper = (SSdbOper *)item; pOper = (SSdbOper *)item;
if (pOper != NULL && pOper->cb != NULL) { sdbDecRef(pOper->table, pOper->pObj);
sdbTrace("app:%p:%p, will do callback func, index:%d", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, i); sdbConfirmForward(NULL, pOper, pOper->retCode);
pOper->retCode = (*pOper->cb)(pOper->pMsg, pOper->retCode); } else if (type == TAOS_QTYPE_FWD) {
} syncConfirmForward(tsSdbObj.sync, pHead->version, TSDB_CODE_SUCCESS);
taosFreeQitem(item);
if (pOper != NULL && pOper->pMsg != NULL) { } else {
sdbTrace("app:%p:%p, msg is processed, result:%s", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, taosFreeQitem(item);
tstrerror(pOper->retCode));
}
if (pOper != NULL) {
sdbDecRef(pOper->table, pOper->pObj);
}
dnodeSendRpcMnodeWriteRsp(pOper->pMsg, pOper->retCode);
} }
taosFreeQitem(item);
} }
} }
......
...@@ -783,9 +783,15 @@ static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *pMsg) { ...@@ -783,9 +783,15 @@ static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *pMsg) {
static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) { static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) {
SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable; SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable;
if (pTable != NULL) { assert(pTable);
mLInfo("app:%p:%p, stable:%s, is created in sdb, result:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
tstrerror(code)); if (code == TSDB_CODE_SUCCESS) {
mLInfo("stable:%s, is created in sdb", pTable->info.tableId);
} else {
mError("app:%p:%p, stable:%s, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
tstrerror(code));
SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .table = tsSuperTableSdb};
sdbDeleteRow(&desc);
} }
return code; return code;
...@@ -1561,10 +1567,16 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) { ...@@ -1561,10 +1567,16 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
assert(pTable); assert(pTable);
mDebug("app:%p:%p, table:%s, create table in id:%d, uid:%" PRIu64 ", result:%s", pMsg->rpcMsg.ahandle, pMsg, if (code == TSDB_CODE_SUCCESS) {
pTable->info.tableId, pTable->sid, pTable->uid, tstrerror(code)); mDebug("app:%p:%p, table:%s, create table in sid:%d, uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
pTable->sid, pTable->uid);
if (code != TSDB_CODE_SUCCESS) return code; } else {
mError("app:%p:%p, table:%s, failed to create table sid:%d, uid:%" PRIu64 ", reason:%s", pMsg->rpcMsg.ahandle, pMsg,
pTable->info.tableId, pTable->sid, pTable->uid, tstrerror(code));
SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .table = tsChildTableSdb};
sdbDeleteRow(&desc);
return code;
}
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont; SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(pCreate, pTable); SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(pCreate, pTable);
......
...@@ -165,10 +165,18 @@ static int32_t mnodeVgroupActionUpdate(SSdbOper *pOper) { ...@@ -165,10 +165,18 @@ static int32_t mnodeVgroupActionUpdate(SSdbOper *pOper) {
} }
mnodeDecDnodeRef(pDnode); mnodeDecDnodeRef(pDnode);
} }
free(pNew);
} }
mnodeVgroupUpdateIdPool(pVgroup); mnodeVgroupUpdateIdPool(pVgroup);
// reset vgid status on vgroup changed
mDebug("vgId:%d, reset sync status to unsynced", pVgroup->vgId);
for (int32_t v = 0; v < pVgroup->numOfVnodes; ++v) {
pVgroup->vnodeGid[v].role = TAOS_SYNC_ROLE_UNSYNCED;
}
mnodeDecVgroupRef(pVgroup); mnodeDecVgroupRef(pVgroup);
mDebug("vgId:%d, is updated, numOfVnode:%d", pVgroup->vgId, pVgroup->numOfVnodes); mDebug("vgId:%d, is updated, numOfVnode:%d", pVgroup->vgId, pVgroup->numOfVnodes);
...@@ -300,6 +308,7 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl ...@@ -300,6 +308,7 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
if (pVgid->pDnode == pDnode) { if (pVgid->pDnode == pDnode) {
mTrace("dnode:%d, receive status from dnode, vgId:%d status is %d", pVgroup->vgId, pDnode->dnodeId, pVgid->role);
pVgid->role = pVload->role; pVgid->role = pVload->role;
if (pVload->role == TAOS_SYNC_ROLE_MASTER) { if (pVload->role == TAOS_SYNC_ROLE_MASTER) {
pVgroup->inUse = i; pVgroup->inUse = i;
...@@ -339,17 +348,23 @@ void *mnodeGetNextVgroup(void *pIter, SVgObj **pVgroup) { ...@@ -339,17 +348,23 @@ void *mnodeGetNextVgroup(void *pIter, SVgObj **pVgroup) {
} }
static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) { static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) {
SVgObj *pVgroup = pMsg->pVgroup;
SDbObj *pDb = pMsg->pDb;
assert(pVgroup);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pMsg->pVgroup = NULL; mError("app:%p:%p, vgId:%d, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId,
tstrerror(code));
SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb};
sdbDeleteRow(&desc);
return code; return code;
} }
SVgObj *pVgroup = pMsg->pVgroup; mInfo("app:%p:%p, vgId:%d, is created in mnode, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId,
SDbObj *pDb = pMsg->pDb; pDb->name, pVgroup->numOfVnodes);
mInfo("vgId:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
mInfo("vgId:%d, index:%d, dnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId); mInfo("app:%p:%p, vgId:%d, index:%d, dnode:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId, i,
pVgroup->vnodeGid[i].dnodeId);
} }
mnodeIncVgroupRef(pVgroup); mnodeIncVgroupRef(pVgroup);
......
...@@ -26,8 +26,6 @@ extern int32_t httpDebugFlag; ...@@ -26,8 +26,6 @@ extern int32_t httpDebugFlag;
#define httpInfo(...) { if (httpDebugFlag & DEBUG_INFO) { taosPrintLog("HTP INFO ", 255, __VA_ARGS__); }} #define httpInfo(...) { if (httpDebugFlag & DEBUG_INFO) { taosPrintLog("HTP INFO ", 255, __VA_ARGS__); }}
#define httpDebug(...) { if (httpDebugFlag & DEBUG_DEBUG) { taosPrintLog("HTP DEBUG ", httpDebugFlag, __VA_ARGS__); }} #define httpDebug(...) { if (httpDebugFlag & DEBUG_DEBUG) { taosPrintLog("HTP DEBUG ", httpDebugFlag, __VA_ARGS__); }}
#define httpTrace(...) { if (httpDebugFlag & DEBUG_TRACE) { taosPrintLog("HTP TRACE ", httpDebugFlag, __VA_ARGS__); }} #define httpTrace(...) { if (httpDebugFlag & DEBUG_TRACE) { taosPrintLog("HTP TRACE ", httpDebugFlag, __VA_ARGS__); }}
#define httpTraceL(...){ if (httpDebugFlag & DEBUG_TRACE) { taosPrintLongString("HTP TRACE ", httpDebugFlag, __VA_ARGS__); }}
#define httpDebugDump(...) { if (httpDebugFlag & DEBUG_DEBUG) { taosPrintLongString("HTP DEBUG ", httpDebugFlag, __VA_ARGS__); }}
#define httpTraceDump(...) { if (httpDebugFlag & DEBUG_TRACE) { taosPrintLongString("HTP TRACE ", httpDebugFlag, __VA_ARGS__); }}
#endif #endif
...@@ -313,9 +313,9 @@ bool httpParseRequest(HttpContext* pContext) { ...@@ -313,9 +313,9 @@ bool httpParseRequest(HttpContext* pContext) {
return true; return true;
} }
httpTraceDump("context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, read size:%d, raw data:\n%s", pContext, pContext->fd, httpTraceL("context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, read size:%d, raw data:\n%s", pContext, pContext->fd,
pContext->ipstr, pContext->pThread->label, pContext->pThread->numOfFds, pContext->parser.bufsize, pContext->ipstr, pContext->pThread->label, pContext->pThread->numOfFds, pContext->parser.bufsize,
pContext->parser.buffer); pContext->parser.buffer);
if (!httpGetHttpMethod(pContext)) { if (!httpGetHttpMethod(pContext)) {
return false; return false;
......
...@@ -108,7 +108,7 @@ bool httpReadDataImp(HttpContext *pContext) { ...@@ -108,7 +108,7 @@ bool httpReadDataImp(HttpContext *pContext) {
static bool httpDecompressData(HttpContext *pContext) { static bool httpDecompressData(HttpContext *pContext) {
if (pContext->contentEncoding != HTTP_COMPRESS_GZIP) { if (pContext->contentEncoding != HTTP_COMPRESS_GZIP) {
httpTraceDump("context:%p, fd:%d, ip:%s, content:%s", pContext, pContext->fd, pContext->ipstr, pContext->parser.data.pos); httpTraceL("context:%p, fd:%d, ip:%s, content:%s", pContext, pContext->fd, pContext->ipstr, pContext->parser.data.pos);
return true; return true;
} }
...@@ -124,8 +124,8 @@ static bool httpDecompressData(HttpContext *pContext) { ...@@ -124,8 +124,8 @@ static bool httpDecompressData(HttpContext *pContext) {
if (ret == 0) { if (ret == 0) {
memcpy(pContext->parser.data.pos, decompressBuf, decompressBufLen); memcpy(pContext->parser.data.pos, decompressBuf, decompressBufLen);
pContext->parser.data.pos[decompressBufLen] = 0; pContext->parser.data.pos[decompressBufLen] = 0;
httpTraceDump("context:%p, fd:%d, ip:%s, rawSize:%d, decompressSize:%d, content:%s", httpTraceL("context:%p, fd:%d, ip:%s, rawSize:%d, decompressSize:%d, content:%s", pContext, pContext->fd,
pContext, pContext->fd, pContext->ipstr, pContext->parser.data.len, decompressBufLen, decompressBuf); pContext->ipstr, pContext->parser.data.len, decompressBufLen, decompressBuf);
pContext->parser.data.len = decompressBufLen; pContext->parser.data.len = decompressBufLen;
} else { } else {
httpError("context:%p, fd:%d, ip:%s, failed to decompress data, rawSize:%d, error:%d", httpError("context:%p, fd:%d, ip:%s, failed to decompress data, rawSize:%d, error:%d",
......
...@@ -166,8 +166,8 @@ void httpProcessMultiSql(HttpContext *pContext) { ...@@ -166,8 +166,8 @@ void httpProcessMultiSql(HttpContext *pContext) {
HttpSqlCmd *cmd = multiCmds->cmds + multiCmds->pos; HttpSqlCmd *cmd = multiCmds->cmds + multiCmds->pos;
char *sql = httpGetCmdsString(pContext, cmd->sql); char *sql = httpGetCmdsString(pContext, cmd->sql);
httpTraceDump("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, start query, sql:%s", pContext, pContext->fd, httpTraceL("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, start query, sql:%s", pContext, pContext->fd,
pContext->ipstr, pContext->user, multiCmds->pos, sql); pContext->ipstr, pContext->user, multiCmds->pos, sql);
taosNotePrintHttp(sql); taosNotePrintHttp(sql);
taos_query_a(pContext->session->taos, sql, httpProcessMultiSqlCallBack, (void *)pContext); taos_query_a(pContext->session->taos, sql, httpProcessMultiSqlCallBack, (void *)pContext);
} }
...@@ -306,8 +306,8 @@ void httpProcessSingleSqlCmd(HttpContext *pContext) { ...@@ -306,8 +306,8 @@ void httpProcessSingleSqlCmd(HttpContext *pContext) {
return; return;
} }
httpTraceDump("context:%p, fd:%d, ip:%s, user:%s, start query, sql:%s", pContext, pContext->fd, pContext->ipstr, httpTraceL("context:%p, fd:%d, ip:%s, user:%s, start query, sql:%s", pContext, pContext->fd, pContext->ipstr,
pContext->user, sql); pContext->user, sql);
taosNotePrintHttp(sql); taosNotePrintHttp(sql);
taos_query_a(pSession->taos, sql, httpProcessSingleSqlCallBack, (void *)pContext); taos_query_a(pSession->taos, sql, httpProcessSingleSqlCallBack, (void *)pContext);
} }
......
...@@ -35,9 +35,6 @@ ...@@ -35,9 +35,6 @@
#define monitorDebug(...) { if (monitorDebugFlag & DEBUG_DEBUG) { taosPrintLog("MON DEBUG ", monitorDebugFlag, __VA_ARGS__); }} #define monitorDebug(...) { if (monitorDebugFlag & DEBUG_DEBUG) { taosPrintLog("MON DEBUG ", monitorDebugFlag, __VA_ARGS__); }}
#define monitorTrace(...) { if (monitorDebugFlag & DEBUG_TRACE) { taosPrintLog("MON TRACE ", monitorDebugFlag, __VA_ARGS__); }} #define monitorTrace(...) { if (monitorDebugFlag & DEBUG_TRACE) { taosPrintLog("MON TRACE ", monitorDebugFlag, __VA_ARGS__); }}
#define monitorDebugDump(...) { if (monitorDebugFlag & DEBUG_DEBUG) { taosPrintLongString("MON DEBUG ", monitorDebugFlag, __VA_ARGS__); }}
#define monitorTraceDump(...) { if (monitorDebugFlag & DEBUG_TRACE) { taosPrintLongString("MON TRACE ", monitorDebugFlag, __VA_ARGS__); }}
#define SQL_LENGTH 1024 #define SQL_LENGTH 1024
#define LOG_LEN_STR 100 #define LOG_LEN_STR 100
#define IP_LEN_STR 18 #define IP_LEN_STR 18
......
...@@ -27,7 +27,4 @@ extern int32_t mqttDebugFlag; ...@@ -27,7 +27,4 @@ extern int32_t mqttDebugFlag;
#define mqttDebug(...) { if (mqttDebugFlag & DEBUG_DEBUG) { taosPrintLog("MQT DEBUG ", mqttDebugFlag, __VA_ARGS__); }} #define mqttDebug(...) { if (mqttDebugFlag & DEBUG_DEBUG) { taosPrintLog("MQT DEBUG ", mqttDebugFlag, __VA_ARGS__); }}
#define mqttTrace(...) { if (mqttDebugFlag & DEBUG_TRACE) { taosPrintLog("MQT TRACE ", mqttDebugFlag, __VA_ARGS__); }} #define mqttTrace(...) { if (mqttDebugFlag & DEBUG_TRACE) { taosPrintLog("MQT TRACE ", mqttDebugFlag, __VA_ARGS__); }}
#define mqttDebugDump(...) { if (mqttDebugFlag & DEBUG_DEBUG) { taosPrintLongString("MQT DEBUG ", mqttDebugFlag, __VA_ARGS__); }}
#define mqttTraceDump(...) { if (mqttDebugFlag & DEBUG_TRACE) { taosPrintLongString("MQT DEBUG ", mqttDebugFlag, __VA_ARGS__); }}
#endif #endif
...@@ -154,6 +154,7 @@ typedef struct SQuery { ...@@ -154,6 +154,7 @@ typedef struct SQuery {
} SQuery; } SQuery;
typedef struct SQueryRuntimeEnv { typedef struct SQueryRuntimeEnv {
jmp_buf env;
SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo
SQuery* pQuery; SQuery* pQuery;
SQLFunctionCtx* pCtx; SQLFunctionCtx* pCtx;
...@@ -169,6 +170,8 @@ typedef struct SQueryRuntimeEnv { ...@@ -169,6 +170,8 @@ typedef struct SQueryRuntimeEnv {
void* pSecQueryHandle; // another thread for void* pSecQueryHandle; // another thread for
bool stableQuery; // super table query or not bool stableQuery; // super table query or not
bool topBotQuery; // false bool topBotQuery; // false
bool groupbyNormalCol; // denote if this is a groupby normal column query
bool hasTagResults; // if there are tag values in final result or not
int32_t prevGroupId; // previous executed group id int32_t prevGroupId; // previous executed group id
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
} SQueryRuntimeEnv; } SQueryRuntimeEnv;
...@@ -197,8 +200,10 @@ typedef struct SQInfo { ...@@ -197,8 +200,10 @@ typedef struct SQInfo {
*/ */
int32_t tableIndex; int32_t tableIndex;
int32_t numOfGroupResultPages; int32_t numOfGroupResultPages;
_qinfo_free_fn_t freeFn; _qinfo_free_fn_t freeFn; //todo remove it
jmp_buf env;
void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables;
} SQInfo; } SQInfo;
#endif // TDENGINE_QUERYEXECUTOR_H #endif // TDENGINE_QUERYEXECUTOR_H
...@@ -16,16 +16,16 @@ ...@@ -16,16 +16,16 @@
#ifndef TDENGINE_TAST_H #ifndef TDENGINE_TAST_H
#define TDENGINE_TAST_H #define TDENGINE_TAST_H
#include <tbuffer.h>
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include <tskiplist.h>
#include "os.h" #include "os.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "taosdef.h" #include "taosdef.h"
#include "tskiplist.h"
#include "tbuffer.h"
#include "tvariant.h" #include "tvariant.h"
struct tExprNode; struct tExprNode;
...@@ -75,10 +75,6 @@ typedef struct tExprNode { ...@@ -75,10 +75,6 @@ typedef struct tExprNode {
}; };
} tExprNode; } tExprNode;
void tSQLBinaryExprFromString(tExprNode **pExpr, SSchema *pSchema, int32_t numOfCols, char *src, int32_t len);
void tSQLBinaryExprToString(tExprNode *pExpr, char *dst, int32_t *len);
void tExprTreeDestroy(tExprNode **pExprs, void (*fp)(void*)); void tExprTreeDestroy(tExprNode **pExprs, void (*fp)(void*));
void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param); void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param);
...@@ -86,12 +82,9 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S ...@@ -86,12 +82,9 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S
void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order, void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
char *(*cb)(void *, const char*, int32_t)); char *(*cb)(void *, const char*, int32_t));
// todo refactor: remove it
void tSQLBinaryExprTrv(tExprNode *pExprs, SArray* res);
uint8_t getBinaryExprOptr(SSQLToken *pToken); uint8_t getBinaryExprOptr(SSQLToken *pToken);
void tExprNodeDestroy(tExprNode *pNode, void (*fp)(void *)); void tExprNodeDestroy(tExprNode *pNode, void (*fp)(void *));
void exprTreeToBinary(SBufferWriter* bw, tExprNode* pExprTree); void exprTreeToBinary(SBufferWriter* bw, tExprNode* pExprTree);
tExprNode* exprTreeFromBinary(const void* data, size_t size); tExprNode* exprTreeFromBinary(const void* data, size_t size);
......
...@@ -60,8 +60,6 @@ typedef struct SPoint { ...@@ -60,8 +60,6 @@ typedef struct SPoint {
void * val; void * val;
} SPoint; } SPoint;
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, char timeUnit, int16_t precision);
SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType, int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType,
SFillColInfo* pFillCol); SFillColInfo* pFillCol);
......
此差异已折叠。
...@@ -32,7 +32,6 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun ...@@ -32,7 +32,6 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun
pWindowResInfo->threshold = threshold; pWindowResInfo->threshold = threshold;
pWindowResInfo->type = type; pWindowResInfo->type = type;
_hash_fn_t fn = taosGetDefaultHashFunction(type); _hash_fn_t fn = taosGetDefaultHashFunction(type);
pWindowResInfo->hashList = taosHashInit(threshold, fn, false); pWindowResInfo->hashList = taosHashInit(threshold, fn, false);
...@@ -54,7 +53,8 @@ void destroyTimeWindowRes(SWindowResult *pWindowRes, int32_t nOutputCols) { ...@@ -54,7 +53,8 @@ void destroyTimeWindowRes(SWindowResult *pWindowRes, int32_t nOutputCols) {
if (pWindowRes == NULL) { if (pWindowRes == NULL) {
return; return;
} }
// TODO opt malloc strategy
for (int32_t i = 0; i < nOutputCols; ++i) { for (int32_t i = 0; i < nOutputCols; ++i) {
free(pWindowRes->resultInfo[i].interResultBuf); free(pWindowRes->resultInfo[i].interResultBuf);
} }
...@@ -180,19 +180,33 @@ void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) { ...@@ -180,19 +180,33 @@ void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) {
/* /*
* remove the results that are not the FIRST time window that spreads beyond the * remove the results that are not the FIRST time window that spreads beyond the
* the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time * the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time.
* NOTE: remove redundant, only when the result set order equals to traverse order
*/ */
void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_t order) { void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_t order) {
assert(pWindowResInfo->size >= 0 && pWindowResInfo->capacity >= pWindowResInfo->size); assert(pWindowResInfo->size >= 0 && pWindowResInfo->capacity >= pWindowResInfo->size);
if (pWindowResInfo->size <= 1) {
return;
}
// get the result order
int32_t resultOrder = (pWindowResInfo->pResult[0].window.skey < pWindowResInfo->pResult[1].window.skey)? 1:-1;
if (order != resultOrder) {
return;
}
int32_t i = 0; int32_t i = 0;
while (i < pWindowResInfo->size && if (order == QUERY_ASC_FORWARD_STEP) {
((pWindowResInfo->pResult[i].window.ekey < lastKey && order == QUERY_ASC_FORWARD_STEP) || while (i < pWindowResInfo->size && (pWindowResInfo->pResult[i].window.ekey < lastKey)) {
(pWindowResInfo->pResult[i].window.skey > lastKey && order == QUERY_DESC_FORWARD_STEP))) { ++i;
++i; }
} else if (order == QUERY_DESC_FORWARD_STEP) {
while (i < pWindowResInfo->size && (pWindowResInfo->pResult[i].window.skey > lastKey)) {
++i;
}
} }
// assert(i < pWindowResInfo->size);
if (i < pWindowResInfo->size) { if (i < pWindowResInfo->size) {
pWindowResInfo->size = (i + 1); pWindowResInfo->size = (i + 1);
} }
......
...@@ -13,25 +13,26 @@ ...@@ -13,25 +13,26 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "os.h" #include "os.h"
#include "tulog.h"
#include "tutil.h" #include "tname.h"
#include "tbuffer.h"
#include "qast.h" #include "qast.h"
#include "tcompare.h" #include "tsdb.h"
#include "exception.h"
#include "qsqlparser.h" #include "qsqlparser.h"
#include "qsyntaxtreefunction.h" #include "qsyntaxtreefunction.h"
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tarray.h"
#include "tbuffer.h"
#include "tcompare.h"
#include "tskiplist.h"
#include "tsqlfunction.h" #include "tsqlfunction.h"
#include "tstoken.h" #include "tstoken.h"
#include "ttokendef.h" #include "ttokendef.h"
#include "tschemautil.h" #include "tulog.h"
#include "tarray.h" #include "tutil.h"
#include "tskiplist.h"
#include "queryLog.h"
#include "tsdbMain.h"
#include "exception.h"
/* /*
* *
...@@ -327,104 +328,6 @@ static tExprNode *createSyntaxTree(SSchema *pSchema, int32_t numOfCols, char *st ...@@ -327,104 +328,6 @@ static tExprNode *createSyntaxTree(SSchema *pSchema, int32_t numOfCols, char *st
} }
} }
void tSQLBinaryExprFromString(tExprNode **pExpr, SSchema *pSchema, int32_t numOfCols, char *src, int32_t len) {
*pExpr = NULL;
if (len <= 0 || src == NULL || pSchema == NULL || numOfCols <= 0) {
return;
}
int32_t pos = 0;
*pExpr = createSyntaxTree(pSchema, numOfCols, src, &pos);
if (*pExpr != NULL) {
assert((*pExpr)->nodeType == TSQL_NODE_EXPR);
}
}
int32_t tSQLBinaryExprToStringImpl(tExprNode *pNode, char *dst, uint8_t type) {
int32_t len = 0;
if (type == TSQL_NODE_EXPR) {
*dst = '(';
tSQLBinaryExprToString(pNode, dst + 1, &len);
len += 2;
*(dst + len - 1) = ')';
} else if (type == TSQL_NODE_COL) {
len = sprintf(dst, "%s", pNode->pSchema->name);
} else {
len = tVariantToString(pNode->pVal, dst);
}
return len;
}
// TODO REFACTOR WITH SQL PARSER
static char *tSQLOptrToString(uint8_t optr, char *dst) {
switch (optr) {
case TSDB_RELATION_LESS: {
*dst = '<';
dst += 1;
break;
}
case TSDB_RELATION_LESS_EQUAL: {
*dst = '<';
*(dst + 1) = '=';
dst += 2;
break;
}
case TSDB_RELATION_EQUAL: {
*dst = '=';
dst += 1;
break;
}
case TSDB_RELATION_GREATER: {
*dst = '>';
dst += 1;
break;
}
case TSDB_RELATION_GREATER_EQUAL: {
*dst = '>';
*(dst + 1) = '=';
dst += 2;
break;
}
case TSDB_RELATION_NOT_EQUAL: {
*dst = '<';
*(dst + 1) = '>';
dst += 2;
break;
}
case TSDB_RELATION_OR: {
memcpy(dst, "or", 2);
dst += 2;
break;
}
case TSDB_RELATION_AND: {
memcpy(dst, "and", 3);
dst += 3;
break;
}
default:;
}
return dst;
}
void tSQLBinaryExprToString(tExprNode *pExpr, char *dst, int32_t *len) {
if (pExpr == NULL) {
*dst = 0;
*len = 0;
return;
}
int32_t lhs = tSQLBinaryExprToStringImpl(pExpr->_node.pLeft, dst, pExpr->_node.pLeft->nodeType);
dst += lhs;
*len = lhs;
char *start = tSQLOptrToString(pExpr->_node.optr, dst);
*len += (start - dst);
*len += tSQLBinaryExprToStringImpl(pExpr->_node.pRight, start, pExpr->_node.pRight->nodeType);
}
static void UNUSED_FUNC destroySyntaxTree(tExprNode *pNode) { tExprNodeDestroy(pNode, NULL); } static void UNUSED_FUNC destroySyntaxTree(tExprNode *pNode) { tExprNodeDestroy(pNode, NULL); }
void tExprNodeDestroy(tExprNode *pNode, void (*fp)(void *)) { void tExprNodeDestroy(tExprNode *pNode, void (*fp)(void *)) {
...@@ -773,8 +676,7 @@ static void tQueryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, ...@@ -773,8 +676,7 @@ static void tQueryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo,
SSkipListNode *pNode = tSkipListIterGet(iter); SSkipListNode *pNode = tSkipListIterGet(iter);
char * pData = SL_GET_NODE_DATA(pNode); char * pData = SL_GET_NODE_DATA(pNode);
// todo refactor: tstr *name = (tstr*) tsdbGetTableName(*(void**) pData);
tstr *name = (*(STable **)pData)->name;
// todo speed up by using hash // todo speed up by using hash
if (pQueryInfo->colIndex == TSDB_TBNAME_COLUMN_INDEX) { if (pQueryInfo->colIndex == TSDB_TBNAME_COLUMN_INDEX) {
if (pQueryInfo->optr == TSDB_RELATION_IN) { if (pQueryInfo->optr == TSDB_RELATION_IN) {
...@@ -976,27 +878,27 @@ void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, ...@@ -976,27 +878,27 @@ void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput,
free(pRightOutput); free(pRightOutput);
} }
void tSQLBinaryExprTrv(tExprNode *pExprs, SArray* res) { //void tSQLBinaryExprTrv(tExprNode *pExprs, SArray* res) {
if (pExprs == NULL) { // if (pExprs == NULL) {
return; // return;
} // }
//
tExprNode *pLeft = pExprs->_node.pLeft; // tExprNode *pLeft = pExprs->_node.pLeft;
tExprNode *pRight = pExprs->_node.pRight; // tExprNode *pRight = pExprs->_node.pRight;
//
// recursive traverse left child branch // // recursive traverse left child branch
if (pLeft->nodeType == TSQL_NODE_EXPR) { // if (pLeft->nodeType == TSQL_NODE_EXPR) {
tSQLBinaryExprTrv(pLeft, res); // tSQLBinaryExprTrv(pLeft, res);
} else if (pLeft->nodeType == TSQL_NODE_COL) { // } else if (pLeft->nodeType == TSQL_NODE_COL) {
taosArrayPush(res, &pLeft->pSchema->colId); // taosArrayPush(res, &pLeft->pSchema->colId);
} // }
//
if (pRight->nodeType == TSQL_NODE_EXPR) { // if (pRight->nodeType == TSQL_NODE_EXPR) {
tSQLBinaryExprTrv(pRight, res); // tSQLBinaryExprTrv(pRight, res);
} else if (pRight->nodeType == TSQL_NODE_COL) { // } else if (pRight->nodeType == TSQL_NODE_COL) {
taosArrayPush(res, &pRight->pSchema->colId); // taosArrayPush(res, &pRight->pSchema->colId);
} // }
} //}
static void exprTreeToBinaryImpl(SBufferWriter* bw, tExprNode* expr) { static void exprTreeToBinaryImpl(SBufferWriter* bw, tExprNode* expr) {
tbufWriteUint8(bw, expr->nodeType); tbufWriteUint8(bw, expr->nodeType);
......
...@@ -118,7 +118,7 @@ static bool tExtMemBufferAlloc(tExtMemBuffer *pMemBuffer) { ...@@ -118,7 +118,7 @@ static bool tExtMemBufferAlloc(tExtMemBuffer *pMemBuffer) {
* To flush data to disk to accommodate more data * To flush data to disk to accommodate more data
*/ */
if (pMemBuffer->numOfInMemPages > 0 && pMemBuffer->numOfInMemPages == pMemBuffer->inMemCapacity) { if (pMemBuffer->numOfInMemPages > 0 && pMemBuffer->numOfInMemPages == pMemBuffer->inMemCapacity) {
if (!tExtMemBufferFlush(pMemBuffer)) { if (tExtMemBufferFlush(pMemBuffer) != 0) {
return false; return false;
} }
} }
...@@ -268,6 +268,7 @@ int32_t tExtMemBufferFlush(tExtMemBuffer *pMemBuffer) { ...@@ -268,6 +268,7 @@ int32_t tExtMemBufferFlush(tExtMemBuffer *pMemBuffer) {
size_t retVal = fwrite((char *)&(first->item), pMemBuffer->pageSize, 1, pMemBuffer->file); size_t retVal = fwrite((char *)&(first->item), pMemBuffer->pageSize, 1, pMemBuffer->file);
if (retVal <= 0) { // failed to write to buffer, may be not enough space if (retVal <= 0) { // failed to write to buffer, may be not enough space
ret = TAOS_SYSTEM_ERROR(errno); ret = TAOS_SYSTEM_ERROR(errno);
return ret;
} }
pMemBuffer->fileMeta.numOfElemsInFile += first->item.num; pMemBuffer->fileMeta.numOfElemsInFile += first->item.num;
......
...@@ -22,41 +22,6 @@ ...@@ -22,41 +22,6 @@
#define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC) #define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC)
int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, char timeUnit, int16_t precision) {
if (slidingTime == 0) {
return startTime;
}
if (timeUnit == 'a' || timeUnit == 'm' || timeUnit == 's' || timeUnit == 'h') {
return (startTime / slidingTime) * slidingTime;
} else {
/*
* here we revised the start time of day according to the local time zone,
* but in case of DST, the start time of one day need to be dynamically decided.
*
* TODO dynamically decide the start time of a day, move to common module
*/
// todo refactor to extract function that is available for Linux/Windows/Mac platform
#if defined(WINDOWS) && _MSC_VER >= 1900
// see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019
int64_t timezone = _timezone;
int32_t daylight = _daylight;
char** tzname = _tzname;
#endif
int64_t t = (precision == TSDB_TIME_PRECISION_MILLI) ? MILLISECOND_PER_SECOND : MILLISECOND_PER_SECOND * 1000L;
int64_t revStartime = (startTime / slidingTime) * slidingTime + timezone * t;
int64_t revEndtime = revStartime + slidingTime - 1;
if (revEndtime < startTime) {
revStartime += slidingTime;
}
return revStartime;
}
}
SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType, SFillColInfo* pFillCol) { int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType, SFillColInfo* pFillCol) {
if (fillType == TSDB_FILL_NONE) { if (fillType == TSDB_FILL_NONE) {
...@@ -128,7 +93,7 @@ static TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int64_t timeInterva ...@@ -128,7 +93,7 @@ static TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int64_t timeInterva
if (order == TSDB_ORDER_ASC) { if (order == TSDB_ORDER_ASC) {
return ekey; return ekey;
} else { } else {
return taosGetIntervalStartTimestamp(ekey, timeInterval, slidingTimeUnit, precision); return taosGetIntervalStartTimestamp(ekey, timeInterval, timeInterval, slidingTimeUnit, precision);
} }
} }
......
此差异已折叠。
...@@ -31,9 +31,7 @@ extern int32_t tscEmbedded; ...@@ -31,9 +31,7 @@ extern int32_t tscEmbedded;
#define tInfo(...) { if (rpcDebugFlag & DEBUG_INFO) { taosPrintLog("RPC INFO ", tscEmbedded ? 255 : rpcDebugFlag, __VA_ARGS__); }} #define tInfo(...) { if (rpcDebugFlag & DEBUG_INFO) { taosPrintLog("RPC INFO ", tscEmbedded ? 255 : rpcDebugFlag, __VA_ARGS__); }}
#define tDebug(...) { if (rpcDebugFlag & DEBUG_DEBUG) { taosPrintLog("RPC DEBUG ", rpcDebugFlag, __VA_ARGS__); }} #define tDebug(...) { if (rpcDebugFlag & DEBUG_DEBUG) { taosPrintLog("RPC DEBUG ", rpcDebugFlag, __VA_ARGS__); }}
#define tTrace(...) { if (rpcDebugFlag & DEBUG_TRACE) { taosPrintLog("RPC TRACE ", rpcDebugFlag, __VA_ARGS__); }} #define tTrace(...) { if (rpcDebugFlag & DEBUG_TRACE) { taosPrintLog("RPC TRACE ", rpcDebugFlag, __VA_ARGS__); }}
#define tDump(x, y) { if (rpcDebugFlag & DEBUG_DUMP) { taosDumpData((unsigned char *)x, y); }}
#define tDebugDump(x, y) { if (rpcDebugFlag & DEBUG_DEBUG) { taosDumpData((unsigned char *)x, y); }}
#define tTraceDump(x, y) { if (rpcDebugFlag & DEBUG_TRACE) { taosDumpData((unsigned char *)x, y); }}
#ifdef __cplusplus #ifdef __cplusplus
} }
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册