提交 5c61e5da 编写于 作者: C Cary Xu

Merge branch '3.0' into feature/TD-18581-D

此差异已折叠。
此差异已折叠。
......@@ -98,6 +98,7 @@ extern int32_t tsQueryRsmaTolerance;
extern bool tsQueryPlannerTrace;
extern int32_t tsQueryNodeChunkSize;
extern bool tsQueryUseNodeAllocator;
extern bool tsKeepColumnName;
// client
extern int32_t tsMinSlidingTime;
......
......@@ -226,110 +226,111 @@
#define TK_WSTART 208
#define TK_WEND 209
#define TK_WDURATION 210
#define TK_CAST 211
#define TK_NOW 212
#define TK_TODAY 213
#define TK_TIMEZONE 214
#define TK_CLIENT_VERSION 215
#define TK_SERVER_VERSION 216
#define TK_SERVER_STATUS 217
#define TK_CURRENT_USER 218
#define TK_COUNT 219
#define TK_LAST_ROW 220
#define TK_CASE 221
#define TK_END 222
#define TK_WHEN 223
#define TK_THEN 224
#define TK_ELSE 225
#define TK_BETWEEN 226
#define TK_IS 227
#define TK_NK_LT 228
#define TK_NK_GT 229
#define TK_NK_LE 230
#define TK_NK_GE 231
#define TK_NK_NE 232
#define TK_MATCH 233
#define TK_NMATCH 234
#define TK_CONTAINS 235
#define TK_IN 236
#define TK_JOIN 237
#define TK_INNER 238
#define TK_SELECT 239
#define TK_DISTINCT 240
#define TK_WHERE 241
#define TK_PARTITION 242
#define TK_BY 243
#define TK_SESSION 244
#define TK_STATE_WINDOW 245
#define TK_SLIDING 246
#define TK_FILL 247
#define TK_VALUE 248
#define TK_NONE 249
#define TK_PREV 250
#define TK_LINEAR 251
#define TK_NEXT 252
#define TK_HAVING 253
#define TK_RANGE 254
#define TK_EVERY 255
#define TK_ORDER 256
#define TK_SLIMIT 257
#define TK_SOFFSET 258
#define TK_LIMIT 259
#define TK_OFFSET 260
#define TK_ASC 261
#define TK_NULLS 262
#define TK_ABORT 263
#define TK_AFTER 264
#define TK_ATTACH 265
#define TK_BEFORE 266
#define TK_BEGIN 267
#define TK_BITAND 268
#define TK_BITNOT 269
#define TK_BITOR 270
#define TK_BLOCKS 271
#define TK_CHANGE 272
#define TK_COMMA 273
#define TK_COMPACT 274
#define TK_CONCAT 275
#define TK_CONFLICT 276
#define TK_COPY 277
#define TK_DEFERRED 278
#define TK_DELIMITERS 279
#define TK_DETACH 280
#define TK_DIVIDE 281
#define TK_DOT 282
#define TK_EACH 283
#define TK_FAIL 284
#define TK_FILE 285
#define TK_FOR 286
#define TK_GLOB 287
#define TK_ID 288
#define TK_IMMEDIATE 289
#define TK_IMPORT 290
#define TK_INITIALLY 291
#define TK_INSTEAD 292
#define TK_ISNULL 293
#define TK_KEY 294
#define TK_NK_BITNOT 295
#define TK_NK_SEMI 296
#define TK_NOTNULL 297
#define TK_OF 298
#define TK_PLUS 299
#define TK_PRIVILEGE 300
#define TK_RAISE 301
#define TK_REPLACE 302
#define TK_RESTRICT 303
#define TK_ROW 304
#define TK_SEMI 305
#define TK_STAR 306
#define TK_STATEMENT 307
#define TK_STRING 308
#define TK_TIMES 309
#define TK_UPDATE 310
#define TK_VALUES 311
#define TK_VARIABLE 312
#define TK_VIEW 313
#define TK_WAL 314
#define TK_IROWTS 211
#define TK_CAST 212
#define TK_NOW 213
#define TK_TODAY 214
#define TK_TIMEZONE 215
#define TK_CLIENT_VERSION 216
#define TK_SERVER_VERSION 217
#define TK_SERVER_STATUS 218
#define TK_CURRENT_USER 219
#define TK_COUNT 220
#define TK_LAST_ROW 221
#define TK_CASE 222
#define TK_END 223
#define TK_WHEN 224
#define TK_THEN 225
#define TK_ELSE 226
#define TK_BETWEEN 227
#define TK_IS 228
#define TK_NK_LT 229
#define TK_NK_GT 230
#define TK_NK_LE 231
#define TK_NK_GE 232
#define TK_NK_NE 233
#define TK_MATCH 234
#define TK_NMATCH 235
#define TK_CONTAINS 236
#define TK_IN 237
#define TK_JOIN 238
#define TK_INNER 239
#define TK_SELECT 240
#define TK_DISTINCT 241
#define TK_WHERE 242
#define TK_PARTITION 243
#define TK_BY 244
#define TK_SESSION 245
#define TK_STATE_WINDOW 246
#define TK_SLIDING 247
#define TK_FILL 248
#define TK_VALUE 249
#define TK_NONE 250
#define TK_PREV 251
#define TK_LINEAR 252
#define TK_NEXT 253
#define TK_HAVING 254
#define TK_RANGE 255
#define TK_EVERY 256
#define TK_ORDER 257
#define TK_SLIMIT 258
#define TK_SOFFSET 259
#define TK_LIMIT 260
#define TK_OFFSET 261
#define TK_ASC 262
#define TK_NULLS 263
#define TK_ABORT 264
#define TK_AFTER 265
#define TK_ATTACH 266
#define TK_BEFORE 267
#define TK_BEGIN 268
#define TK_BITAND 269
#define TK_BITNOT 270
#define TK_BITOR 271
#define TK_BLOCKS 272
#define TK_CHANGE 273
#define TK_COMMA 274
#define TK_COMPACT 275
#define TK_CONCAT 276
#define TK_CONFLICT 277
#define TK_COPY 278
#define TK_DEFERRED 279
#define TK_DELIMITERS 280
#define TK_DETACH 281
#define TK_DIVIDE 282
#define TK_DOT 283
#define TK_EACH 284
#define TK_FAIL 285
#define TK_FILE 286
#define TK_FOR 287
#define TK_GLOB 288
#define TK_ID 289
#define TK_IMMEDIATE 290
#define TK_IMPORT 291
#define TK_INITIALLY 292
#define TK_INSTEAD 293
#define TK_ISNULL 294
#define TK_KEY 295
#define TK_NK_BITNOT 296
#define TK_NK_SEMI 297
#define TK_NOTNULL 298
#define TK_OF 299
#define TK_PLUS 300
#define TK_PRIVILEGE 301
#define TK_RAISE 302
#define TK_REPLACE 303
#define TK_RESTRICT 304
#define TK_ROW 305
#define TK_SEMI 306
#define TK_STAR 307
#define TK_STATEMENT 308
#define TK_STRING 309
#define TK_TIMES 310
#define TK_UPDATE 311
#define TK_VALUES 312
#define TK_VARIABLE 313
#define TK_VIEW 314
#define TK_WAL 315
#define TK_NK_SPACE 300
#define TK_NK_COMMENT 301
......
......@@ -119,9 +119,10 @@ typedef enum EFunctionType {
FUNCTION_TYPE_WSTART,
FUNCTION_TYPE_WEND,
FUNCTION_TYPE_WDURATION,
FUNCTION_TYPE_IROWTS,
// internal function
FUNCTION_TYPE_SELECT_VALUE,
FUNCTION_TYPE_SELECT_VALUE = 3750,
FUNCTION_TYPE_BLOCK_DIST, // block distribution aggregate function
FUNCTION_TYPE_BLOCK_DIST_INFO, // block distribution pseudo column function
FUNCTION_TYPE_TO_COLUMN,
......@@ -212,6 +213,7 @@ bool fmIsClientPseudoColumnFunc(int32_t funcId);
bool fmIsMultiRowsFunc(int32_t funcId);
bool fmIsKeepOrderFunc(int32_t funcId);
bool fmIsCumulativeFunc(int32_t funcId);
bool fmIsInterpPseudoColumnFunc(int32_t funcId);
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc);
......
......@@ -95,6 +95,7 @@ int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from w
bool tsQueryPlannerTrace = false;
int32_t tsQueryNodeChunkSize = 32 * 1024;
bool tsQueryUseNodeAllocator = true;
bool tsKeepColumnName = false;
/*
* denote if the server needs to compress response message at the application layer to client, including query rsp,
......@@ -290,6 +291,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "queryPlannerTrace", tsQueryPlannerTrace, true) != 0) return -1;
if (cfgAddInt32(pCfg, "queryNodeChunkSize", tsQueryNodeChunkSize, 1024, 128 * 1024, true) != 0) return -1;
if (cfgAddBool(pCfg, "queryUseNodeAllocator", tsQueryUseNodeAllocator, true) != 0) return -1;
if (cfgAddBool(pCfg, "keepColumnName", tsKeepColumnName, true) != 0) return -1;
if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1;
if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, 1) != 0) return -1;
if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1;
......@@ -652,6 +654,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsQueryPlannerTrace = cfgGetItem(pCfg, "queryPlannerTrace")->bval;
tsQueryNodeChunkSize = cfgGetItem(pCfg, "queryNodeChunkSize")->i32;
tsQueryUseNodeAllocator = cfgGetItem(pCfg, "queryUseNodeAllocator")->bval;
tsKeepColumnName = cfgGetItem(pCfg, "keepColumnName")->bval;
return 0;
}
......@@ -845,6 +848,9 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
break;
}
case 'k': {
if (strcasecmp("keepColumnName", name) == 0) {
tsKeepColumnName = cfgGetItem(pCfg, "keepColumnName")->bval;
}
break;
}
case 'l': {
......@@ -921,9 +927,9 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
}
case 'u': {
if (strcasecmp("multiProcess", name) == 0) {
#if !defined(WINDOWS) && !defined(DARWIN)
#if !defined(WINDOWS) && !defined(DARWIN)
tsMultiProcess = cfgGetItem(pCfg, "multiProcess")->bval;
#endif
#endif
} else if (strcasecmp("udfDebugFlag", name) == 0) {
udfDebugFlag = cfgGetItem(pCfg, "udfDebugFlag")->i32;
}
......
......@@ -315,7 +315,7 @@ static int32_t mndProcessUptimeTimer(SRpcMsg *pReq) {
return 0;
}
mTrace("update cluster uptime to %" PRId64, clusterObj.upTime);
mInfo("update cluster uptime to %" PRId64, clusterObj.upTime);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "update-uptime");
if (pTrans == NULL) return -1;
......
......@@ -178,6 +178,8 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
SMqConsumerObj *pConsumer;
void *pIter = NULL;
mTrace("start to process mq timer");
// rebalance cannot be parallel
if (!mndRebTryStart()) {
mInfo("mq rebalance already in progress, do nothing");
......
......@@ -119,28 +119,30 @@ static void *mndThreadFp(void *param) {
lastTime++;
taosMsleep(100);
if (mndGetStop(pMnode)) break;
if (lastTime % 10 != 0) continue;
if (lastTime % (tsTtlPushInterval * 10) == 1) {
int64_t sec = lastTime / 10;
if (sec % tsTtlPushInterval == 0) {
mndPullupTtl(pMnode);
}
if (lastTime % (tsTransPullupInterval * 10) == 0) {
if (sec % tsTransPullupInterval == 0) {
mndPullupTrans(pMnode);
}
if (lastTime % (tsMqRebalanceInterval * 10) == 0) {
if (sec % tsMqRebalanceInterval == 0) {
mndCalMqRebalance(pMnode);
}
if (lastTime % (tsTelemInterval * 10) == ((tsTelemInterval - 1) * 10)) {
if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) {
mndPullupTelem(pMnode);
}
if (lastTime % (tsGrantHBInterval * 10) == 0) {
if (sec % tsGrantHBInterval == 0) {
mndPullupGrant(pMnode);
}
if ((lastTime % (tsUptimeInterval * 10)) == ((tsUptimeInterval - 1) * 10)) {
if (sec % tsUptimeInterval == 0) {
mndIncreaseUpTime(pMnode);
}
}
......@@ -399,15 +401,15 @@ void mndPreClose(SMnode *pMnode) {
atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 0);
syncLeaderTransfer(pMnode->syncMgmt.sync);
/*
mInfo("vgId:1, mnode start leader transfer");
// wait for leader transfer finish
while (!atomic_load_8(&(pMnode->syncMgmt.leaderTransferFinish))) {
taosMsleep(10);
mInfo("vgId:1, mnode waiting for leader transfer");
}
mInfo("vgId:1, mnode finish leader transfer");
*/
#if 0
mInfo("vgId:1, mnode start leader transfer");
// wait for leader transfer finish
while (!atomic_load_8(&(pMnode->syncMgmt.leaderTransferFinish))) {
taosMsleep(10);
mInfo("vgId:1, mnode waiting for leader transfer");
}
mInfo("vgId:1, mnode finish leader transfer");
#endif
}
}
......
......@@ -834,6 +834,8 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
int32_t reqLen = tSerializeSVDropTtlTableReq(NULL, 0, &ttlReq);
int32_t contLen = reqLen + sizeof(SMsgHead);
mInfo("start to process ttl timer");
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
......
......@@ -133,7 +133,7 @@ static int32_t mndProcessTelemTimer(SRpcMsg* pReq) {
if (taosSendHttpReport(tsTelemServer, tsTelemPort, pCont, strlen(pCont), HTTP_FLAT) != 0) {
mError("failed to send telemetry report");
} else {
mTrace("succeed to send telemetry report");
mInfo("succeed to send telemetry report");
}
taosMemoryFree(pCont);
}
......
......@@ -1478,6 +1478,7 @@ void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
}
static int32_t mndProcessTransTimer(SRpcMsg *pReq) {
mTrace("start to process trans timer");
mndTransPullup(pReq->info.node);
return 0;
}
......
......@@ -221,14 +221,15 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
return terrno;
}
SSdbRow *pOldRow = *ppOldRow;
pOldRow->status = pRaw->status;
atomic_add_fetch_32(&pOldRow->refCount, 1);
sdbPrintOper(pSdb, pOldRow, "delete");
taosHashRemove(hash, pOldRow->pObj, keySize);
pSdb->tableVer[pOldRow->type]++;
taosThreadRwlockUnlock(pLock);
pSdb->tableVer[pOldRow->type]++;
sdbFreeRow(pSdb, pRow, false);
sdbCheckRow(pSdb, pOldRow);
......@@ -317,7 +318,7 @@ static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) {
TdThreadRwlock *pLock = &pSdb->locks[pRow->type];
taosThreadRwlockWrlock(pLock);
int32_t ref = atomic_load_32(&pRow->refCount);
int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1);
sdbPrintOper(pSdb, pRow, "check");
if (ref <= 0 && pRow->status == SDB_STATUS_DROPPED) {
sdbFreeRow(pSdb, pRow, true);
......
......@@ -688,6 +688,8 @@ static void vnodeRestoreFinish(struct SSyncFSM *pFsm) {
}
} while (true);
walApplyVer(pVnode->pWal, pVnode->state.applied);
pVnode->restored = true;
vDebug("vgId:%d, sync restore finished", pVnode->config.vgId);
}
......
......@@ -50,6 +50,7 @@ extern "C" {
#define FUNC_MGT_KEEP_ORDER_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(21)
#define FUNC_MGT_CUMULATIVE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(22)
#define FUNC_MGT_FORBID_STABLE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(23)
#define FUNC_MGT_INTERP_PC_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(24)
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)
......
......@@ -3146,6 +3146,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.classification = FUNC_MGT_SYSTEM_INFO_FUNC | FUNC_MGT_SCALAR_FUNC,
.translateFunc = translateUserFunc,
},
{
.name = "_irowts",
.type = FUNCTION_TYPE_IROWTS,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_INTERP_PC_FUNC,
.translateFunc = translateTimePseudoColumn,
.getEnvFunc = getTimePseudoFuncEnv,
.initFunc = NULL,
.sprocessFunc = NULL,
.finalizeFunc = NULL
},
};
// clang-format on
......
......@@ -224,6 +224,8 @@ bool fmIsInterpFunc(int32_t funcId) {
return FUNCTION_TYPE_INTERP == funcMgtBuiltins[funcId].type;
}
bool fmIsInterpPseudoColumnFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_INTERP_PC_FUNC); }
bool fmIsLastRowFunc(int32_t funcId) {
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
return false;
......
......@@ -707,6 +707,7 @@ pseudo_column(A) ::= QDURATION(B).
pseudo_column(A) ::= WSTART(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
pseudo_column(A) ::= WEND(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
pseudo_column(A) ::= WDURATION(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
pseudo_column(A) ::= IROWTS(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
function_expression(A) ::= function_name(B) NK_LP expression_list(C) NK_RP(D). { A = createRawExprNodeExt(pCxt, &B, &D, createFunctionNode(pCxt, &B, C)); }
function_expression(A) ::= star_func(B) NK_LP star_func_para_list(C) NK_RP(D). { A = createRawExprNodeExt(pCxt, &B, &D, createFunctionNode(pCxt, &B, C)); }
......
......@@ -252,6 +252,7 @@ static SKeyword keywordTable[] = {
{"WITH", TK_WITH},
{"WRITE", TK_WRITE},
{"_C0", TK_ROWTS},
{"_IROWTS", TK_IROWTS},
{"_QDURATION", TK_QDURATION},
{"_QEND", TK_QEND},
{"_QSTART", TK_QSTART},
......
......@@ -1556,6 +1556,9 @@ static int32_t translateMultiResFunc(STranslateContext* pCxt, SFunctionNode* pFu
"%s(*) is only supported in SELECTed list", pFunc->functionName);
}
}
if (tsKeepColumnName) {
strcpy(pFunc->node.userAlias, ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->userAlias);
}
return TSDB_CODE_SUCCESS;
}
......@@ -2388,8 +2391,12 @@ static SNode* createMultiResFunc(SFunctionNode* pSrcFunc, SExprNode* pExpr) {
SColumnNode* pCol = (SColumnNode*)pExpr;
len = snprintf(buf, sizeof(buf), "%s(%s.%s)", pSrcFunc->functionName, pCol->tableAlias, pCol->colName);
strncpy(pFunc->node.aliasName, buf, TMIN(len, sizeof(pFunc->node.aliasName) - 1));
len = snprintf(buf, sizeof(buf), "%s(%s)", pSrcFunc->functionName, pCol->colName);
strncpy(pFunc->node.userAlias, buf, TMIN(len, sizeof(pFunc->node.userAlias) - 1));
if (tsKeepColumnName) {
strcpy(pFunc->node.userAlias, pCol->colName);
} else {
len = snprintf(buf, sizeof(buf), "%s(%s)", pSrcFunc->functionName, pCol->colName);
strncpy(pFunc->node.userAlias, buf, TMIN(len, sizeof(pFunc->node.userAlias) - 1));
}
} else {
len = snprintf(buf, sizeof(buf), "%s(%s)", pSrcFunc->functionName, pExpr->aliasName);
strncpy(pFunc->node.aliasName, buf, TMIN(len, sizeof(pFunc->node.aliasName) - 1));
......
此差异已折叠。
......@@ -611,6 +611,8 @@ static int32_t createIndefRowsFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt
return code;
}
static bool isInterpFunc(int32_t funcId) { return fmIsInterpFunc(funcId) || fmIsInterpPseudoColumnFunc(funcId); }
static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
if (!pSelect->hasInterpFunc) {
return TSDB_CODE_SUCCESS;
......@@ -625,7 +627,7 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
pInterpFunc->node.requireDataOrder = getRequireDataOrder(true, pSelect);
pInterpFunc->node.resultDataOrder = pInterpFunc->node.requireDataOrder;
int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_SELECT, fmIsInterpFunc, &pInterpFunc->pFuncs);
int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_SELECT, isInterpFunc, &pInterpFunc->pFuncs);
if (TSDB_CODE_SUCCESS == code) {
code = rewriteExprsForSelect(pInterpFunc->pFuncs, pSelect, SQL_CLAUSE_SELECT);
}
......
......@@ -101,6 +101,8 @@ TEST_F(PlanBasicTest, interpFunc) {
useDb("root", "test");
run("SELECT INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00') EVERY(5s) FILL(LINEAR)");
run("SELECT _IROWTS, INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00') EVERY(5s) FILL(LINEAR)");
}
TEST_F(PlanBasicTest, lastRowFunc) {
......
......@@ -201,7 +201,7 @@ void *threadFunc(void *param) {
int64_t t = pInfo->tableBeginIndex;
for (; t <= pInfo->tableEndIndex;) {
// int64_t batch = (pInfo->tableEndIndex - t);
// batch = MIN(batch, batchNum);
// batch = TMIN(batch, batchNum);
int32_t len = sprintf(qstr, "create table");
for (int32_t i = 0; i < batchNumOfTbl;) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册