未验证 提交 f65da854 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #9613 from taosdata/feature/3.0_liaohj

Feature/3.0 liaohj
......@@ -43,7 +43,7 @@ int32_t qParseQuerySql(SParseContext* pContext, SQueryNode** pQuery);
bool qIsDdlQuery(const SQueryNode* pQuery);
void qDestroyQuery(SQueryNode* pQuery);
void qDestroyQuery(SQueryNode* pQueryNode);
/**
* Convert a normal sql statement to only query tags information to enable that the subscribe client can be aware quickly of the true vgroup ids that
......
......@@ -180,6 +180,14 @@ void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t ty
return pRequest;
}
static void doFreeReqResultInfo(SReqResultInfo* pResInfo) {
tfree(pResInfo->pRspMsg);
tfree(pResInfo->length);
tfree(pResInfo->row);
tfree(pResInfo->pCol);
tfree(pResInfo->fields);
}
static void doDestroyRequest(void* p) {
assert(p != NULL);
SRequestObj* pRequest = (SRequestObj*)p;
......@@ -190,7 +198,7 @@ static void doDestroyRequest(void* p) {
tfree(pRequest->sqlstr);
tfree(pRequest->pInfo);
tfree(pRequest->body.resInfo.pRspMsg);
doFreeReqResultInfo(&pRequest->body.resInfo);
deregisterRequest(pRequest);
tfree(pRequest);
......@@ -415,7 +423,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
*+------------+-----+-----------+---------------+
*| uid|localIp| PId | timestamp | serial number |
*+------------+-----+-----------+---------------+
*| 16bit |12bit|20bit |16bit |
*| 12bit |12bit|24bit |16bit |
*+------------+-----+-----------+---------------+
* @return
*/
......@@ -435,11 +443,11 @@ uint64_t generateRequestId() {
}
}
int64_t ts = taosGetTimestampUs();
int64_t ts = taosGetTimestampMs();
uint64_t pid = taosGetPId();
int32_t val = atomic_add_fetch_32(&requestSerialId, 1);
uint64_t id = ((hashId & 0xFFFF) << 48) | ((pid & 0x0FFF) << 36) | ((ts & 0xFFFFF) << 16) | (val & 0xFFFF);
uint64_t id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
return id;
}
......
......@@ -192,13 +192,12 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
}
tsem_wait(&pRequest->body.rspSem);
destroySendMsgInfo(pSendMsg);
return TSDB_CODE_SUCCESS;
}
int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQuery, SQueryDag** pDag) {
pRequest->type = pQuery->type;
return qCreateQueryDag(pQuery, pDag, pRequest->requestId);
int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag) {
pRequest->type = pQueryNode->type;
return qCreateQueryDag(pQueryNode, pDag, pRequest->requestId);
}
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) {
......@@ -364,8 +363,6 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con
asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
tsem_wait(&pRequest->body.rspSem);
destroySendMsgInfo(body);
if (pRequest->code != TSDB_CODE_SUCCESS) {
const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno);
printf("failed to connect to server, reason: %s\n\n", errorMsg);
......@@ -456,17 +453,21 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
}
SDataBuf buf = {.len = pMsg->contLen};
buf.pData = calloc(1, pMsg->contLen);
if (buf.pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
} else {
memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};
if (pMsg->contLen > 0) {
buf.pData = calloc(1, pMsg->contLen);
if (buf.pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
} else {
memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
}
}
pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
rpcFreeCont(pMsg->pCont);
destroySendMsgInfo(pSendInfo);
}
TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port) {
......@@ -539,7 +540,6 @@ void* doFetchRow(SRequestObj* pRequest) {
asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
tsem_wait(&pRequest->body.rspSem);
destroySendMsgInfo(body);
pResultInfo->current = 0;
if (pResultInfo->numOfRows <= pResultInfo->current) {
......
......@@ -30,6 +30,7 @@ int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
setErrno(pRequest, code);
free(pMsg->pData);
sem_post(&pRequest->body.rspSem);
return code;
}
......@@ -37,6 +38,7 @@ int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code) {
int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
if (code != TSDB_CODE_SUCCESS) {
free(pMsg->pData);
setErrno(pRequest, code);
sem_post(&pRequest->body.rspSem);
return code;
......@@ -73,6 +75,7 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId,
pTscObj->pAppInfo->numOfConns);
free(pMsg->pData);
sem_post(&pRequest->body.rspSem);
return 0;
}
......@@ -238,6 +241,7 @@ int32_t processRetrieveVndRsp(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
// todo rsp with the vnode id list
SRequestObj* pRequest = param;
free(pMsg->pData);
tsem_post(&pRequest->body.rspSem);
}
......@@ -245,6 +249,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
if (code != TSDB_CODE_SUCCESS) {
free(pMsg->pData);
setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem);
return code;
......@@ -258,6 +263,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
tNameGetDbName(&name, db);
setConnectionDB(pRequest->pTscObj, db);
free(pMsg->pData);
tsem_post(&pRequest->body.rspSem);
return 0;
}
......@@ -266,6 +272,7 @@ int32_t processCreateTableRsp(void* param, const SDataBuf* pMsg, int32_t code) {
assert(pMsg != NULL && param != NULL);
SRequestObj* pRequest = param;
free(pMsg->pData);
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem);
......
......@@ -112,6 +112,7 @@ TEST(testCase, show_user_Test) {
printf("%s\n", str);
}
taos_free_result(pRes);
taos_close(pConn);
}
......@@ -130,7 +131,7 @@ TEST(testCase, drop_user_Test) {
TEST(testCase, show_db_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "show databases");
TAOS_ROW pRow = NULL;
......@@ -170,62 +171,62 @@ TEST(testCase, create_db_Test) {
}
taos_close(pConn);
}
//
//TEST(testCase, create_dnode_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "create dnode abc1 port 7000");
// if (taos_errno(pRes) != 0) {
// printf("error in create dnode, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create dnode 1.1.1.1 port 9000");
// if (taos_errno(pRes) != 0) {
// printf("failed to create dnode, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// taos_close(pConn);
//}
//
//TEST(testCase, drop_dnode_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "drop dnode 2");
// if (taos_errno(pRes) != 0) {
// printf("error in drop dnode, reason:%s\n", taos_errstr(pRes));
// }
//
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// ASSERT_TRUE(pFields == NULL);
//
// int32_t numOfFields = taos_num_fields(pRes);
// ASSERT_EQ(numOfFields, 0);
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
//
//TEST(testCase, use_db_test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
// if (taos_errno(pRes) != 0) {
// printf("error in use db, reason:%s\n", taos_errstr(pRes));
// }
//
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// ASSERT_TRUE(pFields == NULL);
//
// int32_t numOfFields = taos_num_fields(pRes);
// ASSERT_EQ(numOfFields, 0);
//
// taos_close(pConn);
//}
TEST(testCase, create_dnode_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "create dnode abc1 port 7000");
if (taos_errno(pRes) != 0) {
printf("error in create dnode, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create dnode 1.1.1.1 port 9000");
if (taos_errno(pRes) != 0) {
printf("failed to create dnode, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, drop_dnode_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "drop dnode 2");
if (taos_errno(pRes) != 0) {
printf("error in drop dnode, reason:%s\n", taos_errstr(pRes));
}
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == NULL);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, use_db_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == NULL);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
taos_close(pConn);
}
//TEST(testCase, drop_db_test) {
//// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
......@@ -280,19 +281,19 @@ TEST(testCase, create_db_Test) {
taos_close(pConn);
}
//TEST(testCase, create_table_Test) {
// // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// // assert(pConn != NULL);
// //
// // TAOS_RES* pRes = taos_query(pConn, "use abc1");
// // taos_free_result(pRes);
// //
// // pRes = taos_query(pConn, "create table tm0(ts timestamp, k int)");
// // taos_free_result(pRes);
// //
// // taos_close(pConn);
//}
//
TEST(testCase, create_table_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tm0(ts timestamp, k int)");
taos_free_result(pRes);
taos_close(pConn);
}
//TEST(testCase, create_ctable_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
......@@ -303,12 +304,12 @@ TEST(testCase, create_db_Test) {
// }
// taos_free_result(pRes);
//
//// pRes = taos_query(pConn, "create table tm0 using st1 tags(1)");
//// if (taos_errno(pRes) != 0) {
//// printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes));
//// }
////
//// taos_free_result(pRes);
// pRes = taos_query(pConn, "create table tm0 using st1 tags(1)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes));
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
//
......@@ -342,7 +343,7 @@ TEST(testCase, create_db_Test) {
// taos_free_result(pRes);
// taos_close(pConn);
//}
//
//TEST(testCase, show_vgroup_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
......@@ -375,7 +376,7 @@ TEST(testCase, create_db_Test) {
//
// taos_close(pConn);
//}
//
//TEST(testCase, drop_stable_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
......@@ -468,6 +469,13 @@ TEST(testCase, create_multiple_tables) {
ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("failed to use db, reason:%s", taos_errstr(pRes));
taos_free_result(pRes);
taos_close(pConn);
return;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table t_2 using st1 tags(1)");
......@@ -513,14 +521,17 @@ TEST(testCase, create_multiple_tables) {
TEST(testCase, generated_request_id_test) {
SHashObj *phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
for(int32_t i = 0; i < 1000000; ++i) {
for(int32_t i = 0; i < 50000000; ++i) {
uint64_t v = generateRequestId();
void* result = taosHashGet(phash, &v, sizeof(v));
ASSERT_EQ(result, nullptr);
if (result != nullptr) {
printf("0x%"PRIx64", index:%d\n", v, i);
}
assert(result == nullptr);
taosHashPut(phash, &v, sizeof(v), NULL, 0);
}
taosHashClear(phash);
taosHashCleanup(phash);
}
//TEST(testCase, projection_query_tables) {
......
......@@ -245,7 +245,6 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SE
SEpSet epSet;
ctgGenEpSet(&epSet, vgroupInfo);
rpcSendRecv(pRpc, &epSet, &rpcMsg, &rpcRsp);
if (TSDB_CODE_SUCCESS != rpcRsp.code) {
......
......@@ -10,7 +10,7 @@ SCreateAcctReq* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, in
SDropUserReq* buildDropUserMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
SShowMsg* buildShowMsg(SShowInfo* pShowInfo, SParseBasicCtx* pParseCtx, char* msgBuf, int32_t msgLen);
SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf);
SCreateStbMsg* buildCreateTableMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf);
SCreateStbMsg* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf);
SDropStbMsg* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf);
SCreateDnodeMsg *buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf);
SDropDnodeMsg *buildDropDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf);
......
......@@ -38,14 +38,6 @@ typedef struct SMsgBuf {
char *buf;
} SMsgBuf;
// create table operation type
enum TSQL_CREATE_TABLE_TYPE {
TSQL_CREATE_TABLE = 0x1,
TSQL_CREATE_STABLE = 0x2,
TSQL_CREATE_CTABLE = 0x3,
TSQL_CREATE_STREAM = 0x4,
};
void clearTableMetaInfo(STableMetaInfo* pTableMetaInfo);
void clearAllTableMetaInfo(SQueryStmtInfo* pQueryInfo, bool removeMeta, uint64_t id);
......
......@@ -370,7 +370,7 @@ create_table_list(A) ::= create_from_stable(Z). {
pCreateTable->childTableInfo = taosArrayInit(4, sizeof(SCreatedTableInfo));
taosArrayPush(pCreateTable->childTableInfo, &Z);
pCreateTable->type = TSQL_CREATE_CTABLE;
pCreateTable->type = TSDB_SQL_CREATE_TABLE;
A = pCreateTable;
}
......@@ -381,7 +381,7 @@ create_table_list(A) ::= create_table_list(X) create_from_stable(Z). {
%type create_table_args{SCreateTableSql*}
create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) LP columnlist(X) RP. {
A = tSetCreateTableInfo(X, NULL, NULL, TSQL_CREATE_TABLE);
A = tSetCreateTableInfo(X, NULL, NULL, TSDB_SQL_CREATE_TABLE);
setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE);
V.n += Z.n;
......@@ -391,7 +391,7 @@ create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) LP columnlist(X) RP. {
// create super table
%type create_stable_args{SCreateTableSql*}
create_stable_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) LP columnlist(X) RP TAGS LP columnlist(Y) RP. {
A = tSetCreateTableInfo(X, Y, NULL, TSQL_CREATE_STABLE);
A = tSetCreateTableInfo(X, Y, NULL, TSDB_SQL_CREATE_STABLE);
setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_STABLE);
V.n += Z.n;
......@@ -421,11 +421,11 @@ tagNamelist(A) ::= ids(X). {A = taosArrayInit(4, sizeof(STo
// create stream
// create table table_name as select count(*) from super_table_name interval(time)
create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) AS select(S). {
A = tSetCreateTableInfo(NULL, NULL, S, TSQL_CREATE_STREAM);
setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE);
V.n += Z.n;
setCreatedTableName(pInfo, &V, &U);
// A = tSetCreateTableInfo(NULL, NULL, S, TSQL_CREATE_STREAM);
// setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE);
//
// V.n += Z.n;
// setCreatedTableName(pInfo, &V, &U);
}
%type column{SField}
......
......@@ -579,25 +579,21 @@ SCreateTableSql *tSetCreateTableInfo(SArray *pCols, SArray *pTags, SSqlNode *pSe
SCreateTableSql *pCreate = calloc(1, sizeof(SCreateTableSql));
switch (type) {
case TSQL_CREATE_TABLE: {
case TSDB_SQL_CREATE_TABLE: {
pCreate->colInfo.pColumns = pCols;
assert(pTags == NULL);
break;
}
case TSQL_CREATE_STABLE: {
case TSDB_SQL_CREATE_STABLE: {
pCreate->colInfo.pColumns = pCols;
pCreate->colInfo.pTagColumns = pTags;
assert(pTags != NULL && pCols != NULL);
break;
}
case TSQL_CREATE_STREAM: {
pCreate->pSelect = pSelect;
break;
}
case TSQL_CREATE_CTABLE: {
assert(0);
}
// case TSQL_CREATE_STREAM: {
// pCreate->pSelect = pSelect;
// break;
// }
default:
assert(false);
......@@ -785,7 +781,7 @@ void destroySqlInfo(SSqlInfo *pInfo) {
taosArrayDestroy(pInfo->funcs);
if (pInfo->type == TSDB_SQL_SELECT) {
destroyAllSqlNode(&pInfo->sub);
} else if (pInfo->type == TSDB_SQL_CREATE_STABLE) {
} else if (pInfo->type == TSDB_SQL_CREATE_STABLE || pInfo->type == TSDB_SQL_CREATE_TABLE) {
pInfo->pCreateTableInfo = destroyCreateTableSql(pInfo->pCreateTableInfo);
} else if (pInfo->type == TSDB_SQL_ALTER_TABLE) {
taosArrayDestroyEx(pInfo->pAlterInfo->varList, freeItem);
......
......@@ -230,7 +230,7 @@ SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCt
return pCreateMsg;
}
SCreateStbMsg* buildCreateTableMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf) {
SCreateStbMsg* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf) {
SSchema* pSchema;
int32_t numOfTags = 0;
......@@ -239,16 +239,16 @@ SCreateStbMsg* buildCreateTableMsg(SCreateTableSql* pCreateTableSql, int32_t* le
numOfTags = (int32_t) taosArrayGetSize(pCreateTableSql->colInfo.pTagColumns);
}
SCreateStbMsg* pCreateTableMsg = (SCreateStbMsg*)calloc(1, sizeof(SCreateStbMsg) + (numOfCols + numOfTags) * sizeof(SSchema));
SCreateStbMsg* pCreateStbMsg = (SCreateStbMsg*)calloc(1, sizeof(SCreateStbMsg) + (numOfCols + numOfTags) * sizeof(SSchema));
char* pMsg = NULL;
#if 0
int32_t tableType = pCreateTableSql->type;
if (tableType != TSQL_CREATE_TABLE && tableType != TSQL_CREATE_STABLE) { // create by using super table, tags value
#if 0
SArray* list = pInfo->pCreateTableInfo->childTableInfo;
int32_t numOfTables = (int32_t)taosArrayGetSize(list);
pCreateTableMsg->numOfTables = htonl(numOfTables);
pCreateStbMsg->numOfTables = htonl(numOfTables);
pMsg = (char*)pCreateMsg;
for (int32_t i = 0; i < numOfTables; ++i) {
......@@ -268,25 +268,27 @@ SCreateStbMsg* buildCreateTableMsg(SCreateTableSql* pCreateTableSql, int32_t* le
int32_t len = (int32_t)(pMsg - (char*)pCreate);
pCreate->len = htonl(len);
}
} else {
#endif
} else { // create (super) table
// create (super) table
SName n = {0};
int32_t code = createSName(&n, &pCreateTableSql->name, pParseCtx, pMsgBuf);
if (code != 0) {
return NULL;
}
code = tNameExtractFullName(&n, pCreateTableMsg->name);
code = tNameExtractFullName(&n, pCreateStbMsg->name);
if (code != 0) {
buildInvalidOperationMsg(pMsgBuf, "invalid table name or database not specified");
return NULL;
}
pCreateTableMsg->igExists = pCreateTableSql->existCheck ? 1 : 0;
pCreateTableMsg->numOfColumns = htonl(numOfCols);
pCreateTableMsg->numOfTags = htonl(numOfTags);
pCreateStbMsg->igExists = pCreateTableSql->existCheck ? 1 : 0;
pCreateStbMsg->numOfColumns = htonl(numOfCols);
pCreateStbMsg->numOfTags = htonl(numOfTags);
pSchema = (SSchema*) pCreateTableMsg->pSchema;
pSchema = (SSchema*)pCreateStbMsg->pSchema;
for (int i = 0; i < numOfCols; ++i) {
SField* pField = taosArrayGet(pCreateTableSql->colInfo.pColumns, i);
pSchema->type = pField->type;
......@@ -306,12 +308,11 @@ SCreateStbMsg* buildCreateTableMsg(SCreateTableSql* pCreateTableSql, int32_t* le
}
pMsg = (char*)pSchema;
}
int32_t msgLen = (int32_t)(pMsg - (char*)pCreateTableMsg);
int32_t msgLen = (int32_t)(pMsg - (char*)pCreateStbMsg);
*len = msgLen;
return pCreateTableMsg;
return pCreateStbMsg;
}
SDropStbMsg* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf) {
......@@ -330,7 +331,7 @@ SDropStbMsg* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* p
assert(code == TSDB_CODE_SUCCESS && name.type == TSDB_TABLE_NAME_T);
pDropTableMsg->igNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0;
*len = sizeof(SDropTableMsg);
*len = sizeof(SDropStbMsg);
return pDropTableMsg;
}
......
#include <astGenerator.h>
#include <tmsg.h>
#include <ttime.h>
#include "astToMsg.h"
#include "parserInt.h"
#include "parserUtil.h"
#include "queryInfoUtil.h"
#include "tglobal.h"
#include "tmsg.h"
#include "ttime.h"
/* is contained in pFieldList or not */
static bool has(SArray* pFieldList, int32_t startIndex, const char* name) {
......@@ -248,16 +250,10 @@ static int32_t validateTableColumns(SArray* pFieldList, int32_t maxRowLength, in
}
static int32_t validateTableColumnInfo(SArray* pFieldList, SMsgBuf* pMsgBuf) {
assert(pFieldList != NULL);
assert(pFieldList != NULL && pMsgBuf != NULL);
const char* msg1 = "first column must be timestamp";
const char* msg2 = "row length exceeds max length";
const char* msg3 = "duplicated column names";
const char* msg4 = "invalid data type";
const char* msg5 = "invalid binary/nchar column length";
const char* msg6 = "invalid column name";
const char* msg7 = "too many columns";
const char* msg8 = "illegal number of columns";
const char* msg2 = "illegal number of columns";
// first column must be timestamp
SField* pField = taosArrayGet(pFieldList, 0);
......@@ -268,7 +264,7 @@ static int32_t validateTableColumnInfo(SArray* pFieldList, SMsgBuf* pMsgBuf) {
// number of fields no less than 2
size_t numOfCols = taosArrayGetSize(pFieldList);
if (numOfCols <= 1) {
return buildInvalidOperationMsg(pMsgBuf, msg8);
return buildInvalidOperationMsg(pMsgBuf, msg2);
}
return validateTableColumns(pFieldList, TSDB_MAX_BYTES_PER_ROW, TSDB_MAX_COLUMNS, pMsgBuf);
......@@ -297,11 +293,9 @@ static int32_t validateTagParams(SArray* pTagsList, SArray* pFieldList, SMsgBuf*
return validateTableColumns(pFieldList, TSDB_MAX_TAGS_LEN, TSDB_MAX_TAGS, pMsgBuf);
}
int32_t doCheckForCreateTable(SSqlInfo* pInfo, SMsgBuf* pMsgBuf) {
int32_t doCheckForCreateTable(SCreateTableSql* pCreateTable, SMsgBuf* pMsgBuf) {
const char* msg1 = "invalid table name";
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
SArray* pFieldList = pCreateTable->colInfo.pColumns;
SArray* pTagList = pCreateTable->colInfo.pTagColumns;
assert(pFieldList != NULL);
......@@ -326,6 +320,8 @@ typedef struct SVgroupTablesBatch {
SVgroupInfo info;
} SVgroupTablesBatch;
static SArray* doSerializeVgroupCreateTableInfo(SHashObj* pVgroupHashmap);
static int32_t doParseSerializeTagValue(SSchema* pTagSchema, int32_t numOfInputTag, SKVRowBuilder* pKvRowBuilder,
SArray* pTagValList, int32_t tsPrecision, SMsgBuf* pMsgBuf) {
const char* msg1 = "illegal value or data overflow";
......@@ -351,14 +347,52 @@ static int32_t doParseSerializeTagValue(SSchema* pTagSchema, int32_t numOfInputT
return code;
}
int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len) {
static void addCreateTbReqIntoVgroup(SHashObj* pVgroupHashmap, const SName* pTableName, SKVRow row, uint64_t suid, SVgroupInfo* pVgInfo) {
struct SVCreateTbReq req = {0};
req.type = TD_CHILD_TABLE;
req.name = strdup(tNameGetTableName(pTableName));
req.ctbCfg.suid = suid;
req.ctbCfg.pTag = row;
SVgroupTablesBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
if (pTableBatch == NULL) {
SVgroupTablesBatch tBatch = {0};
tBatch.info = *pVgInfo;
tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
taosArrayPush(tBatch.req.pArray, &req);
taosHashPut(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId), &tBatch, sizeof(tBatch));
} else { // add to the correct vgroup
assert(pVgInfo->vgId == pTableBatch->info.vgId);
taosArrayPush(pTableBatch->req.pArray, &req);
}
}
static void destroyCreateTbReqBatch(SVgroupTablesBatch* pTbBatch) {
size_t size = taosArrayGetSize(pTbBatch->req.pArray);
for(int32_t i = 0; i < size; ++i) {
SVCreateTbReq* pTableReq = taosArrayGet(pTbBatch->req.pArray, i);
tfree(pTableReq->name);
if (pTableReq->type == TSDB_NORMAL_TABLE) {
tfree(pTableReq->ntbCfg.pSchema);
} else if (pTableReq->type == TSDB_CHILD_TABLE) {
tfree(pTableReq->ctbCfg.pTag);
} else {
assert(0);
}
}
taosArrayDestroy(pTbBatch->req.pArray);
}
static int32_t doCheckAndBuildCreateCTableReq(SCreateTableSql* pCreateTable, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, SArray** pBufArray) {
const char* msg1 = "invalid table name";
const char* msg2 = "tags number not matched";
const char* msg3 = "tag value too long";
const char* msg4 = "illegal value or data overflow";
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
SHashObj* pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
// super table name, create table by using dst
......@@ -378,6 +412,11 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
return code;
}
SKVRowBuilder kvRowBuilder = {0};
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SArray* pValList = pCreateTableInfo->pTagVals;
size_t numOfInputTag = taosArrayGetSize(pValList);
......@@ -393,26 +432,22 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
SSchema* pTagSchema = getTableTagSchema(pSuperTableMeta);
STableComInfo tinfo = getTableInfo(pSuperTableMeta);
SKVRowBuilder kvRowBuilder = {0};
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SArray* pNameList = NULL;
size_t nameSize = 0;
size_t numOfBoundTags = 0;
int32_t schemaSize = getNumOfTags(pSuperTableMeta);
if (pCreateTableInfo->pTagNames) {
pNameList = pCreateTableInfo->pTagNames;
nameSize = taosArrayGetSize(pNameList);
numOfBoundTags = taosArrayGetSize(pNameList);
if (numOfInputTag != nameSize || schemaSize < numOfInputTag) {
if (numOfInputTag != numOfBoundTags || schemaSize < numOfInputTag) {
tdDestroyKVRowBuilder(&kvRowBuilder);
tfree(pSuperTableMeta);
return buildInvalidOperationMsg(pMsgBuf, msg2);
}
bool findColumnIndex = false;
for (int32_t i = 0; i < nameSize; ++i) {
for (int32_t i = 0; i < numOfBoundTags; ++i) {
SToken* sToken = taosArrayGet(pNameList, i);
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // create tmp buf to avoid alter orginal sqlstr
......@@ -440,6 +475,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) {
if (pItem->pVar.nLen > pSchema->bytes) {
tdDestroyKVRowBuilder(&kvRowBuilder);
tfree(pSuperTableMeta);
return buildInvalidOperationMsg(pMsgBuf, msg3);
}
} else if (pSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
......@@ -460,12 +496,14 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
int16_t len = varDataTLen(tagVal);
if (len > pSchema->bytes) {
tdDestroyKVRowBuilder(&kvRowBuilder);
tfree(pSuperTableMeta);
return buildInvalidOperationMsg(pMsgBuf, msg3);
}
}
if (code != TSDB_CODE_SUCCESS) {
tdDestroyKVRowBuilder(&kvRowBuilder);
tfree(pSuperTableMeta);
return buildInvalidOperationMsg(pMsgBuf, msg4);
}
......@@ -484,11 +522,14 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
} else {
if (schemaSize != numOfInputTag) {
tdDestroyKVRowBuilder(&kvRowBuilder);
tfree(pSuperTableMeta);
return buildInvalidOperationMsg(pMsgBuf, msg2);
}
code = doParseSerializeTagValue(pTagSchema, numOfInputTag, &kvRowBuilder, pValList, tinfo.precision, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
tdDestroyKVRowBuilder(&kvRowBuilder);
tfree(pSuperTableMeta);
return code;
}
}
......@@ -496,6 +537,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
tdDestroyKVRowBuilder(&kvRowBuilder);
if (row == NULL) {
tfree(pSuperTableMeta);
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
......@@ -504,72 +546,135 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
SName tableName = {0};
code = createSName(&tableName, &pCreateTableInfo->name, pCtx, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
tfree(pSuperTableMeta);
return code;
}
// Find a appropriate vgroup to accommodate this table , according to the table name
SVgroupInfo info = {0};
catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &tableName, &info);
struct SVCreateTbReq req = {0};
req.type = TD_CHILD_TABLE;
req.name = strdup(tNameGetTableName(&tableName));
req.ctbCfg.suid = pSuperTableMeta->uid;
req.ctbCfg.pTag = row;
addCreateTbReqIntoVgroup(pVgroupHashmap, &tableName, row, pSuperTableMeta->uid, &info);
tfree(pSuperTableMeta);
}
SVgroupTablesBatch* pTableBatch = taosHashGet(pVgroupHashmap, &info.vgId, sizeof(info.vgId));
if (pTableBatch == NULL) {
SVgroupTablesBatch tBatch = {0};
tBatch.info = info;
*pBufArray = doSerializeVgroupCreateTableInfo(pVgroupHashmap);
tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
taosArrayPush(tBatch.req.pArray, &req);
taosHashCleanup(pVgroupHashmap);
return TSDB_CODE_SUCCESS;
}
taosHashPut(pVgroupHashmap, &info.vgId, sizeof(info.vgId), &tBatch, sizeof(tBatch));
} else { // add to the correct vgroup
assert(info.vgId == pTableBatch->info.vgId);
taosArrayPush(pTableBatch->req.pArray, &req);
}
static int32_t serializeVgroupTablesBatchImpl(SVgroupTablesBatch* pTbBatch, SArray* pBufArray) {
int tlen = sizeof(SMsgHead) + tSVCreateTbBatchReqSerialize(NULL, &(pTbBatch->req));
void* buf = malloc(tlen);
if (buf == NULL) {
// TODO: handle error
}
// TODO: serialize and
SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*));
((SMsgHead*)buf)->vgId = htonl(pTbBatch->info.vgId);
((SMsgHead*)buf)->contLen = htonl(tlen);
SVgroupTablesBatch* pTbBatch = NULL;
do {
pTbBatch = taosHashIterate(pVgroupHashmap, pTbBatch);
if (pTbBatch == NULL) break;
void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tSVCreateTbBatchReqSerialize(&pBuf, &(pTbBatch->req));
SVgDataBlocks* pVgData = calloc(1, sizeof(SVgDataBlocks));
pVgData->vg = pTbBatch->info;
pVgData->pData = buf;
pVgData->size = tlen;
pVgData->numOfTables = (int32_t) taosArrayGetSize(pTbBatch->req.pArray);
taosArrayPush(pBufArray, &pVgData);
}
int tlen = sizeof(SMsgHead) + tSVCreateTbBatchReqSerialize(NULL, &(pTbBatch->req));
void* buf = malloc(tlen);
if (buf == NULL) {
// TODO: handle error
static int32_t doBuildSingleTableBatchReq(SName* pTableName, SArray* pColumns, SVgroupInfo* pVgroupInfo, SVgroupTablesBatch* pBatch) {
struct SVCreateTbReq req = {0};
req.type = TD_NORMAL_TABLE;
req.name = strdup(tNameGetTableName(pTableName));
req.ntbCfg.nCols = taosArrayGetSize(pColumns);
int32_t num = req.ntbCfg.nCols;
req.ntbCfg.pSchema = calloc(num, sizeof(SSchema));
for(int32_t i = 0; i < num; ++i) {
SSchema* pSchema = taosArrayGet(pColumns, i);
memcpy(&req.ntbCfg.pSchema[i], pSchema, sizeof(SSchema));
}
pBatch->info = *pVgroupInfo;
pBatch->req.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
if (pBatch->req.pArray == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
taosArrayPush(pBatch->req.pArray, &req);
return TSDB_CODE_SUCCESS;
}
int32_t doCheckAndBuildCreateTableReq(SCreateTableSql* pCreateTable, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len) {
SArray* pBufArray = NULL;
// it is a sql statement to create a normal table
if (pCreateTable->childTableInfo == NULL) {
assert(taosArrayGetSize(pCreateTable->colInfo.pColumns) > 0 && pCreateTable->colInfo.pTagColumns == NULL);
int32_t code = doCheckForCreateTable(pCreateTable, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
((SMsgHead*)buf)->vgId = htonl(pTbBatch->info.vgId);
((SMsgHead*)buf)->contLen = htonl(tlen);
SName tableName = {0};
code = createSName(&tableName, &pCreateTable->name, pCtx, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tSVCreateTbBatchReqSerialize(&pBuf, &(pTbBatch->req));
SVgroupInfo info = {0};
catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &tableName, &info);
SVgDataBlocks* pVgData = calloc(1, sizeof(SVgDataBlocks));
pVgData->vg = pTbBatch->info;
pVgData->pData = buf;
pVgData->size = tlen;
pVgData->numOfTables = (int32_t) taosArrayGetSize(pTbBatch->req.pArray);
SVgroupTablesBatch tbatch = {0};
code = doBuildSingleTableBatchReq(&tableName, pCreateTable->colInfo.pColumns, &info, &tbatch);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
taosArrayPush(pBufArray, &pVgData);
} while (true);
pBufArray = taosArrayInit(1, POINTER_BYTES);
if (pBufArray == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
serializeVgroupTablesBatchImpl(&tbatch, pBufArray);
destroyCreateTbReqBatch(&tbatch);
} else { // it is a child table, created according to a super table
doCheckAndBuildCreateCTableReq(pCreateTable, pCtx, pMsgBuf, &pBufArray);
}
SVnodeModifOpStmtInfo* pStmtInfo = calloc(1, sizeof(SVnodeModifOpStmtInfo));
pStmtInfo->nodeType = TSDB_SQL_CREATE_TABLE;
pStmtInfo->pDataBlocks = pBufArray;
*pOutput = (char*) pStmtInfo;
*len = sizeof(SVnodeModifOpStmtInfo);
return TSDB_CODE_SUCCESS;
}
SArray* doSerializeVgroupCreateTableInfo(SHashObj* pVgroupHashmap) {
SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*));
SVgroupTablesBatch* pTbBatch = NULL;
do {
pTbBatch = taosHashIterate(pVgroupHashmap, pTbBatch);
if (pTbBatch == NULL) {
break;
}
/*int32_t code = */serializeVgroupTablesBatchImpl(pTbBatch, pBufArray);
destroyCreateTbReqBatch(pTbBatch);
} while (true);
return pBufArray;
}
SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) {
int32_t code = 0;
......@@ -776,21 +881,13 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, c
case TSDB_SQL_CREATE_STABLE: {
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
assert(pCreateTable->type != TSQL_CREATE_CTABLE);
if (pCreateTable->type == TSQL_CREATE_TABLE || pCreateTable->type == TSQL_CREATE_STABLE) {
if ((code = doCheckForCreateTable(pInfo, pMsgBuf)) != TSDB_CODE_SUCCESS) {
terrno = code;
goto _error;
}
pDcl->pMsg = (char*)buildCreateTableMsg(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf);
pDcl->msgType = (pCreateTable->type == TSQL_CREATE_TABLE) ? TDMT_VND_CREATE_TABLE : TDMT_MND_CREATE_STB;
} else if (pCreateTable->type == TSQL_CREATE_STREAM) {
// if ((code = doCheckForStream(pSql, pInfo)) != TSDB_CODE_SUCCESS) {
// return code;
if ((code = doCheckForCreateTable(pCreateTable, pMsgBuf)) != TSDB_CODE_SUCCESS) {
terrno = code;
goto _error;
}
pDcl->pMsg = (char*)buildCreateStbMsg(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf);
pDcl->msgType = TDMT_MND_CREATE_STB;
break;
}
......@@ -838,19 +935,19 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, c
SVnodeModifOpStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) {
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
assert(pCreateTable->type == TSQL_CREATE_CTABLE);
assert(pCreateTable->type == TSDB_SQL_CREATE_TABLE);
SMsgBuf m = {.buf = msgBuf, .len = msgBufLen};
SMsgBuf* pMsgBuf = &m;
SVnodeModifOpStmtInfo* pInsertStmt = NULL;
SVnodeModifOpStmtInfo* pModifSqlStmt = NULL;
int32_t msgLen = 0;
int32_t code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, (char**) &pInsertStmt, &msgLen);
int32_t code = doCheckAndBuildCreateTableReq(pCreateTable, pCtx, pMsgBuf, (char**) &pModifSqlStmt, &msgLen);
if (code != TSDB_CODE_SUCCESS) {
tfree(pInsertStmt);
tfree(pModifSqlStmt);
return NULL;
}
return pInsertStmt;
return pModifSqlStmt;
}
\ No newline at end of file
......@@ -44,21 +44,21 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
}
if (!isDqlSqlStatement(&info)) {
bool toVnode = false;
// bool toVnode = false;
if (info.type == TSDB_SQL_CREATE_TABLE) {
SCreateTableSql* pCreateSql = info.pCreateTableInfo;
if (pCreateSql->type == TSQL_CREATE_CTABLE || pCreateSql->type == TSQL_CREATE_TABLE) {
toVnode = true;
}
}
if (toVnode) {
SVnodeModifOpStmtInfo *pInsertInfo = qParserValidateCreateTbSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen);
if (pInsertInfo == NULL) {
// SCreateTableSql* pCreateSql = info.pCreateTableInfo;
// if (pCreateSql->type == TSQL_CREATE_CTABLE || pCreateSql->type == TSQL_CREATE_TABLE) {
// toVnode = true;
// }
// }
// if (toVnode) {
SVnodeModifOpStmtInfo * pModifStmtInfo = qParserValidateCreateTbSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen);
if (pModifStmtInfo == NULL) {
return terrno;
}
*pQuery = (SQueryNode*) pInsertInfo;
*pQuery = (SQueryNode*)pModifStmtInfo;
} else {
SDclStmtInfo* pDcl = qParserValidateDclSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen);
if (pDcl == NULL) {
......@@ -240,6 +240,10 @@ void qParserCleanupMetaRequestInfo(SCatalogReq* pMetaReq) {
taosArrayDestroy(pMetaReq->pUdf);
}
void qDestroyQuery(SQueryNode* pQuery) {
// todo
void qDestroyQuery(SQueryNode* pQueryNode) {
if (nodeType(pQueryNode) == TSDB_SQL_INSERT || nodeType(pQueryNode) == TSDB_SQL_CREATE_TABLE) {
SVnodeModifOpStmtInfo* pModifInfo = (SVnodeModifOpStmtInfo*)pQueryNode;
taosArrayDestroy(pModifInfo->pDataBlocks);
}
tfree(pQueryNode);
}
......@@ -2642,7 +2642,7 @@ static void yy_reduce(
pCreateTable->childTableInfo = taosArrayInit(4, sizeof(SCreatedTableInfo));
taosArrayPush(pCreateTable->childTableInfo, &yymsp[0].minor.yy150);
pCreateTable->type = TSQL_CREATE_CTABLE;
pCreateTable->type = TSDB_SQL_CREATE_TABLE;
yylhsminor.yy326 = pCreateTable;
}
yymsp[0].minor.yy326 = yylhsminor.yy326;
......@@ -2656,7 +2656,7 @@ static void yy_reduce(
break;
case 140: /* create_table_args ::= ifnotexists ids cpxName LP columnlist RP */
{
yylhsminor.yy326 = tSetCreateTableInfo(yymsp[-1].minor.yy165, NULL, NULL, TSQL_CREATE_TABLE);
yylhsminor.yy326 = tSetCreateTableInfo(yymsp[-1].minor.yy165, NULL, NULL, TSDB_SQL_CREATE_TABLE);
setSqlInfo(pInfo, yylhsminor.yy326, NULL, TSDB_SQL_CREATE_TABLE);
yymsp[-4].minor.yy0.n += yymsp[-3].minor.yy0.n;
......@@ -2666,7 +2666,7 @@ static void yy_reduce(
break;
case 141: /* create_stable_args ::= ifnotexists ids cpxName LP columnlist RP TAGS LP columnlist RP */
{
yylhsminor.yy326 = tSetCreateTableInfo(yymsp[-5].minor.yy165, yymsp[-1].minor.yy165, NULL, TSQL_CREATE_STABLE);
yylhsminor.yy326 = tSetCreateTableInfo(yymsp[-5].minor.yy165, yymsp[-1].minor.yy165, NULL, TSDB_SQL_CREATE_STABLE);
setSqlInfo(pInfo, yylhsminor.yy326, NULL, TSDB_SQL_CREATE_STABLE);
yymsp[-8].minor.yy0.n += yymsp[-7].minor.yy0.n;
......@@ -2700,11 +2700,11 @@ static void yy_reduce(
break;
case 146: /* create_table_args ::= ifnotexists ids cpxName AS select */
{
yylhsminor.yy326 = tSetCreateTableInfo(NULL, NULL, yymsp[0].minor.yy278, TSQL_CREATE_STREAM);
setSqlInfo(pInfo, yylhsminor.yy326, NULL, TSDB_SQL_CREATE_TABLE);
yymsp[-3].minor.yy0.n += yymsp[-2].minor.yy0.n;
setCreatedTableName(pInfo, &yymsp[-3].minor.yy0, &yymsp[-4].minor.yy0);
// yylhsminor.yy326 = tSetCreateTableInfo(NULL, NULL, yymsp[0].minor.yy278, TSQL_CREATE_STREAM);
// setSqlInfo(pInfo, yylhsminor.yy326, NULL, TSDB_SQL_CREATE_TABLE);
//
// yymsp[-3].minor.yy0.n += yymsp[-2].minor.yy0.n;
// setCreatedTableName(pInfo, &yymsp[-3].minor.yy0, &yymsp[-4].minor.yy0);
}
yymsp[-4].minor.yy326 = yylhsminor.yy326;
break;
......
......@@ -392,6 +392,18 @@ SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) {
}
static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) {
if (pQueryNode->info.type == QNODE_MODIFY) {
SDataPayloadInfo* pInfo = pQueryNode->pExtInfo;
size_t size = taosArrayGetSize(pInfo->payload);
for (int32_t i = 0; i < size; ++i) {
SVgDataBlocks* pBlock = taosArrayGetP(pInfo->payload, i);
tfree(pBlock);
}
taosArrayDestroy(pInfo->payload);
}
tfree(pQueryNode->pExtInfo);
tfree(pQueryNode->pSchema);
tfree(pQueryNode->info.name);
......
......@@ -305,10 +305,10 @@ static void splitModificationOpSubPlan(SPlanContext* pCxt, SQueryPlanNode* pPlan
SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pPayload->payload, i);
vgroupInfoToEpSet(&blocks->vg, &subplan->execNode);
subplan->pDataSink = createDataInserter(pCxt, blocks);
subplan->pNode = NULL;
subplan->type = QUERY_TYPE_MODIFY;
subplan->msgType = pPayload->msgType;
subplan->pDataSink = createDataInserter(pCxt, blocks);
subplan->pNode = NULL;
subplan->type = QUERY_TYPE_MODIFY;
subplan->msgType = pPayload->msgType;
subplan->id.queryId = pCxt->pDag->queryId;
RECOVERY_CURRENT_SUBPLAN(pCxt);
......
......@@ -16,12 +16,44 @@
#include "parser.h"
#include "plannerInt.h"
static void destroyDataSinkNode(SDataSink* pSinkNode) {
if (pSinkNode == NULL) {
return;
}
tfree(pSinkNode);
}
void qDestroySubplan(SSubplan* pSubplan) {
// todo
if (pSubplan == NULL) {
return;
}
taosArrayDestroy(pSubplan->pChildren);
taosArrayDestroy(pSubplan->pParents);
destroyDataSinkNode(pSubplan->pDataSink);
// todo destroy pNode
tfree(pSubplan);
}
void qDestroyQueryDag(struct SQueryDag* pDag) {
// todo
if (pDag == NULL) {
return;
}
size_t size = taosArrayGetSize(pDag->pSubplans);
for(size_t i = 0; i < size; ++i) {
SArray* pa = taosArrayGetP(pDag->pSubplans, i);
size_t t = taosArrayGetSize(pa);
for(int32_t j = 0; j < t; ++j) {
SSubplan* pSubplan = taosArrayGetP(pa, j);
qDestroySubplan(pSubplan);
}
taosArrayDestroy(pa);
}
taosArrayDestroy(pDag->pSubplans);
tfree(pDag);
}
int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, uint64_t requestId) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册