提交 5b5b5648 编写于 作者: D dapan1121

Merge remote-tracking branch 'origin/3.0' into feat/TD-24700

...@@ -81,10 +81,6 @@ Set<String> subscription() throws SQLException; ...@@ -81,10 +81,6 @@ Set<String> subscription() throws SQLException;
ConsumerRecords<V> poll(Duration timeout) throws SQLException; ConsumerRecords<V> poll(Duration timeout) throws SQLException;
void commitAsync();
void commitAsync(OffsetCommitCallback callback);
void commitSync() throws SQLException; void commitSync() throws SQLException;
void close() throws SQLException; void close() throws SQLException;
......
...@@ -283,6 +283,8 @@ Provides dnode configuration information. ...@@ -283,6 +283,8 @@ Provides dnode configuration information.
| 2 | consumer_group | BINARY(193) | Subscribed consumer group | | 2 | consumer_group | BINARY(193) | Subscribed consumer group |
| 3 | vgroup_id | INT | Vgroup ID for the consumer | | 3 | vgroup_id | INT | Vgroup ID for the consumer |
| 4 | consumer_id | BIGINT | Consumer ID | | 4 | consumer_id | BIGINT | Consumer ID |
| 5 | offset | BINARY(64) | Consumption progress |
| 6 | rows | BIGINT | Number of consumption items |
## INS_STREAMS ## INS_STREAMS
......
...@@ -36,15 +36,16 @@ REST connection supports all platforms that can run Java. ...@@ -36,15 +36,16 @@ REST connection supports all platforms that can run Java.
| taos-jdbcdriver version | major changes | TDengine version | | taos-jdbcdriver version | major changes | TDengine version |
| :---------------------: | :------------------------------------------------------------------------------------------------------------------------------------------------: | :--------------: | | :---------------------: | :------------------------------------------------------------------------------------------------------------------------------------------------: | :--------------: |
| 3.2.4 | Subscription add the enable.auto.commit parameter and the unsubscribe() method in the WebSocket connection | 3.0.5.0 or later |
| 3.2.3 | Fixed resultSet data parsing failure in some cases | 3.0.5.0 or later | | 3.2.3 | Fixed resultSet data parsing failure in some cases | 3.0.5.0 or later |
| 3.2.2 | subscription add seek function | 3.0.5.0 or later | | 3.2.2 | Subscription add seek function | 3.0.5.0 or later |
| 3.2.1 | JDBC REST connection supports schemaless/prepareStatement over WebSocket | 3.0.3.0 or later | | 3.2.1 | JDBC REST connection supports schemaless/prepareStatement over WebSocket | 3.0.3.0 or later |
| 3.2.0 | This version has been deprecated | - | | 3.2.0 | This version has been deprecated | - |
| 3.1.0 | JDBC REST connection supports subscription over WebSocket | - | | 3.1.0 | JDBC REST connection supports subscription over WebSocket | - |
| 3.0.1 - 3.0.4 | fix the resultSet data is parsed incorrectly sometimes. 3.0.1 is compiled on JDK 11, you are advised to use other version in the JDK 8 environment | - | | 3.0.1 - 3.0.4 | fix the resultSet data is parsed incorrectly sometimes. 3.0.1 is compiled on JDK 11, you are advised to use other version in the JDK 8 environment | - |
| 3.0.0 | Support for TDengine 3.0 | 3.0.0.0 or later | | 3.0.0 | Support for TDengine 3.0 | 3.0.0.0 or later |
| 2.0.42 | fix wasNull interface return value in WebSocket connection | - | | 2.0.42 | Fix wasNull interface return value in WebSocket connection | - |
| 2.0.41 | fix decode method of username and password in REST connection | - | | 2.0.41 | Fix decode method of username and password in REST connection | - |
| 2.0.39 - 2.0.40 | Add REST connection/request timeout parameters | - | | 2.0.39 - 2.0.40 | Add REST connection/request timeout parameters | - |
| 2.0.38 | JDBC REST connections add bulk pull function | - | | 2.0.38 | JDBC REST connections add bulk pull function | - |
| 2.0.37 | Support json tags | - | | 2.0.37 | Support json tags | - |
......
...@@ -81,10 +81,6 @@ Set<String> subscription() throws SQLException; ...@@ -81,10 +81,6 @@ Set<String> subscription() throws SQLException;
ConsumerRecords<V> poll(Duration timeout) throws SQLException; ConsumerRecords<V> poll(Duration timeout) throws SQLException;
void commitAsync();
void commitAsync(OffsetCommitCallback callback);
void commitSync() throws SQLException; void commitSync() throws SQLException;
void close() throws SQLException; void close() throws SQLException;
......
...@@ -36,6 +36,7 @@ REST 连接支持所有能运行 Java 的平台。 ...@@ -36,6 +36,7 @@ REST 连接支持所有能运行 Java 的平台。
| taos-jdbcdriver 版本 | 主要变化 | TDengine 版本 | | taos-jdbcdriver 版本 | 主要变化 | TDengine 版本 |
| :------------------: | :----------------------------------------------------------------------------------------------------------------------------------------------------: | :----------------: | | :------------------: | :----------------------------------------------------------------------------------------------------------------------------------------------------: | :----------------: |
| 3.2.4 | 数据订阅在 WebSocket 连接下增加 enable.auto.commit 参数,以及 unsubscribe() 方法。 | - |
| 3.2.3 | 修复 ResultSet 在一些情况数据解析失败 | - | | 3.2.3 | 修复 ResultSet 在一些情况数据解析失败 | - |
| 3.2.2 | 新增功能:数据订阅支持 seek 功能。 | 3.0.5.0 及更高版本 | | 3.2.2 | 新增功能:数据订阅支持 seek 功能。 | 3.0.5.0 及更高版本 |
| 3.2.1 | 新增功能:WebSocket 连接支持 schemaless 与 prepareStatement 写入。变更:consumer poll 返回结果集为 ConsumerRecord,可通过 value() 获取指定结果集数据。 | 3.0.3.0 及更高版本 | | 3.2.1 | 新增功能:WebSocket 连接支持 schemaless 与 prepareStatement 写入。变更:consumer poll 返回结果集为 ConsumerRecord,可通过 value() 获取指定结果集数据。 | 3.0.3.0 及更高版本 |
......
...@@ -284,6 +284,8 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数 ...@@ -284,6 +284,8 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数
| 2 | consumer_group | BINARY(193) | 订阅者的消费者组 | | 2 | consumer_group | BINARY(193) | 订阅者的消费者组 |
| 3 | vgroup_id | INT | 消费者被分配的 vgroup id | | 3 | vgroup_id | INT | 消费者被分配的 vgroup id |
| 4 | consumer_id | BIGINT | 消费者的唯一 id | | 4 | consumer_id | BIGINT | 消费者的唯一 id |
| 5 | offset | BINARY(64) | 消费者的消费进度 |
| 6 | rows | BIGINT | 消费者的消费的数据条数 |
## INS_STREAMS ## INS_STREAMS
......
...@@ -3246,6 +3246,7 @@ typedef struct { ...@@ -3246,6 +3246,7 @@ typedef struct {
char* sql; char* sql;
char* ast; char* ast;
int64_t deleteMark; int64_t deleteMark;
int64_t lastTs;
} SMCreateSmaReq; } SMCreateSmaReq;
int32_t tSerializeSMCreateSmaReq(void* buf, int32_t bufLen, SMCreateSmaReq* pReq); int32_t tSerializeSMCreateSmaReq(void* buf, int32_t bufLen, SMCreateSmaReq* pReq);
......
...@@ -319,19 +319,22 @@ typedef struct SIndexOptions { ...@@ -319,19 +319,22 @@ typedef struct SIndexOptions {
SNode* pInterval; SNode* pInterval;
SNode* pOffset; SNode* pOffset;
SNode* pSliding; SNode* pSliding;
int8_t tsPrecision;
SNode* pStreamOptions; SNode* pStreamOptions;
} SIndexOptions; } SIndexOptions;
typedef struct SCreateIndexStmt { typedef struct SCreateIndexStmt {
ENodeType type; ENodeType type;
EIndexType indexType; EIndexType indexType;
bool ignoreExists; bool ignoreExists;
char indexDbName[TSDB_DB_NAME_LEN]; char indexDbName[TSDB_DB_NAME_LEN];
char indexName[TSDB_INDEX_NAME_LEN]; char indexName[TSDB_INDEX_NAME_LEN];
char dbName[TSDB_DB_NAME_LEN]; char dbName[TSDB_DB_NAME_LEN];
char tableName[TSDB_TABLE_NAME_LEN]; char tableName[TSDB_TABLE_NAME_LEN];
SNodeList* pCols; SNodeList* pCols;
SIndexOptions* pOptions; SIndexOptions* pOptions;
SNode* pPrevQuery;
SMCreateSmaReq* pReq;
} SCreateIndexStmt; } SCreateIndexStmt;
typedef struct SDropIndexStmt { typedef struct SDropIndexStmt {
......
...@@ -706,6 +706,7 @@ int32_t* taosGetErrno(); ...@@ -706,6 +706,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_INVALID_TIMELINE_QUERY TAOS_DEF_ERROR_CODE(0, 0x2666) #define TSDB_CODE_PAR_INVALID_TIMELINE_QUERY TAOS_DEF_ERROR_CODE(0, 0x2666)
#define TSDB_CODE_PAR_INVALID_OPTR_USAGE TAOS_DEF_ERROR_CODE(0, 0x2667) #define TSDB_CODE_PAR_INVALID_OPTR_USAGE TAOS_DEF_ERROR_CODE(0, 0x2667)
#define TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x2668) #define TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x2668)
#define TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED TAOS_DEF_ERROR_CODE(0, 0x2669)
#define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF) #define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF)
//planner //planner
......
...@@ -835,6 +835,7 @@ int32_t tSerializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pReq ...@@ -835,6 +835,7 @@ int32_t tSerializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pReq
if (tEncodeBinary(&encoder, pReq->ast, pReq->astLen) < 0) return -1; if (tEncodeBinary(&encoder, pReq->ast, pReq->astLen) < 0) return -1;
} }
if (tEncodeI64(&encoder, pReq->deleteMark) < 0) return -1; if (tEncodeI64(&encoder, pReq->deleteMark) < 0) return -1;
if (tEncodeI64(&encoder, pReq->lastTs) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
...@@ -884,6 +885,7 @@ int32_t tDeserializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pR ...@@ -884,6 +885,7 @@ int32_t tDeserializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pR
if (tDecodeCStrTo(&decoder, pReq->ast) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->ast) < 0) return -1;
} }
if (tDecodeI64(&decoder, &pReq->deleteMark) < 0) return -1; if (tDecodeI64(&decoder, &pReq->deleteMark) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->lastTs) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;
......
...@@ -917,6 +917,10 @@ void nodesDestroyNode(SNode* pNode) { ...@@ -917,6 +917,10 @@ void nodesDestroyNode(SNode* pNode) {
SCreateIndexStmt* pStmt = (SCreateIndexStmt*)pNode; SCreateIndexStmt* pStmt = (SCreateIndexStmt*)pNode;
nodesDestroyNode((SNode*)pStmt->pOptions); nodesDestroyNode((SNode*)pStmt->pOptions);
nodesDestroyList(pStmt->pCols); nodesDestroyList(pStmt->pCols);
if (pStmt->pReq) {
tFreeSMCreateSmaReq(pStmt->pReq);
taosMemoryFreeClear(pStmt->pReq);
}
break; break;
} }
case QUERY_NODE_DROP_INDEX_STMT: // no pointer field case QUERY_NODE_DROP_INDEX_STMT: // no pointer field
...@@ -1063,6 +1067,7 @@ void nodesDestroyNode(SNode* pNode) { ...@@ -1063,6 +1067,7 @@ void nodesDestroyNode(SNode* pNode) {
} }
case QUERY_NODE_QUERY: { case QUERY_NODE_QUERY: {
SQuery* pQuery = (SQuery*)pNode; SQuery* pQuery = (SQuery*)pNode;
nodesDestroyNode(pQuery->pPrevRoot);
nodesDestroyNode(pQuery->pRoot); nodesDestroyNode(pQuery->pRoot);
nodesDestroyNode(pQuery->pPostRoot); nodesDestroyNode(pQuery->pPostRoot);
taosMemoryFreeClear(pQuery->pResSchema); taosMemoryFreeClear(pQuery->pResSchema);
......
...@@ -35,6 +35,7 @@ int32_t translate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMe ...@@ -35,6 +35,7 @@ int32_t translate(SParseContext* pParseCxt, SQuery* pQuery, SParseMetaCache* pMe
int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema); int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema);
int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery); int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery);
int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow); int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow);
int32_t translatePostCreateSmaIndex(SParseContext* pParseCxt, SQuery* pQuery, void** pResRow);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -3521,6 +3521,10 @@ static int32_t translateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) { ...@@ -3521,6 +3521,10 @@ static int32_t translateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (NULL == pSelect->pWindow) { if (NULL == pSelect->pWindow) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (pSelect->pFromTable->type == QUERY_NODE_REAL_TABLE &&
((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType == TSDB_SYSTEM_TABLE) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED, "WINDOW");
}
pCxt->currClause = SQL_CLAUSE_WINDOW; pCxt->currClause = SQL_CLAUSE_WINDOW;
int32_t code = translateExpr(pCxt, &pSelect->pWindow); int32_t code = translateExpr(pCxt, &pSelect->pWindow);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -5804,6 +5808,15 @@ static int32_t buildCreateSmaReq(STranslateContext* pCxt, SCreateIndexStmt* pStm ...@@ -5804,6 +5808,15 @@ static int32_t buildCreateSmaReq(STranslateContext* pCxt, SCreateIndexStmt* pStm
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = getSmaIndexAst(pCxt, pStmt, &pReq->ast, &pReq->astLen, &pReq->expr, &pReq->exprLen); code = getSmaIndexAst(pCxt, pStmt, &pReq->ast, &pReq->astLen, &pReq->expr, &pReq->exprLen);
} }
if (TSDB_CODE_SUCCESS == code) {
STableMeta* pMetaCache = NULL;
code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
pStmt->pOptions->tsPrecision = pMetaCache->tableInfo.precision;
code = createLastTsSelectStmt(pStmt->dbName, pStmt->tableName, pMetaCache, &pStmt->pPrevQuery);
}
taosMemoryFreeClear(pMetaCache);
}
return code; return code;
} }
...@@ -5829,15 +5842,60 @@ static int32_t checkCreateSmaIndex(STranslateContext* pCxt, SCreateIndexStmt* pS ...@@ -5829,15 +5842,60 @@ static int32_t checkCreateSmaIndex(STranslateContext* pCxt, SCreateIndexStmt* pS
} }
static int32_t translateCreateSmaIndex(STranslateContext* pCxt, SCreateIndexStmt* pStmt) { static int32_t translateCreateSmaIndex(STranslateContext* pCxt, SCreateIndexStmt* pStmt) {
SMCreateSmaReq createSmaReq = {0};
int32_t code = checkCreateSmaIndex(pCxt, pStmt); int32_t code = checkCreateSmaIndex(pCxt, pStmt);
pStmt->pReq = taosMemoryCalloc(1, sizeof(SMCreateSmaReq));
if (pStmt->pReq == NULL) code = TSDB_CODE_OUT_OF_MEMORY;
if (TSDB_CODE_SUCCESS == code) {
code = buildCreateSmaReq(pCxt, pStmt, pStmt->pReq);
}
TSWAP(pCxt->pPrevRoot, pStmt->pPrevQuery);
return code;
}
int32_t createIntervalFromCreateSmaIndexStmt(SCreateIndexStmt* pStmt, SInterval* pInterval) {
pInterval->interval = ((SValueNode*)pStmt->pOptions->pInterval)->datum.i;
pInterval->intervalUnit = ((SValueNode*)pStmt->pOptions->pInterval)->unit;
pInterval->offset = NULL != pStmt->pOptions->pOffset ? ((SValueNode*)pStmt->pOptions->pOffset)->datum.i : 0;
pInterval->sliding = NULL != pStmt->pOptions->pSliding ? ((SValueNode*)pStmt->pOptions->pSliding)->datum.i : pInterval->interval;
pInterval->slidingUnit = NULL != pStmt->pOptions->pSliding ? ((SValueNode*)pStmt->pOptions->pSliding)->unit : pInterval->intervalUnit;
pInterval->precision = pStmt->pOptions->tsPrecision;
return TSDB_CODE_SUCCESS;
}
int32_t translatePostCreateSmaIndex(SParseContext* pParseCxt, SQuery* pQuery, void ** pResRow) {
int32_t code = TSDB_CODE_SUCCESS;
SCreateIndexStmt* pStmt = (SCreateIndexStmt*)pQuery->pRoot;
int64_t lastTs = 0;
SInterval interval = {0};
STranslateContext pCxt = {0};
code = initTranslateContext(pParseCxt, NULL, &pCxt);
if (TSDB_CODE_SUCCESS == code) {
code = createIntervalFromCreateSmaIndexStmt(pStmt, &interval);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = buildCreateSmaReq(pCxt, pStmt, &createSmaReq); if (pResRow && pResRow[0]) {
lastTs = *(int64_t*)pResRow[0];
} else if (interval.interval > 0) {
lastTs = convertTimePrecision(taosGetTimestampMs(), TSDB_TIME_PRECISION_MILLI, interval.precision);
} else {
lastTs = taosGetTimestampMs();
}
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_SMA, (FSerializeFunc)tSerializeSMCreateSmaReq, &createSmaReq); if (interval.interval > 0) {
pStmt->pReq->lastTs = taosTimeTruncate(lastTs, &interval);
} else {
pStmt->pReq->lastTs = lastTs;
}
code = buildCmdMsg(&pCxt, TDMT_MND_CREATE_SMA, (FSerializeFunc)tSerializeSMCreateSmaReq, pStmt->pReq);
} }
tFreeSMCreateSmaReq(&createSmaReq); if (TSDB_CODE_SUCCESS == code) {
code = setQuery(&pCxt, pQuery);
}
setRefreshMate(&pCxt, pQuery);
destroyTranslateContext(&pCxt);
tFreeSMCreateSmaReq(pStmt->pReq);
taosMemoryFreeClear(pStmt->pReq);
return code; return code;
} }
...@@ -6990,7 +7048,7 @@ static int32_t translateCreateStream(STranslateContext* pCxt, SCreateStreamStmt* ...@@ -6990,7 +7048,7 @@ static int32_t translateCreateStream(STranslateContext* pCxt, SCreateStreamStmt*
return code; return code;
} }
int32_t buildIntervalForCreateStream(SCreateStreamStmt* pStmt, SInterval* pInterval) { static int32_t buildIntervalForCreateStream(SCreateStreamStmt* pStmt, SInterval* pInterval) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (QUERY_NODE_SELECT_STMT != nodeType(pStmt->pQuery)) { if (QUERY_NODE_SELECT_STMT != nodeType(pStmt->pQuery)) {
return code; return code;
......
...@@ -172,6 +172,8 @@ static char* getSyntaxErrFormat(int32_t errCode) { ...@@ -172,6 +172,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return "%s function is not supported in group query"; return "%s function is not supported in group query";
case TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED_FUNC: case TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED_FUNC:
return "%s function is not supported in system table query"; return "%s function is not supported in system table query";
case TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED:
return "%s is not supported in system table query";
case TSDB_CODE_PAR_INVALID_INTERP_CLAUSE: case TSDB_CODE_PAR_INVALID_INTERP_CLAUSE:
return "Invalid usage of RANGE clause, EVERY clause or FILL clause"; return "Invalid usage of RANGE clause, EVERY clause or FILL clause";
case TSDB_CODE_PAR_NO_VALID_FUNC_IN_WIN: case TSDB_CODE_PAR_NO_VALID_FUNC_IN_WIN:
......
...@@ -227,6 +227,8 @@ int32_t qContinueParsePostQuery(SParseContext* pCxt, SQuery* pQuery, void** pRes ...@@ -227,6 +227,8 @@ int32_t qContinueParsePostQuery(SParseContext* pCxt, SQuery* pQuery, void** pRes
case QUERY_NODE_CREATE_STREAM_STMT: case QUERY_NODE_CREATE_STREAM_STMT:
code = translatePostCreateStream(pCxt, pQuery, pResRow); code = translatePostCreateStream(pCxt, pQuery, pResRow);
break; break;
case QUERY_NODE_CREATE_INDEX_STMT:
code = translatePostCreateSmaIndex(pCxt, pQuery, pResRow);
default: default:
break; break;
} }
......
...@@ -542,6 +542,18 @@ TEST_F(ParserInitialCTest, createSmaIndex) { ...@@ -542,6 +542,18 @@ TEST_F(ParserInitialCTest, createSmaIndex) {
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) { setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_CREATE_INDEX_STMT); ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_CREATE_INDEX_STMT);
SMCreateSmaReq req = {0}; SMCreateSmaReq req = {0};
ASSERT_TRUE(pQuery->pPrevRoot);
ASSERT_EQ(QUERY_NODE_SELECT_STMT, nodeType(pQuery->pPrevRoot));
SCreateIndexStmt* pStmt = (SCreateIndexStmt*)pQuery->pRoot;
SCmdMsgInfo* pCmdMsg = (SCmdMsgInfo*)taosMemoryMalloc(sizeof(SCmdMsgInfo));
if (NULL == pCmdMsg) FAIL();
pCmdMsg->msgType = TDMT_MND_CREATE_SMA;
pCmdMsg->msgLen = tSerializeSMCreateSmaReq(NULL, 0, pStmt->pReq);
pCmdMsg->pMsg = taosMemoryMalloc(pCmdMsg->msgLen);
if (!pCmdMsg->pMsg) FAIL();
tSerializeSMCreateSmaReq(pCmdMsg->pMsg, pCmdMsg->msgLen, pStmt->pReq);
((SQuery*)pQuery)->pCmdMsg = pCmdMsg;
ASSERT_TRUE(TSDB_CODE_SUCCESS == tDeserializeSMCreateSmaReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req)); ASSERT_TRUE(TSDB_CODE_SUCCESS == tDeserializeSMCreateSmaReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req));
ASSERT_EQ(std::string(req.name), std::string(expect.name)); ASSERT_EQ(std::string(req.name), std::string(expect.name));
......
...@@ -291,4 +291,13 @@ TEST_F(ParserInitialDTest, dropUser) { ...@@ -291,4 +291,13 @@ TEST_F(ParserInitialDTest, dropUser) {
run("DROP USER wxy"); run("DROP USER wxy");
} }
TEST_F(ParserInitialDTest, IntervalOnSysTable) {
login("root");
run("SELECT count('reboot_time') FROM information_schema.ins_dnodes interval(14m) sliding(9m)",
TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED, PARSER_STAGE_TRANSLATE);
run("SELECT count('create_time') FROM information_schema.ins_qnodes interval(14m) sliding(9m)",
TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED, PARSER_STAGE_TRANSLATE);
}
} // namespace ParserTest } // namespace ParserTest
...@@ -441,6 +441,16 @@ class PlannerTestBaseImpl { ...@@ -441,6 +441,16 @@ class PlannerTestBaseImpl {
pCxt->topicQuery = true; pCxt->topicQuery = true;
} else if (QUERY_NODE_CREATE_INDEX_STMT == nodeType(pQuery->pRoot)) { } else if (QUERY_NODE_CREATE_INDEX_STMT == nodeType(pQuery->pRoot)) {
SMCreateSmaReq req = {0}; SMCreateSmaReq req = {0};
SCreateIndexStmt* pStmt = (SCreateIndexStmt*)pQuery->pRoot;
SCmdMsgInfo* pCmdMsg = (SCmdMsgInfo*)taosMemoryMalloc(sizeof(SCmdMsgInfo));
if (NULL == pCmdMsg) FAIL();
pCmdMsg->msgType = TDMT_MND_CREATE_SMA;
pCmdMsg->msgLen = tSerializeSMCreateSmaReq(NULL, 0, pStmt->pReq);
pCmdMsg->pMsg = taosMemoryMalloc(pCmdMsg->msgLen);
if (!pCmdMsg->pMsg) FAIL();
tSerializeSMCreateSmaReq(pCmdMsg->pMsg, pCmdMsg->msgLen, pStmt->pReq);
((SQuery*)pQuery)->pCmdMsg = pCmdMsg;
tDeserializeSMCreateSmaReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req); tDeserializeSMCreateSmaReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req);
g_mockCatalogService->createSmaIndex(&req); g_mockCatalogService->createSmaIndex(&req);
nodesStringToNode(req.ast, &pCxt->pAstRoot); nodesStringToNode(req.ast, &pCxt->pAstRoot);
......
...@@ -351,9 +351,13 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { ...@@ -351,9 +351,13 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId); SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId);
qDebug("s-task:%s scan history task end, update stream task:%s info, transfer exec state", pTask->id.idStr, pStreamTask->id.idStr); if (pStreamTask == NULL) {
qError("s-task:%s failed to find related stream task:0x%x, it may have been destoryed or closed",
// todo handle stream task is dropped here pTask->id.idStr, pTask->streamTaskId.taskId);
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
} else {
qDebug("s-task:%s scan history task end, update stream task:%s info, transfer exec state", pTask->id.idStr, pStreamTask->id.idStr);
}
ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId); ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId);
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window; STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
...@@ -377,7 +381,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { ...@@ -377,7 +381,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
// update the scan data range for source task. // update the scan data range for source task.
qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64 qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64
", status:%s, sched-status:%d", ", status:%s, sched-status:%d",
pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN, pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus); pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus);
} else { } else {
...@@ -473,6 +477,9 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -473,6 +477,9 @@ int32_t streamExecForAll(SStreamTask* pTask) {
ASSERT(batchSize == 0); ASSERT(batchSize == 0);
if (pTask->info.fillHistory && pTask->status.transferState) { if (pTask->info.fillHistory && pTask->status.transferState) {
int32_t code = streamTransferStateToStreamTask(pTask); int32_t code = streamTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo handle this
return 0;
}
} }
break; break;
...@@ -564,7 +571,7 @@ int32_t streamTryExec(SStreamTask* pTask) { ...@@ -564,7 +571,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
if (schedStatus == TASK_SCHED_STATUS__WAITING) { if (schedStatus == TASK_SCHED_STATUS__WAITING) {
int32_t code = streamExecForAll(pTask); int32_t code = streamExecForAll(pTask);
if (code < 0) { if (code < 0) { // todo this status shoudl be removed
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
return -1; return -1;
} }
......
...@@ -720,15 +720,39 @@ int tdbPagerInsertFreePage(SPager *pPager, SPage *pPage, TXN *pTxn) { ...@@ -720,15 +720,39 @@ int tdbPagerInsertFreePage(SPager *pPager, SPage *pPage, TXN *pTxn) {
int code = 0; int code = 0;
SPgno pgno = TDB_PAGE_PGNO(pPage); SPgno pgno = TDB_PAGE_PGNO(pPage);
if (pPager->frps) {
taosArrayPush(pPager->frps, &pgno);
pPage->pPager = NULL;
return code;
}
pPager->frps = taosArrayInit(8, sizeof(SPgno));
// memset(pPage->pData, 0, pPage->pageSize); // memset(pPage->pData, 0, pPage->pageSize);
tdbTrace("tdb/insert-free-page: tbc recycle page: %d.", pgno); tdbTrace("tdb/insert-free-page: tbc recycle page: %d.", pgno);
// printf("tdb/insert-free-page: tbc recycle page: %d.\n", pgno); // printf("tdb/insert-free-page: tbc recycle page: %d.\n", pgno);
code = tdbTbInsert(pPager->pEnv->pFreeDb, &pgno, sizeof(pgno), NULL, 0, pTxn); code = tdbTbInsert(pPager->pEnv->pFreeDb, &pgno, sizeof(pgno), NULL, 0, pTxn);
if (code < 0) { if (code < 0) {
tdbError("tdb/insert-free-page: tb insert failed with ret: %d.", code); tdbError("tdb/insert-free-page: tb insert failed with ret: %d.", code);
taosArrayDestroy(pPager->frps);
pPager->frps = NULL;
return -1; return -1;
} }
while (TARRAY_SIZE(pPager->frps) > 0) {
pgno = *(SPgno *)taosArrayPop(pPager->frps);
code = tdbTbInsert(pPager->pEnv->pFreeDb, &pgno, sizeof(pgno), NULL, 0, pTxn);
if (code < 0) {
tdbError("tdb/insert-free-page: tb insert failed with ret: %d.", code);
taosArrayDestroy(pPager->frps);
pPager->frps = NULL;
return -1;
}
}
taosArrayDestroy(pPager->frps);
pPager->frps = NULL;
pPage->pPager = NULL; pPage->pPager = NULL;
return code; return code;
...@@ -739,7 +763,11 @@ static int tdbPagerRemoveFreePage(SPager *pPager, SPgno *pPgno, TXN *pTxn) { ...@@ -739,7 +763,11 @@ static int tdbPagerRemoveFreePage(SPager *pPager, SPgno *pPgno, TXN *pTxn) {
TBC *pCur; TBC *pCur;
if (!pPager->pEnv->pFreeDb) { if (!pPager->pEnv->pFreeDb) {
return 0; return code;
}
if (pPager->frps) {
return code;
} }
code = tdbTbcOpen(pPager->pEnv->pFreeDb, &pCur, pTxn); code = tdbTbcOpen(pPager->pEnv->pFreeDb, &pCur, pTxn);
......
...@@ -408,6 +408,7 @@ struct SPager { ...@@ -408,6 +408,7 @@ struct SPager {
// u8 inTran; // u8 inTran;
TXN *pActiveTxn; TXN *pActiveTxn;
SArray *ofps; SArray *ofps;
SArray *frps;
SPager *pNext; // used by TDB SPager *pNext; // used by TDB
SPager *pHashNext; // used by TDB SPager *pHashNext; // used by TDB
#ifdef USE_MAINDB #ifdef USE_MAINDB
......
...@@ -568,6 +568,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_SELECTED_EXPR, "Invalid SELECTed ex ...@@ -568,6 +568,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_SELECTED_EXPR, "Invalid SELECTed ex
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_GET_META_ERROR, "Fail to get table info") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_GET_META_ERROR, "Fail to get table info")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_NOT_UNIQUE_TABLE_ALIAS, "Not unique table/alias") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_NOT_UNIQUE_TABLE_ALIAS, "Not unique table/alias")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED_FUNC, "System table not allowed") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED_FUNC, "System table not allowed")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED, "System table not allowed")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error")
//planner //planner
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册