提交 d862dcae 编写于 作者: H Haojun Liao

[td-11818] refactor, fix bug in repeat create table, add request id for each query.

上级 961ffa77
......@@ -141,7 +141,7 @@ struct SQueryNode;
/**
* Create the physical plan for the query, according to the AST.
*/
int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag** pDag);
int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag** pDag, uint64_t requestId);
// Set datasource of this subplan, multiple calls may be made to a subplan.
// @subplan subplan to be schedule
......
......@@ -68,8 +68,6 @@ typedef struct {
SysNameInfo taosGetSysNameInfo();
int64_t taosGetPid();
#ifdef __cplusplus
}
#endif
......
......@@ -146,6 +146,8 @@ int taos_init();
void* createTscObj(const char* user, const char* auth, const char *db, SAppInstInfo* pAppInfo);
void destroyTscObj(void*pObj);
uint64_t generateRequestId();
void *createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type);
void destroyRequest(SRequestObj* pRequest);
......
......@@ -87,6 +87,7 @@ static void tscInitLogFile() {
}
}
// todo close the transporter properly
void closeTransporter(STscObj* pTscObj) {
if (pTscObj == NULL || pTscObj->pTransporter == NULL) {
return;
......@@ -166,8 +167,7 @@ void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t ty
return NULL;
}
// TODO generated request uuid
pRequest->requestId = 0;
pRequest->requestId = generateRequestId();
pRequest->metric.start = taosGetTimestampMs();
pRequest->type = type;
......@@ -410,6 +410,36 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
return 0;
}
/**
* The request id is an unsigned integer format of 64bit.
*+------------+-----+-----------+---------------+
*| uid|localIp| PId | timestamp | serial number |
*+------------+-----+-----------+---------------+
*| 16bit |12bit|20bit |16bit |
*+------------+-----+-----------+---------------+
* @return
*/
static int32_t requestSerialId = 0;
uint64_t generateRequestId() {
uint64_t hashId = 0;
char uid[64] = {0};
int32_t code = taosGetSystemUid(uid, tListLen(uid));
if (code != TSDB_CODE_SUCCESS) {
tscError("Failed to get the system uid to generated request id, reason:%s. use ip address instead", tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {
hashId = MurmurHash3_32(uid, strlen(uid));
}
int64_t ts = taosGetTimestampUs();
uint64_t pid = taosGetPId();
int32_t val = atomic_add_fetch_32(&requestSerialId, 1);
uint64_t id = ((hashId & 0xFFFF) << 48) | ((pid & 0x0FFF) << 36) | ((ts & 0xFFFFF) << 16) | (val & 0xFFFF);
return id;
}
#if 0
#include "cJSON.h"
static setConfRet taos_set_config_imp(const char *config){
......
......@@ -198,13 +198,14 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQuery, SQueryDag** pDag) {
pRequest->type = pQuery->type;
return qCreateQueryDag(pQuery, pDag);
return qCreateQueryDag(pQuery, pDag, pRequest->requestId);
}
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) {
if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
return scheduleExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob, &pRequest->affectedRows);
}
return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob);
}
......
......@@ -147,29 +147,29 @@ TEST(testCase, connect_Test) {
// taos_close(pConn);
//}
//TEST(testCase, create_db_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "create database abc1");
// if (taos_errno(pRes) != 0) {
// printf("error in create db, reason:%s\n", taos_errstr(pRes));
// }
//
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// ASSERT_TRUE(pFields == NULL);
//
// int32_t numOfFields = taos_num_fields(pRes);
// ASSERT_EQ(numOfFields, 0);
//
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create database abc1 vgroups 4");
// if (taos_errno(pRes) != 0) {
// printf("error in create db, reason:%s\n", taos_errstr(pRes));
// }
// taos_close(pConn);
//}
TEST(testCase, create_db_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "create database abc1");
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
}
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == NULL);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
taos_free_result(pRes);
pRes = taos_query(pConn, "create database abc1 vgroups 4");
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
}
taos_close(pConn);
}
//
//TEST(testCase, create_dnode_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
......@@ -249,36 +249,36 @@ TEST(testCase, connect_Test) {
//// taos_close(pConn);
//}
// TEST(testCase, create_stable_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "create database abc1");
// if (taos_errno(pRes) != 0) {
// printf("error in create db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "use abc1");
// if (taos_errno(pRes) != 0) {
// printf("error in use db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create stable st1(ts timestamp, k int) tags(a int)");
// if (taos_errno(pRes) != 0) {
// printf("error in create stable, reason:%s\n", taos_errstr(pRes));
// }
//
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// ASSERT_TRUE(pFields == NULL);
//
// int32_t numOfFields = taos_num_fields(pRes);
// ASSERT_EQ(numOfFields, 0);
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
TEST(testCase, create_stable_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "create database abc1");
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create stable st1(ts timestamp, k int) tags(a int)");
if (taos_errno(pRes) != 0) {
printf("error in create stable, reason:%s\n", taos_errstr(pRes));
}
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == NULL);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
taos_free_result(pRes);
taos_close(pConn);
}
//TEST(testCase, create_table_Test) {
// // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
......@@ -470,7 +470,15 @@ TEST(testCase, create_multiple_tables) {
TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes);
pRes = taos_query(pConn, "create table t_2 using st1 tags(1) t_3 using st1 tags(2)");
pRes = taos_query(pConn, "create table t_2 using st1 tags(1)");
if (taos_errno(pRes) != 0) {
printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
ASSERT_TRUE(false);
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table t_3 using st1 tags(2)");
if (taos_errno(pRes) != 0) {
printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
......@@ -491,6 +499,25 @@ TEST(testCase, create_multiple_tables) {
taos_close(pConn);
}
TEST(testCase, generated_request_id_test) {
uint64_t id0 = generateRequestId();
uint64_t id1 = generateRequestId();
uint64_t id2 = generateRequestId();
uint64_t id3 = generateRequestId();
uint64_t id4 = generateRequestId();
ASSERT_NE(id0, id1);
ASSERT_NE(id1, id2);
ASSERT_NE(id2, id3);
ASSERT_NE(id4, id3);
ASSERT_NE(id0, id2);
ASSERT_NE(id0, id4);
ASSERT_NE(id0, id3);
// SHashObj *phash = taosHashInit()
}
//TEST(testCase, projection_query_tables) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_EQ(pConn, nullptr);
......
......@@ -105,7 +105,7 @@ int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str);
*/
int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql);
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag);
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, uint64_t requestId);
int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep);
int32_t subPlanToString(const SSubplan *pPhyNode, char** str, int32_t* len);
int32_t stringToSubplan(const char* str, SSubplan** subplan);
......
......@@ -309,6 +309,7 @@ static void splitModificationOpSubPlan(SPlanContext* pCxt, SQueryPlanNode* pPlan
subplan->pNode = NULL;
subplan->type = QUERY_TYPE_MODIFY;
subplan->msgType = pPayload->msgType;
subplan->id.queryId = pCxt->pDag->queryId;
RECOVERY_CURRENT_SUBPLAN(pCxt);
}
......@@ -328,7 +329,7 @@ static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
// todo deal subquery
}
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag) {
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, uint64_t requestId) {
TRY(TSDB_MAX_TAG_CONDITIONS) {
SPlanContext context = {
.pCatalog = pCatalog,
......@@ -338,6 +339,8 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
};
*pDag = context.pDag;
context.pDag->queryId = requestId;
context.pDag->pSubplans = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
createSubplanByLevel(&context, pQueryNode);
} CATCH(code) {
......
......@@ -24,7 +24,7 @@ void qDestroyQueryDag(struct SQueryDag* pDag) {
// todo
}
int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag) {
int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, uint64_t requestId) {
SQueryPlanNode* logicPlan;
int32_t code = createQueryPlan(pNode, &logicPlan);
if (TSDB_CODE_SUCCESS != code) {
......@@ -38,7 +38,7 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag)
return code;
}
code = createDag(logicPlan, NULL, pDag);
code = createDag(logicPlan, NULL, pDag, requestId);
if (TSDB_CODE_SUCCESS != code) {
destroyQueryPlan(logicPlan);
qDestroyQueryDag(*pDag);
......
......@@ -700,7 +700,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) {
pConn->sid = sid;
pConn->tranId = (uint16_t)(taosRand() & 0xFFFF);
pConn->ownId = htonl(pConn->sid);
pConn->linkUid = (uint32_t)((int64_t)pConn + taosGetPid() + (int64_t)pConn->tranId);
pConn->linkUid = (uint32_t)((int64_t)pConn + taosGetPId() + (int64_t)pConn->tranId);
pConn->spi = pRpc->spi;
pConn->encrypt = pRpc->encrypt;
if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_PASSWORD_LEN);
......
......@@ -1130,8 +1130,4 @@ SysNameInfo taosGetSysNameInfo() {
return info;
}
int64_t taosGetPid() {
getpid();
}
#endif
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册