diff --git a/include/client/taos.h b/include/client/taos.h index db33dbf8a201b71980730ff613d523bfe70749be..84f625571095e519b3268d42324719461d5ac650 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -193,7 +193,7 @@ DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr); DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList); DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision); -DLL_EXPORT TAOS_RES* tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen); +DLL_EXPORT TAOS_RES *taos_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen); #ifdef __cplusplus } diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 9cf30683144dead4baaf35f627c87547a84c0a08..8527b29a2dae7fb51128c3587094050b6a2bafab 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1093,6 +1093,7 @@ typedef struct { typedef struct { int8_t igExists; char* name; + char* sql; char* physicalPlan; char* logicalPlan; } SCMCreateTopicReq; @@ -1101,6 +1102,7 @@ static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateT int tlen = 0; tlen += taosEncodeFixedI8(buf, pReq->igExists); tlen += taosEncodeString(buf, pReq->name); + tlen += taosEncodeString(buf, pReq->sql); tlen += taosEncodeString(buf, pReq->physicalPlan); tlen += taosEncodeString(buf, pReq->logicalPlan); return tlen; @@ -1109,6 +1111,7 @@ static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateT static FORCE_INLINE void* tDeserializeSCMCreateTopicReq(void* buf, SCMCreateTopicReq* pReq) { buf = taosDecodeFixedI8(buf, &(pReq->igExists)); buf = taosDecodeString(buf, &(pReq->name)); + buf = taosDecodeString(buf, &(pReq->sql)); buf = taosDecodeString(buf, &(pReq->physicalPlan)); buf = taosDecodeString(buf, &(pReq->logicalPlan)); return buf; @@ -1231,7 +1234,7 @@ typedef struct { } SMVSubscribeRsp; typedef struct { - char name[TSDB_TOPIC_FNAME_LEN]; + char name[TSDB_TOPIC_NAME_LEN]; int8_t igExists; int32_t execLen; void* executor; diff --git a/include/common/tname.h b/include/common/tname.h index 11d97dac06d7240664f48626cd9d58ad6c555afa..12a0d34cb4fef92d0f65de535808f1150aa2c33d 100644 --- a/include/common/tname.h +++ b/include/common/tname.h @@ -25,14 +25,12 @@ #define T_NAME_ACCT 0x1u #define T_NAME_DB 0x2u #define T_NAME_TABLE 0x4u -#define T_NAME_TOPIC 0x8u typedef struct SName { uint8_t type; //db_name_t, table_name_t int32_t acctId; char dbname[TSDB_DB_NAME_LEN]; char tname[TSDB_TABLE_NAME_LEN]; - char topicName[TSDB_TOPIC_NAME_LEN]; } SName; int32_t tNameExtractFullName(const SName* name, char* dst); diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 0cdbfdb07f6560b367385772685b925e5a4708e6..418c43fab9728fb28286b8003450f19938eb5468 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -44,7 +44,7 @@ typedef struct SParseContext { * @param msg extended error message if exists. * @return error code */ -int32_t qParseQuerySql(SParseContext* pContext, SQueryNode** pQuery); +int32_t qParseQuerySql(SParseContext* pContext, SQueryNode** pQueryNode); /** * Return true if it is a ddl/dcl sql statement diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 4f1db37f794ac5ba192aae3c0e399558b73c74ce..f069b6828692d0199c9a5c1cd77547d8bfccfdbe 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -95,6 +95,7 @@ typedef struct SScanPhyNode { int8_t tableType; int32_t order; // scan order: TSDB_ORDER_ASC|TSDB_ORDER_DESC int32_t count; // repeat count + int32_t reverse; // reverse scan count } SScanPhyNode; typedef SScanPhyNode SSystemTableScanPhyNode; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 854d16f67d57db105d866a7b2adb0585fde850f2..423477b15fed3498b9ceb0115b08a5a449318471 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -307,6 +307,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_VND_NO_WRITE_AUTH TAOS_DEF_ERROR_CODE(0, 0x0512) //"Database write operation denied") #define TSDB_CODE_VND_IS_SYNCING TAOS_DEF_ERROR_CODE(0, 0x0513) //"Database is syncing") #define TSDB_CODE_VND_INVALID_TSDB_STATE TAOS_DEF_ERROR_CODE(0, 0x0514) //"Invalid tsdb state") +#define TSDB_CODE_VND_TB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0515) // "Table not exists") // tsdb #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) //"Invalid table ID") diff --git a/include/util/tcoding.h b/include/util/tcoding.h index 226856901f2d20ae6ce50f81989aba032ca38d6c..819878704833db6db9994e36f04fc3741b2461bb 100644 --- a/include/util/tcoding.h +++ b/include/util/tcoding.h @@ -351,6 +351,7 @@ static FORCE_INLINE void *taosDecodeString(void *buf, char **value) { buf = taosDecodeVariantU64(buf, &size); *value = (char *)malloc((size_t)size + 1); + if (*value == NULL) return NULL; memcpy(*value, buf, (size_t)size); diff --git a/include/util/tdef.h b/include/util/tdef.h index 9f16b58e0db30e8357da720f5254605f0d5bec08..428de5d17138a9f869b4ab0b3fc987759c256bba 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -181,7 +181,7 @@ do { \ #define TSDB_COL_NAME_LEN 65 #define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64 #define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE -#define TSDB_MAX_SQL_SHOW_LEN 512 +#define TSDB_MAX_SQL_SHOW_LEN 1024 #define TSDB_MAX_ALLOWED_SQL_LEN (1*1024*1024u) // sql length should be less than 1mb #define TSDB_APP_NAME_LEN TSDB_UNI_LEN diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 3d7fc37a6e0b7fb1ed15cb66daabc31b2f57f677..b6f38b12cac7feeaf0da1b2baba90e20f1414c4b 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -118,9 +118,8 @@ typedef struct STscObj { uint32_t connId; int32_t connType; uint64_t id; // ref ID returned by taosAddRef - void *pTransporter; pthread_mutex_t mutex; // used to protect the operation on db - int32_t numOfReqs; // number of sqlObj from this tscObj + int32_t numOfReqs; // number of sqlObj bound to this connection SAppInstInfo *pAppInfo; } STscObj; @@ -149,12 +148,13 @@ typedef struct SShowReqInfo { } SShowReqInfo; typedef struct SRequestSendRecvBody { - tsem_t rspSem; // not used now - void* fp; - SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed. - struct SSchJob *pQueryJob; // query job, created according to sql query DAG. - SDataBuf requestMsg; - SReqResultInfo resInfo; + tsem_t rspSem; // not used now + void* fp; + SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed. + SDataBuf requestMsg; + struct SSchJob *pQueryJob; // query job, created according to sql query DAG. + struct SQueryDag *pDag; // the query dag, generated according to the sql statement. + SReqResultInfo resInfo; } SRequestSendRecvBody; #define ERROR_MSG_BUF_DEFAULT_SIZE 512 diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 0e3afb60c0c883dcf171e7dd92f2b07dcf1ef5f8..f747ccf3b636e27bc320b5d9e818b2082cca88fd 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -89,13 +89,12 @@ static void tscInitLogFile() { // todo close the transporter properly void closeTransporter(STscObj* pTscObj) { - if (pTscObj == NULL || pTscObj->pTransporter == NULL) { + if (pTscObj == NULL || pTscObj->pAppInfo->pTransporter == NULL) { return; } - tscDebug("free transporter:%p in connObj: 0x%"PRIx64, pTscObj->pTransporter, pTscObj->id); - rpcClose(pTscObj->pTransporter); - pTscObj->pTransporter = NULL; + tscDebug("free transporter:%p in connObj: 0x%"PRIx64, pTscObj->pAppInfo->pTransporter, pTscObj->id); + rpcClose(pTscObj->pAppInfo->pTransporter); } // TODO refactor @@ -140,10 +139,6 @@ void* createTscObj(const char* user, const char* auth, const char *db, SAppInstI } pObj->pAppInfo = pAppInfo; - if (pAppInfo != NULL) { - pObj->pTransporter = pAppInfo->pTransporter; - } - tstrncpy(pObj->user, user, sizeof(pObj->user)); memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN); @@ -199,6 +194,7 @@ static void doDestroyRequest(void* p) { tfree(pRequest->pInfo); doFreeReqResultInfo(&pRequest->body.resInfo); + qDestroyQueryDag(pRequest->body.pDag); deregisterRequest(pRequest); tfree(pRequest); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 1a25cdc20c28258af8b1de3003abaad71a8f8f54..87cb6766e377f195ee5c8aa53e6446e790acb566 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -13,12 +13,12 @@ #include "tpagedfile.h" #include "tref.h" -#define CHECK_CODE_GOTO(expr, lable) \ +#define CHECK_CODE_GOTO(expr, label) \ do { \ int32_t code = expr; \ if (TSDB_CODE_SUCCESS != code) { \ terrno = code; \ - goto lable; \ + goto label; \ } \ } while (0) @@ -153,13 +153,13 @@ int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) { SParseContext cxt = { .requestId = pRequest->requestId, - .acctId = pTscObj->acctId, - .db = getConnectionDB(pTscObj), - .pTransporter = pTscObj->pTransporter, - .pSql = pRequest->sqlstr, - .sqlLen = pRequest->sqlLen, - .pMsg = pRequest->msgBuf, - .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE + .acctId = pTscObj->acctId, + .db = getConnectionDB(pTscObj), + .pSql = pRequest->sqlstr, + .sqlLen = pRequest->sqlLen, + .pMsg = pRequest->msgBuf, + .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE, + .pTransporter = pTscObj->pAppInfo->pTransporter, }; cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); @@ -192,10 +192,10 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) { pShowReqInfo->pArray = pDcl->pExtension; } } - asyncSendMsgToServer(pTscObj->pTransporter, &pDcl->epSet, &transporterId, pSendMsg); + asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pDcl->epSet, &transporterId, pSendMsg); } else { SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet; - asyncSendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, pSendMsg); + asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, pEpSet, &transporterId, pSendMsg); } tsem_wait(&pRequest->body.rspSem); @@ -242,7 +242,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) { if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) { SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; - int32_t code = scheduleExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res); + int32_t code = scheduleExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res); if (code != TSDB_CODE_SUCCESS) { // handle error and retry } else { @@ -256,7 +256,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) { return pRequest->code; } - return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, &pRequest->body.pQueryJob); + return scheduleAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob); } typedef struct tmq_t tmq_t; @@ -362,28 +362,55 @@ TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sq char *dagStr = NULL; terrno = TSDB_CODE_SUCCESS; + if (taos == NULL || topicName == NULL || sql == NULL) { + tscError("invalid parameters for creating topic, connObj:%p, topic name:%s, sql:%s", taos, topicName, sql); + terrno = TSDB_CODE_TSC_INVALID_INPUT; + goto _return; + } + + if (strlen(topicName) >= TSDB_TOPIC_NAME_LEN) { + tscError("topic name too long, max length:%d", TSDB_TOPIC_NAME_LEN - 1); + terrno = TSDB_CODE_TSC_INVALID_INPUT; + goto _return; + } + + if (sqlLen > tsMaxSQLStringLen) { + tscError("sql string exceeds max length:%d", tsMaxSQLStringLen); + terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT; + goto _return; + } + + tscDebug("start to create topic, %s", topicName); CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); + CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return); -//temporary disabled until planner ready -#if 0 - CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return); - //TODO: check sql valid + // todo check for invalid sql statement and return with error code - CHECK_CODE_GOTO(qCreateQueryDag(pQuery, &pDag), _return); + CHECK_CODE_GOTO(qCreateQueryDag(pQueryNode, &pRequest->body.pDag, pRequest->requestId), _return); - dagStr = qDagToString(pDag); - if(dagStr == NULL) { - //TODO + pStr = qDagToString(pRequest->body.pDag); + if(pStr == NULL) { + goto _return; } -#endif + + // The topic should be related to a database that the queried table is belonged to. + SName name = {0}; + char dbName[TSDB_DB_FNAME_LEN] = {0}; + tNameGetFullDbName(&((SQueryStmtInfo*) pQueryNode)->pTableMetaInfo[0]->name, dbName); + + tNameFromString(&name, dbName, T_NAME_ACCT|T_NAME_DB); + tNameFromString(&name, topicName, T_NAME_TABLE); + + char topicFname[TSDB_TOPIC_FNAME_LEN] = {0}; + tNameExtractFullName(&name, topicFname); SCMCreateTopicReq req = { - .name = (char*)name, - .igExists = 0, - /*.physicalPlan = dagStr,*/ - .physicalPlan = (char*)sql, - .logicalPlan = "", + .name = (char*) topicFname, + .igExists = 0, + .physicalPlan = (char*) pStr, + .sql = (char*) sql, + .logicalPlan = "no logic plan", }; int tlen = tSerializeSCMCreateTopicReq(NULL, &req); @@ -391,27 +418,32 @@ TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sq if(buf == NULL) { goto _return; } + void* abuf = buf; tSerializeSCMCreateTopicReq(&abuf, &req); /*printf("formatted: %s\n", dagStr);*/ pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen }; + pRequest->type = TDMT_MND_CREATE_TOPIC; SMsgSendInfo* body = buildMsgInfoImpl(pRequest); - SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet; + SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); int64_t transporterId = 0; - asyncSendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, body); + asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body); tsem_wait(&pRequest->body.rspSem); _return: - qDestroyQuery(pQuery); - qDestroyQueryDag(pDag); - destroySendMsgInfo(body); + qDestroyQuery(pQueryNode); + if (body != NULL) { + destroySendMsgInfo(body); + } + if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) { pRequest->code = terrno; } + return pRequest; } @@ -445,24 +477,22 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { nPrintTsc("%s", sql) SRequestObj *pRequest = NULL; - SQueryNode *pQuery = NULL; - SQueryDag *pDag = NULL; + SQueryNode *pQueryNode = NULL; terrno = TSDB_CODE_SUCCESS; CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); - CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return); + CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return); - if (qIsDdlQuery(pQuery)) { - CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return); + if (qIsDdlQuery(pQueryNode)) { + CHECK_CODE_GOTO(execDdlQuery(pRequest, pQueryNode), _return); } else { - CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pDag), _return); - CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag), _return); + CHECK_CODE_GOTO(getPlan(pRequest, pQueryNode, &pRequest->body.pDag), _return); + CHECK_CODE_GOTO(scheduleQuery(pRequest, pRequest->body.pDag), _return); pRequest->code = terrno; } _return: - qDestroyQuery(pQuery); - qDestroyQueryDag(pDag); + qDestroyQuery(pQueryNode); if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) { pRequest->code = terrno; } @@ -523,7 +553,7 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con SMsgSendInfo* body = buildConnectMsg(pRequest); int64_t transporterId = 0; - asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); + asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); tsem_wait(&pRequest->body.rspSem); if (pRequest->code != TSDB_CODE_SUCCESS) { @@ -534,7 +564,7 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con taos_close(pTscObj); pTscObj = NULL; } else { - tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p, reqId:0x%"PRIx64, pTscObj->id, pTscObj->connId, pTscObj->pTransporter, pRequest->requestId); + tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p, reqId:0x%"PRIx64, pTscObj->id, pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId); destroyRequest(pRequest); } @@ -702,7 +732,7 @@ void* doFetchRow(SRequestObj* pRequest) { int64_t transporterId = 0; STscObj *pTscObj = pRequest->pTscObj; - asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); + asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); tsem_wait(&pRequest->body.rspSem); pRequest->type = TDMT_VND_SHOW_TABLES_FETCH; @@ -712,7 +742,7 @@ void* doFetchRow(SRequestObj* pRequest) { int64_t transporterId = 0; STscObj *pTscObj = pRequest->pTscObj; - asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); + asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); tsem_wait(&pRequest->body.rspSem); diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index d5b149284e7f197c28e8c21b64edf3ae2200f8a3..83a22ef6eb91c399060447c010550747afb5e456 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -149,27 +149,36 @@ TEST(testCase, connect_Test) { //} // //TEST(testCase, create_db_Test) { - //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - //assert(pConn != NULL); - - //TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2"); - //if (taos_errno(pRes) != 0) { - //printf("error in create db, reason:%s\n", taos_errstr(pRes)); - //} - - //TAOS_FIELD* pFields = taos_fetch_fields(pRes); - //ASSERT_TRUE(pFields == NULL); - - //int32_t numOfFields = taos_num_fields(pRes); - //ASSERT_EQ(numOfFields, 0); - - //taos_free_result(pRes); - - //pRes = taos_query(pConn, "create database abc1 vgroups 4"); - //if (taos_errno(pRes) != 0) { - //printf("error in create db, reason:%s\n", taos_errstr(pRes)); - //} - //taos_close(pConn); +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// TAOS_RES* pRes = taos_query(pConn, "drop database abc1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to drop database, reason:%s\n", taos_errstr(pRes)); +// } +// +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "create database abc1 vgroups 2"); +// if (taos_errno(pRes) != 0) { +// printf("error in create db, reason:%s\n", taos_errstr(pRes)); +// } +// +// TAOS_FIELD* pFields = taos_fetch_fields(pRes); +// ASSERT_TRUE(pFields == NULL); +// +// int32_t numOfFields = taos_num_fields(pRes); +// ASSERT_EQ(numOfFields, 0); +// +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "create database if not exists abc1 vgroups 4"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create database abc1, reason:%s\n", taos_errstr(pRes)); +// } +// +// taos_free_result(pRes); +// taos_close(pConn); //} // //TEST(testCase, create_dnode_Test) { @@ -195,7 +204,7 @@ TEST(testCase, connect_Test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // assert(pConn != NULL); // -// TAOS_RES* pRes = taos_query(pConn, "drop dnode 2"); +// TAOS_RES* pRes = taos_query(pConn, "drop dnode 3"); // if (taos_errno(pRes) != 0) { // printf("error in drop dnode, reason:%s\n", taos_errstr(pRes)); // } @@ -206,6 +215,11 @@ TEST(testCase, connect_Test) { // int32_t numOfFields = taos_num_fields(pRes); // ASSERT_EQ(numOfFields, 0); // +// pRes = taos_query(pConn, "drop dnode 4"); +// if (taos_errno(pRes) != 0) { +// printf("error in drop dnode, reason:%s\n", taos_errstr(pRes)); +// } +// // taos_free_result(pRes); // taos_close(pConn); //} @@ -228,33 +242,33 @@ TEST(testCase, connect_Test) { // taos_close(pConn); //} // -//// TEST(testCase, drop_db_test) { -//// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -//// assert(pConn != NULL); -//// -//// showDB(pConn); -//// -//// TAOS_RES* pRes = taos_query(pConn, "drop database abc1"); -//// if (taos_errno(pRes) != 0) { -//// printf("failed to drop db, reason:%s\n", taos_errstr(pRes)); -//// } -//// taos_free_result(pRes); -//// -//// showDB(pConn); -//// -//// pRes = taos_query(pConn, "create database abc1"); -//// if (taos_errno(pRes) != 0) { -//// printf("create to drop db, reason:%s\n", taos_errstr(pRes)); -//// } -//// taos_free_result(pRes); -//// taos_close(pConn); -////} +// TEST(testCase, drop_db_test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// showDB(pConn); +// +// TAOS_RES* pRes = taos_query(pConn, "drop database abc1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to drop db, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); +// +// showDB(pConn); +// +// pRes = taos_query(pConn, "create database abc1"); +// if (taos_errno(pRes) != 0) { +// printf("create to drop db, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); +// taos_close(pConn); +//} // //TEST(testCase, create_stable_Test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // assert(pConn != NULL); // -// TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2"); +// TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2"); // if (taos_errno(pRes) != 0) { // printf("error in create db, reason:%s\n", taos_errstr(pRes)); // } @@ -280,7 +294,7 @@ TEST(testCase, connect_Test) { // taos_free_result(pRes); // taos_close(pConn); //} - +// //TEST(testCase, create_table_Test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // assert(pConn != NULL); @@ -293,26 +307,26 @@ TEST(testCase, connect_Test) { // // taos_close(pConn); //} - +// //TEST(testCase, create_ctable_Test) { - //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - //assert(pConn != NULL); - - //TAOS_RES* pRes = taos_query(pConn, "use abc1"); - //if (taos_errno(pRes) != 0) { - //printf("failed to use db, reason:%s\n", taos_errstr(pRes)); - //} - //taos_free_result(pRes); - - //pRes = taos_query(pConn, "create table tm0 using st1 tags(1)"); - //if (taos_errno(pRes) != 0) { - //printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes)); - //} - - //taos_free_result(pRes); - //taos_close(pConn); +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// TAOS_RES* pRes = taos_query(pConn, "use abc1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to use db, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "create table tm0 using st1 tags(1)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes)); +// } +// +// taos_free_result(pRes); +// taos_close(pConn); //} - +// //TEST(testCase, show_stable_Test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // assert(pConn != NULL); @@ -500,60 +514,101 @@ TEST(testCase, connect_Test) { // // taosHashCleanup(phash); //} +// +TEST(testCase, create_topic_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); -// TEST(testCase, create_topic_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + ASSERT_TRUE(pFields == nullptr); + + int32_t numOfFields = taos_num_fields(pRes); + ASSERT_EQ(numOfFields, 0); + + taos_free_result(pRes); + + char* sql = "select * from tu"; + pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql)); + taos_free_result(pRes); + taos_close(pConn); +} // -// TAOS_RES* pRes = taos_query(pConn, "create database abc1"); -// if (taos_errno(pRes) != 0) { -// printf("error in create db, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); +//TEST(testCase, insert_test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// ASSERT_NE(pConn, nullptr); // -// pRes = taos_query(pConn, "use abc1"); -// if (taos_errno(pRes) != 0) { -// printf("error in use db, reason:%s\n", taos_errstr(pRes)); -// } +// TAOS_RES* pRes = taos_query(pConn, "use abc1"); // taos_free_result(pRes); // -// pRes = taos_query(pConn, "create stable st1(ts timestamp, k int) tags(a int)"); +// pRes = taos_query(pConn, "insert into t_2 values(now, 1)"); // if (taos_errno(pRes) != 0) { -// printf("error in create stable, reason:%s\n", taos_errstr(pRes)); +// printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); +// taos_free_result(pRes); +// ASSERT_TRUE(false); // } // -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// ASSERT_TRUE(pFields == NULL); -// -// int32_t numOfFields = taos_num_fields(pRes); -// ASSERT_EQ(numOfFields, 0); -// // taos_free_result(pRes); -// -// char* sql = "select * from st1"; -// tmq_create_topic(pConn, "test_topic_1", sql, strlen(sql)); // taos_close(pConn); //} -//TEST(testCase, insert_test) { +//TEST(testCase, projection_query_tables) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// ASSERT_EQ(pConn, nullptr); +// ASSERT_NE(pConn, nullptr); // // TAOS_RES* pRes = taos_query(pConn, "use abc1"); // taos_free_result(pRes); // -// pRes = taos_query(pConn, "insert into t_2 values(now, 1)"); +//// pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)"); +//// if (taos_errno(pRes) != 0) { +//// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); +//// } +//// taos_free_result(pRes); +//// +//// pRes = taos_query(pConn, "create table tu using st1 tags(1)"); +//// if (taos_errno(pRes) != 0) { +//// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); +//// } +//// taos_free_result(pRes); +//// +//// for(int32_t i = 0; i < 100; ++i) { +//// char sql[512] = {0}; +//// sprintf(sql, "insert into tu values(now+%da, %d)", i, i); +//// TAOS_RES* p = taos_query(pConn, sql); +//// if (taos_errno(p) != 0) { +//// printf("failed to insert data, reason:%s\n", taos_errstr(p)); +//// } +//// +//// taos_free_result(p); +//// } +// +// pRes = taos_query(pConn, "select * from tu"); // if (taos_errno(pRes) != 0) { -// printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); +// printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); // taos_free_result(pRes); // ASSERT_TRUE(false); // } // +// TAOS_ROW pRow = NULL; +// TAOS_FIELD* pFields = taos_fetch_fields(pRes); +// int32_t numOfFields = taos_num_fields(pRes); +// +// char str[512] = {0}; +// while ((pRow = taos_fetch_row(pRes)) != NULL) { +// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); +// printf("%s\n", str); +// } +// // taos_free_result(pRes); // taos_close(pConn); //} -//TEST(testCase, projection_query_tables) { +//TEST(testCase, projection_query_stables) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // ASSERT_NE(pConn, nullptr); // @@ -577,7 +632,7 @@ TEST(testCase, connect_Test) { //// pRes = taos_query(pConn, "insert into tu values(now, 1)"); //// taos_free_result(pRes); // -// pRes = taos_query(pConn, "select * from tu"); +// pRes = taos_query(pConn, "select ts,k from m1"); // if (taos_errno(pRes) != 0) { // printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); // taos_free_result(pRes); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index a55e0dd2b252c328221d7b1e6d644eadf8e5db68..0c2524f48c375b3432d1f7931fefe0e87b302ef0 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -350,7 +350,7 @@ typedef struct SMqTopicObj { // TODO: add cache and change name to id typedef struct SMqConsumerTopic { - char name[TSDB_TOPIC_FNAME_LEN]; + char name[TSDB_TOPIC_NAME_LEN]; SList *vgroups; // SList } SMqConsumerTopic; @@ -409,7 +409,7 @@ typedef struct SMqVGroupHbObj { #if 0 typedef struct SCGroupObj { - char name[TSDB_TOPIC_FNAME_LEN]; + char name[TSDB_TOPIC_NAME_LEN]; int64_t createTime; int64_t updateTime; uint64_t uid; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index acdc718f20c3176313bbed172daffc55f7892bba..67aeae0d4c3f35b1637b1d1ec51f2809e28bc4dc 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -118,11 +118,13 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER); + + pTopic->sql = calloc(pTopic->sqlLen + 1, sizeof(char)); SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER); - SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER); - SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len, TOPIC_DECODE_OVER); - SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER); - SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER); +// SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER); +// SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len, TOPIC_DECODE_OVER); +// SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER); +// SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER); SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER) @@ -178,7 +180,7 @@ void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) { static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) { SName name = {0}; - tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TOPIC); + tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); char db[TSDB_TABLE_FNAME_LEN] = {0}; tNameGetFullDbName(&name, db); @@ -203,20 +205,24 @@ static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMq return pDrop; } -static int32_t mndCheckCreateTopicMsg(SCMCreateTopicReq *pCreate) { +static int32_t mndCheckCreateTopicMsg(SCMCreateTopicReq *creattopReq) { // deserialize and other stuff return 0; } static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq *pCreate, SDbObj *pDb) { SMqTopicObj topicObj = {0}; - tstrncpy(topicObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN); + tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN); tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN); topicObj.createTime = taosGetTimestampMs(); topicObj.updateTime = topicObj.createTime; topicObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); topicObj.dbUid = pDb->uid; topicObj.version = 1; + topicObj.sql = strdup(pCreate->sql); + topicObj.physicalPlan = strdup(pCreate->physicalPlan); + topicObj.logicalPlan = strdup(pCreate->logicalPlan); + topicObj.sqlLen = strlen(pCreate->sql); SSdbRaw *pTopicRaw = mndTopicActionEncode(&topicObj); if (pTopicRaw == NULL) return -1; @@ -228,46 +234,47 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; char *msgStr = pMsg->rpcMsg.pCont; - SCMCreateTopicReq *pCreate; - tDeserializeSCMCreateTopicReq(msgStr, pCreate); - mDebug("topic:%s, start to create", pCreate->name); + SCMCreateTopicReq createTopicReq = {0}; + tDeserializeSCMCreateTopicReq(msgStr, &createTopicReq); + + mDebug("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql); - if (mndCheckCreateTopicMsg(pCreate) != 0) { - mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + if (mndCheckCreateTopicMsg(&createTopicReq) != 0) { + mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr()); return -1; } - SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pCreate->name); + SMqTopicObj *pTopic = mndAcquireTopic(pMnode, createTopicReq.name); if (pTopic != NULL) { sdbRelease(pMnode->pSdb, pTopic); - if (pCreate->igExists) { - mDebug("topic:%s, already exist, ignore exist is set", pCreate->name); + if (createTopicReq.igExists) { + mDebug("topic:%s, already exist, ignore exist is set", createTopicReq.name); return 0; } else { terrno = TSDB_CODE_MND_TOPIC_ALREADY_EXIST; - mError("db:%s, failed to create since %s", pCreate->name, terrstr()); + mError("db:%s, failed to create since %s", createTopicReq.name, terrstr()); return -1; } } - SDbObj *pDb = mndAcquireDbByTopic(pMnode, pCreate->name); + SDbObj *pDb = mndAcquireDbByTopic(pMnode, createTopicReq.name); if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_SELECTED; - mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr()); return -1; } - int32_t code = mndCreateTopic(pMnode, pMsg, pCreate, pDb); + int32_t code = mndCreateTopic(pMnode, pMsg, &createTopicReq, pDb); mndReleaseDb(pMnode, pDb); if (code != 0) { terrno = code; - mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr()); return -1; } - return TSDB_CODE_MND_ACTION_IN_PROGRESS; + return TSDB_CODE_SUCCESS; } static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pMsg, SMqTopicObj *pTopic) { return 0; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index a8296e8f8e5867f8c507e9ab1dbd8bcc3f3c1a88..86ba2d99a30799ae7d8907c928f12d2cd5e77423 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -713,13 +713,6 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe pCheckInfo->initBuf = true; int32_t order = pHandle->order; - // no data in buffer, abort -// if (pHandle->pMemTable->snapshot.mem == NULL && pHandle->pMemTable->snapshot.imem == NULL) { -// return false; -// } -// -// assert(pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL); -// STbData** pMem = NULL; STbData** pIMem = NULL; @@ -787,8 +780,7 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe assert(pCheckInfo->lastKey >= key); } } else { - tsdbDebug("%p uid:%"PRId64", no data in imem, 0x%"PRIx64, pHandle, pCheckInfo->tableId, - pHandle->qId); + tsdbDebug("%p uid:%"PRId64", no data in imem, 0x%"PRIx64, pHandle, pCheckInfo->tableId, pHandle->qId); } return true; @@ -2554,9 +2546,6 @@ static bool doHasDataInBuffer(STsdbReadHandle* pTsdbReadHandle) { pTsdbReadHandle->activeIndex += 1; } - // no data in memtable or imemtable, decrease the memory reference. - // TODO !! -// tsdbMayUnTakeMemSnapshot(pTsdbReadHandle); return false; } diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 909b233efb9a6415ecb35175210db20ebfc7847a..2a9b4ed7141e27617f4853fce947c4eb92e64864 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -68,6 +68,7 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { pTbCfg = metaGetTbInfoByName(pVnode->pMeta, pReq->tableFname, &uid); if (pTbCfg == NULL) { + code = TSDB_CODE_VND_TB_NOT_EXIST; goto _exit; } diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 94f34b8e1726dcd234c6e08eb426536f1f669edf..f6b752bc6cbbd89bb3a897c41ac8e9f981aa2ed5 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -285,16 +285,16 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter, char dbFullName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(pTableName, dbFullName); - ctgDebug("try to get table meta from vnode, db:%s, tbName:%s", dbFullName, pTableName->tname); + ctgDebug("try to get table meta from vnode, db:%s, tbName:%s", dbFullName, tNameGetTableName(pTableName)); - SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbName = dbFullName, .tableFullName = (char *)pTableName->tname}; + SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbName = dbFullName, .tableFullName = (char *)tNameGetTableName(pTableName)}; char *msg = NULL; SEpSet *pVnodeEpSet = NULL; int32_t msgLen = 0; int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)](&bInput, &msg, 0, &msgLen); if (code) { - ctgError("Build vnode tablemeta msg failed, code:%x, tbName:%s", code, pTableName->tname); + ctgError("Build vnode tablemeta msg failed, code:%x, tbName:%s", code, tNameGetTableName(pTableName)); CTG_ERR_RET(code); } @@ -313,21 +313,21 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter, if (TSDB_CODE_SUCCESS != rpcRsp.code) { if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) { SET_META_TYPE_NONE(output->metaType); - ctgDebug("tablemeta not exist in vnode, tbName:%s", pTableName->tname); + ctgDebug("tablemeta not exist in vnode, tbName:%s", tNameGetTableName(pTableName)); return TSDB_CODE_SUCCESS; } - ctgError("error rsp for table meta from vnode, code:%x, tbName:%s", rpcRsp.code, pTableName->tname); + ctgError("error rsp for table meta from vnode, code:%x, tbName:%s", rpcRsp.code, tNameGetTableName(pTableName)); CTG_ERR_RET(rpcRsp.code); } code = queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)](output, rpcRsp.pCont, rpcRsp.contLen); if (code) { - ctgError("Process vnode tablemeta rsp failed, code:%x, tbName:%s", code, pTableName->tname); + ctgError("Process vnode tablemeta rsp failed, code:%x, tbName:%s", code, tNameGetTableName(pTableName)); CTG_ERR_RET(code); } - ctgDebug("Got table meta from vnode, db:%s, tbName:%s", dbFullName, pTableName->tname); + ctgDebug("Got table meta from vnode, db:%s, tbName:%s", dbFullName, tNameGetTableName(pTableName)); return TSDB_CODE_SUCCESS; } @@ -776,7 +776,7 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con STableMetaOutput *output = &voutput; if (CTG_IS_STABLE(isSTable)) { - ctgDebug("will renew table meta, supposed to be stable, tbName:%s", pTableName->tname); + ctgDebug("will renew table meta, supposed to be stable, tbName:%s", tNameGetTableName(pTableName)); // if get from mnode failed, will not try vnode CTG_ERR_JRET(ctgGetTableMetaFromMnode(pCatalog, pTransporter, pMgmtEps, pTableName, &moutput)); @@ -787,13 +787,13 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con output = &moutput; } } else { - ctgDebug("will renew table meta, not supposed to be stable, tbName:%s, isStable:%d", pTableName->tname, isSTable); + ctgDebug("will renew table meta, not supposed to be stable, tbName:%s, isStable:%d", tNameGetTableName(pTableName), isSTable); // if get from vnode failed or no table meta, will not try mnode CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput)); if (CTG_IS_META_TABLE(voutput.metaType) && TSDB_SUPER_TABLE == voutput.tbMeta->tableType) { - ctgDebug("will continue to renew table meta since got stable, tbName:%s, metaType:%d", pTableName->tname, voutput.metaType); + ctgDebug("will continue to renew table meta since got stable, tbName:%s, metaType:%d", tNameGetTableName(pTableName), voutput.metaType); CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.tbFname, &moutput)); @@ -820,7 +820,7 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con } if (CTG_IS_META_NONE(output->metaType)) { - ctgError("no tablemeta got, tbNmae:%s", pTableName->tname); + ctgError("no tablemeta got, tbNmae:%s", tNameGetTableName(pTableName)); CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST); } @@ -860,7 +860,7 @@ int32_t ctgGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMg CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist)); if (0 == exist) { - ctgError("renew tablemeta succeed but get from cache failed, may be deleted, tbName:%s", pTableName->tname); + ctgError("renew tablemeta succeed but get from cache failed, may be deleted, tbName:%s", tNameGetTableName(pTableName)); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } @@ -1241,7 +1241,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S } else { int32_t vgId = tbMeta->vgId; if (NULL == taosHashGetClone(dbVgroup->vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) { - ctgError("table's vgId not found in vgroup list, vgId:%d, tbName:%s", vgId, pTableName->tname); + ctgError("table's vgId not found in vgroup list, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName)); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } @@ -1252,7 +1252,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S } if (NULL == taosArrayPush(vgList, &vgroupInfo)) { - ctgError("taosArrayPush vgroupInfo to array failed, vgId:%d, tbName:%s", vgId, pTableName->tname); + ctgError("taosArrayPush vgroupInfo to array failed, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName)); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index d76f2703924ff51ff4cb2d16986c33dbda29a871..42c0f2a6e42598453ca0360d1c044941b0f173f6 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -542,34 +542,34 @@ typedef struct SOrderOperatorInfo { void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream); -SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime); -SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv); - -SOperatorInfo* createAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); -SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult); -SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTableScanOperator(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTableSeqScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); + +SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); +SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult); +SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv); +SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createMultiwaySortOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows, void* merger); -SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp); -SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger, bool multigroupResult); -SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, +SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp); +SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger, bool multigroupResult); +SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter); -SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput); -SOperatorInfo* createOrderOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal); +SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, int32_t numOfOutput); +SOperatorInfo* createOrderOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal); //SSDataBlock* doGlobalAggregate(void* param, bool* newgroup); //SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 0dfcc9b1d81d226a2d563fbf88087580a1b0a641..031900d2bdb823bf01cd488ba010d27efaba8ccc 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -294,7 +294,6 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO SSDataBlock *res = calloc(1, sizeof(SSDataBlock)); res->info.numOfCols = numOfOutput; - res->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData)); for (int32_t i = 0; i < numOfOutput; ++i) { SColumnInfoData idata = {{0}}; @@ -1815,7 +1814,7 @@ static int32_t setCtxTagColumnInfo(SQLFunctionCtx *pCtx, int32_t numOfOutput) { return TSDB_CODE_SUCCESS; } -static SQLFunctionCtx* createSQLFunctionCtx(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, +static SQLFunctionCtx* createSqlFunctionCtx(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, int32_t** rowCellInfoOffset) { STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; @@ -1919,6 +1918,104 @@ static SQLFunctionCtx* createSQLFunctionCtx(STaskRuntimeEnv* pRuntimeEnv, SExprI return pFuncCtx; } +static SQLFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExpr, int32_t numOfOutput, int32_t** rowCellInfoOffset) { + SQLFunctionCtx * pFuncCtx = (SQLFunctionCtx *)calloc(numOfOutput, sizeof(SQLFunctionCtx)); + if (pFuncCtx == NULL) { + return NULL; + } + + *rowCellInfoOffset = calloc(numOfOutput, sizeof(int32_t)); + if (*rowCellInfoOffset == 0) { + tfree(pFuncCtx); + return NULL; + } + + for (int32_t i = 0; i < numOfOutput; ++i) { + SSqlExpr *pSqlExpr = &pExpr[i].base; + SQLFunctionCtx* pCtx = &pFuncCtx[i]; +#if 0 + SColIndex *pIndex = &pSqlExpr->colInfo; + + if (TSDB_COL_REQ_NULL(pIndex->flag)) { + pCtx->requireNull = true; + pIndex->flag &= ~(TSDB_COL_NULL); + } else { + pCtx->requireNull = false; + } +#endif +// pCtx->inputBytes = pSqlExpr->colBytes; +// pCtx->inputType = pSqlExpr->colType; + + pCtx->ptsOutputBuf = NULL; + + pCtx->resDataInfo.bytes = pSqlExpr->resSchema.bytes; + pCtx->resDataInfo.type = pSqlExpr->resSchema.type; + +// pCtx->order = pQueryAttr->order.order; +// pCtx->functionId = pSqlExpr->functionId; +// pCtx->stableQuery = pQueryAttr->stableQuery; + pCtx->resDataInfo.intermediateBytes = pSqlExpr->interBytes; + pCtx->start.key = INT64_MIN; + pCtx->end.key = INT64_MIN; + + pCtx->numOfParams = pSqlExpr->numOfParams; + for (int32_t j = 0; j < pCtx->numOfParams; ++j) { + int16_t type = pSqlExpr->param[j].nType; + int16_t bytes = pSqlExpr->param[j].nLen; + + if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { + taosVariantCreateFromBinary(&pCtx->param[j], pSqlExpr->param[j].pz, bytes, type); + } else { + taosVariantCreateFromBinary(&pCtx->param[j], (char *)&pSqlExpr->param[j].i, bytes, type); + } + } + + // set the order information for top/bottom query + int32_t functionId = pCtx->functionId; + + if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) { + int32_t f = getExprFunctionId(&pExpr[0]); + assert(f == FUNCTION_TS || f == FUNCTION_TS_DUMMY); + +// pCtx->param[2].i = pQueryAttr->order.order; + pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; + pCtx->param[3].i = functionId; + pCtx->param[3].nType = TSDB_DATA_TYPE_BIGINT; + +// pCtx->param[1].i = pQueryAttr->order.col.info.colId; + } else if (functionId == FUNCTION_INTERP) { +// pCtx->param[2].i = (int8_t)pQueryAttr->fillType; +// if (pQueryAttr->fillVal != NULL) { +// if (isNull((const char *)&pQueryAttr->fillVal[i], pCtx->inputType)) { +// pCtx->param[1].nType = TSDB_DATA_TYPE_NULL; +// } else { // todo refactor, taosVariantCreateFromBinary should handle the NULL value +// if (pCtx->inputType != TSDB_DATA_TYPE_BINARY && pCtx->inputType != TSDB_DATA_TYPE_NCHAR) { +// taosVariantCreateFromBinary(&pCtx->param[1], (char *)&pQueryAttr->fillVal[i], pCtx->inputBytes, pCtx->inputType); +// } +// } +// } + } else if (functionId == FUNCTION_TS_COMP) { +// pCtx->param[0].i = pQueryAttr->vgId; //TODO this should be the parameter from client + pCtx->param[0].nType = TSDB_DATA_TYPE_BIGINT; + } else if (functionId == FUNCTION_TWA) { +// pCtx->param[1].i = pQueryAttr->window.skey; + pCtx->param[1].nType = TSDB_DATA_TYPE_BIGINT; +// pCtx->param[2].i = pQueryAttr->window.ekey; + pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; + } else if (functionId == FUNCTION_ARITHM) { +// pCtx->param[1].pz = (char*) getScalarFuncSupport(pRuntimeEnv->scalarSup, i); + } + } + +// for(int32_t i = 1; i < numOfOutput; ++i) { +// (*rowCellInfoOffset)[i] = (int32_t)((*rowCellInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) + pExpr[i - 1].base.interBytes); +// } + + setCtxTagColumnInfo(pFuncCtx, numOfOutput); + + return pFuncCtx; +} + static void* destroySQLFunctionCtx(SQLFunctionCtx* pCtx, int32_t numOfOutput) { if (pCtx == NULL) { return NULL; @@ -4452,7 +4549,7 @@ void queryCostStatis(SQInfo *pQInfo) { // return true; //} -void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream) { +void appendDownstream(SOperatorInfo* p, SOperatorInfo* pUpstream) { if (p->pDownstream == NULL) { assert(p->numOfDownstream == 0); } @@ -4545,28 +4642,6 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr pRuntimeEnv->cur.vgroupIndex = -1; setResultBufSize(pQueryAttr, &pRuntimeEnv->resultInfo); - switch(tbScanner) { -// case OP_TableBlockInfoScan: { -// pRuntimeEnv->proot = createTableBlockInfoScanOperator(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv); -// break; -// } -// case OP_TableSeqScan: { -// pRuntimeEnv->proot = createTableSeqScanOperator(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv); -// break; -// } -// case OP_DataBlocksOptScan: { -// pRuntimeEnv->proot = createDataBlocksOptScanInfo(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0); -// break; -// } -// case OP_TableScan: { -// pRuntimeEnv->proot = createTableScanOperator(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr)); -// break; -// } - default: { // do nothing - break; - } - } - if (sourceOptr != NULL) { assert(pRuntimeEnv->proot == NULL); pRuntimeEnv->proot = sourceOptr; @@ -4841,18 +4916,26 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) { } -SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createTableScanOperator(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo) { assert(repeatTime > 0 && numOfOutput > 0); - STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); - pInfo->pTsdbReadHandle = pTsdbQueryHandle; + STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + + if (pInfo == NULL || pOperator == NULL) { + tfree(pInfo); + tfree(pOperator); + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return NULL; + } + + pInfo->pTsdbReadHandle = pTsdbReadHandle; pInfo->times = repeatTime; pInfo->reverseTimes = 0; pInfo->order = order; pInfo->current = 0; pInfo->scanFlag = MAIN_SCAN; - SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "TableScanOperator"; pOperator->operatorType = OP_TableScan; pOperator->blockingOptr = false; @@ -4866,10 +4949,43 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, int32_t order, in return pOperator; } -SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv) { +SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo) { + assert(repeatTime > 0); + + STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + tfree(pInfo); + tfree(pOperator); + + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return NULL; + } + + pInfo->pTsdbReadHandle = pTsdbReadHandle; + pInfo->times = repeatTime; + pInfo->reverseTimes = reverseTime; + pInfo->order = order; + pInfo->current = 0; + pInfo->scanFlag = MAIN_SCAN; + + pOperator->name = "DataBlocksOptimizedScanOperator"; + pOperator->operatorType = OP_DataBlocksOptScan; + pOperator->blockingOptr = false; + pOperator->status = OP_IN_EXECUTING; + pOperator->info = pInfo; + pOperator->numOfOutput = numOfOutput; + pOperator->pRuntimeEnv = NULL; + pOperator->exec = doTableScan; + pOperator->pTaskInfo = pTaskInfo; + + return pOperator; +} + +SOperatorInfo* createTableSeqScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv) { STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); - pInfo->pTsdbReadHandle = pTsdbQueryHandle; + pInfo->pTsdbReadHandle = pTsdbReadHandle; pInfo->times = 1; pInfo->reverseTimes = 0; pInfo->order = pRuntimeEnv->pQueryAttr->order.order; @@ -4890,10 +5006,10 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, STaskRuntimeEn return pOperator; } -SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv) { +SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv) { STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); - pInfo->pTsdbReadHandle = pTsdbQueryHandle; + pInfo->pTsdbReadHandle = pTsdbReadHandle; pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); SColumnInfoData infoData = {{0}}; @@ -4973,27 +5089,6 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf } -SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) { - assert(repeatTime > 0); - - STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); - pInfo->pTsdbReadHandle = pTsdbQueryHandle; - pInfo->times = repeatTime; - pInfo->reverseTimes = reverseTime; - pInfo->current = 0; - pInfo->order = pRuntimeEnv->pQueryAttr->order.order; - - SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo)); - pOptr->name = "DataBlocksOptimizedScanOperator"; -// pOptr->operatorType = OP_DataBlocksOptScan; - pOptr->pRuntimeEnv = pRuntimeEnv; - pOptr->blockingOptr = false; - pOptr->info = pInfo; - pOptr->exec = doTableScan; - - return pOptr; -} - SArray* getOrderCheckColumns(STaskAttr* pQuery) { int32_t numOfCols = (pQuery->pGroupbyExpr == NULL)? 0: taosArrayGetSize(pQuery->pGroupbyExpr->columnInfo); @@ -5098,7 +5193,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S pInfo->bufCapacity = 4096; pInfo->udfInfo = pUdfInfo; pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity * pInfo->resultRowFactor); - pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); + pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr); pInfo->groupColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr); @@ -5147,7 +5242,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S // pOperator->exec = doGlobalAggregate; pOperator->cleanup = destroyGlobalAggOperatorInfo; - appendUpstream(pOperator, downstream); + appendDownstream(pOperator, downstream); return pOperator; } @@ -5296,7 +5391,7 @@ SOperatorInfo *createOrderOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn pOperator->cleanup = destroyOrderOperatorInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - appendUpstream(pOperator, downstream); + appendDownstream(pOperator, downstream); return pOperator; } @@ -6228,33 +6323,33 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { tfree(pOperator); } -SOperatorInfo* createAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - int32_t numOfRows = (int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); +// STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + int32_t numOfRows = 1;//(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows); - pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); + pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); pInfo->seed = rand(); - setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed, MAIN_SCAN); +// setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed, MAIN_SCAN); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "TableAggregate"; -// pOperator->operatorType = OP_Aggregate; + pOperator->operatorType = OP_Aggregate; pOperator->blockingOptr = true; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; - pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->pRuntimeEnv = NULL; pOperator->exec = doAggregate; pOperator->cleanup = destroyAggOperatorInfo; - appendUpstream(pOperator, downstream); + appendDownstream(pOperator, downstream); return pOperator; } @@ -6334,7 +6429,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOp size_t tableGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) tableGroup); - pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); + pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup, TSDB_DATA_TYPE_INT); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -6349,7 +6444,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOp pOperator->exec = doSTableAggregate; pOperator->cleanup = destroyAggOperatorInfo; - appendUpstream(pOperator, downstream); + appendDownstream(pOperator, downstream); return pOperator; } @@ -6362,7 +6457,7 @@ SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator SOptrBasicInfo* pBInfo = &pInfo->binfo; pBInfo->pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity); - pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset); + pBInfo->pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset); initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed, MAIN_SCAN); @@ -6379,7 +6474,7 @@ SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator pOperator->exec = doProjectOperation; pOperator->cleanup = destroyProjectOperatorInfo; - appendUpstream(pOperator, downstream); + appendDownstream(pOperator, downstream); return pOperator; } @@ -6437,7 +6532,7 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->cleanup = destroyConditionOperatorInfo; - appendUpstream(pOperator, downstream); + appendDownstream(pOperator, downstream); return pOperator; } @@ -6455,7 +6550,7 @@ SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn pOperator->exec = doLimit; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - appendUpstream(pOperator, downstream); + appendDownstream(pOperator, downstream); return pOperator; } @@ -6463,7 +6558,7 @@ SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); - pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); + pInfo->pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); @@ -6480,7 +6575,7 @@ SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOpe pOperator->exec = doIntervalAgg; pOperator->cleanup = destroyBasicOperatorInfo; - appendUpstream(pOperator, downstream); + appendDownstream(pOperator, downstream); return pOperator; } @@ -6488,7 +6583,7 @@ SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOpe SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); - pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); + pInfo->pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); @@ -6505,7 +6600,7 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S pOperator->exec = doAllIntervalAgg; pOperator->cleanup = destroyBasicOperatorInfo; - appendUpstream(pOperator, downstream); + appendDownstream(pOperator, downstream); return pOperator; } @@ -6513,7 +6608,7 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper SStateWindowOperatorInfo* pInfo = calloc(1, sizeof(SStateWindowOperatorInfo)); pInfo->colIndex = -1; pInfo->reptScan = false; - pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); + pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); @@ -6529,13 +6624,13 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper pOperator->exec = doStateWindowAgg; pOperator->cleanup = destroyStateWindowOperatorInfo; - appendUpstream(pOperator, downstream); + appendDownstream(pOperator, downstream); return pOperator; } SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { SSWindowOperatorInfo* pInfo = calloc(1, sizeof(SSWindowOperatorInfo)); - pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); + pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); @@ -6554,14 +6649,14 @@ SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator pOperator->exec = doSessionWindowAgg; pOperator->cleanup = destroySWindowOperatorInfo; - appendUpstream(pOperator, downstream); + appendDownstream(pOperator, downstream); return pOperator; } SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); - pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); + pInfo->pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); @@ -6578,14 +6673,14 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim pOperator->exec = doSTableIntervalAgg; pOperator->cleanup = destroyBasicOperatorInfo; - appendUpstream(pOperator, downstream); + appendDownstream(pOperator, downstream); return pOperator; } SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); - pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); + pInfo->pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); @@ -6602,7 +6697,7 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun pOperator->exec = doAllSTableIntervalAgg; pOperator->cleanup = destroyBasicOperatorInfo; - appendUpstream(pOperator, downstream); + appendDownstream(pOperator, downstream); return pOperator; } @@ -6613,7 +6708,7 @@ SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator pInfo->colIndex = -1; // group by column index - pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); + pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; @@ -6635,7 +6730,7 @@ SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator pOperator->exec = hashGroupbyAggregate; pOperator->cleanup = destroyGroupbyOperatorInfo; - appendUpstream(pOperator, downstream); + appendDownstream(pOperator, downstream); return pOperator; } @@ -6674,7 +6769,7 @@ SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInf pOperator->exec = doFill; pOperator->cleanup = destroySFillOperatorInfo; - appendUpstream(pOperator, downstream); + appendDownstream(pOperator, downstream); return pOperator; } @@ -6722,7 +6817,7 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->cleanup = destroySlimitOperatorInfo; - appendUpstream(pOperator, downstream); + appendDownstream(pOperator, downstream); return pOperator; } @@ -7020,7 +7115,7 @@ SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperato pOperator->pExpr = pExpr; pOperator->cleanup = destroyDistinctOperatorInfo; - appendUpstream(pOperator, downstream); + appendDownstream(pOperator, downstream); return pOperator; } @@ -7187,10 +7282,13 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId) { SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTaskInfo, void* param) { if (pPhyNode->pChildren == NULL || taosArrayGetSize(pPhyNode->pChildren) == 0) { if (pPhyNode->info.type == OP_TableScan) { - SScanPhyNode* pScanPhyNode = (SScanPhyNode*) pPhyNode; - size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets); - SOperatorInfo* pOperatorInfo = createTableScanOperator(param, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pTaskInfo); - pTaskInfo->pRoot = pOperatorInfo; + SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode; + size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets); + return createTableScanOperator(param, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pTaskInfo); + } else if (pPhyNode->info.type == OP_DataBlocksOptScan) { + SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode; + size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets); + return createDataBlocksOptScanInfo(param, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo); } else { assert(0); } @@ -7199,21 +7297,20 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle) { STsdbQueryCond cond = {.loadExternalRows = false}; - cond.twindow.skey = INT64_MIN; - cond.twindow.ekey = INT64_MAX; uint64_t uid = 0; SPhyNode* pPhyNode = pPlan->pNode; - if (pPhyNode->info.type == OP_TableScan) { + if (pPhyNode->info.type == OP_TableScan || pPhyNode->info.type == OP_DataBlocksOptScan) { - SScanPhyNode* pScanNode = (SScanPhyNode*) pPhyNode; - uid = pScanNode->uid; - cond.order = pScanNode->order; - cond.numOfCols = taosArrayGetSize(pScanNode->node.pTargets); + STableScanPhyNode* pTableScanNode = (STableScanPhyNode*) pPhyNode; + uid = pTableScanNode->scan.uid; + cond.order = pTableScanNode->scan.order; + cond.numOfCols = taosArrayGetSize(pTableScanNode->scan.node.pTargets); cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo)); + cond.twindow = pTableScanNode->window; for(int32_t i = 0; i < cond.numOfCols; ++i) { - SExprInfo* pExprInfo = taosArrayGetP(pScanNode->node.pTargets, i); + SExprInfo* pExprInfo = taosArrayGetP(pTableScanNode->scan.node.pTargets, i); assert(pExprInfo->pExpr->nodeType == TEXPR_COL_NODE); SSchema* pSchema = pExprInfo->pExpr->pSchema; @@ -7235,7 +7332,11 @@ int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* r *pTaskInfo = createExecTaskInfo((uint64_t)pPlan->id.queryId); tsdbReadHandleT tsdbReadHandle = tsdbQueryTables(readerHandle, &cond, &group, (*pTaskInfo)->id.queryId, NULL); - doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, tsdbReadHandle); + (*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, tsdbReadHandle); + if ((*pTaskInfo)->pRoot == NULL) { + return terrno; + } + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/function/src/tscalarfunction.c b/source/libs/function/src/tscalarfunction.c index 1e7e3ef1556eaa161ca2311253b656dddc559cce..90d25b3e4a225067a77a66d97f4b103347f38d38 100644 --- a/source/libs/function/src/tscalarfunction.c +++ b/source/libs/function/src/tscalarfunction.c @@ -2,7 +2,6 @@ #include "tbinoperator.h" #include "tunaryoperator.h" - static void assignBasicParaInfo(struct SScalarFuncParam* dst, const struct SScalarFuncParam* src) { dst->type = src->type; dst->bytes = src->bytes; diff --git a/source/libs/parser/inc/astGenerator.h b/source/libs/parser/inc/astGenerator.h index 7f357a2bbdfeff431dcfed899040e2ba264e7a2c..c601c4e3e35a82e6248cc4f24a001dfb2bfb1a56 100644 --- a/source/libs/parser/inc/astGenerator.h +++ b/source/libs/parser/inc/astGenerator.h @@ -170,8 +170,6 @@ typedef struct SCreateDbInfo { int8_t update; int8_t cachelast; SArray *keep; -// int8_t dbType; -// int16_t partitions; } SCreateDbInfo; typedef struct SCreateFuncInfo { diff --git a/source/libs/parser/src/astValidate.c b/source/libs/parser/src/astValidate.c index 5d56a88c14bc9fccc11d973e89a042e030286cfa..512204b40b1e02f858ec8a494630fa721c95b791 100644 --- a/source/libs/parser/src/astValidate.c +++ b/source/libs/parser/src/astValidate.c @@ -917,6 +917,8 @@ int32_t validateLimitNode(SQueryStmtInfo *pQueryInfo, SSqlNode* pSqlNode, SMsgBu return buildInvalidOperationMsg(pMsgBuf, msg1); } } + + return TSDB_CODE_SUCCESS; } int32_t validateOrderbyNode(SQueryStmtInfo *pQueryInfo, SSqlNode* pSqlNode, SMsgBuf* pMsgBuf) { diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index fea215238c67dda2a89345132e9d6fa0eca3d802..58e368aa0db24caf87f58ecc094ae1d70e977360 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -80,11 +80,11 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) { return TSDB_CODE_SUCCESS; } -int32_t qParseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) { +int32_t qParseQuerySql(SParseContext* pCxt, SQueryNode** pQueryNode) { if (isInsertSql(pCxt->pSql, pCxt->sqlLen)) { - return parseInsertSql(pCxt, (SVnodeModifOpStmtInfo**)pQuery); + return parseInsertSql(pCxt, (SVnodeModifOpStmtInfo**)pQueryNode); } else { - return parseQuerySql(pCxt, pQuery); + return parseQuerySql(pCxt, pQueryNode); } } diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 65c97110cec7bfeaf91804499acd0f2a10f59840..422233eed74beaf0e322023afbb394736ea8b2c3 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -189,6 +189,7 @@ static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableI return (SPhyNode*)node; } + static bool isSystemTable(SQueryTableInfo* pTable) { // todo return false; @@ -261,8 +262,8 @@ static void vgroupMsgToEpSet(const SVgroupMsg* vg, SQueryNodeAddr* execNode) { } static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) { - SVgroupsInfo* vgroupList = pTable->pMeta->vgroupList; - for (int32_t i = 0; i < pTable->pMeta->vgroupList->numOfVgroups; ++i) { + SVgroupsInfo* pVgroupList = pTable->pMeta->vgroupList; + for (int32_t i = 0; i < pVgroupList->numOfVgroups; ++i) { STORE_CURRENT_SUBPLAN(pCxt); SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN); subplan->msgType = TDMT_VND_QUERY; diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c index 123364134d16bf77feb3fe2bc56b5e497bf3fcfe..97eb63ac3129bc8454b6263b7f8aca31f2dde397 100644 --- a/source/libs/planner/src/physicalPlanJson.c +++ b/source/libs/planner/src/physicalPlanJson.c @@ -559,28 +559,35 @@ static const char* jkScanNodeTableId = "TableId"; static const char* jkScanNodeTableType = "TableType"; static const char* jkScanNodeTableOrder = "Order"; static const char* jkScanNodeTableCount = "Count"; +static const char* jkScanNodeTableRevCount = "Reverse"; static bool scanNodeToJson(const void* obj, cJSON* json) { - const SScanPhyNode* scan = (const SScanPhyNode*)obj; - bool res = cJSON_AddNumberToObject(json, jkScanNodeTableId, scan->uid); + const SScanPhyNode* pNode = (const SScanPhyNode*)obj; + bool res = cJSON_AddNumberToObject(json, jkScanNodeTableId, pNode->uid); + + if (res) { + res = cJSON_AddNumberToObject(json, jkScanNodeTableType, pNode->tableType); + } if (res) { - res = cJSON_AddNumberToObject(json, jkScanNodeTableType, scan->tableType); + res = cJSON_AddNumberToObject(json, jkScanNodeTableOrder, pNode->order); } if (res) { - res = cJSON_AddNumberToObject(json, jkScanNodeTableOrder, scan->order); + res = cJSON_AddNumberToObject(json, jkScanNodeTableCount, pNode->count); } + if (res) { - res = cJSON_AddNumberToObject(json, jkScanNodeTableCount, scan->count); + res = cJSON_AddNumberToObject(json, jkScanNodeTableRevCount, pNode->reverse); } return res; } static bool scanNodeFromJson(const cJSON* json, void* obj) { - SScanPhyNode* scan = (SScanPhyNode*)obj; - scan->uid = getNumber(json, jkScanNodeTableId); - scan->tableType = getNumber(json, jkScanNodeTableType); - scan->count = getNumber(json, jkScanNodeTableCount); - scan->order = getNumber(json, jkScanNodeTableOrder); + SScanPhyNode* pNode = (SScanPhyNode*)obj; + pNode->uid = getNumber(json, jkScanNodeTableId); + pNode->tableType = getNumber(json, jkScanNodeTableType); + pNode->count = getNumber(json, jkScanNodeTableCount); + pNode->order = getNumber(json, jkScanNodeTableOrder); + pNode->reverse = getNumber(json, jkScanNodeTableRevCount); return true; } @@ -996,10 +1003,13 @@ static SSubplan* subplanFromJson(const cJSON* json) { if (NULL == subplan) { return NULL; } + bool res = fromObject(json, jkSubplanId, subplanIdFromJson, &subplan->id, true); + if (res) { res = fromPnode(json, jkSubplanNode, phyNodeFromJson, (void**)&subplan->pNode); } + if (res) { res = fromObjectWithAlloc(json, jkSubplanDataSink, dataSinkFromJson, (void**)&subplan->pDataSink, sizeof(SDataSink), false); } @@ -1027,7 +1037,7 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) { } *str = cJSON_Print(json); -// printf("====Physical plan:====\n") +// printf("====Physical plan:====\n"); // printf("%s\n", *str); *len = strlen(*str) + 1; return TSDB_CODE_SUCCESS; @@ -1047,14 +1057,18 @@ cJSON* qDagToJson(const SQueryDag* pDag) { if(pRoot == NULL) { return NULL; } - cJSON_AddNumberToObject(pRoot, "numOfSubplans", pDag->numOfSubplans); - cJSON_AddNumberToObject(pRoot, "queryId", pDag->queryId); + + cJSON_AddNumberToObject(pRoot, "Number", pDag->numOfSubplans); + cJSON_AddNumberToObject(pRoot, "QueryId", pDag->queryId); + cJSON *pLevels = cJSON_CreateArray(); if(pLevels == NULL) { cJSON_Delete(pRoot); return NULL; } - cJSON_AddItemToObject(pRoot, "pSubplans", pLevels); + + cJSON_AddItemToObject(pRoot, "Subplans", pLevels); + size_t level = taosArrayGetSize(pDag->pSubplans); for(size_t i = 0; i < level; i++) { const SArray* pSubplans = (const SArray*)taosArrayGetP(pDag->pSubplans, i); @@ -1064,6 +1078,7 @@ cJSON* qDagToJson(const SQueryDag* pDag) { cJSON_Delete(pRoot); return NULL; } + cJSON_AddItemToArray(pLevels, plansOneLevel); for(size_t j = 0; j < num; j++) { cJSON* pSubplan = subplanToJson((const SSubplan*)taosArrayGetP(pSubplans, j)); @@ -1071,6 +1086,7 @@ cJSON* qDagToJson(const SQueryDag* pDag) { cJSON_Delete(pRoot); return NULL; } + cJSON_AddItemToArray(plansOneLevel, pSubplan); } } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 09e5291c85fd80b7492af974a34506c16adf3407..94574762175cdf0c27718adfbf982e1bfdc163da 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -1029,6 +1029,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, msg = pTask->msg; break; } + case TDMT_VND_QUERY: { msgSize = sizeof(SSubQueryMsg) + pTask->msgLen; msg = calloc(1, msgSize); @@ -1047,7 +1048,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, pMsg->contentLen = htonl(pTask->msgLen); memcpy(pMsg->msg, pTask->msg, pTask->msgLen); break; - } + } + case TDMT_VND_RES_READY: { msgSize = sizeof(SResReadyReq); msg = calloc(1, msgSize); diff --git a/source/libs/transport/src/rpcTcp.c b/source/libs/transport/src/rpcTcp.c index 9fa51a6fdce38072a008b1203ccfa0da88802d78..59d223153a20412a90bc21a043df764922b8fcb3 100644 --- a/source/libs/transport/src/rpcTcp.c +++ b/source/libs/transport/src/rpcTcp.c @@ -413,8 +413,11 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin pFdObj->thandle = thandle; pFdObj->port = port; pFdObj->ip = ip; - tDebug("%s %p TCP connection to 0x%x:%hu is created, localPort:%hu FD:%p numOfFds:%d", pThreadObj->label, thandle, - ip, port, localPort, pFdObj, pThreadObj->numOfFds); + + char ipport[40] = {0}; + taosIpPort2String(ip, port, ipport); + tDebug("%s %p TCP connection to %s is created, localPort:%hu FD:%p numOfFds:%d", pThreadObj->label, thandle, + ipport, localPort, pFdObj, pThreadObj->numOfFds); } else { tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno)); taosCloseSocket(fd); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 8294bca959242198af9838f31397011f23d202cd..fe63ba1999d3a0e2e58277318176d0aea97d2a0d 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -301,12 +301,13 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_CFG_FILE, "Invalid config file") TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TERM_FILE, "Invalid term file") TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FLOWCTRL, "Database memory is full") TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_DROPPING, "Database is dropping") -TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_UPDATING, "Database is updating") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_UPDATING, "Database is updating") TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_CLOSING, "Database is closing") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, "Database suspended") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, "Database write operation denied") TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_SYNCING, "Database is syncing") TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TSDB_STATE, "Invalid tsdb state") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_TB_NOT_EXIST, "Table not exists") // tsdb TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")