提交 f1aa756d 编写于 作者: P Ping Xiao

Merge branch 'develop' into jdbcfixes

......@@ -18,7 +18,7 @@ import (
"sync"
"time"
_ "github.com/taosdata/TDengine/src/connector/go/taosSql"
_ "github.com/taosdata/driver-go/taosSql"
)
const (
......
......@@ -110,8 +110,6 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
//todo tags value as well as the table id structure needs refactor
char *tsGetTagsValue(STableMeta *pMeta);
void extractTableNameFromToken(SSQLToken *pToken, SSQLToken* pTable);
#ifdef __cplusplus
}
#endif
......
......@@ -320,6 +320,8 @@ typedef struct SSqlStream {
SSqlObj *pSql;
uint32_t streamId;
char listed;
bool isProject;
int16_t precision;
int64_t num; // number of computing count
/*
......@@ -334,7 +336,6 @@ typedef struct SSqlStream {
int64_t etime; // stream end query time, when time is larger then etime, the stream will be closed
int64_t interval;
int64_t slidingTime;
int16_t precision;
void * pTimer;
void (*fp)();
......
......@@ -14,7 +14,6 @@
*/
#include "os.h"
#include "qast.h"
#include "qextbuffer.h"
#include "qfill.h"
#include "qhistogram.h"
......@@ -23,6 +22,7 @@
#include "qtsbuf.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "qast.h"
#include "tscLog.h"
#include "tscSubquery.h"
#include "tscompression.h"
......
......@@ -406,7 +406,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
pSql->res.qhandle = 0x1;
pSql->res.numOfRows = 0;
} else if (pCmd->command == TSDB_SQL_RESET_CACHE) {
taosCacheEmpty(tscCacheHandle,false);
taosCacheEmpty(tscCacheHandle);
} else if (pCmd->command == TSDB_SQL_SERV_VERSION) {
tscProcessServerVer(pSql);
} else if (pCmd->command == TSDB_SQL_CLI_VERSION) {
......
......@@ -18,19 +18,19 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "qast.h"
#include "taos.h"
#include "taosmsg.h"
#include "tstoken.h"
#include "tstrbuild.h"
#include "ttime.h"
#include "qast.h"
#include "tcompare.h"
#include "tname.h"
#include "tscLog.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "tstoken.h"
#include "tstrbuild.h"
#include "ttime.h"
#include "ttokendef.h"
#include "tname.h"
#include "tcompare.h"
#define DEFAULT_PRIMARY_TIMESTAMP_COL_NAME "_c0"
......@@ -90,6 +90,7 @@ static int32_t validateSqlFunctionInStreamSql(SSqlCmd* pCmd, SQueryInfo* pQueryI
static int32_t buildArithmeticExprString(tSQLExpr* pExpr, char** exprString);
static int32_t validateFunctionsInIntervalOrGroupbyQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo);
static int32_t validateArithmeticSQLExpr(SSqlCmd* pCmd, tSQLExpr* pExpr, SQueryInfo* pQueryInfo, SColumnList* pList, int32_t* type);
static int32_t validateEp(char* ep);
static int32_t validateDNodeConfig(tDCLSQL* pOptions);
static int32_t validateLocalConfig(tDCLSQL* pOptions);
static int32_t validateColumnName(char* name);
......@@ -359,6 +360,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
case TSDB_SQL_CFG_DNODE: {
const char* msg2 = "invalid configure options or values";
const char* msg3 = "invalid dnode ep";
/* validate the ip address */
tDCLSQL* pDCL = pInfo->pDCLInfo;
......@@ -375,6 +377,10 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
strncpy(pCfg->ep, pDCL->a[0].z, pDCL->a[0].n);
if (validateEp(pCfg->ep) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
strncpy(pCfg->config, pDCL->a[1].z, pDCL->a[1].n);
if (pDCL->nTokens == 3) {
......@@ -654,11 +660,14 @@ int32_t parseIntervalClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQ
int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
const char* msg0 = "sliding value too small";
const char* msg1 = "sliding value no larger than the interval value";
const char* msg2 = "sliding value can not less than 1% of interval value";
const static int32_t INTERVAL_SLIDING_FACTOR = 100;
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSQLToken* pSliding = &pQuerySql->sliding;
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
SSQLToken* pSliding = &pQuerySql->sliding;
if (pSliding->n != 0) {
getTimestampInUsFromStr(pSliding->z, pSliding->n, &pQueryInfo->slidingTime);
if (tinfo.precision == TSDB_TIME_PRECISION_MILLI) {
......@@ -676,6 +685,10 @@ int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQu
pQueryInfo->slidingTime = pQueryInfo->intervalTime;
}
if ((pQueryInfo->intervalTime != 0) && (pQueryInfo->intervalTime/pQueryInfo->slidingTime > INTERVAL_SLIDING_FACTOR)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
return TSDB_CODE_SUCCESS;
}
......@@ -4629,6 +4642,24 @@ typedef struct SDNodeDynConfOption {
int32_t len; // name string length
} SDNodeDynConfOption;
int32_t validateEp(char* ep) {
char buf[TSDB_EP_LEN + 1] = {0};
tstrncpy(buf, ep, TSDB_EP_LEN);
char *pos = strchr(buf, ':');
if (NULL == pos) {
return TSDB_CODE_TSC_INVALID_SQL;
}
uint16_t port = atoi(pos+1);
if (0 == port) {
return TSDB_CODE_TSC_INVALID_SQL;
}
return TSDB_CODE_SUCCESS;
}
int32_t validateDNodeConfig(tDCLSQL* pOptions) {
if (pOptions->nTokens < 2 || pOptions->nTokens > 3) {
return TSDB_CODE_TSC_INVALID_SQL;
......@@ -6096,16 +6127,12 @@ int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSQLExpr* pS
}
}
// NOTE: binary|nchar data allows the >|< type filter
if ((*pExpr)->_node.optr != TSDB_RELATION_EQUAL && (*pExpr)->_node.optr != TSDB_RELATION_NOT_EQUAL) {
if (pRight->nodeType == TSQL_NODE_VALUE) {
if (pRight->pVal->nType == TSDB_DATA_TYPE_BOOL) {
return TSDB_CODE_TSC_INVALID_SQL;
}
if ((pRight->pVal->nType == TSDB_DATA_TYPE_BINARY || pRight->pVal->nType == TSDB_DATA_TYPE_NCHAR)
&& (*pExpr)->_node.optr != TSDB_RELATION_LIKE) {
return TSDB_CODE_TSC_INVALID_SQL;
}
}
}
}
......
......@@ -215,25 +215,3 @@ __attribute__ ((unused)) static FORCE_INLINE size_t copy(char* dst, const char*
return len;
}
/*
* tablePrefix.columnName
* extract table name and save it in pTable, with only column name in pToken
*/
void extractTableNameFromToken(SSQLToken* pToken, SSQLToken* pTable) {
const char sep = TS_PATH_DELIMITER[0];
if (pToken == pTable || pToken == NULL || pTable == NULL) {
return;
}
char* r = strnchr(pToken->z, sep, pToken->n, false);
if (r != NULL) { // record the table name token
pTable->n = r - pToken->z;
pTable->z = pToken->z;
r += 1;
pToken->n -= (r - pToken->z);
pToken->z = r;
}
}
......@@ -438,8 +438,9 @@ void tscKillSTableQuery(SSqlObj *pSql) {
* here, we cannot set the command = TSDB_SQL_KILL_QUERY. Otherwise, it may cause
* sub-queries not correctly released and master sql object of super table query reaches an abnormal state.
*/
pSql->pSubs[i]->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
rpcCancelRequest(pSql->pSubs[i]->pRpcCtx);
rpcCancelRequest(pSub->pRpcCtx);
pSub->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
tscQueueAsyncRes(pSub);
}
/*
......@@ -1955,7 +1956,7 @@ int tscProcessUseDbRsp(SSqlObj *pSql) {
}
int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
taosCacheEmpty(tscCacheHandle, false);
taosCacheEmpty(tscCacheHandle);
return 0;
}
......@@ -2001,7 +2002,7 @@ int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
if (isSuperTable) { // if it is a super table, reset whole query cache
tscDebug("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name);
taosCacheEmpty(tscCacheHandle, false);
taosCacheEmpty(tscCacheHandle);
}
}
......
......@@ -617,19 +617,18 @@ void taos_stop_query(TAOS_RES *res) {
if (pSql->signature != pSql) return;
tscDebug("%p start to cancel query", res);
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
tscKillSTableQuery(pSql);
return;
}
if (pSql->cmd.command >= TSDB_SQL_LOCAL) {
return;
if (pSql->cmd.command < TSDB_SQL_LOCAL) {
rpcCancelRequest(pSql->pRpcCtx);
}
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
tscQueueAsyncRes(pSql);
rpcCancelRequest(pSql->pRpcCtx);
tscDebug("%p query is cancelled", res);
}
......
......@@ -71,6 +71,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
pSql->fp = tscProcessStreamQueryCallback;
pSql->param = pStream;
pSql->res.completed = false;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......@@ -86,7 +87,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
// failed to get meter/metric meta, retry in 10sec.
if (code != TSDB_CODE_SUCCESS) {
int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision);
tscError("%p stream:%p,get metermeta failed, retry in %" PRId64 "ms", pStream->pSql, pStream, retryDelayTime);
tscDebug("%p stream:%p,get metermeta failed, retry in %" PRId64 "ms", pStream->pSql, pStream, retryDelayTime);
tscSetRetryTimer(pStream, pSql, retryDelayTime);
} else {
......@@ -108,7 +109,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
tscDebug("%p add into timer", pSql);
if (isProjectStream(pQueryInfo)) {
if (pStream->isProject) {
/*
* pQueryInfo->window.ekey, which is the start time, does not change in case of
* repeat first execution, once the first execution failed.
......@@ -121,7 +122,19 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
}
} else {
pQueryInfo->window.skey = pStream->stime - pStream->interval;
pQueryInfo->window.ekey = pStream->stime - 1;
int64_t etime = taosGetTimestamp(pStream->precision);
// delay to wait all data in last time window
if (pStream->precision == TSDB_TIME_PRECISION_MICRO) {
etime -= tsMaxStreamComputDelay * 1000l;
} else {
etime -= tsMaxStreamComputDelay;
}
if (etime > pStream->etime) {
etime = pStream->etime;
} else {
etime = pStream->stime + (etime - pStream->stime) / pStream->interval * pStream->interval;
}
pQueryInfo->window.ekey = etime;
}
// launch stream computing in a new thread
......@@ -137,7 +150,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
SSqlStream *pStream = (SSqlStream *)param;
if (tres == NULL || numOfRows < 0) {
int64_t retryDelay = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision);
tscError("%p stream:%p, query data failed, code:%d, retry in %" PRId64 "ms", pStream->pSql, pStream, numOfRows,
tscError("%p stream:%p, query data failed, code:0x%08x, retry in %" PRId64 "ms", pStream->pSql, pStream, numOfRows,
retryDelay);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0);
......@@ -151,17 +164,45 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
taos_fetch_rows_a(tres, tscProcessStreamRetrieveResult, param);
}
static void tscSetTimestampForRes(SSqlStream *pStream, SSqlObj *pSql) {
// no need to be called as this is alreay done in the query
static void tscStreamFillTimeGap(SSqlStream* pStream, TSKEY ts) {
#if 0
SSqlObj * pSql = pStream->pSql;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
if (pQueryInfo->fillType != TSDB_FILL_SET_VALUE && pQueryInfo->fillType != TSDB_FILL_NULL) {
return;
}
SSqlRes *pRes = &pSql->res;
/* failed to retrieve any result in this retrieve */
pSql->res.numOfRows = 1;
void *row[TSDB_MAX_COLUMNS] = {0};
char tmpRes[TSDB_MAX_BYTES_PER_ROW] = {0};
void *oldPtr = pSql->res.data;
pSql->res.data = tmpRes;
int32_t rowNum = 0;
int64_t timestamp = *(int64_t *)pRes->data;
int64_t actualTimestamp = pStream->stime - pStream->interval;
while (pStream->stime + pStream->slidingTime < ts) {
pStream->stime += pStream->slidingTime;
*(TSKEY*)row[0] = pStream->stime;
for (int32_t i = 1; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
assignVal(pSql->res.data + offset, (char *)(&pQueryInfo->fillVal[i]), pField->bytes, pField->type);
row[i] = pSql->res.data + offset;
}
(*pStream->fp)(pStream->param, pSql, row);
++rowNum;
}
if (timestamp != actualTimestamp) {
// reset the timestamp of each agg point by using start time of each interval
*((int64_t *)pRes->data) = actualTimestamp;
tscWarn("%p stream:%p, timestamp of points is:%" PRId64 ", reset to %" PRId64, pSql, pStream, timestamp, actualTimestamp);
if (rowNum > 0) {
tscDebug("%p stream:%p %d rows padded", pSql, pStream, rowNum);
}
pRes->numOfRows = 0;
pRes->data = oldPtr;
#endif
}
static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOfRows) {
......@@ -170,7 +211,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
if (pSql == NULL || numOfRows < 0) {
int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision);
tscError("%p stream:%p, retrieve data failed, code:%d, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime);
tscError("%p stream:%p, retrieve data failed, code:0x%08x, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime);
tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime);
return;
......@@ -180,16 +221,11 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
if (numOfRows > 0) { // when reaching here the first execution of stream computing is successful.
pStream->numOfRes += numOfRows;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
for(int32_t i = 0; i < numOfRows; ++i) {
TAOS_ROW row = taos_fetch_row(res);
tscDebug("%p stream:%p fetch result", pSql, pStream);
if (isProjectStream(pQueryInfo)) {
tscStreamFillTimeGap(pStream, *(TSKEY*)row[0]);
pStream->stime = *(TSKEY *)row[0];
} else {
tscSetTimestampForRes(pStream, pSql);
}
// user callback function
(*pStream->fp)(pStream->param, res, row);
......@@ -199,56 +235,19 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
taos_fetch_rows_a(res, tscProcessStreamRetrieveResult, pStream);
} else { // numOfRows == 0, all data has been retrieved
pStream->useconds += pSql->res.useconds;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
if (pStream->numOfRes == 0) {
if (pQueryInfo->fillType == TSDB_FILL_SET_VALUE || pQueryInfo->fillType == TSDB_FILL_NULL) {
SSqlRes *pRes = &pSql->res;
/* failed to retrieve any result in this retrieve */
pSql->res.numOfRows = 1;
void *row[TSDB_MAX_COLUMNS] = {0};
char tmpRes[TSDB_MAX_BYTES_PER_ROW] = {0};
void *oldPtr = pSql->res.data;
pSql->res.data = tmpRes;
for (int32_t i = 1; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
assignVal(pSql->res.data + offset, (char *)(&pQueryInfo->fillVal[i]), pField->bytes, pField->type);
row[i] = pSql->res.data + offset;
}
tscSetTimestampForRes(pStream, pSql);
row[0] = pRes->data;
// char result[512] = {0};
// taos_print_row(result, row, pQueryInfo->fieldsInfo.pFields, pQueryInfo->fieldsInfo.numOfOutput);
// tscInfo("%p stream:%p query result: %s", pSql, pStream, result);
tscDebug("%p stream:%p fetch result", pSql, pStream);
// user callback function
(*pStream->fp)(pStream->param, res, row);
pRes->numOfRows = 0;
pRes->data = oldPtr;
} else if (isProjectStream(pQueryInfo)) {
if (pStream->isProject) {
/* no resuls in the query range, retry */
// todo set retry dynamic time
int32_t retry = tsProjectExecInterval;
tscError("%p stream:%p, retrieve no data, code:%d, retry in %" PRId32 "ms", pSql, pStream, numOfRows, retry);
tscError("%p stream:%p, retrieve no data, code:0x%08x, retry in %" PRId32 "ms", pSql, pStream, numOfRows, retry);
tscSetRetryTimer(pStream, pStream->pSql, retry);
return;
}
} else {
if (isProjectStream(pQueryInfo)) {
} else if (pStream->isProject) {
pStream->stime += 1;
}
}
tscDebug("%p stream:%p, query on:%s, fetch result completed, fetched rows:%" PRId64, pSql, pStream, pTableMetaInfo->name,
pStream->numOfRes);
......@@ -262,10 +261,9 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
}
static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
int64_t delay = getDelayValueAfterTimewindowClosed(pStream, timer);
if (isProjectStream(pQueryInfo)) {
if (pStream->isProject) {
int64_t now = taosGetTimestamp(pStream->precision);
int64_t etime = now > pStream->etime ? pStream->etime : now;
......@@ -323,8 +321,7 @@ static int64_t getLaunchTimeDelay(const SSqlStream* pStream) {
static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
int64_t timer = 0;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
if (isProjectStream(pQueryInfo)) {
if (pStream->isProject) {
/*
* for project query, no mater fetch data successfully or not, next launch will issue
* more than the sliding time window
......@@ -342,7 +339,6 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
return;
}
} else {
pStream->stime += pStream->slidingTime;
if ((pStream->stime - pStream->interval) >= pStream->etime) {
tscDebug("%p stream:%p, stime:%" PRId64 " is larger than end time: %" PRId64 ", stop the stream", pStream->pSql, pStream,
pStream->stime, pStream->etime);
......@@ -409,14 +405,16 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
pStream->slidingTime = pQueryInfo->slidingTime;
if (pStream->isProject) {
pQueryInfo->intervalTime = 0; // clear the interval value to avoid the force time window split by query processor
pQueryInfo->slidingTime = 0;
}
}
static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, int64_t stime) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
if (isProjectStream(pQueryInfo)) {
if (pStream->isProject) {
// no data in table, flush all data till now to destination meter, 10sec delay
pStream->interval = tsProjectExecInterval;
pStream->slidingTime = tsProjectExecInterval;
......@@ -489,7 +487,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
SSqlStream *pStream = (SSqlStream *)calloc(1, sizeof(SSqlStream));
if (pStream == NULL) {
tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code);
tscError("%p open stream failed, sql:%s, reason:%s, code:0x%08x", pSql, sqlstr, pCmd->payload, pRes->code);
tscFreeSqlObj(pSql);
return NULL;
}
......@@ -514,7 +512,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
if (pRes->code != TSDB_CODE_SUCCESS) {
setErrorInfo(pSql, pRes->code, pCmd->payload);
tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code);
tscError("%p open stream failed, sql:%s, reason:%s, code:0x%08x", pSql, sqlstr, pCmd->payload, pRes->code);
tscFreeSqlObj(pSql);
return NULL;
}
......@@ -523,6 +521,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
pStream->isProject = isProjectStream(pQueryInfo);
pStream->fp = fp;
pStream->callback = callback;
pStream->param = param;
......@@ -565,6 +564,8 @@ void taos_close_stream(TAOS_STREAM *handle) {
taosTmrStopA(&(pStream->pTimer));
tscDebug("%p stream:%p is closed", pSql, pStream);
// notify CQ to release the pStream object
pStream->fp(pStream->param, NULL, NULL);
tscFreeSqlObj(pSql);
pStream->pSql = NULL;
......
......@@ -14,12 +14,12 @@
*/
#include "os.h"
#include "tscSubquery.h"
#include "qtsbuf.h"
#include "qast.h"
#include "tcompare.h"
#include "tschemautil.h"
#include "qtsbuf.h"
#include "tscLog.h"
#include "tscSubquery.h"
#include "tschemautil.h"
#include "tsclient.h"
typedef struct SInsertSupporter {
......@@ -57,10 +57,15 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
pSubQueryInfo1->tsBuf = output1;
pSubQueryInfo2->tsBuf = output2;
// no result generated, return directly
if (pSupporter1->pTSBuf == NULL || pSupporter2->pTSBuf == NULL) {
tscDebug("%p at least one ts-comp is empty, 0 for secondary query after ts blocks intersecting", pSql);
return 0;
}
tsBufResetPos(pSupporter1->pTSBuf);
tsBufResetPos(pSupporter2->pTSBuf);
// TODO add more details information
if (!tsBufNextPos(pSupporter1->pTSBuf)) {
tsBufFlush(output1);
tsBufFlush(output2);
......@@ -210,6 +215,7 @@ static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) {
pSupporter->f = NULL;
}
tfree(pSupporter->pIdTagList);
tscTagCondRelease(&pSupporter->tagCond);
free(pSupporter);
}
......@@ -420,43 +426,6 @@ static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) {
pQueryInfo->window = *win;
}
static UNUSED_FUNC void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj* pSql) {
SSqlObj* pParentSql = pSupporter->pObj;
SQueryInfo* pParentQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex);
// if (tscNonOrderedProjectionQueryOnSTable(pParentQueryInfo, 0)) {
// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// assert(pQueryInfo->numOfTables == 1);
//
// // for projection query, need to try next vnode
//// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes;
// int32_t totalVnode = 0;
// if ((++pTableMetaInfo->vgroupIndex) < totalVnode) {
// tscDebug("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql,
// pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVnode, pRes->numOfTotal);
//
// pSql->cmd.command = TSDB_SQL_SELECT;
// pSql->fp = tscJoinQueryCallback;
// tscProcessSql(pSql);
//
// return;
// }
// }
SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
STimeWindow win = TSWINDOW_INITIALIZER;
int64_t num = doTSBlockIntersect(pParentSql, p1, p2, &win);
if (num <= 0) { // no result during ts intersect
tscDebug("%p free all sub SqlObj and quit", pParentSql);
freeJoinSubqueryObj(pParentSql);
} else {
updateQueryTimeRange(pParentQueryInfo, &win);
tscLaunchRealSubqueries(pParentSql);
}
}
int32_t tscCompareTidTags(const void* p1, const void* p2) {
const STidTags* t1 = (const STidTags*) varDataVal(p1);
const STidTags* t2 = (const STidTags*) varDataVal(p2);
......@@ -713,9 +682,12 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
SArray *s1 = NULL, *s2 = NULL;
getIntersectionOfTableTuple(pQueryInfo, pParentSql, &s1, &s2);
if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) { // no results,return.
tscDebug("%p free all sub SqlObj and quit", pParentSql);
tscDebug("%p tag intersect does not generated qualified tables for join, free all sub SqlObj and quit", pParentSql);
freeJoinSubqueryObj(pParentSql);
// set no result command
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
(*pParentSql->fp)(pParentSql->param, pParentSql, 0);
} else {
// proceed to for ts_comp query
SSqlCmd* pSubCmd1 = &pParentSql->pSubs[0]->cmd;
......@@ -847,6 +819,9 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
tscDebug("%p no results generated in ts intersection, free all sub SqlObj and quit", pParentSql);
freeJoinSubqueryObj(pParentSql);
// set no result command
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
(*pParentSql->fp)(pParentSql->param, pParentSql, 0);
return;
}
......
......@@ -14,21 +14,21 @@
*/
#include "os.h"
#include "qast.h"
#include "hash.h"
#include "tscUtil.h"
#include "taosmsg.h"
#include "qast.h"
#include "tcache.h"
#include "tkey.h"
#include "tmd5.h"
#include "tscProfile.h"
#include "tscLocalMerge.h"
#include "tscLog.h"
#include "tscProfile.h"
#include "tscSubquery.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "ttimer.h"
#include "ttokendef.h"
#include "tscLog.h"
#include "tscUtil.h"
#include "hash.h"
static void freeQueryInfoImpl(SQueryInfo* pQueryInfo);
static void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache);
......
......@@ -3,6 +3,7 @@
#include "os.h"
#include "taosmsg.h"
#include "tstoken.h"
typedef struct SDataStatis {
int16_t colId;
......@@ -23,6 +24,8 @@ void extractTableName(const char *tableId, char *name);
char* extractDBName(const char *tableId, char *name);
void extractTableNameFromToken(SSQLToken *pToken, SSQLToken* pTable);
SSchema tGetTableNameColumnSchema();
bool tscValidateTableNameLength(size_t len);
......
......@@ -105,3 +105,26 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, in
}
return start;
}
/*
* tablePrefix.columnName
* extract table name and save it in pTable, with only column name in pToken
*/
void extractTableNameFromToken(SSQLToken* pToken, SSQLToken* pTable) {
const char sep = TS_PATH_DELIMITER[0];
if (pToken == pTable || pToken == NULL || pTable == NULL) {
return;
}
char* r = strnchr(pToken->z, sep, pToken->n, false);
if (r != NULL) { // record the table name token
pTable->n = r - pToken->z;
pTable->z = pToken->z;
r += 1;
pToken->n -= (r - pToken->z);
pToken->z = r;
}
}
#!/bin/bash
ulimit -c unlimited
function buildTDengine {
cd /root/TDengine
git remote update
REMOTE_COMMIT=`git rev-parse --short remotes/origin/develop`
LOCAL_COMMIT=`git rev-parse --short @`
echo " LOCAL: $LOCAL_COMMIT"
echo "REMOTE: $REMOTE_COMMIT"
if [ "$LOCAL_COMMIT" == "$REMOTE_COMMIT" ]; then
echo "repo up-to-date"
else
echo "repo need to pull"
git pull
LOCAL_COMMIT=`git rev-parse --short @`
cd /root/TDengine/debug
rm -rf /root/TDengine/debug/*
cmake ..
make > /dev/null
make install
fi
}
function restartTaosd {
systemctl stop taosd
pkill -KILL -x taosd
sleep 10
logDir=`grep 'logDir' /etc/taos/taos.cfg|awk 'END{print $2}'`
dataDir=`grep 'dataDir' /etc/taos/taos.cfg|awk '{print $2}'`
rm -rf $logDir/*
rm -rf $dataDir/*
taosd 2>&1 > /dev/null &
sleep 10
}
buildTDengine
restartTaosd
......@@ -10,7 +10,6 @@ public class TDNode {
private int running;
private int deployed;
private boolean testCluster;
private int valgrind;
private String path;
private String cfgDir;
private String dataDir;
......@@ -22,17 +21,12 @@ public class TDNode {
running = 0;
deployed = 0;
testCluster = false;
valgrind = 0;
}
public void setPath(String path) {
this.path = path;
}
public void setValgrind(int valgrind) {
this.valgrind = valgrind;
}
public void setTestCluster(boolean testCluster) {
this.testCluster = testCluster;
}
......@@ -58,7 +52,7 @@ public class TDNode {
public void start() {
String selfPath = System.getProperty("user.dir");
String binPath = "";
String projDir = selfPath + "../../../../";
String projDir = selfPath + "/../../../../";
try {
ArrayList<String> taosdPath = new ArrayList<>();
......@@ -95,14 +89,8 @@ public class TDNode {
return;
}
String cmd = "";
if(this.valgrind == 0) {
cmd = "nohup " + binPath + " -c " + cfgDir + " > /dev/null 2>&1 & ";
String cmd = "nohup " + binPath + " -c " + cfgDir + " > /dev/null 2>&1 & ";
System.out.println("start taosd cmd: " + cmd);
} else {
String valgrindCmdline = "valgrind --tool=memcheck --leak-check=full --show-reac∏hable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes";
cmd = "nohup " + valgrindCmdline + " " + binPath + " -c " + this.cfgDir + " 2>&1 & ";
}
try{
Runtime.getRuntime().exec(cmd);
......@@ -115,12 +103,7 @@ public class TDNode {
}
public void stop() {
String toBeKilled = "";
if (this.valgrind == 0) {
toBeKilled = "taosd";
} else {
toBeKilled = "valgrind.bin";
}
String toBeKilled = "taosd";
if (this.running != 0) {
String psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print " + toBeKilled + "}'";
......@@ -136,10 +119,6 @@ public class TDNode {
String fuserCmd = "fuser -k -n tcp " + port;
Runtime.getRuntime().exec(fuserCmd).waitFor();
}
if (this.valgrind == 1) {
TimeUnit.SECONDS.sleep(2);
}
} catch (Exception e) {
e.printStackTrace();
}
......
......@@ -6,7 +6,6 @@ import java.util.*;
public class TDNodes {
private ArrayList<TDNode> tdNodes;
private boolean testCluster;
private int valgrind;
public TDNodes () {
tdNodes = new ArrayList<>();
......@@ -23,12 +22,6 @@ public class TDNodes {
String killCmd = "kill -9 " + ps.pid();
Runtime.getRuntime().exec(killCmd).waitFor();
psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}'";
ps = Runtime.getRuntime().exec(psCmd);
ps.waitFor();
killCmd = "kill -9 " + ps.pid();
Runtime.getRuntime().exec(killCmd).waitFor();
String binPath = System.getProperty("user.dir");
binPath += "/../../../debug";
System.out.println("binPath: " + binPath);
......@@ -54,10 +47,6 @@ public class TDNodes {
this.testCluster = testCluster;
}
public void setValgrid(int valgrind) {
this.valgrind = valgrind;
}
public void check(int index) {
if(index < 1 || index > 10) {
System.out.println("index: " + index + " should on a scale of [1, 10]");
......@@ -71,7 +60,6 @@ public class TDNodes {
String projectRealPath = file.getCanonicalPath();
check(index);
tdNodes.get(index - 1).setTestCluster(this.testCluster);
tdNodes.get(index - 1).setValgrind(valgrind);
tdNodes.get(index - 1).setPath(projectRealPath);
tdNodes.get(index - 1).deploy();
} catch (Exception e) {
......
......@@ -10,19 +10,17 @@ public class BaseTest {
private static boolean testCluster = false;
private static String deployPath = System.getProperty("user.dir");
private static int valgrind = 0;
private static TDNodes tdNodes = new TDNodes();
@BeforeClass
public static void setUpEvn() {
public static void setupEnv() {
try{
File file = new File(deployPath + "/../../../");
String rootPath = file.getCanonicalPath();
tdNodes.setPath(rootPath);
tdNodes.setTestCluster(testCluster);
tdNodes.setValgrid(valgrind);
tdNodes.deploy(1);
tdNodes.start(1);
......
from .cinterface import CTaosInterface
from .error import *
from .constants import FieldType
import threading
# querySeqNum = 0
......@@ -37,6 +38,7 @@ class TDengineCursor(object):
self._block_iter = 0
self._affected_rows = 0
self._logfile = ""
self._threadId = threading.get_ident()
if connection is not None:
self._connection = connection
......@@ -103,6 +105,12 @@ class TDengineCursor(object):
def execute(self, operation, params=None):
"""Prepare and execute a database operation (query or command).
"""
# if threading.get_ident() != self._threadId:
# info ="Cursor execute:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident())
# raise OperationalError(info)
# print(info)
# return None
if not operation:
return None
......@@ -188,6 +196,11 @@ class TDengineCursor(object):
def fetchall(self):
"""Fetch all (remaining) rows of a query result, returning them as a sequence of sequences (e.g. a list of tuples). Note that the cursor's arraysize attribute can affect the performance of this operation.
"""
# if threading.get_ident() != self._threadId:
# info ="[WARNING] Cursor fetchall:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident())
# raise OperationalError(info)
# print(info)
# return None
if self._result is None or self._fields is None:
raise OperationalError("Invalid use of fetchall")
......@@ -232,6 +245,12 @@ class TDengineCursor(object):
def _handle_result(self):
"""Handle the return result from query.
"""
# if threading.get_ident() != self._threadId:
# info = "Cursor handleresult:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident())
# raise OperationalError(info)
# print(info)
# return None
self._description = []
for ele in self._fields:
self._description.append(
......
......@@ -109,6 +109,8 @@ void cqClose(void *handle) {
while (pObj) {
SCqObj *pTemp = pObj;
pObj = pObj->next;
tdFreeSchema(pTemp->pSchema);
tfree(pTemp->sqlStr);
free(pTemp);
}
......@@ -242,6 +244,10 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
SCqObj *pObj = (SCqObj *)param;
if (tres == NULL && row == NULL) {
pObj->pStream = NULL;
return;
}
SCqContext *pContext = pObj->pContext;
STSchema *pSchema = pObj->pSchema;
if (pObj->pStream == NULL) return;
......@@ -263,8 +269,14 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
void* val = row[i];
if (val == NULL) {
val = getNullValue(c->type);
} else if (IS_VAR_DATA_TYPE(c->type)) {
} else if (c->type == TSDB_DATA_TYPE_BINARY) {
val = ((char*)val) - sizeof(VarDataLenT);
} else if (c->type == TSDB_DATA_TYPE_NCHAR) {
char buf[TSDB_MAX_NCHAR_LEN];
size_t len = taos_fetch_lengths(tres)[i];
taosMbsToUcs4(val, len, buf, sizeof(buf), &len);
memcpy(val + sizeof(VarDataLenT), buf, len);
varDataLen(val) = len;
}
tdAppendColVal(trow, val, c->type, c->bytes, c->offset);
}
......
......@@ -131,8 +131,8 @@ static void dnodeFreeMnodeWriteMsg(SMnodeMsg *pWrite) {
taosFreeQitem(pWrite);
}
void dnodeSendRpcMnodeWriteRsp(void *pRaw, int32_t code) {
SMnodeMsg *pWrite = pRaw;
void dnodeSendRpcMnodeWriteRsp(void *pMsg, int32_t code) {
SMnodeMsg *pWrite = pMsg;
if (pWrite == NULL) return;
if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return;
if (code == TSDB_CODE_MND_ACTION_NEED_REPROCESSED) {
......
......@@ -402,6 +402,7 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
void *pVnode = vnodeAcquireVnode(pCreate->cfg.vgId);
if (pVnode != NULL) {
dDebug("vgId:%d, already exist, processed as alter msg", pCreate->cfg.vgId);
int32_t code = vnodeAlter(pVnode, pCreate);
vnodeRelease(pVnode);
return code;
......
......@@ -87,8 +87,8 @@ int32_t qKillQuery(qinfo_t qinfo);
void* qOpenQueryMgmt(int32_t vgId);
void qSetQueryMgmtClosed(void* pExecutor);
void qCleanupQueryMgmt(void* pExecutor);
void** qRegisterQInfo(void* pMgmt, void* qInfo);
void** qAcquireQInfo(void* pMgmt, void** key);
void** qRegisterQInfo(void* pMgmt, uint64_t qInfo);
void** qAcquireQInfo(void* pMgmt, uint64_t key);
void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool needFree);
#ifdef __cplusplus
......
......@@ -129,9 +129,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_CLUSTER_CFG_INCONSISTENT, 0, 0x0335, "mnode clus
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACCT_ALREADY_EXIST, 0, 0x0340, "mnode accounts already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ACCT, 0, 0x0341, "mnode invalid account")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ACCT_PARA, 0, 0x0342, "mnode invalid account parameter")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ACCT_OPTION, 0, 0x0343, "mnode invalid acct option")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_ACCTS, 0, 0x0344, "mnode too many accounts")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ACCT_OPTION, 0, 0x0342, "mnode invalid acct option")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_USER_ALREADY_EXIST, 0, 0x0350, "mnode user already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_USER, 0, 0x0351, "mnode invalid user")
......@@ -145,7 +143,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_ID, 0, 0x0361, "mnode inva
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_NAME, 0, 0x0362, "mnode invalid table name")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_TYPE, 0, 0x0363, "mnode invalid table type")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TAGS, 0, 0x0364, "mnode too many tags")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TABLES, 0, 0x0365, "mnode too many tables")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TIMESERIES, 0, 0x0366, "mnode not enough time series")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_SUPER_TABLE, 0, 0x0367, "mnode no super table") // operation only available for super table
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COL_NAME_TOO_LONG, 0, 0x0368, "mnode column name too long")
......@@ -161,13 +158,13 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_OPTION, 0, 0x0382, "mnode inva
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB, 0, 0x0383, "mnode invalid database")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MONITOR_DB_FORBIDDEN, 0, 0x0384, "mnode monitor db forbidden")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_DATABASES, 0, 0x0385, "mnode too many databases")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_IN_DROPPING, 0, 0x0386, "mnode db in dropping")
// dnode
TAOS_DEFINE_ERROR(TSDB_CODE_DND_MSG_NOT_PROCESSED, 0, 0x0400, "dnode message not processed")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_OUT_OF_MEMORY, 0, 0x0401, "dnode out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_NO_WRITE_ACCESS, 0, 0x0402, "dnode no disk write access")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_INVALID_MSG_LEN, 0, 0x0403, "dnode invalid message length")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_INVALID_FILE_FORMAT, 0, 0x0404, "dnode invalid file format")
// vnode
TAOS_DEFINE_ERROR(TSDB_CODE_VND_ACTION_IN_PROGRESS, 0, 0x0500, "vnode action in progress")
......
......@@ -473,7 +473,7 @@ typedef struct {
typedef struct {
int32_t code;
uint64_t qhandle;
uint64_t qhandle; // query handle
} SQueryTableRsp;
typedef struct {
......
......@@ -235,9 +235,10 @@ bool tsdbNextDataBlock(TsdbQueryHandleT *pQueryHandle);
* Get current data block information
*
* @param pQueryHandle
* @param pBlockInfo
* @return
*/
SDataBlockInfo tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pQueryHandle);
void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pQueryHandle, SDataBlockInfo* pBlockInfo);
/**
*
......
......@@ -206,9 +206,10 @@ static void shellSourceFile(TAOS *con, char *fptr) {
if (code != 0) {
fprintf(stderr, "DB error: %s: %s (%d)\n", taos_errstr(con), fname, lineNo);
}
/* free local resouce: allocated memory/metric-meta refcnt */
taos_free_result(pSql);
}
memset(cmd, 0, MAX_COMMAND_SIZE);
cmd_len = 0;
......
......@@ -520,9 +520,8 @@ int main(int argc, char *argv[]) {
snprintf(command, BUFFER_SIZE, "create table if not exists %s.meters (ts timestamp%s tags (areaid int, loc binary(10))", db_name, cols);
queryDB(taos, command);
printf("meters created!\n");
taos_close(taos);
}
taos_close(taos);
/* Wait for table to create */
multiThreadCreateTable(cols, use_metric, threads, ntables, db_name, tb_prefix, ip_addr, port, user, pass);
......@@ -792,9 +791,6 @@ void * createTable(void *sarg)
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d (ts timestamp%s;", winfo->db_name, winfo->tb_prefix, i, winfo->cols);
queryDB(winfo->taos, command);
}
taos_close(winfo->taos);
} else {
/* Create all the tables; */
printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id);
......@@ -812,7 +808,6 @@ void * createTable(void *sarg)
}
queryDB(winfo->taos, command);
}
taos_close(winfo->taos);
}
return NULL;
......@@ -857,7 +852,6 @@ void multiThreadCreateTable(char* cols, bool use_metric, int threads, int ntable
for (int i = 0; i < threads; i++) {
info *t_info = infos + i;
taos_close(t_info->taos);
sem_destroy(&(t_info->mutex_sem));
sem_destroy(&(t_info->lock_sem));
}
......@@ -875,6 +869,11 @@ void *readTable(void *sarg) {
int64_t sTime = rinfo->start_time;
char *tb_prefix = rinfo->tb_prefix;
FILE *fp = fopen(rinfo->fp, "a");
if (NULL == fp) {
printf("fopen %s fail, reason:%s.\n", rinfo->fp, strerror(errno));
return NULL;
}
int num_of_DPT = rinfo->nrecords_per_table;
int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1;
int totalData = num_of_DPT * num_of_tables;
......@@ -930,6 +929,11 @@ void *readMetric(void *sarg) {
TAOS *taos = rinfo->taos;
char command[BUFFER_SIZE] = "\0";
FILE *fp = fopen(rinfo->fp, "a");
if (NULL == fp) {
printf("fopen %s fail, reason:%s.\n", rinfo->fp, strerror(errno));
return NULL;
}
int num_of_DPT = rinfo->nrecords_per_table;
int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1;
int totalData = num_of_DPT * num_of_tables;
......
此差异已折叠。
......@@ -40,7 +40,7 @@ struct arguments {
static error_t parse_opt(int key, char *arg, struct argp_state *state) {
struct arguments *arguments = state->input;
switch (key) {
case 'w':
case 'r':
arguments->dataDir = arg;
break;
case 'd':
......@@ -51,6 +51,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
break;
case 'f':
arguments->fqdn = arg;
break;
case 'g':
arguments->dnodeGroups = arg;
break;
......
......@@ -96,6 +96,7 @@ void walModWalFile(char* walfile) {
if (wfd < 0) {
printf("wal:%s, failed to open(%s)\n", newWalFile, strerror(errno));
free(buffer);
close(rfd);
return ;
}
......@@ -116,6 +117,11 @@ void walModWalFile(char* walfile) {
break;
}
if (pHead->len >= 1024000 - sizeof(SWalHead)) {
printf("wal:%s, SWalHead.len(%d) overflow, skip the rest of file\n", walfile, pHead->len);
break;
}
ret = read(rfd, pHead->cont, pHead->len);
if ( ret != pHead->len) {
printf("wal:%s, failed to read body, skip, len:%d ret:%d\n", walfile, pHead->len, ret);
......
......@@ -99,6 +99,8 @@ static int32_t readVnodeCfg(SVnodeObj *pVnode, char* cfgFile)
goto PARSE_OVER;
}
content[maxLen] = (char)0;
root = cJSON_Parse(content);
if (root == NULL) {
printf("failed to json parse %s, invalid json format\n", cfgFile);
......
......@@ -44,6 +44,7 @@ void mnodeDecMnodeRef(struct SMnodeObj *pMnode);
char * mnodeGetMnodeRoleStr();
void mnodeGetMnodeIpSetForPeer(SRpcIpSet *ipSet);
void mnodeGetMnodeIpSetForShell(SRpcIpSet *ipSet);
char* mnodeGetMnodeMasterEp();
void mnodeGetMnodeInfos(void *mnodes);
void mnodeUpdateMnodeIpSet();
......
......@@ -53,6 +53,7 @@ typedef struct {
void * rowData;
int32_t rowSize;
int32_t retCode; // for callback in sdb queue
int32_t processedCount; // for sync fwd callback
int32_t (*cb)(struct SMnodeMsg *pMsg, int32_t code);
struct SMnodeMsg *pMsg;
} SSdbOper;
......
......@@ -128,6 +128,7 @@ int32_t mnodeInitAccts() {
void mnodeCleanupAccts() {
acctCleanUp();
sdbCloseTable(tsAcctSdb);
tsAcctSdb = NULL;
}
void *mnodeGetAcct(char *name) {
......
......@@ -459,6 +459,7 @@ void mnodeMoveVgroupToHead(SVgObj *pVgroup) {
void mnodeCleanupDbs() {
sdbCloseTable(tsDbSdb);
tsDbSdb = NULL;
}
static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
......@@ -965,6 +966,11 @@ static int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_INVALID_DB;
}
if (pMsg->pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping", pAlter->db, pMsg->pDb->status);
return TSDB_CODE_MND_DB_IN_DROPPING;
}
return mnodeAlterDb(pMsg->pDb, pAlter, pMsg);
}
......
......@@ -88,13 +88,13 @@ static int32_t mnodeDnodeActionDelete(SSdbOper *pOper) {
}
static int32_t mnodeDnodeActionUpdate(SSdbOper *pOper) {
SDnodeObj *pDnode = pOper->pObj;
SDnodeObj *pSaved = mnodeGetDnode(pDnode->dnodeId);
if (pSaved != NULL && pDnode != pSaved) {
memcpy(pSaved, pDnode, pOper->rowSize);
free(pDnode);
mnodeDecDnodeRef(pSaved);
SDnodeObj *pNew = pOper->pObj;
SDnodeObj *pDnode = mnodeGetDnode(pNew->dnodeId);
if (pDnode != NULL && pNew != pDnode) {
memcpy(pDnode, pNew, pOper->rowSize);
free(pNew);
}
mnodeDecDnodeRef(pDnode);
return TSDB_CODE_SUCCESS;
}
......@@ -176,6 +176,7 @@ int32_t mnodeInitDnodes() {
void mnodeCleanupDnodes() {
sdbCloseTable(tsDnodeSdb);
tsDnodeSdb = NULL;
}
void *mnodeGetNextDnode(void *pIter, SDnodeObj **pDnode) {
......@@ -334,7 +335,7 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) {
if (pStatus->dnodeId == 0) {
mDebug("dnode:%d %s, first access", pDnode->dnodeId, pDnode->dnodeEp);
} else {
//mDebug("dnode:%d, status received, access times %d", pDnode->dnodeId, pDnode->lastAccess);
mTrace("dnode:%d, status received, access times %d", pDnode->dnodeId, pDnode->lastAccess);
}
int32_t openVnodes = htons(pStatus->openVnodes);
......@@ -468,7 +469,7 @@ static int32_t mnodeDropDnodeByEp(char *ep, SMnodeMsg *pMsg) {
}
mnodeDecDnodeRef(pDnode);
if (strcmp(pDnode->dnodeEp, dnodeGetMnodeMasterEp()) == 0) {
if (strcmp(pDnode->dnodeEp, mnodeGetMnodeMasterEp()) == 0) {
mError("dnode:%d, can't drop dnode:%s which is master", pDnode->dnodeId, ep);
return TSDB_CODE_MND_NO_REMOVE_MASTER;
}
......
......@@ -121,8 +121,8 @@ void mnodeCleanupSystem() {
dnodeFreeMnodeWqueue();
dnodeFreeMnodeRqueue();
dnodeFreeMnodePqueue();
mnodeCleanupComponents(sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]) - 1);
mnodeCleanupTimer();
mnodeCleanupComponents(sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]) - 1);
mInfo("mnode is cleaned up");
}
......
......@@ -165,6 +165,7 @@ int32_t mnodeInitMnodes() {
void mnodeCleanupMnodes() {
sdbCloseTable(tsMnodeSdb);
tsMnodeSdb = NULL;
mnodeMnodeDestroyLock();
}
......@@ -267,6 +268,10 @@ void mnodeGetMnodeIpSetForShell(SRpcIpSet *ipSet) {
mnodeMnodeUnLock();
}
char* mnodeGetMnodeMasterEp() {
return tsMnodeInfos.nodeInfos[tsMnodeInfos.inUse].nodeEp;
}
void mnodeGetMnodeInfos(void *mnodeInfos) {
mnodeMnodeRdLock();
*(SDMMnodeInfos *)mnodeInfos = tsMnodeInfos;
......
......@@ -72,8 +72,6 @@ typedef struct {
void * sync;
void * wal;
SSyncCfg cfg;
sem_t sem;
int32_t code;
int32_t numOfTables;
SSdbTable *tableList[SDB_TABLE_MAX];
pthread_mutex_t mutex;
......@@ -201,7 +199,7 @@ static void sdbRestoreTables() {
sdbDebug("table:%s, is restored, numOfRows:%" PRId64, pTable->tableName, pTable->numOfRows);
}
sdbInfo("sdb is restored, version:%" PRId64 " totalRows:%d numOfTables:%d", tsSdbObj.version, totalRows, numOfTables);
sdbInfo("sdb is restored, ver:%" PRId64 " totalRows:%d numOfTables:%d", tsSdbObj.version, totalRows, numOfTables);
}
void sdbUpdateMnodeRoles() {
......@@ -244,22 +242,31 @@ static void sdbNotifyRole(void *ahandle, int8_t role) {
sdbUpdateMnodeRoles();
}
FORCE_INLINE
static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
tsSdbObj.code = code;
sem_post(&tsSdbObj.sem);
sdbDebug("forward request confirmed, version:%" PRIu64 ", result:%s", (int64_t)param, tstrerror(code));
}
assert(param);
SSdbOper * pOper = param;
SMnodeMsg *pMsg = pOper->pMsg;
if (code <= 0) pOper->retCode = code;
static int32_t sdbForwardToPeer(SWalHead *pHead) {
if (tsSdbObj.sync == NULL) return TSDB_CODE_SUCCESS;
int32_t processedCount = atomic_add_fetch_32(&pOper->processedCount, 1);
if (processedCount <= 1) {
if (pMsg != NULL) {
sdbDebug("app:%p:%p, waiting for confirm this operation, count:%d", pMsg->rpcMsg.ahandle, pMsg, processedCount);
}
return;
}
int32_t code = syncForwardToPeer(tsSdbObj.sync, pHead, (void*)pHead->version, TAOS_QTYPE_RPC);
if (code > 0) {
sdbDebug("forward request is sent, version:%" PRIu64 ", code:%d", pHead->version, code);
sem_wait(&tsSdbObj.sem);
return tsSdbObj.code;
if (pMsg != NULL) {
sdbDebug("app:%p:%p, is confirmed and will do callback func", pMsg->rpcMsg.ahandle, pMsg);
}
return code;
if (pOper->cb != NULL) {
pOper->retCode = (*pOper->cb)(pMsg, pOper->retCode);
}
dnodeSendRpcMnodeWriteRsp(pMsg, pOper->retCode);
taosFreeQitem(pOper);
}
void sdbUpdateSync() {
......@@ -298,7 +305,7 @@ void sdbUpdateSync() {
}
syncCfg.replica = index;
syncCfg.quorum = (syncCfg.replica == 1) ? 1:2;
syncCfg.quorum = (syncCfg.replica == 1) ? 1 : 2;
bool hasThisDnode = false;
for (int32_t i = 0; i < syncCfg.replica; ++i) {
......@@ -339,7 +346,6 @@ void sdbUpdateSync() {
int32_t sdbInit() {
pthread_mutex_init(&tsSdbObj.mutex, NULL);
sem_init(&tsSdbObj.sem, 0, 0);
if (sdbInitWriteWorker() != 0) {
return -1;
......@@ -367,7 +373,7 @@ void sdbCleanUp() {
tsSdbObj.status = SDB_STATUS_CLOSING;
sdbCleanupWriteWorker();
sdbDebug("sdb will be closed, version:%" PRId64, tsSdbObj.version);
sdbDebug("sdb will be closed, ver:%" PRId64, tsSdbObj.version);
if (tsSdbObj.sync) {
syncStop(tsSdbObj.sync);
......@@ -379,7 +385,6 @@ void sdbCleanUp() {
tsSdbObj.wal = NULL;
}
sem_destroy(&tsSdbObj.sem);
pthread_mutex_destroy(&tsSdbObj.mutex);
}
......@@ -466,8 +471,8 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) {
atomic_add_fetch_32(&pTable->autoIndex, 1);
}
sdbDebug("table:%s, insert record:%s to hash, rowSize:%d numOfRows:%" PRId64 " version:%" PRIu64, pTable->tableName,
sdbGetKeyStrFromObj(pTable, pOper->pObj), pOper->rowSize, pTable->numOfRows, sdbGetVersion());
sdbDebug("table:%s, insert record:%s to hash, rowSize:%d numOfRows:%" PRId64 " ver:%" PRIu64 ", msg:%p", pTable->tableName,
sdbGetKeyStrFromObj(pTable, pOper->pObj), pOper->rowSize, pTable->numOfRows, sdbGetVersion(), pOper->pMsg);
(*pTable->insertFp)(pOper);
return TSDB_CODE_SUCCESS;
......@@ -485,8 +490,8 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) {
taosHashRemove(pTable->iHandle, key, keySize);
atomic_sub_fetch_32(&pTable->numOfRows, 1);
sdbDebug("table:%s, delete record:%s from hash, numOfRows:%" PRId64 " version:%" PRIu64, pTable->tableName,
sdbGetKeyStrFromObj(pTable, pOper->pObj), pTable->numOfRows, sdbGetVersion());
sdbDebug("table:%s, delete record:%s from hash, numOfRows:%" PRId64 " ver:%" PRIu64 ", msg:%p", pTable->tableName,
sdbGetKeyStrFromObj(pTable, pOper->pObj), pTable->numOfRows, sdbGetVersion(), pOper->pMsg);
int8_t *updateEnd = pOper->pObj + pTable->refCountPos - 1;
*updateEnd = 1;
......@@ -496,8 +501,8 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) {
}
static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbOper *pOper) {
sdbDebug("table:%s, update record:%s in hash, numOfRows:%" PRId64 " version:%" PRIu64, pTable->tableName,
sdbGetKeyStrFromObj(pTable, pOper->pObj), pTable->numOfRows, sdbGetVersion());
sdbDebug("table:%s, update record:%s in hash, numOfRows:%" PRId64 " ver:%" PRIu64 ", msg:%p", pTable->tableName,
sdbGetKeyStrFromObj(pTable, pOper->pObj), pTable->numOfRows, sdbGetVersion(), pOper->pMsg);
(*pTable->updateFp)(pOper);
return TSDB_CODE_SUCCESS;
......@@ -513,6 +518,7 @@ static int sdbWrite(void *param, void *data, int type) {
assert(pTable != NULL);
pthread_mutex_lock(&tsSdbObj.mutex);
if (pHead->version == 0) {
// assign version
tsSdbObj.version++;
......@@ -521,16 +527,13 @@ static int sdbWrite(void *param, void *data, int type) {
// for data from WAL or forward, version may be smaller
if (pHead->version <= tsSdbObj.version) {
pthread_mutex_unlock(&tsSdbObj.mutex);
if (type == TAOS_QTYPE_FWD && tsSdbObj.sync != NULL) {
sdbDebug("forward request is received, version:%" PRIu64 " confirm it", pHead->version);
syncConfirmForward(tsSdbObj.sync, pHead->version, TSDB_CODE_SUCCESS);
}
sdbDebug("table:%s, failed to restore %s record:%s from source(%d), ver:%" PRId64 " too large, sdb ver:%" PRId64,
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), type, pHead->version, tsSdbObj.version);
return TSDB_CODE_SUCCESS;
} else if (pHead->version != tsSdbObj.version + 1) {
pthread_mutex_unlock(&tsSdbObj.mutex);
sdbError("table:%s, failed to restore %s record:%s from wal, version:%" PRId64 " too large, sdb version:%" PRId64,
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version,
tsSdbObj.version);
sdbError("table:%s, failed to restore %s record:%s from source(%d), ver:%" PRId64 " too large, sdb ver:%" PRId64,
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), type, pHead->version, tsSdbObj.version);
return TSDB_CODE_MND_APP_ERROR;
} else {
tsSdbObj.version = pHead->version;
......@@ -543,27 +546,35 @@ static int sdbWrite(void *param, void *data, int type) {
return code;
}
code = sdbForwardToPeer(pHead);
pthread_mutex_unlock(&tsSdbObj.mutex);
// from app, oper is created
if (pOper != NULL) {
sdbTrace("record from app is disposed, table:%s action:%s record:%s version:%" PRIu64 " result:%s",
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version,
tstrerror(code));
return code;
// forward to peers
pOper->processedCount = 0;
int32_t syncCode = syncForwardToPeer(tsSdbObj.sync, pHead, pOper, TAOS_QTYPE_RPC);
if (syncCode <= 0) pOper->processedCount = 1;
if (syncCode < 0) {
sdbError("table:%s, failed to forward request, result:%s action:%s record:%s ver:%" PRId64 ", msg:%p", pTable->tableName,
tstrerror(syncCode), sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, pOper->pMsg);
} else if (syncCode > 0) {
sdbDebug("table:%s, forward request is sent, action:%s record:%s ver:%" PRId64 ", msg:%p", pTable->tableName,
sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, pOper->pMsg);
} else {
sdbTrace("table:%s, no need to send fwd request, action:%s record:%s ver:%" PRId64 ", msg:%p", pTable->tableName,
sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version, pOper->pMsg);
}
return syncCode;
}
// from wal or forward msg, oper not created, should add into hash
if (tsSdbObj.sync != NULL) {
sdbTrace("record from wal forward is disposed, table:%s action:%s record:%s version:%" PRIu64 " confirm it",
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
syncConfirmForward(tsSdbObj.sync, pHead->version, code);
} else {
sdbTrace("record from wal restore is disposed, table:%s action:%s record:%s version:%" PRIu64, pTable->tableName,
sdbDebug("table:%s, record from wal/fwd is disposed, action:%s record:%s ver:%" PRId64, pTable->tableName,
sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), pHead->version);
}
// even it is WAL/FWD, it shall be called to update version in sync
syncForwardToPeer(tsSdbObj.sync, pHead, pOper, TAOS_QTYPE_RPC);
// from wal or forward msg, oper not created, should add into hash
if (action == SDB_ACTION_INSERT) {
SSdbOper oper = {.rowSize = pHead->len, .rowData = pHead->cont, .table = pTable};
code = (*pTable->decodeFp)(&oper);
......@@ -627,7 +638,7 @@ int32_t sdbInsertRow(SSdbOper *pOper) {
memcpy(pNewOper, pOper, sizeof(SSdbOper));
if (pNewOper->pMsg != NULL) {
sdbDebug("app:%p:%p, table:%s record:%p:%s, insert action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle,
sdbDebug("app:%p:%p, table:%s record:%p:%s, insert action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle,
pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj));
}
......@@ -677,7 +688,7 @@ int32_t sdbDeleteRow(SSdbOper *pOper) {
memcpy(pNewOper, pOper, sizeof(SSdbOper));
if (pNewOper->pMsg != NULL) {
sdbDebug("app:%p:%p, table:%s record:%p:%s, delete action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle,
sdbDebug("app:%p:%p, table:%s record:%p:%s, delete action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle,
pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj));
}
......@@ -727,7 +738,7 @@ int32_t sdbUpdateRow(SSdbOper *pOper) {
memcpy(pNewOper, pOper, sizeof(SSdbOper));
if (pNewOper->pMsg != NULL) {
sdbDebug("app:%p:%p, table:%s record:%p:%s, update action is add to sdb queue, ", pNewOper->pMsg->rpcMsg.ahandle,
sdbDebug("app:%p:%p, table:%s record:%p:%s, update action is add to sdb queue", pNewOper->pMsg->rpcMsg.ahandle,
pNewOper->pMsg, pTable->tableName, pOper->pObj, sdbGetKeyStrFromObj(pTable, pOper->pObj));
}
......@@ -943,20 +954,20 @@ static void *sdbWorkerFp(void *param) {
taosGetQitem(tsSdbWriteQall, &type, &item);
if (type == TAOS_QTYPE_RPC) {
pOper = (SSdbOper *)item;
pOper->processedCount = 1;
pHead = (void *)pOper + sizeof(SSdbOper) + SDB_SYNC_HACK;
if (pOper->pMsg != NULL) {
sdbDebug("app:%p:%p, table:%s record:%p:%s ver:%" PRIu64 ", will be processed in sdb queue",
pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, ((SSdbTable *)pOper->table)->tableName, pOper->pObj,
sdbGetKeyStr(pOper->table, pHead->cont), pHead->version);
}
} else {
pHead = (SWalHead *)item;
pOper = NULL;
}
if (pOper != NULL && pOper->pMsg != NULL) {
sdbDebug("app:%p:%p, table:%s record:%p:%s version:%" PRIu64 ", will be processed in sdb queue",
pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, ((SSdbTable *)pOper->table)->tableName, pOper->pObj,
sdbGetKeyStr(pOper->table, pHead->cont), pHead->version);
}
int32_t code = sdbWrite(pOper, pHead, type);
if (pOper) pOper->retCode = code;
if (pOper && code <= 0) pOper->retCode = code;
}
walFsync(tsSdbObj.wal);
......@@ -965,27 +976,19 @@ static void *sdbWorkerFp(void *param) {
taosResetQitems(tsSdbWriteQall);
for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(tsSdbWriteQall, &type, &item);
if (type == TAOS_QTYPE_RPC) {
pOper = (SSdbOper *)item;
if (pOper != NULL && pOper->cb != NULL) {
sdbTrace("app:%p:%p, will do callback func, index:%d", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, i);
pOper->retCode = (*pOper->cb)(pOper->pMsg, pOper->retCode);
}
if (pOper != NULL && pOper->pMsg != NULL) {
sdbTrace("app:%p:%p, msg is processed, result:%s", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg,
tstrerror(pOper->retCode));
}
if (pOper != NULL) {
sdbDecRef(pOper->table, pOper->pObj);
}
dnodeSendRpcMnodeWriteRsp(pOper->pMsg, pOper->retCode);
}
sdbConfirmForward(NULL, pOper, pOper->retCode);
} else if (type == TAOS_QTYPE_FWD) {
syncConfirmForward(tsSdbObj.sync, pHead->version, TSDB_CODE_SUCCESS);
taosFreeQitem(item);
} else {
taosFreeQitem(item);
}
}
}
return NULL;
}
......@@ -307,6 +307,11 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) {
code = TSDB_CODE_MND_INVALID_DB;
goto connect_over;
}
if (pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping", pDb->name, pDb->status);
return TSDB_CODE_MND_DB_IN_DROPPING;
}
mnodeDecDbRef(pDb);
}
......@@ -353,6 +358,11 @@ static int32_t mnodeProcessUseMsg(SMnodeMsg *pMsg) {
code = TSDB_CODE_MND_INVALID_DB;
}
if (pMsg->pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping", pMsg->pDb->name, pMsg->pDb->status);
return TSDB_CODE_MND_DB_IN_DROPPING;
}
return code;
}
......
......@@ -116,6 +116,11 @@ static int32_t mnodeChildTableActionInsert(SSdbOper *pOper) {
mError("ctable:%s, vgId:%d not in db:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName);
return TSDB_CODE_MND_INVALID_DB;
}
if (pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping", pDb->name, pDb->status);
return TSDB_CODE_MND_DB_IN_DROPPING;
}
mnodeDecDbRef(pDb);
SAcctObj *pAcct = mnodeGetAcct(pDb->acct);
......@@ -284,8 +289,8 @@ static int32_t mnodeChildTableActionRestored() {
if (pTable == NULL) break;
SDbObj *pDb = mnodeGetDbByTableId(pTable->info.tableId);
if (pDb == NULL) {
mError("ctable:%s, failed to get db, discard it", pTable->info.tableId);
if (pDb == NULL || pDb->status != TSDB_DB_STATUS_READY) {
mError("ctable:%s, failed to get db or db in dropping, discard it", pTable->info.tableId);
SSdbOper desc = {.type = SDB_OPER_LOCAL, .pObj = pTable, .table = tsChildTableSdb};
sdbDeleteRow(&desc);
mnodeDecTableRef(pTable);
......@@ -376,6 +381,7 @@ static int32_t mnodeInitChildTables() {
static void mnodeCleanupChildTables() {
sdbCloseTable(tsChildTableSdb);
tsChildTableSdb = NULL;
}
static void mnodeAddTableIntoStable(SSuperTableObj *pStable, SChildTableObj *pCtable) {
......@@ -422,7 +428,7 @@ static int32_t mnodeSuperTableActionDestroy(SSdbOper *pOper) {
static int32_t mnodeSuperTableActionInsert(SSdbOper *pOper) {
SSuperTableObj *pStable = pOper->pObj;
SDbObj *pDb = mnodeGetDbByTableId(pStable->info.tableId);
if (pDb != NULL) {
if (pDb != NULL && pDb->status == TSDB_DB_STATUS_READY) {
mnodeAddSuperTableIntoDb(pDb);
}
mnodeDecDbRef(pDb);
......@@ -554,6 +560,7 @@ static int32_t mnodeInitSuperTables() {
static void mnodeCleanupSuperTables() {
sdbCloseTable(tsSuperTableSdb);
tsSuperTableSdb = NULL;
}
int32_t mnodeInitTables() {
......@@ -683,11 +690,16 @@ static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg) {
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(pCreate->db);
if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) {
if (pMsg->pDb == NULL) {
mError("app:%p:%p, table:%s, failed to create, db not selected", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId);
return TSDB_CODE_MND_DB_NOT_SELECTED;
}
if (pMsg->pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping", pMsg->pDb->name, pMsg->pDb->status);
return TSDB_CODE_MND_DB_IN_DROPPING;
}
if (pMsg->pTable == NULL) pMsg->pTable = mnodeGetTable(pCreate->tableId);
if (pMsg->pTable != NULL && pMsg->retry == 0) {
if (pCreate->getMeta) {
......@@ -717,11 +729,16 @@ static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg) {
static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) {
SCMDropTableMsg *pDrop = pMsg->rpcMsg.pCont;
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDbByTableId(pDrop->tableId);
if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) {
mError("app:%p:%p, table:%s, failed to drop table, db not selected", pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId);
if (pMsg->pDb == NULL) {
mError("app:%p:%p, table:%s, failed to drop table, db not selected or db in dropping", pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId);
return TSDB_CODE_MND_DB_NOT_SELECTED;
}
if (pMsg->pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping", pMsg->pDb->name, pMsg->pDb->status);
return TSDB_CODE_MND_DB_IN_DROPPING;
}
if (mnodeCheckIsMonitorDB(pMsg->pDb->name, tsMonitorDbName)) {
mError("app:%p:%p, table:%s, failed to drop table, in monitor database", pMsg->rpcMsg.ahandle, pMsg,
pDrop->tableId);
......@@ -755,12 +772,17 @@ static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *pMsg) {
pInfo->tableId, pMsg->rpcMsg.handle, pInfo->createFlag);
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDbByTableId(pInfo->tableId);
if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) {
if (pMsg->pDb == NULL) {
mError("app:%p:%p, table:%s, failed to get table meta, db not selected", pMsg->rpcMsg.ahandle, pMsg,
pInfo->tableId);
return TSDB_CODE_MND_DB_NOT_SELECTED;
}
if (pMsg->pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping", pMsg->pDb->name, pMsg->pDb->status);
return TSDB_CODE_MND_DB_IN_DROPPING;
}
if (pMsg->pTable == NULL) pMsg->pTable = mnodeGetTable(pInfo->tableId);
if (pMsg->pTable == NULL) {
if (!pInfo->createFlag) {
......@@ -783,9 +805,15 @@ static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *pMsg) {
static int32_t mnodeCreateSuperTableCb(SMnodeMsg *pMsg, int32_t code) {
SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable;
if (pTable != NULL) {
mLInfo("app:%p:%p, stable:%s, is created in sdb, result:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
assert(pTable);
if (code == TSDB_CODE_SUCCESS) {
mLInfo("stable:%s, is created in sdb", pTable->info.tableId);
} else {
mError("app:%p:%p, stable:%s, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
tstrerror(code));
SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .table = tsSuperTableSdb};
sdbDeleteRow(&desc);
}
return code;
......@@ -1202,6 +1230,11 @@ static int32_t mnodeGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow,
SDbObj *pDb = mnodeGetDb(pShow->db);
if (pDb == NULL) return TSDB_CODE_MND_DB_NOT_SELECTED;
if (pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping", pDb->name, pDb->status);
return TSDB_CODE_MND_DB_IN_DROPPING;
}
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
......@@ -1261,6 +1294,11 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows,
SDbObj *pDb = mnodeGetDb(pShow->db);
if (pDb == NULL) return 0;
if (pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping", pDb->name, pDb->status);
return 0;
}
tstrncpy(prefix, pDb->name, 64);
strcat(prefix, TS_PATH_DELIMITER);
prefixLen = strlen(prefix);
......@@ -1561,10 +1599,16 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
assert(pTable);
mDebug("app:%p:%p, table:%s, create table in id:%d, uid:%" PRIu64 ", result:%s", pMsg->rpcMsg.ahandle, pMsg,
if (code == TSDB_CODE_SUCCESS) {
mDebug("app:%p:%p, table:%s, create table in sid:%d, uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
pTable->sid, pTable->uid);
} else {
mError("app:%p:%p, table:%s, failed to create table sid:%d, uid:%" PRIu64 ", reason:%s", pMsg->rpcMsg.ahandle, pMsg,
pTable->info.tableId, pTable->sid, pTable->uid, tstrerror(code));
if (code != TSDB_CODE_SUCCESS) return code;
SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .table = tsChildTableSdb};
sdbDeleteRow(&desc);
return code;
}
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(pCreate, pTable);
......@@ -2285,7 +2329,7 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
if (pTable == NULL) continue;
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDbByTableId(tableId);
if (pMsg->pDb == NULL) {
if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) {
mnodeDecTableRef(pTable);
continue;
}
......@@ -2324,6 +2368,11 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void
SDbObj *pDb = mnodeGetDb(pShow->db);
if (pDb == NULL) return TSDB_CODE_MND_DB_NOT_SELECTED;
if (pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping", pDb->name, pDb->status);
return TSDB_CODE_MND_DB_IN_DROPPING;
}
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
......@@ -2372,6 +2421,11 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows
SDbObj *pDb = mnodeGetDb(pShow->db);
if (pDb == NULL) return 0;
if (pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping", pDb->name, pDb->status);
return 0;
}
int32_t numOfRows = 0;
SChildTableObj *pTable = NULL;
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
......@@ -2462,11 +2516,16 @@ static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) {
pAlter->tableId, pMsg->rpcMsg.handle);
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDbByTableId(pAlter->tableId);
if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) {
if (pMsg->pDb == NULL) {
mError("app:%p:%p, table:%s, failed to alter table, db not selected", pMsg->rpcMsg.ahandle, pMsg, pAlter->tableId);
return TSDB_CODE_MND_DB_NOT_SELECTED;
}
if (pMsg->pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping", pMsg->pDb->name, pMsg->pDb->status);
return TSDB_CODE_MND_DB_IN_DROPPING;
}
if (mnodeCheckIsMonitorDB(pMsg->pDb->name, tsMonitorDbName)) {
mError("app:%p:%p, table:%s, failed to alter table, its log db", pMsg->rpcMsg.ahandle, pMsg, pAlter->tableId);
return TSDB_CODE_MND_MONITOR_DB_FORBIDDEN;
......@@ -2526,6 +2585,11 @@ static int32_t mnodeGetStreamTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, vo
SDbObj *pDb = mnodeGetDb(pShow->db);
if (pDb == NULL) return TSDB_CODE_MND_DB_NOT_SELECTED;
if (pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping", pDb->name, pDb->status);
return TSDB_CODE_MND_DB_IN_DROPPING;
}
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
......@@ -2573,6 +2637,10 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro
SDbObj *pDb = mnodeGetDb(pShow->db);
if (pDb == NULL) return 0;
if (pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping", pDb->name, pDb->status);
return 0;
}
int32_t numOfRows = 0;
SChildTableObj *pTable = NULL;
......
......@@ -154,6 +154,7 @@ int32_t mnodeInitUsers() {
void mnodeCleanupUsers() {
sdbCloseTable(tsUserSdb);
tsUserSdb = NULL;
}
SUserObj *mnodeGetUser(char *name) {
......
......@@ -76,6 +76,11 @@ static int32_t mnodeVgroupActionInsert(SSdbOper *pOper) {
return TSDB_CODE_MND_INVALID_DB;
}
if (pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping", pDb->name, pDb->status);
return TSDB_CODE_MND_DB_IN_DROPPING;
}
pVgroup->pDb = pDb;
pVgroup->prev = NULL;
pVgroup->next = NULL;
......@@ -171,6 +176,12 @@ static int32_t mnodeVgroupActionUpdate(SSdbOper *pOper) {
mnodeVgroupUpdateIdPool(pVgroup);
// reset vgid status on vgroup changed
mDebug("vgId:%d, reset sync status to unsynced", pVgroup->vgId);
for (int32_t v = 0; v < pVgroup->numOfVnodes; ++v) {
pVgroup->vnodeGid[v].role = TAOS_SYNC_ROLE_UNSYNCED;
}
mnodeDecVgroupRef(pVgroup);
mDebug("vgId:%d, is updated, numOfVnode:%d", pVgroup->vgId, pVgroup->numOfVnodes);
......@@ -302,6 +313,7 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
if (pVgid->pDnode == pDnode) {
mTrace("dnode:%d, receive status from dnode, vgId:%d status is %d", pVgroup->vgId, pDnode->dnodeId, pVgid->role);
pVgid->role = pVload->role;
if (pVload->role == TAOS_SYNC_ROLE_MASTER) {
pVgroup->inUse = i;
......@@ -341,17 +353,23 @@ void *mnodeGetNextVgroup(void *pIter, SVgObj **pVgroup) {
}
static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) {
SVgObj *pVgroup = pMsg->pVgroup;
SDbObj *pDb = pMsg->pDb;
assert(pVgroup);
if (code != TSDB_CODE_SUCCESS) {
pMsg->pVgroup = NULL;
mError("app:%p:%p, vgId:%d, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId,
tstrerror(code));
SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb};
sdbDeleteRow(&desc);
return code;
}
SVgObj *pVgroup = pMsg->pVgroup;
SDbObj *pDb = pMsg->pDb;
mInfo("vgId:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
mInfo("app:%p:%p, vgId:%d, is created in mnode, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId,
pDb->name, pVgroup->numOfVnodes);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
mInfo("vgId:%d, index:%d, dnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId);
mInfo("app:%p:%p, vgId:%d, index:%d, dnode:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId, i,
pVgroup->vnodeGid[i].dnodeId);
}
mnodeIncVgroupRef(pVgroup);
......@@ -414,6 +432,7 @@ void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle) {
void mnodeCleanupVgroups() {
sdbCloseTable(tsVgroupSdb);
tsVgroupSdb = NULL;
}
int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
......@@ -422,6 +441,11 @@ int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
return TSDB_CODE_MND_DB_NOT_SELECTED;
}
if (pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping", pDb->name, pDb->status);
return TSDB_CODE_MND_DB_IN_DROPPING;
}
int32_t cols = 0;
SSchema *pSchema = pMeta->schema;
......@@ -510,6 +534,11 @@ int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pC
SDbObj *pDb = mnodeGetDb(pShow->db);
if (pDb == NULL) return 0;
if (pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping", pDb->name, pDb->status);
return 0;
}
pVgroup = pDb->pHead;
while (pVgroup != NULL) {
maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
......@@ -683,9 +712,9 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
if (rpcMsg->ahandle == NULL) return;
SMnodeMsg *mnodeMsg = rpcMsg->ahandle;
mnodeMsg->received++;
atomic_add_fetch_8(&mnodeMsg->received, 1);
if (rpcMsg->code == TSDB_CODE_SUCCESS) {
mnodeMsg->successed++;
atomic_add_fetch_8(&mnodeMsg->successed, 1);
} else {
mnodeMsg->code = rpcMsg->code;
}
......
......@@ -16,16 +16,16 @@
#ifndef TDENGINE_TAST_H
#define TDENGINE_TAST_H
#include <tbuffer.h>
#ifdef __cplusplus
extern "C" {
#endif
#include <tskiplist.h>
#include "os.h"
#include "taosmsg.h"
#include "taosdef.h"
#include "tskiplist.h"
#include "tbuffer.h"
#include "tvariant.h"
struct tExprNode;
......@@ -75,10 +75,6 @@ typedef struct tExprNode {
};
} tExprNode;
void tSQLBinaryExprFromString(tExprNode **pExpr, SSchema *pSchema, int32_t numOfCols, char *src, int32_t len);
void tSQLBinaryExprToString(tExprNode *pExpr, char *dst, int32_t *len);
void tExprTreeDestroy(tExprNode **pExprs, void (*fp)(void*));
void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SExprTraverseSupp *param);
......@@ -86,9 +82,6 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S
void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
char *(*cb)(void *, const char*, int32_t));
// todo refactor: remove it
void tSQLBinaryExprTrv(tExprNode *pExprs, SArray* res);
uint8_t getBinaryExprOptr(SSQLToken *pToken);
void tExprNodeDestroy(tExprNode *pNode, void (*fp)(void *));
......
此差异已折叠。
......@@ -190,8 +190,7 @@ void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_
}
// get the result order
int32_t resultOrder = (pWindowResInfo->pResult[0].window.skey < pWindowResInfo->pResult[1].window.skey)?
TSDB_ORDER_ASC:TSDB_ORDER_DESC;
int32_t resultOrder = (pWindowResInfo->pResult[0].window.skey < pWindowResInfo->pResult[1].window.skey)? 1:-1;
if (order != resultOrder) {
return;
......
此差异已折叠。
......@@ -174,7 +174,7 @@ int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
return 0;
}
return FILL_IS_ASC_FILL(pFillInfo) ? (pFillInfo->numOfRows - pFillInfo->rowIdx) : pFillInfo->rowIdx + 1;
return pFillInfo->numOfRows - pFillInfo->rowIdx;
}
// todo: refactor
......
此差异已折叠。
......@@ -156,6 +156,7 @@ int main(int argc, char *argv[]) {
}
tInfo("client is initialized");
tInfo("threads:%d msgSize:%d requests:%d", appThreads, msgSize, numOfReqs);
gettimeofday(&systemTime, NULL);
startTime = systemTime.tv_sec*1000000 + systemTime.tv_usec;
......
此差异已折叠。
......@@ -195,7 +195,6 @@ typedef struct {
typedef struct {
uint32_t len;
uint32_t offset;
// uint32_t padding;
uint32_t hasLast : 2;
uint32_t numOfBlocks : 30;
uint64_t uid;
......@@ -224,7 +223,7 @@ typedef struct {
typedef struct {
int16_t colId;
int16_t len;
int32_t len;
int32_t type : 8;
int32_t offset : 24;
int64_t sum;
......@@ -438,8 +437,9 @@ int tsdbLoadCompIdx(SRWHelper* pHelper, void* target);
int tsdbLoadCompInfo(SRWHelper* pHelper, void* target);
int tsdbLoadCompData(SRWHelper* phelper, SCompBlock* pcompblock, void* target);
void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols);
int tsdbLoadBlockDataCols(SRWHelper* pHelper, SCompBlock* pCompBlock, int16_t* colIds, int numOfColIds);
int tsdbLoadBlockData(SRWHelper* pHelper, SCompBlock* pCompBlock);
int tsdbLoadBlockDataCols(SRWHelper* pHelper, SCompBlock* pCompBlock, SCompInfo* pCompInfo, int16_t* colIds,
int numOfColIds);
int tsdbLoadBlockData(SRWHelper* pHelper, SCompBlock* pCompBlock, SCompInfo* pCompInfo);
// ------------------ tsdbMain.c
#define REPO_ID(r) (r)->config.tsdbId
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册