提交 cc64e9e4 编写于 作者: L Liu Jicong

merge topic

...@@ -193,7 +193,7 @@ DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr); ...@@ -193,7 +193,7 @@ DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr);
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList); DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList);
DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision); DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision);
DLL_EXPORT TAOS_RES* tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen); DLL_EXPORT TAOS_RES *taos_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -1093,6 +1093,7 @@ typedef struct { ...@@ -1093,6 +1093,7 @@ typedef struct {
typedef struct { typedef struct {
int8_t igExists; int8_t igExists;
char* name; char* name;
char* sql;
char* physicalPlan; char* physicalPlan;
char* logicalPlan; char* logicalPlan;
} SCMCreateTopicReq; } SCMCreateTopicReq;
...@@ -1101,6 +1102,7 @@ static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateT ...@@ -1101,6 +1102,7 @@ static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateT
int tlen = 0; int tlen = 0;
tlen += taosEncodeFixedI8(buf, pReq->igExists); tlen += taosEncodeFixedI8(buf, pReq->igExists);
tlen += taosEncodeString(buf, pReq->name); tlen += taosEncodeString(buf, pReq->name);
tlen += taosEncodeString(buf, pReq->sql);
tlen += taosEncodeString(buf, pReq->physicalPlan); tlen += taosEncodeString(buf, pReq->physicalPlan);
tlen += taosEncodeString(buf, pReq->logicalPlan); tlen += taosEncodeString(buf, pReq->logicalPlan);
return tlen; return tlen;
...@@ -1109,6 +1111,7 @@ static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateT ...@@ -1109,6 +1111,7 @@ static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateT
static FORCE_INLINE void* tDeserializeSCMCreateTopicReq(void* buf, SCMCreateTopicReq* pReq) { static FORCE_INLINE void* tDeserializeSCMCreateTopicReq(void* buf, SCMCreateTopicReq* pReq) {
buf = taosDecodeFixedI8(buf, &(pReq->igExists)); buf = taosDecodeFixedI8(buf, &(pReq->igExists));
buf = taosDecodeString(buf, &(pReq->name)); buf = taosDecodeString(buf, &(pReq->name));
buf = taosDecodeString(buf, &(pReq->sql));
buf = taosDecodeString(buf, &(pReq->physicalPlan)); buf = taosDecodeString(buf, &(pReq->physicalPlan));
buf = taosDecodeString(buf, &(pReq->logicalPlan)); buf = taosDecodeString(buf, &(pReq->logicalPlan));
return buf; return buf;
...@@ -1231,7 +1234,7 @@ typedef struct { ...@@ -1231,7 +1234,7 @@ typedef struct {
} SMVSubscribeRsp; } SMVSubscribeRsp;
typedef struct { typedef struct {
char name[TSDB_TOPIC_FNAME_LEN]; char name[TSDB_TOPIC_NAME_LEN];
int8_t igExists; int8_t igExists;
int32_t execLen; int32_t execLen;
void* executor; void* executor;
......
...@@ -25,14 +25,12 @@ ...@@ -25,14 +25,12 @@
#define T_NAME_ACCT 0x1u #define T_NAME_ACCT 0x1u
#define T_NAME_DB 0x2u #define T_NAME_DB 0x2u
#define T_NAME_TABLE 0x4u #define T_NAME_TABLE 0x4u
#define T_NAME_TOPIC 0x8u
typedef struct SName { typedef struct SName {
uint8_t type; //db_name_t, table_name_t uint8_t type; //db_name_t, table_name_t
int32_t acctId; int32_t acctId;
char dbname[TSDB_DB_NAME_LEN]; char dbname[TSDB_DB_NAME_LEN];
char tname[TSDB_TABLE_NAME_LEN]; char tname[TSDB_TABLE_NAME_LEN];
char topicName[TSDB_TOPIC_NAME_LEN];
} SName; } SName;
int32_t tNameExtractFullName(const SName* name, char* dst); int32_t tNameExtractFullName(const SName* name, char* dst);
......
...@@ -44,7 +44,7 @@ typedef struct SParseContext { ...@@ -44,7 +44,7 @@ typedef struct SParseContext {
* @param msg extended error message if exists. * @param msg extended error message if exists.
* @return error code * @return error code
*/ */
int32_t qParseQuerySql(SParseContext* pContext, SQueryNode** pQuery); int32_t qParseQuerySql(SParseContext* pContext, SQueryNode** pQueryNode);
/** /**
* Return true if it is a ddl/dcl sql statement * Return true if it is a ddl/dcl sql statement
......
...@@ -95,6 +95,7 @@ typedef struct SScanPhyNode { ...@@ -95,6 +95,7 @@ typedef struct SScanPhyNode {
int8_t tableType; int8_t tableType;
int32_t order; // scan order: TSDB_ORDER_ASC|TSDB_ORDER_DESC int32_t order; // scan order: TSDB_ORDER_ASC|TSDB_ORDER_DESC
int32_t count; // repeat count int32_t count; // repeat count
int32_t reverse; // reverse scan count
} SScanPhyNode; } SScanPhyNode;
typedef SScanPhyNode SSystemTableScanPhyNode; typedef SScanPhyNode SSystemTableScanPhyNode;
......
...@@ -307,6 +307,7 @@ int32_t* taosGetErrno(); ...@@ -307,6 +307,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_NO_WRITE_AUTH TAOS_DEF_ERROR_CODE(0, 0x0512) //"Database write operation denied") #define TSDB_CODE_VND_NO_WRITE_AUTH TAOS_DEF_ERROR_CODE(0, 0x0512) //"Database write operation denied")
#define TSDB_CODE_VND_IS_SYNCING TAOS_DEF_ERROR_CODE(0, 0x0513) //"Database is syncing") #define TSDB_CODE_VND_IS_SYNCING TAOS_DEF_ERROR_CODE(0, 0x0513) //"Database is syncing")
#define TSDB_CODE_VND_INVALID_TSDB_STATE TAOS_DEF_ERROR_CODE(0, 0x0514) //"Invalid tsdb state") #define TSDB_CODE_VND_INVALID_TSDB_STATE TAOS_DEF_ERROR_CODE(0, 0x0514) //"Invalid tsdb state")
#define TSDB_CODE_VND_TB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0515) // "Table not exists")
// tsdb // tsdb
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) //"Invalid table ID") #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) //"Invalid table ID")
......
...@@ -351,6 +351,7 @@ static FORCE_INLINE void *taosDecodeString(void *buf, char **value) { ...@@ -351,6 +351,7 @@ static FORCE_INLINE void *taosDecodeString(void *buf, char **value) {
buf = taosDecodeVariantU64(buf, &size); buf = taosDecodeVariantU64(buf, &size);
*value = (char *)malloc((size_t)size + 1); *value = (char *)malloc((size_t)size + 1);
if (*value == NULL) return NULL; if (*value == NULL) return NULL;
memcpy(*value, buf, (size_t)size); memcpy(*value, buf, (size_t)size);
......
...@@ -181,7 +181,7 @@ do { \ ...@@ -181,7 +181,7 @@ do { \
#define TSDB_COL_NAME_LEN 65 #define TSDB_COL_NAME_LEN 65
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64 #define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE #define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE
#define TSDB_MAX_SQL_SHOW_LEN 512 #define TSDB_MAX_SQL_SHOW_LEN 1024
#define TSDB_MAX_ALLOWED_SQL_LEN (1*1024*1024u) // sql length should be less than 1mb #define TSDB_MAX_ALLOWED_SQL_LEN (1*1024*1024u) // sql length should be less than 1mb
#define TSDB_APP_NAME_LEN TSDB_UNI_LEN #define TSDB_APP_NAME_LEN TSDB_UNI_LEN
......
...@@ -118,9 +118,8 @@ typedef struct STscObj { ...@@ -118,9 +118,8 @@ typedef struct STscObj {
uint32_t connId; uint32_t connId;
int32_t connType; int32_t connType;
uint64_t id; // ref ID returned by taosAddRef uint64_t id; // ref ID returned by taosAddRef
void *pTransporter;
pthread_mutex_t mutex; // used to protect the operation on db pthread_mutex_t mutex; // used to protect the operation on db
int32_t numOfReqs; // number of sqlObj from this tscObj int32_t numOfReqs; // number of sqlObj bound to this connection
SAppInstInfo *pAppInfo; SAppInstInfo *pAppInfo;
} STscObj; } STscObj;
...@@ -149,12 +148,13 @@ typedef struct SShowReqInfo { ...@@ -149,12 +148,13 @@ typedef struct SShowReqInfo {
} SShowReqInfo; } SShowReqInfo;
typedef struct SRequestSendRecvBody { typedef struct SRequestSendRecvBody {
tsem_t rspSem; // not used now tsem_t rspSem; // not used now
void* fp; void* fp;
SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed. SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed.
struct SSchJob *pQueryJob; // query job, created according to sql query DAG. SDataBuf requestMsg;
SDataBuf requestMsg; struct SSchJob *pQueryJob; // query job, created according to sql query DAG.
SReqResultInfo resInfo; struct SQueryDag *pDag; // the query dag, generated according to the sql statement.
SReqResultInfo resInfo;
} SRequestSendRecvBody; } SRequestSendRecvBody;
#define ERROR_MSG_BUF_DEFAULT_SIZE 512 #define ERROR_MSG_BUF_DEFAULT_SIZE 512
......
...@@ -89,13 +89,12 @@ static void tscInitLogFile() { ...@@ -89,13 +89,12 @@ static void tscInitLogFile() {
// todo close the transporter properly // todo close the transporter properly
void closeTransporter(STscObj* pTscObj) { void closeTransporter(STscObj* pTscObj) {
if (pTscObj == NULL || pTscObj->pTransporter == NULL) { if (pTscObj == NULL || pTscObj->pAppInfo->pTransporter == NULL) {
return; return;
} }
tscDebug("free transporter:%p in connObj: 0x%"PRIx64, pTscObj->pTransporter, pTscObj->id); tscDebug("free transporter:%p in connObj: 0x%"PRIx64, pTscObj->pAppInfo->pTransporter, pTscObj->id);
rpcClose(pTscObj->pTransporter); rpcClose(pTscObj->pAppInfo->pTransporter);
pTscObj->pTransporter = NULL;
} }
// TODO refactor // TODO refactor
...@@ -140,10 +139,6 @@ void* createTscObj(const char* user, const char* auth, const char *db, SAppInstI ...@@ -140,10 +139,6 @@ void* createTscObj(const char* user, const char* auth, const char *db, SAppInstI
} }
pObj->pAppInfo = pAppInfo; pObj->pAppInfo = pAppInfo;
if (pAppInfo != NULL) {
pObj->pTransporter = pAppInfo->pTransporter;
}
tstrncpy(pObj->user, user, sizeof(pObj->user)); tstrncpy(pObj->user, user, sizeof(pObj->user));
memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN); memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN);
...@@ -199,6 +194,7 @@ static void doDestroyRequest(void* p) { ...@@ -199,6 +194,7 @@ static void doDestroyRequest(void* p) {
tfree(pRequest->pInfo); tfree(pRequest->pInfo);
doFreeReqResultInfo(&pRequest->body.resInfo); doFreeReqResultInfo(&pRequest->body.resInfo);
qDestroyQueryDag(pRequest->body.pDag);
deregisterRequest(pRequest); deregisterRequest(pRequest);
tfree(pRequest); tfree(pRequest);
......
...@@ -13,12 +13,12 @@ ...@@ -13,12 +13,12 @@
#include "tpagedfile.h" #include "tpagedfile.h"
#include "tref.h" #include "tref.h"
#define CHECK_CODE_GOTO(expr, lable) \ #define CHECK_CODE_GOTO(expr, label) \
do { \ do { \
int32_t code = expr; \ int32_t code = expr; \
if (TSDB_CODE_SUCCESS != code) { \ if (TSDB_CODE_SUCCESS != code) { \
terrno = code; \ terrno = code; \
goto lable; \ goto label; \
} \ } \
} while (0) } while (0)
...@@ -153,13 +153,13 @@ int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) { ...@@ -153,13 +153,13 @@ int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) {
SParseContext cxt = { SParseContext cxt = {
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
.acctId = pTscObj->acctId, .acctId = pTscObj->acctId,
.db = getConnectionDB(pTscObj), .db = getConnectionDB(pTscObj),
.pTransporter = pTscObj->pTransporter, .pSql = pRequest->sqlstr,
.pSql = pRequest->sqlstr, .sqlLen = pRequest->sqlLen,
.sqlLen = pRequest->sqlLen, .pMsg = pRequest->msgBuf,
.pMsg = pRequest->msgBuf, .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE .pTransporter = pTscObj->pAppInfo->pTransporter,
}; };
cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
...@@ -192,10 +192,10 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) { ...@@ -192,10 +192,10 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
pShowReqInfo->pArray = pDcl->pExtension; pShowReqInfo->pArray = pDcl->pExtension;
} }
} }
asyncSendMsgToServer(pTscObj->pTransporter, &pDcl->epSet, &transporterId, pSendMsg); asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pDcl->epSet, &transporterId, pSendMsg);
} else { } else {
SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet; SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet;
asyncSendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, pSendMsg); asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, pEpSet, &transporterId, pSendMsg);
} }
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
...@@ -242,7 +242,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) { ...@@ -242,7 +242,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) {
if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) { if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
int32_t code = scheduleExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res); int32_t code = scheduleExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
// handle error and retry // handle error and retry
} else { } else {
...@@ -256,7 +256,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) { ...@@ -256,7 +256,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) {
return pRequest->code; return pRequest->code;
} }
return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, &pRequest->body.pQueryJob); return scheduleAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob);
} }
typedef struct tmq_t tmq_t; typedef struct tmq_t tmq_t;
...@@ -362,28 +362,55 @@ TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sq ...@@ -362,28 +362,55 @@ TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sq
char *dagStr = NULL; char *dagStr = NULL;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
if (taos == NULL || topicName == NULL || sql == NULL) {
tscError("invalid parameters for creating topic, connObj:%p, topic name:%s, sql:%s", taos, topicName, sql);
terrno = TSDB_CODE_TSC_INVALID_INPUT;
goto _return;
}
if (strlen(topicName) >= TSDB_TOPIC_NAME_LEN) {
tscError("topic name too long, max length:%d", TSDB_TOPIC_NAME_LEN - 1);
terrno = TSDB_CODE_TSC_INVALID_INPUT;
goto _return;
}
if (sqlLen > tsMaxSQLStringLen) {
tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
goto _return;
}
tscDebug("start to create topic, %s", topicName);
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return);
//temporary disabled until planner ready // todo check for invalid sql statement and return with error code
#if 0
CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return);
//TODO: check sql valid
CHECK_CODE_GOTO(qCreateQueryDag(pQuery, &pDag), _return); CHECK_CODE_GOTO(qCreateQueryDag(pQueryNode, &pRequest->body.pDag, pRequest->requestId), _return);
dagStr = qDagToString(pDag); pStr = qDagToString(pRequest->body.pDag);
if(dagStr == NULL) { if(pStr == NULL) {
//TODO goto _return;
} }
#endif
// The topic should be related to a database that the queried table is belonged to.
SName name = {0};
char dbName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(&((SQueryStmtInfo*) pQueryNode)->pTableMetaInfo[0]->name, dbName);
tNameFromString(&name, dbName, T_NAME_ACCT|T_NAME_DB);
tNameFromString(&name, topicName, T_NAME_TABLE);
char topicFname[TSDB_TOPIC_FNAME_LEN] = {0};
tNameExtractFullName(&name, topicFname);
SCMCreateTopicReq req = { SCMCreateTopicReq req = {
.name = (char*)name, .name = (char*) topicFname,
.igExists = 0, .igExists = 0,
/*.physicalPlan = dagStr,*/ .physicalPlan = (char*) pStr,
.physicalPlan = (char*)sql, .sql = (char*) sql,
.logicalPlan = "", .logicalPlan = "no logic plan",
}; };
int tlen = tSerializeSCMCreateTopicReq(NULL, &req); int tlen = tSerializeSCMCreateTopicReq(NULL, &req);
...@@ -391,27 +418,32 @@ TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sq ...@@ -391,27 +418,32 @@ TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sq
if(buf == NULL) { if(buf == NULL) {
goto _return; goto _return;
} }
void* abuf = buf; void* abuf = buf;
tSerializeSCMCreateTopicReq(&abuf, &req); tSerializeSCMCreateTopicReq(&abuf, &req);
/*printf("formatted: %s\n", dagStr);*/ /*printf("formatted: %s\n", dagStr);*/
pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen }; pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
pRequest->type = TDMT_MND_CREATE_TOPIC;
SMsgSendInfo* body = buildMsgInfoImpl(pRequest); SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet; SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, body); asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
_return: _return:
qDestroyQuery(pQuery); qDestroyQuery(pQueryNode);
qDestroyQueryDag(pDag); if (body != NULL) {
destroySendMsgInfo(body); destroySendMsgInfo(body);
}
if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) { if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
pRequest->code = terrno; pRequest->code = terrno;
} }
return pRequest; return pRequest;
} }
...@@ -445,24 +477,22 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { ...@@ -445,24 +477,22 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
nPrintTsc("%s", sql) nPrintTsc("%s", sql)
SRequestObj *pRequest = NULL; SRequestObj *pRequest = NULL;
SQueryNode *pQuery = NULL; SQueryNode *pQueryNode = NULL;
SQueryDag *pDag = NULL;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return); CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return);
if (qIsDdlQuery(pQuery)) { if (qIsDdlQuery(pQueryNode)) {
CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return); CHECK_CODE_GOTO(execDdlQuery(pRequest, pQueryNode), _return);
} else { } else {
CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pDag), _return); CHECK_CODE_GOTO(getPlan(pRequest, pQueryNode, &pRequest->body.pDag), _return);
CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag), _return); CHECK_CODE_GOTO(scheduleQuery(pRequest, pRequest->body.pDag), _return);
pRequest->code = terrno; pRequest->code = terrno;
} }
_return: _return:
qDestroyQuery(pQuery); qDestroyQuery(pQueryNode);
qDestroyQueryDag(pDag);
if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) { if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) {
pRequest->code = terrno; pRequest->code = terrno;
} }
...@@ -523,7 +553,7 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con ...@@ -523,7 +553,7 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con
SMsgSendInfo* body = buildConnectMsg(pRequest); SMsgSendInfo* body = buildConnectMsg(pRequest);
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
if (pRequest->code != TSDB_CODE_SUCCESS) { if (pRequest->code != TSDB_CODE_SUCCESS) {
...@@ -534,7 +564,7 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con ...@@ -534,7 +564,7 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con
taos_close(pTscObj); taos_close(pTscObj);
pTscObj = NULL; pTscObj = NULL;
} else { } else {
tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p, reqId:0x%"PRIx64, pTscObj->id, pTscObj->connId, pTscObj->pTransporter, pRequest->requestId); tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p, reqId:0x%"PRIx64, pTscObj->id, pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId);
destroyRequest(pRequest); destroyRequest(pRequest);
} }
...@@ -702,7 +732,7 @@ void* doFetchRow(SRequestObj* pRequest) { ...@@ -702,7 +732,7 @@ void* doFetchRow(SRequestObj* pRequest) {
int64_t transporterId = 0; int64_t transporterId = 0;
STscObj *pTscObj = pRequest->pTscObj; STscObj *pTscObj = pRequest->pTscObj;
asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
pRequest->type = TDMT_VND_SHOW_TABLES_FETCH; pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
...@@ -712,7 +742,7 @@ void* doFetchRow(SRequestObj* pRequest) { ...@@ -712,7 +742,7 @@ void* doFetchRow(SRequestObj* pRequest) {
int64_t transporterId = 0; int64_t transporterId = 0;
STscObj *pTscObj = pRequest->pTscObj; STscObj *pTscObj = pRequest->pTscObj;
asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
......
...@@ -149,27 +149,36 @@ TEST(testCase, connect_Test) { ...@@ -149,27 +149,36 @@ TEST(testCase, connect_Test) {
//} //}
// //
//TEST(testCase, create_db_Test) { //TEST(testCase, create_db_Test) {
//TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
//assert(pConn != NULL); // assert(pConn != NULL);
//
//TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2"); // TAOS_RES* pRes = taos_query(pConn, "drop database abc1");
//if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
//printf("error in create db, reason:%s\n", taos_errstr(pRes)); // printf("failed to drop database, reason:%s\n", taos_errstr(pRes));
//} // }
//
//TAOS_FIELD* pFields = taos_fetch_fields(pRes); // taos_free_result(pRes);
//ASSERT_TRUE(pFields == NULL); //
// pRes = taos_query(pConn, "create database abc1 vgroups 2");
//int32_t numOfFields = taos_num_fields(pRes); // if (taos_errno(pRes) != 0) {
//ASSERT_EQ(numOfFields, 0); // printf("error in create db, reason:%s\n", taos_errstr(pRes));
// }
//taos_free_result(pRes); //
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
//pRes = taos_query(pConn, "create database abc1 vgroups 4"); // ASSERT_TRUE(pFields == NULL);
//if (taos_errno(pRes) != 0) { //
//printf("error in create db, reason:%s\n", taos_errstr(pRes)); // int32_t numOfFields = taos_num_fields(pRes);
//} // ASSERT_EQ(numOfFields, 0);
//taos_close(pConn); //
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create database if not exists abc1 vgroups 4");
// if (taos_errno(pRes) != 0) {
// printf("failed to create database abc1, reason:%s\n", taos_errstr(pRes));
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//} //}
// //
//TEST(testCase, create_dnode_Test) { //TEST(testCase, create_dnode_Test) {
...@@ -195,7 +204,7 @@ TEST(testCase, connect_Test) { ...@@ -195,7 +204,7 @@ TEST(testCase, connect_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); // assert(pConn != NULL);
// //
// TAOS_RES* pRes = taos_query(pConn, "drop dnode 2"); // TAOS_RES* pRes = taos_query(pConn, "drop dnode 3");
// if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
// printf("error in drop dnode, reason:%s\n", taos_errstr(pRes)); // printf("error in drop dnode, reason:%s\n", taos_errstr(pRes));
// } // }
...@@ -206,6 +215,11 @@ TEST(testCase, connect_Test) { ...@@ -206,6 +215,11 @@ TEST(testCase, connect_Test) {
// int32_t numOfFields = taos_num_fields(pRes); // int32_t numOfFields = taos_num_fields(pRes);
// ASSERT_EQ(numOfFields, 0); // ASSERT_EQ(numOfFields, 0);
// //
// pRes = taos_query(pConn, "drop dnode 4");
// if (taos_errno(pRes) != 0) {
// printf("error in drop dnode, reason:%s\n", taos_errstr(pRes));
// }
//
// taos_free_result(pRes); // taos_free_result(pRes);
// taos_close(pConn); // taos_close(pConn);
//} //}
...@@ -228,33 +242,33 @@ TEST(testCase, connect_Test) { ...@@ -228,33 +242,33 @@ TEST(testCase, connect_Test) {
// taos_close(pConn); // taos_close(pConn);
//} //}
// //
//// TEST(testCase, drop_db_test) { // TEST(testCase, drop_db_test) {
//// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
//// assert(pConn != NULL); // assert(pConn != NULL);
//// //
//// showDB(pConn); // showDB(pConn);
//// //
//// TAOS_RES* pRes = taos_query(pConn, "drop database abc1"); // TAOS_RES* pRes = taos_query(pConn, "drop database abc1");
//// if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
//// printf("failed to drop db, reason:%s\n", taos_errstr(pRes)); // printf("failed to drop db, reason:%s\n", taos_errstr(pRes));
//// } // }
//// taos_free_result(pRes); // taos_free_result(pRes);
//// //
//// showDB(pConn); // showDB(pConn);
//// //
//// pRes = taos_query(pConn, "create database abc1"); // pRes = taos_query(pConn, "create database abc1");
//// if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
//// printf("create to drop db, reason:%s\n", taos_errstr(pRes)); // printf("create to drop db, reason:%s\n", taos_errstr(pRes));
//// } // }
//// taos_free_result(pRes); // taos_free_result(pRes);
//// taos_close(pConn); // taos_close(pConn);
////} //}
// //
//TEST(testCase, create_stable_Test) { //TEST(testCase, create_stable_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); // assert(pConn != NULL);
// //
// TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2"); // TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
// if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
// printf("error in create db, reason:%s\n", taos_errstr(pRes)); // printf("error in create db, reason:%s\n", taos_errstr(pRes));
// } // }
...@@ -280,7 +294,7 @@ TEST(testCase, connect_Test) { ...@@ -280,7 +294,7 @@ TEST(testCase, connect_Test) {
// taos_free_result(pRes); // taos_free_result(pRes);
// taos_close(pConn); // taos_close(pConn);
//} //}
//
//TEST(testCase, create_table_Test) { //TEST(testCase, create_table_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); // assert(pConn != NULL);
...@@ -293,26 +307,26 @@ TEST(testCase, connect_Test) { ...@@ -293,26 +307,26 @@ TEST(testCase, connect_Test) {
// //
// taos_close(pConn); // taos_close(pConn);
//} //}
//
//TEST(testCase, create_ctable_Test) { //TEST(testCase, create_ctable_Test) {
//TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
//assert(pConn != NULL); // assert(pConn != NULL);
//
//TAOS_RES* pRes = taos_query(pConn, "use abc1"); // TAOS_RES* pRes = taos_query(pConn, "use abc1");
//if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
//printf("failed to use db, reason:%s\n", taos_errstr(pRes)); // printf("failed to use db, reason:%s\n", taos_errstr(pRes));
//} // }
//taos_free_result(pRes); // taos_free_result(pRes);
//
//pRes = taos_query(pConn, "create table tm0 using st1 tags(1)"); // pRes = taos_query(pConn, "create table tm0 using st1 tags(1)");
//if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
//printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes)); // printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes));
//} // }
//
//taos_free_result(pRes); // taos_free_result(pRes);
//taos_close(pConn); // taos_close(pConn);
//} //}
//
//TEST(testCase, show_stable_Test) { //TEST(testCase, show_stable_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); // assert(pConn != NULL);
...@@ -500,60 +514,101 @@ TEST(testCase, connect_Test) { ...@@ -500,60 +514,101 @@ TEST(testCase, connect_Test) {
// //
// taosHashCleanup(phash); // taosHashCleanup(phash);
//} //}
//
TEST(testCase, create_topic_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
// TEST(testCase, create_topic_Test) { TAOS_RES* pRes = taos_query(pConn, "use abc1");
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); if (taos_errno(pRes) != 0) {
// assert(pConn != NULL); printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == nullptr);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
taos_free_result(pRes);
char* sql = "select * from tu";
pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql));
taos_free_result(pRes);
taos_close(pConn);
}
// //
// TAOS_RES* pRes = taos_query(pConn, "create database abc1"); //TEST(testCase, insert_test) {
// if (taos_errno(pRes) != 0) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// printf("error in create db, reason:%s\n", taos_errstr(pRes)); // ASSERT_NE(pConn, nullptr);
// }
// taos_free_result(pRes);
// //
// pRes = taos_query(pConn, "use abc1"); // TAOS_RES* pRes = taos_query(pConn, "use abc1");
// if (taos_errno(pRes) != 0) {
// printf("error in use db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes); // taos_free_result(pRes);
// //
// pRes = taos_query(pConn, "create stable st1(ts timestamp, k int) tags(a int)"); // pRes = taos_query(pConn, "insert into t_2 values(now, 1)");
// if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
// printf("error in create stable, reason:%s\n", taos_errstr(pRes)); // printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// ASSERT_TRUE(false);
// } // }
// //
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// ASSERT_TRUE(pFields == NULL);
//
// int32_t numOfFields = taos_num_fields(pRes);
// ASSERT_EQ(numOfFields, 0);
//
// taos_free_result(pRes); // taos_free_result(pRes);
//
// char* sql = "select * from st1";
// tmq_create_topic(pConn, "test_topic_1", sql, strlen(sql));
// taos_close(pConn); // taos_close(pConn);
//} //}
//TEST(testCase, insert_test) { //TEST(testCase, projection_query_tables) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_EQ(pConn, nullptr); // ASSERT_NE(pConn, nullptr);
// //
// TAOS_RES* pRes = taos_query(pConn, "use abc1"); // TAOS_RES* pRes = taos_query(pConn, "use abc1");
// taos_free_result(pRes); // taos_free_result(pRes);
// //
// pRes = taos_query(pConn, "insert into t_2 values(now, 1)"); //// pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)");
//// if (taos_errno(pRes) != 0) {
//// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
//// }
//// taos_free_result(pRes);
////
//// pRes = taos_query(pConn, "create table tu using st1 tags(1)");
//// if (taos_errno(pRes) != 0) {
//// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
//// }
//// taos_free_result(pRes);
////
//// for(int32_t i = 0; i < 100; ++i) {
//// char sql[512] = {0};
//// sprintf(sql, "insert into tu values(now+%da, %d)", i, i);
//// TAOS_RES* p = taos_query(pConn, sql);
//// if (taos_errno(p) != 0) {
//// printf("failed to insert data, reason:%s\n", taos_errstr(p));
//// }
////
//// taos_free_result(p);
//// }
//
// pRes = taos_query(pConn, "select * from tu");
// if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
// printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); // printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes); // taos_free_result(pRes);
// ASSERT_TRUE(false); // ASSERT_TRUE(false);
// } // }
// //
// TAOS_ROW pRow = NULL;
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0};
// while ((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
//
// taos_free_result(pRes); // taos_free_result(pRes);
// taos_close(pConn); // taos_close(pConn);
//} //}
//TEST(testCase, projection_query_tables) { //TEST(testCase, projection_query_stables) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_NE(pConn, nullptr); // ASSERT_NE(pConn, nullptr);
// //
...@@ -577,7 +632,7 @@ TEST(testCase, connect_Test) { ...@@ -577,7 +632,7 @@ TEST(testCase, connect_Test) {
//// pRes = taos_query(pConn, "insert into tu values(now, 1)"); //// pRes = taos_query(pConn, "insert into tu values(now, 1)");
//// taos_free_result(pRes); //// taos_free_result(pRes);
// //
// pRes = taos_query(pConn, "select * from tu"); // pRes = taos_query(pConn, "select ts,k from m1");
// if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
// printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); // printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes); // taos_free_result(pRes);
......
...@@ -350,7 +350,7 @@ typedef struct SMqTopicObj { ...@@ -350,7 +350,7 @@ typedef struct SMqTopicObj {
// TODO: add cache and change name to id // TODO: add cache and change name to id
typedef struct SMqConsumerTopic { typedef struct SMqConsumerTopic {
char name[TSDB_TOPIC_FNAME_LEN]; char name[TSDB_TOPIC_NAME_LEN];
SList *vgroups; // SList<int32_t> SList *vgroups; // SList<int32_t>
} SMqConsumerTopic; } SMqConsumerTopic;
...@@ -409,7 +409,7 @@ typedef struct SMqVGroupHbObj { ...@@ -409,7 +409,7 @@ typedef struct SMqVGroupHbObj {
#if 0 #if 0
typedef struct SCGroupObj { typedef struct SCGroupObj {
char name[TSDB_TOPIC_FNAME_LEN]; char name[TSDB_TOPIC_NAME_LEN];
int64_t createTime; int64_t createTime;
int64_t updateTime; int64_t updateTime;
uint64_t uid; uint64_t uid;
......
...@@ -118,11 +118,13 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { ...@@ -118,11 +118,13 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER); SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
pTopic->sql = calloc(pTopic->sqlLen + 1, sizeof(char));
SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER); SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER); // SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len, TOPIC_DECODE_OVER); // SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER); // SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER); // SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER);
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER) SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER)
...@@ -178,7 +180,7 @@ void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) { ...@@ -178,7 +180,7 @@ void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) {
static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) { static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) {
SName name = {0}; SName name = {0};
tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TOPIC); tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
char db[TSDB_TABLE_FNAME_LEN] = {0}; char db[TSDB_TABLE_FNAME_LEN] = {0};
tNameGetFullDbName(&name, db); tNameGetFullDbName(&name, db);
...@@ -203,20 +205,24 @@ static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMq ...@@ -203,20 +205,24 @@ static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMq
return pDrop; return pDrop;
} }
static int32_t mndCheckCreateTopicMsg(SCMCreateTopicReq *pCreate) { static int32_t mndCheckCreateTopicMsg(SCMCreateTopicReq *creattopReq) {
// deserialize and other stuff // deserialize and other stuff
return 0; return 0;
} }
static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq *pCreate, SDbObj *pDb) { static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq *pCreate, SDbObj *pDb) {
SMqTopicObj topicObj = {0}; SMqTopicObj topicObj = {0};
tstrncpy(topicObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN); tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN); tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
topicObj.createTime = taosGetTimestampMs(); topicObj.createTime = taosGetTimestampMs();
topicObj.updateTime = topicObj.createTime; topicObj.updateTime = topicObj.createTime;
topicObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); topicObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
topicObj.dbUid = pDb->uid; topicObj.dbUid = pDb->uid;
topicObj.version = 1; topicObj.version = 1;
topicObj.sql = strdup(pCreate->sql);
topicObj.physicalPlan = strdup(pCreate->physicalPlan);
topicObj.logicalPlan = strdup(pCreate->logicalPlan);
topicObj.sqlLen = strlen(pCreate->sql);
SSdbRaw *pTopicRaw = mndTopicActionEncode(&topicObj); SSdbRaw *pTopicRaw = mndTopicActionEncode(&topicObj);
if (pTopicRaw == NULL) return -1; if (pTopicRaw == NULL) return -1;
...@@ -228,46 +234,47 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq ...@@ -228,46 +234,47 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq
static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) { static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
char *msgStr = pMsg->rpcMsg.pCont; char *msgStr = pMsg->rpcMsg.pCont;
SCMCreateTopicReq *pCreate;
tDeserializeSCMCreateTopicReq(msgStr, pCreate);
mDebug("topic:%s, start to create", pCreate->name); SCMCreateTopicReq createTopicReq = {0};
tDeserializeSCMCreateTopicReq(msgStr, &createTopicReq);
mDebug("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql);
if (mndCheckCreateTopicMsg(pCreate) != 0) { if (mndCheckCreateTopicMsg(&createTopicReq) != 0) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
return -1; return -1;
} }
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pCreate->name); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, createTopicReq.name);
if (pTopic != NULL) { if (pTopic != NULL) {
sdbRelease(pMnode->pSdb, pTopic); sdbRelease(pMnode->pSdb, pTopic);
if (pCreate->igExists) { if (createTopicReq.igExists) {
mDebug("topic:%s, already exist, ignore exist is set", pCreate->name); mDebug("topic:%s, already exist, ignore exist is set", createTopicReq.name);
return 0; return 0;
} else { } else {
terrno = TSDB_CODE_MND_TOPIC_ALREADY_EXIST; terrno = TSDB_CODE_MND_TOPIC_ALREADY_EXIST;
mError("db:%s, failed to create since %s", pCreate->name, terrstr()); mError("db:%s, failed to create since %s", createTopicReq.name, terrstr());
return -1; return -1;
} }
} }
SDbObj *pDb = mndAcquireDbByTopic(pMnode, pCreate->name); SDbObj *pDb = mndAcquireDbByTopic(pMnode, createTopicReq.name);
if (pDb == NULL) { if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED; terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
return -1; return -1;
} }
int32_t code = mndCreateTopic(pMnode, pMsg, pCreate, pDb); int32_t code = mndCreateTopic(pMnode, pMsg, &createTopicReq, pDb);
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
if (code != 0) { if (code != 0) {
terrno = code; terrno = code;
mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
return -1; return -1;
} }
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pMsg, SMqTopicObj *pTopic) { return 0; } static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pMsg, SMqTopicObj *pTopic) { return 0; }
......
...@@ -713,13 +713,6 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe ...@@ -713,13 +713,6 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe
pCheckInfo->initBuf = true; pCheckInfo->initBuf = true;
int32_t order = pHandle->order; int32_t order = pHandle->order;
// no data in buffer, abort
// if (pHandle->pMemTable->snapshot.mem == NULL && pHandle->pMemTable->snapshot.imem == NULL) {
// return false;
// }
//
// assert(pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL);
//
STbData** pMem = NULL; STbData** pMem = NULL;
STbData** pIMem = NULL; STbData** pIMem = NULL;
...@@ -787,8 +780,7 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe ...@@ -787,8 +780,7 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe
assert(pCheckInfo->lastKey >= key); assert(pCheckInfo->lastKey >= key);
} }
} else { } else {
tsdbDebug("%p uid:%"PRId64", no data in imem, 0x%"PRIx64, pHandle, pCheckInfo->tableId, tsdbDebug("%p uid:%"PRId64", no data in imem, 0x%"PRIx64, pHandle, pCheckInfo->tableId, pHandle->qId);
pHandle->qId);
} }
return true; return true;
...@@ -2554,9 +2546,6 @@ static bool doHasDataInBuffer(STsdbReadHandle* pTsdbReadHandle) { ...@@ -2554,9 +2546,6 @@ static bool doHasDataInBuffer(STsdbReadHandle* pTsdbReadHandle) {
pTsdbReadHandle->activeIndex += 1; pTsdbReadHandle->activeIndex += 1;
} }
// no data in memtable or imemtable, decrease the memory reference.
// TODO !!
// tsdbMayUnTakeMemSnapshot(pTsdbReadHandle);
return false; return false;
} }
......
...@@ -68,6 +68,7 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -68,6 +68,7 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
pTbCfg = metaGetTbInfoByName(pVnode->pMeta, pReq->tableFname, &uid); pTbCfg = metaGetTbInfoByName(pVnode->pMeta, pReq->tableFname, &uid);
if (pTbCfg == NULL) { if (pTbCfg == NULL) {
code = TSDB_CODE_VND_TB_NOT_EXIST;
goto _exit; goto _exit;
} }
......
...@@ -285,16 +285,16 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter, ...@@ -285,16 +285,16 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter,
char dbFullName[TSDB_DB_FNAME_LEN]; char dbFullName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFullName); tNameGetFullDbName(pTableName, dbFullName);
ctgDebug("try to get table meta from vnode, db:%s, tbName:%s", dbFullName, pTableName->tname); ctgDebug("try to get table meta from vnode, db:%s, tbName:%s", dbFullName, tNameGetTableName(pTableName));
SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbName = dbFullName, .tableFullName = (char *)pTableName->tname}; SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbName = dbFullName, .tableFullName = (char *)tNameGetTableName(pTableName)};
char *msg = NULL; char *msg = NULL;
SEpSet *pVnodeEpSet = NULL; SEpSet *pVnodeEpSet = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)](&bInput, &msg, 0, &msgLen); int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)](&bInput, &msg, 0, &msgLen);
if (code) { if (code) {
ctgError("Build vnode tablemeta msg failed, code:%x, tbName:%s", code, pTableName->tname); ctgError("Build vnode tablemeta msg failed, code:%x, tbName:%s", code, tNameGetTableName(pTableName));
CTG_ERR_RET(code); CTG_ERR_RET(code);
} }
...@@ -313,21 +313,21 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter, ...@@ -313,21 +313,21 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter,
if (TSDB_CODE_SUCCESS != rpcRsp.code) { if (TSDB_CODE_SUCCESS != rpcRsp.code) {
if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) { if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) {
SET_META_TYPE_NONE(output->metaType); SET_META_TYPE_NONE(output->metaType);
ctgDebug("tablemeta not exist in vnode, tbName:%s", pTableName->tname); ctgDebug("tablemeta not exist in vnode, tbName:%s", tNameGetTableName(pTableName));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
ctgError("error rsp for table meta from vnode, code:%x, tbName:%s", rpcRsp.code, pTableName->tname); ctgError("error rsp for table meta from vnode, code:%x, tbName:%s", rpcRsp.code, tNameGetTableName(pTableName));
CTG_ERR_RET(rpcRsp.code); CTG_ERR_RET(rpcRsp.code);
} }
code = queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)](output, rpcRsp.pCont, rpcRsp.contLen); code = queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)](output, rpcRsp.pCont, rpcRsp.contLen);
if (code) { if (code) {
ctgError("Process vnode tablemeta rsp failed, code:%x, tbName:%s", code, pTableName->tname); ctgError("Process vnode tablemeta rsp failed, code:%x, tbName:%s", code, tNameGetTableName(pTableName));
CTG_ERR_RET(code); CTG_ERR_RET(code);
} }
ctgDebug("Got table meta from vnode, db:%s, tbName:%s", dbFullName, pTableName->tname); ctgDebug("Got table meta from vnode, db:%s, tbName:%s", dbFullName, tNameGetTableName(pTableName));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -776,7 +776,7 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con ...@@ -776,7 +776,7 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con
STableMetaOutput *output = &voutput; STableMetaOutput *output = &voutput;
if (CTG_IS_STABLE(isSTable)) { if (CTG_IS_STABLE(isSTable)) {
ctgDebug("will renew table meta, supposed to be stable, tbName:%s", pTableName->tname); ctgDebug("will renew table meta, supposed to be stable, tbName:%s", tNameGetTableName(pTableName));
// if get from mnode failed, will not try vnode // if get from mnode failed, will not try vnode
CTG_ERR_JRET(ctgGetTableMetaFromMnode(pCatalog, pTransporter, pMgmtEps, pTableName, &moutput)); CTG_ERR_JRET(ctgGetTableMetaFromMnode(pCatalog, pTransporter, pMgmtEps, pTableName, &moutput));
...@@ -787,13 +787,13 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con ...@@ -787,13 +787,13 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con
output = &moutput; output = &moutput;
} }
} else { } else {
ctgDebug("will renew table meta, not supposed to be stable, tbName:%s, isStable:%d", pTableName->tname, isSTable); ctgDebug("will renew table meta, not supposed to be stable, tbName:%s, isStable:%d", tNameGetTableName(pTableName), isSTable);
// if get from vnode failed or no table meta, will not try mnode // if get from vnode failed or no table meta, will not try mnode
CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput)); CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput));
if (CTG_IS_META_TABLE(voutput.metaType) && TSDB_SUPER_TABLE == voutput.tbMeta->tableType) { if (CTG_IS_META_TABLE(voutput.metaType) && TSDB_SUPER_TABLE == voutput.tbMeta->tableType) {
ctgDebug("will continue to renew table meta since got stable, tbName:%s, metaType:%d", pTableName->tname, voutput.metaType); ctgDebug("will continue to renew table meta since got stable, tbName:%s, metaType:%d", tNameGetTableName(pTableName), voutput.metaType);
CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.tbFname, &moutput)); CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.tbFname, &moutput));
...@@ -820,7 +820,7 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con ...@@ -820,7 +820,7 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con
} }
if (CTG_IS_META_NONE(output->metaType)) { if (CTG_IS_META_NONE(output->metaType)) {
ctgError("no tablemeta got, tbNmae:%s", pTableName->tname); ctgError("no tablemeta got, tbNmae:%s", tNameGetTableName(pTableName));
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST); CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
} }
...@@ -860,7 +860,7 @@ int32_t ctgGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMg ...@@ -860,7 +860,7 @@ int32_t ctgGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMg
CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist)); CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist));
if (0 == exist) { if (0 == exist) {
ctgError("renew tablemeta succeed but get from cache failed, may be deleted, tbName:%s", pTableName->tname); ctgError("renew tablemeta succeed but get from cache failed, may be deleted, tbName:%s", tNameGetTableName(pTableName));
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
...@@ -1241,7 +1241,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S ...@@ -1241,7 +1241,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
} else { } else {
int32_t vgId = tbMeta->vgId; int32_t vgId = tbMeta->vgId;
if (NULL == taosHashGetClone(dbVgroup->vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) { if (NULL == taosHashGetClone(dbVgroup->vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) {
ctgError("table's vgId not found in vgroup list, vgId:%d, tbName:%s", vgId, pTableName->tname); ctgError("table's vgId not found in vgroup list, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName));
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
...@@ -1252,7 +1252,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S ...@@ -1252,7 +1252,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
} }
if (NULL == taosArrayPush(vgList, &vgroupInfo)) { if (NULL == taosArrayPush(vgList, &vgroupInfo)) {
ctgError("taosArrayPush vgroupInfo to array failed, vgId:%d, tbName:%s", vgId, pTableName->tname); ctgError("taosArrayPush vgroupInfo to array failed, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName));
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
......
...@@ -542,34 +542,34 @@ typedef struct SOrderOperatorInfo { ...@@ -542,34 +542,34 @@ typedef struct SOrderOperatorInfo {
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream); void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream);
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime); SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableScanOperator(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableSeqScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult); SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult);
SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createMultiwaySortOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, SOperatorInfo* createMultiwaySortOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
int32_t numOfRows, void* merger); int32_t numOfRows, void* merger);
SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp); SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp);
SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger, bool multigroupResult); SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger, bool multigroupResult);
SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter); int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter);
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput); SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, int32_t numOfOutput);
SOperatorInfo* createOrderOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal); SOperatorInfo* createOrderOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal);
//SSDataBlock* doGlobalAggregate(void* param, bool* newgroup); //SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
//SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup); //SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
......
...@@ -2,7 +2,6 @@ ...@@ -2,7 +2,6 @@
#include "tbinoperator.h" #include "tbinoperator.h"
#include "tunaryoperator.h" #include "tunaryoperator.h"
static void assignBasicParaInfo(struct SScalarFuncParam* dst, const struct SScalarFuncParam* src) { static void assignBasicParaInfo(struct SScalarFuncParam* dst, const struct SScalarFuncParam* src) {
dst->type = src->type; dst->type = src->type;
dst->bytes = src->bytes; dst->bytes = src->bytes;
......
...@@ -170,8 +170,6 @@ typedef struct SCreateDbInfo { ...@@ -170,8 +170,6 @@ typedef struct SCreateDbInfo {
int8_t update; int8_t update;
int8_t cachelast; int8_t cachelast;
SArray *keep; SArray *keep;
// int8_t dbType;
// int16_t partitions;
} SCreateDbInfo; } SCreateDbInfo;
typedef struct SCreateFuncInfo { typedef struct SCreateFuncInfo {
......
...@@ -917,6 +917,8 @@ int32_t validateLimitNode(SQueryStmtInfo *pQueryInfo, SSqlNode* pSqlNode, SMsgBu ...@@ -917,6 +917,8 @@ int32_t validateLimitNode(SQueryStmtInfo *pQueryInfo, SSqlNode* pSqlNode, SMsgBu
return buildInvalidOperationMsg(pMsgBuf, msg1); return buildInvalidOperationMsg(pMsgBuf, msg1);
} }
} }
return TSDB_CODE_SUCCESS;
} }
int32_t validateOrderbyNode(SQueryStmtInfo *pQueryInfo, SSqlNode* pSqlNode, SMsgBuf* pMsgBuf) { int32_t validateOrderbyNode(SQueryStmtInfo *pQueryInfo, SSqlNode* pSqlNode, SMsgBuf* pMsgBuf) {
......
...@@ -80,11 +80,11 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) { ...@@ -80,11 +80,11 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qParseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) { int32_t qParseQuerySql(SParseContext* pCxt, SQueryNode** pQueryNode) {
if (isInsertSql(pCxt->pSql, pCxt->sqlLen)) { if (isInsertSql(pCxt->pSql, pCxt->sqlLen)) {
return parseInsertSql(pCxt, (SVnodeModifOpStmtInfo**)pQuery); return parseInsertSql(pCxt, (SVnodeModifOpStmtInfo**)pQueryNode);
} else { } else {
return parseQuerySql(pCxt, pQuery); return parseQuerySql(pCxt, pQueryNode);
} }
} }
......
...@@ -189,6 +189,7 @@ static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableI ...@@ -189,6 +189,7 @@ static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableI
return (SPhyNode*)node; return (SPhyNode*)node;
} }
static bool isSystemTable(SQueryTableInfo* pTable) { static bool isSystemTable(SQueryTableInfo* pTable) {
// todo // todo
return false; return false;
...@@ -261,8 +262,8 @@ static void vgroupMsgToEpSet(const SVgroupMsg* vg, SQueryNodeAddr* execNode) { ...@@ -261,8 +262,8 @@ static void vgroupMsgToEpSet(const SVgroupMsg* vg, SQueryNodeAddr* execNode) {
} }
static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) { static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
SVgroupsInfo* vgroupList = pTable->pMeta->vgroupList; SVgroupsInfo* pVgroupList = pTable->pMeta->vgroupList;
for (int32_t i = 0; i < pTable->pMeta->vgroupList->numOfVgroups; ++i) { for (int32_t i = 0; i < pVgroupList->numOfVgroups; ++i) {
STORE_CURRENT_SUBPLAN(pCxt); STORE_CURRENT_SUBPLAN(pCxt);
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN); SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
subplan->msgType = TDMT_VND_QUERY; subplan->msgType = TDMT_VND_QUERY;
......
...@@ -559,28 +559,35 @@ static const char* jkScanNodeTableId = "TableId"; ...@@ -559,28 +559,35 @@ static const char* jkScanNodeTableId = "TableId";
static const char* jkScanNodeTableType = "TableType"; static const char* jkScanNodeTableType = "TableType";
static const char* jkScanNodeTableOrder = "Order"; static const char* jkScanNodeTableOrder = "Order";
static const char* jkScanNodeTableCount = "Count"; static const char* jkScanNodeTableCount = "Count";
static const char* jkScanNodeTableRevCount = "Reverse";
static bool scanNodeToJson(const void* obj, cJSON* json) { static bool scanNodeToJson(const void* obj, cJSON* json) {
const SScanPhyNode* scan = (const SScanPhyNode*)obj; const SScanPhyNode* pNode = (const SScanPhyNode*)obj;
bool res = cJSON_AddNumberToObject(json, jkScanNodeTableId, scan->uid); bool res = cJSON_AddNumberToObject(json, jkScanNodeTableId, pNode->uid);
if (res) {
res = cJSON_AddNumberToObject(json, jkScanNodeTableType, pNode->tableType);
}
if (res) { if (res) {
res = cJSON_AddNumberToObject(json, jkScanNodeTableType, scan->tableType); res = cJSON_AddNumberToObject(json, jkScanNodeTableOrder, pNode->order);
} }
if (res) { if (res) {
res = cJSON_AddNumberToObject(json, jkScanNodeTableOrder, scan->order); res = cJSON_AddNumberToObject(json, jkScanNodeTableCount, pNode->count);
} }
if (res) { if (res) {
res = cJSON_AddNumberToObject(json, jkScanNodeTableCount, scan->count); res = cJSON_AddNumberToObject(json, jkScanNodeTableRevCount, pNode->reverse);
} }
return res; return res;
} }
static bool scanNodeFromJson(const cJSON* json, void* obj) { static bool scanNodeFromJson(const cJSON* json, void* obj) {
SScanPhyNode* scan = (SScanPhyNode*)obj; SScanPhyNode* pNode = (SScanPhyNode*)obj;
scan->uid = getNumber(json, jkScanNodeTableId); pNode->uid = getNumber(json, jkScanNodeTableId);
scan->tableType = getNumber(json, jkScanNodeTableType); pNode->tableType = getNumber(json, jkScanNodeTableType);
scan->count = getNumber(json, jkScanNodeTableCount); pNode->count = getNumber(json, jkScanNodeTableCount);
scan->order = getNumber(json, jkScanNodeTableOrder); pNode->order = getNumber(json, jkScanNodeTableOrder);
pNode->reverse = getNumber(json, jkScanNodeTableRevCount);
return true; return true;
} }
...@@ -996,10 +1003,13 @@ static SSubplan* subplanFromJson(const cJSON* json) { ...@@ -996,10 +1003,13 @@ static SSubplan* subplanFromJson(const cJSON* json) {
if (NULL == subplan) { if (NULL == subplan) {
return NULL; return NULL;
} }
bool res = fromObject(json, jkSubplanId, subplanIdFromJson, &subplan->id, true); bool res = fromObject(json, jkSubplanId, subplanIdFromJson, &subplan->id, true);
if (res) { if (res) {
res = fromPnode(json, jkSubplanNode, phyNodeFromJson, (void**)&subplan->pNode); res = fromPnode(json, jkSubplanNode, phyNodeFromJson, (void**)&subplan->pNode);
} }
if (res) { if (res) {
res = fromObjectWithAlloc(json, jkSubplanDataSink, dataSinkFromJson, (void**)&subplan->pDataSink, sizeof(SDataSink), false); res = fromObjectWithAlloc(json, jkSubplanDataSink, dataSinkFromJson, (void**)&subplan->pDataSink, sizeof(SDataSink), false);
} }
...@@ -1027,7 +1037,7 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) { ...@@ -1027,7 +1037,7 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) {
} }
*str = cJSON_Print(json); *str = cJSON_Print(json);
// printf("====Physical plan:====\n") // printf("====Physical plan:====\n");
// printf("%s\n", *str); // printf("%s\n", *str);
*len = strlen(*str) + 1; *len = strlen(*str) + 1;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1047,14 +1057,18 @@ cJSON* qDagToJson(const SQueryDag* pDag) { ...@@ -1047,14 +1057,18 @@ cJSON* qDagToJson(const SQueryDag* pDag) {
if(pRoot == NULL) { if(pRoot == NULL) {
return NULL; return NULL;
} }
cJSON_AddNumberToObject(pRoot, "numOfSubplans", pDag->numOfSubplans);
cJSON_AddNumberToObject(pRoot, "queryId", pDag->queryId); cJSON_AddNumberToObject(pRoot, "Number", pDag->numOfSubplans);
cJSON_AddNumberToObject(pRoot, "QueryId", pDag->queryId);
cJSON *pLevels = cJSON_CreateArray(); cJSON *pLevels = cJSON_CreateArray();
if(pLevels == NULL) { if(pLevels == NULL) {
cJSON_Delete(pRoot); cJSON_Delete(pRoot);
return NULL; return NULL;
} }
cJSON_AddItemToObject(pRoot, "pSubplans", pLevels);
cJSON_AddItemToObject(pRoot, "Subplans", pLevels);
size_t level = taosArrayGetSize(pDag->pSubplans); size_t level = taosArrayGetSize(pDag->pSubplans);
for(size_t i = 0; i < level; i++) { for(size_t i = 0; i < level; i++) {
const SArray* pSubplans = (const SArray*)taosArrayGetP(pDag->pSubplans, i); const SArray* pSubplans = (const SArray*)taosArrayGetP(pDag->pSubplans, i);
...@@ -1064,6 +1078,7 @@ cJSON* qDagToJson(const SQueryDag* pDag) { ...@@ -1064,6 +1078,7 @@ cJSON* qDagToJson(const SQueryDag* pDag) {
cJSON_Delete(pRoot); cJSON_Delete(pRoot);
return NULL; return NULL;
} }
cJSON_AddItemToArray(pLevels, plansOneLevel); cJSON_AddItemToArray(pLevels, plansOneLevel);
for(size_t j = 0; j < num; j++) { for(size_t j = 0; j < num; j++) {
cJSON* pSubplan = subplanToJson((const SSubplan*)taosArrayGetP(pSubplans, j)); cJSON* pSubplan = subplanToJson((const SSubplan*)taosArrayGetP(pSubplans, j));
...@@ -1071,6 +1086,7 @@ cJSON* qDagToJson(const SQueryDag* pDag) { ...@@ -1071,6 +1086,7 @@ cJSON* qDagToJson(const SQueryDag* pDag) {
cJSON_Delete(pRoot); cJSON_Delete(pRoot);
return NULL; return NULL;
} }
cJSON_AddItemToArray(plansOneLevel, pSubplan); cJSON_AddItemToArray(plansOneLevel, pSubplan);
} }
} }
......
...@@ -1029,6 +1029,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, ...@@ -1029,6 +1029,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
msg = pTask->msg; msg = pTask->msg;
break; break;
} }
case TDMT_VND_QUERY: { case TDMT_VND_QUERY: {
msgSize = sizeof(SSubQueryMsg) + pTask->msgLen; msgSize = sizeof(SSubQueryMsg) + pTask->msgLen;
msg = calloc(1, msgSize); msg = calloc(1, msgSize);
...@@ -1047,7 +1048,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, ...@@ -1047,7 +1048,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
pMsg->contentLen = htonl(pTask->msgLen); pMsg->contentLen = htonl(pTask->msgLen);
memcpy(pMsg->msg, pTask->msg, pTask->msgLen); memcpy(pMsg->msg, pTask->msg, pTask->msgLen);
break; break;
} }
case TDMT_VND_RES_READY: { case TDMT_VND_RES_READY: {
msgSize = sizeof(SResReadyReq); msgSize = sizeof(SResReadyReq);
msg = calloc(1, msgSize); msg = calloc(1, msgSize);
......
...@@ -413,8 +413,11 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin ...@@ -413,8 +413,11 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
pFdObj->thandle = thandle; pFdObj->thandle = thandle;
pFdObj->port = port; pFdObj->port = port;
pFdObj->ip = ip; pFdObj->ip = ip;
tDebug("%s %p TCP connection to 0x%x:%hu is created, localPort:%hu FD:%p numOfFds:%d", pThreadObj->label, thandle,
ip, port, localPort, pFdObj, pThreadObj->numOfFds); char ipport[40] = {0};
taosIpPort2String(ip, port, ipport);
tDebug("%s %p TCP connection to %s is created, localPort:%hu FD:%p numOfFds:%d", pThreadObj->label, thandle,
ipport, localPort, pFdObj, pThreadObj->numOfFds);
} else { } else {
tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno)); tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno));
taosCloseSocket(fd); taosCloseSocket(fd);
......
...@@ -301,12 +301,13 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_CFG_FILE, "Invalid config file") ...@@ -301,12 +301,13 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_CFG_FILE, "Invalid config file")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TERM_FILE, "Invalid term file") TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TERM_FILE, "Invalid term file")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FLOWCTRL, "Database memory is full") TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FLOWCTRL, "Database memory is full")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_DROPPING, "Database is dropping") TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_DROPPING, "Database is dropping")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_UPDATING, "Database is updating") TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_UPDATING, "Database is updating")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_CLOSING, "Database is closing") TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_CLOSING, "Database is closing")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, "Database suspended") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, "Database suspended")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, "Database write operation denied") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, "Database write operation denied")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_SYNCING, "Database is syncing") TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_SYNCING, "Database is syncing")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TSDB_STATE, "Invalid tsdb state") TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TSDB_STATE, "Invalid tsdb state")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_TB_NOT_EXIST, "Table not exists")
// tsdb // tsdb
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册