diff --git a/cmake/cmake.options b/cmake/cmake.options index faa45256fb7ed7395136ecf54d5f24448466720f..e84d02800c5bbf72273f6ef13cd3adfa3b3c0256 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -50,6 +50,12 @@ option( OFF ) +option( + BUILD_WITH_UV_TRANS + "If build with libuv_trans " + OFF +) + option( BUILD_WITH_CRAFT "If build with canonical-raft" diff --git a/include/client/taos.h b/include/client/taos.h index 40772e9d2c4e201b261fbc2481a90a1bb48da83b..626264d94ed6a296f380144cf4363003b81737cf 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -92,14 +92,16 @@ typedef struct taosField { typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code); -DLL_EXPORT void taos_cleanup(void); -DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...); -DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port); -DLL_EXPORT TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port); -DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port); -DLL_EXPORT void taos_close(TAOS *taos); +typedef struct tmq_t tmq_t; +typedef struct tmq_conf_t tmq_conf_t; +typedef struct tmq_list_t tmq_list_t; -const char *taos_data_type(int type); +typedef struct tmq_message_t tmq_message_t; +typedef struct tmq_message_topic_t tmq_message_topic_t; +typedef struct tmq_message_tb_t tmq_message_tb_t; +typedef struct tmq_tb_iter_t tmq_tb_iter_t; +typedef struct tmq_message_col_t tmq_message_col_t; +typedef struct tmq_col_iter_t tmq_col_iter_t; typedef struct TAOS_BIND { int buffer_type; @@ -134,6 +136,15 @@ typedef struct TAOS_MULTI_BIND { int num; } TAOS_MULTI_BIND; +DLL_EXPORT void taos_cleanup(void); +DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...); +DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port); +DLL_EXPORT TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port); +DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port); +DLL_EXPORT void taos_close(TAOS *taos); + +const char *taos_data_type(int type); + DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos); DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length); DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags); @@ -192,16 +203,6 @@ DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr); DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList); DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision); -typedef struct tmq_t tmq_t; -typedef struct tmq_conf_t tmq_conf_t; -typedef struct tmq_list_t tmq_list_t; - -typedef struct tmq_message_t tmq_message_t; -typedef struct tmq_message_topic_t tmq_message_topic_t; -typedef struct tmq_message_tb_t tmq_message_tb_t; -typedef struct tmq_tb_iter_t tmq_tb_iter_t; -typedef struct tmq_message_col_t tmq_message_col_t; -typedef struct tmq_col_iter_t tmq_col_iter_t; DLL_EXPORT tmq_list_t* tmq_list_new(); DLL_EXPORT int32_t tmq_list_append(tmq_list_t*, char*); diff --git a/include/common/common.h b/include/common/common.h index 9b8a4654428eb19efdc727e228eff401fac848ed..056d2789fe0b865ee8cc9d824836cfafdadaf4d9 100644 --- a/include/common/common.h +++ b/include/common/common.h @@ -38,6 +38,12 @@ // int16_t bytes; //} SSchema; +typedef struct { + uint32_t numOfTables; + SArray *pGroupList; + SHashObj *map; // speedup acquire the tableQueryInfo by table uid +} STableGroupInfo; + typedef struct SColumnDataAgg { int16_t colId; int64_t sum; @@ -57,17 +63,12 @@ typedef struct SDataBlockInfo { typedef struct SConstantItem { SColumnInfo info; - int32_t startIndex; // run-length-encoding to save the space for multiple rows - int32_t endIndex; + int32_t startRow; // run-length-encoding to save the space for multiple rows + int32_t endRow; SVariant value; } SConstantItem; -typedef struct { - uint32_t numOfTables; - SArray *pGroupList; - SHashObj *map; // speedup acquire the tableQueryInfo by table uid -} STableGroupInfo; - +// info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList); typedef struct SSDataBlock { SColumnDataAgg *pBlockAgg; SArray *pDataBlock; // SArray @@ -75,11 +76,88 @@ typedef struct SSDataBlock { SDataBlockInfo info; } SSDataBlock; +// pBlockAgg->numOfNull == info.rows, all data are null +// pBlockAgg->numOfNull == 0, no data are null. typedef struct SColumnInfoData { - SColumnInfo info; // TODO filter info needs to be removed - char *pData; // the corresponding block data in memory + SColumnInfo info; // TODO filter info needs to be removed + char *nullbitmap;// + char *pData; // the corresponding block data in memory } SColumnInfoData; +static FORCE_INLINE int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) { + int64_t tbUid = pBlock->info.uid; + int32_t numOfCols = pBlock->info.numOfCols; + int32_t rows = pBlock->info.rows; + int32_t sz = taosArrayGetSize(pBlock->pDataBlock); + + int32_t tlen = 0; + tlen += taosEncodeFixedI64(buf, tbUid); + tlen += taosEncodeFixedI32(buf, numOfCols); + tlen += taosEncodeFixedI32(buf, rows); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SColumnInfoData* pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); + tlen += taosEncodeFixedI16(buf, pColData->info.colId); + tlen += taosEncodeFixedI16(buf, pColData->info.type); + tlen += taosEncodeFixedI16(buf, pColData->info.bytes); + int32_t colSz = rows * pColData->info.bytes; + tlen += taosEncodeBinary(buf, pColData->pData, colSz); + } + return tlen; +} + +static FORCE_INLINE void* tDecodeDataBlock(void* buf, SSDataBlock* pBlock) { + int32_t sz; + + buf = taosDecodeFixedI64(buf, &pBlock->info.uid); + buf = taosDecodeFixedI32(buf, &pBlock->info.numOfCols); + buf = taosDecodeFixedI32(buf, &pBlock->info.rows); + buf = taosDecodeFixedI32(buf, &sz); + pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData)); + for (int32_t i = 0; i < sz; i++) { + SColumnInfoData data; + buf = taosDecodeFixedI16(buf, &data.info.colId); + buf = taosDecodeFixedI16(buf, &data.info.type); + buf = taosDecodeFixedI16(buf, &data.info.bytes); + int32_t colSz = pBlock->info.rows * data.info.bytes; + buf = taosDecodeBinary(buf, (void**)&data.pData, colSz); + taosArrayPush(pBlock->pDataBlock, &data); + } + return buf; +} + +static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp* pRsp) { + int32_t tlen = 0; + int32_t sz = 0; + tlen += taosEncodeFixedI64(buf, pRsp->consumerId); + tlen += tEncodeSSchemaWrapper(buf, pRsp->schemas); + if (pRsp->pBlockData) { + sz = taosArrayGetSize(pRsp->pBlockData); + } + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SSDataBlock* pBlock = (SSDataBlock*) taosArrayGet(pRsp->pBlockData, i); + tlen += tEncodeDataBlock(buf, pBlock); + } + return tlen; +} + +static FORCE_INLINE void* tDecodeSMqConsumeRsp(void* buf, SMqConsumeRsp* pRsp) { + int32_t sz; + buf = taosDecodeFixedI64(buf, &pRsp->consumerId); + pRsp->schemas = (SSchemaWrapper*)calloc(1, sizeof(SSchemaWrapper)); + if (pRsp->schemas == NULL) return NULL; + buf = tDecodeSSchemaWrapper(buf, pRsp->schemas); + buf = taosDecodeFixedI32(buf, &sz); + pRsp->pBlockData = taosArrayInit(sz, sizeof(SSDataBlock)); + for (int32_t i = 0; i < sz; i++) { + SSDataBlock block; + tDecodeDataBlock(buf, &block); + taosArrayPush(pRsp->pBlockData, &block); + } + return buf; +} + //====================================================================================================================== // the following structure shared by parser and executor typedef struct SColumn { diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d9d7d9bf9608c57733d9f3b53a77b44776510e29..27a01e4818109b8325ed7145f4c10bf043773c45 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -289,6 +289,7 @@ static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) { } return buf; } + typedef struct { int32_t acctId; int64_t clusterId; @@ -296,6 +297,7 @@ typedef struct { int8_t superUser; int8_t align[3]; SEpSet epSet; + char sVersion[128]; } SConnectRsp; typedef struct { @@ -1588,16 +1590,53 @@ typedef struct SMqSetCVgRsp { char cGroup[TSDB_CONSUMER_GROUP_LEN]; } SMqSetCVgRsp; -typedef struct SMqColData { - int16_t colId; - int16_t type; - int16_t bytes; -} SMqColMeta; +typedef struct { + uint32_t nCols; + SSchema *pSchema; +} SSchemaWrapper; + +static FORCE_INLINE int32_t tEncodeSSchema(void** buf, const SSchema* pSchema) { + int32_t tlen = 0; + tlen += taosEncodeFixedI8(buf, pSchema->type); + tlen += taosEncodeFixedI32(buf, pSchema->bytes); + tlen += taosEncodeFixedI32(buf, pSchema->colId); + tlen += taosEncodeString(buf, pSchema->name); + return tlen; +} + +static FORCE_INLINE void* tDecodeSSchema(void* buf, SSchema* pSchema) { + buf = taosDecodeFixedI8(buf, &pSchema->type); + buf = taosDecodeFixedI32(buf, &pSchema->bytes); + buf = taosDecodeFixedI32(buf, &pSchema->colId); + buf = taosDecodeStringTo(buf, pSchema->name); + return buf; +} + +static FORCE_INLINE int32_t tEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) { + int32_t tlen = 0; + tlen += taosEncodeFixedU32(buf, pSW->nCols); + for (int32_t i = 0; i < pSW->nCols; i ++) { + tlen += tEncodeSSchema(buf, &pSW->pSchema[i]); + } + return tlen; +} + +static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) { + buf = taosDecodeFixedU32(buf, &pSW->nCols); + pSW->pSchema = (SSchema*) calloc(pSW->nCols, sizeof(SSchema)); + if (pSW->pSchema == NULL) { + return NULL; + } + for (int32_t i = 0; i < pSW->nCols; i ++) { + buf = tDecodeSSchema(buf, &pSW->pSchema[i]); + } + return buf; +} typedef struct SMqTbData { int64_t uid; int32_t numOfRows; - char colData[]; + char* colData; } SMqTbData; typedef struct SMqTopicBlk { @@ -1612,18 +1651,12 @@ typedef struct SMqTopicBlk { } SMqTopicData; typedef struct SMqConsumeRsp { - int64_t consumerId; - int32_t numOfCols; - SMqColMeta* meta; - int32_t numOfTopics; - SMqTopicData* data; + int64_t consumerId; + SSchemaWrapper* schemas; + int32_t numOfTopics; + SArray* pBlockData; //SArray } SMqConsumeRsp; -static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp* pRsp) { - int32_t tlen = 0; - return tlen; -} - // one req for one vg+topic typedef struct SMqConsumeReq { SMsgHead head; diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index c61f3da6bd456058e9754918be10907486dd7b77..56fe0421a38f6c696f186cb5d059b36c49d36946 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -114,6 +114,7 @@ typedef struct STscObj { char user[TSDB_USER_LEN]; char pass[TSDB_PASSWORD_LEN]; char db[TSDB_DB_FNAME_LEN]; + char ver[128]; int32_t acctId; uint32_t connId; int32_t connType; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 102d9d545a4b0be873c880609eca68a26bece2cd..321d9a9a980c7b6775b579d6ba121bb9835f2c2c 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -119,7 +119,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo)); p->mgmtEp = epSet; p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores); - p->pAppHbMgr = appHbMgrInit(p); + /*p->pAppHbMgr = appHbMgrInit(p);*/ taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES); pInst = &p; @@ -218,12 +218,10 @@ int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag, if (pQueryNode->type == TSDB_SQL_SELECT) { setResSchemaInfo(&pRequest->body.resInfo, pSchema, numOfCols); - tfree(pSchema); pRequest->type = TDMT_VND_QUERY; - } else { - tfree(pSchema); } + tfree(pSchema); return code; } @@ -621,6 +619,27 @@ struct tmq_message_t { }; int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) { + SMqConsumeRsp rsp; + tDecodeSMqConsumeRsp(pMsg->pData, &rsp); + int32_t colNum = rsp.schemas->nCols; + for (int32_t i = 0; i < colNum; i++) { + printf("| %s |", rsp.schemas->pSchema[i].name); + } + printf("\n"); + int32_t sz = taosArrayGetSize(rsp.pBlockData); + for (int32_t i = 0; i < sz; i++) { + SSDataBlock* pDataBlock = taosArrayGet(rsp.pBlockData, i); + int32_t rows = pDataBlock->info.rows; + for (int32_t j = 0; j < colNum; j++) { + SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, j); + for (int32_t k = 0; k < rows; k++) { + void* var = POINTER_SHIFT(pColInfoData->pData, k * pColInfoData->info.bytes); + if (j == 0) printf(" %ld ", *(int64_t*)var); + if (j == 1) printf(" %d ", *(int32_t*)var); + } + } + /*pDataBlock->*/ + } return 0; } @@ -721,9 +740,9 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { pRequest->body.requestMsg = (SDataBuf){ .pData = pReq, .len = sizeof(SMqConsumeReq) }; SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); - /*sendInfo->requestObjRefId = 0;*/ + sendInfo->requestObjRefId = 0; /*sendInfo->param = &tmq_message;*/ - /*sendInfo->fp = tmq_poll_cb_inner;*/ + sendInfo->fp = tmq_poll_cb_inner; int64_t transporterId = 0; asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); @@ -776,7 +795,6 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { if (qIsDdlQuery(pQueryNode)) { CHECK_CODE_GOTO(execDdlQuery(pRequest, pQueryNode), _return); } else { - CHECK_CODE_GOTO(getPlan(pRequest, pQueryNode, &pRequest->body.pDag, pNodeList), _return); CHECK_CODE_GOTO(scheduleQuery(pRequest, pRequest->body.pDag, pNodeList), _return); pRequest->code = terrno; diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index ba2a21d7eaf420a32c9bfa69e49345e88230d260..61b704355bb18e9c82b790e9b78fb34abf0f00d4 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -275,3 +275,70 @@ int taos_affected_rows(TAOS_RES *res) { } int taos_result_precision(TAOS_RES *res) { return TSDB_TIME_PRECISION_MILLI; } + +int taos_select_db(TAOS *taos, const char *db) { + STscObj *pObj = (STscObj *)taos; + if (pObj == NULL) { + terrno = TSDB_CODE_TSC_DISCONNECTED; + return TSDB_CODE_TSC_DISCONNECTED; + } + + if (db == NULL || strlen(db) == 0) { + terrno = TSDB_CODE_TSC_INVALID_INPUT; + return terrno; + } + + char sql[256] = {0}; + snprintf(sql, tListLen(sql), "use %s", db); + + TAOS_RES* pRequest = taos_query(taos, sql); + int32_t code = taos_errno(pRequest); + + taos_free_result(pRequest); + return code; +} + +void taos_stop_query(TAOS_RES *res) { + if (res == NULL) { + return; + } + + SRequestObj* pRequest = (SRequestObj*) res; + int32_t numOfFields = taos_num_fields(pRequest); + + // It is not a query, no need to stop. + if (numOfFields == 0) { + return; + } + +// scheduleCancelJob(pRequest->body.pQueryJob); +} + +bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) { + return false; +} + +int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { + return 0; +} + +int taos_validate_sql(TAOS *taos, const char *sql) { + return true; +} + +const char *taos_get_server_info(TAOS *taos) { + if (taos == NULL) { + return NULL; + } + + STscObj* pTscObj = (STscObj*) taos; + return pTscObj->ver; +} + +void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) { + // TODO +} + +void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { + // TODO +} \ No newline at end of file diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 42d40b6b81ab9dd18c29aad898fe3233d92e7740..70b1fd91ef887e730b4754eaf7e029eb30363f54 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -67,13 +67,14 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { pTscObj->connId = pConnect->connId; pTscObj->acctId = pConnect->acctId; + tstrncpy(pTscObj->ver, pConnect->sVersion, tListLen(pTscObj->ver)); // update the appInstInfo pTscObj->pAppInfo->clusterId = pConnect->clusterId; atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); - SClientHbKey connKey = {.connId = pConnect->connId, .hbType = HEARTBEAT_TYPE_QUERY}; - hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, NULL); + /*SClientHbKey connKey = {.connId = pConnect->connId, .hbType = HEARTBEAT_TYPE_QUERY};*/ + /*hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, NULL);*/ // pRequest->body.resInfo.pRspMsg = pMsg->pData; tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId, diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 12e4532b09bd74347b0f62a042df901865047dce..3d57a5de1d02d3671fbb4fdbbefa29b0770ace88 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -237,27 +237,27 @@ TEST(testCase, use_db_test) { taos_close(pConn); } - TEST(testCase, drop_db_test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - showDB(pConn); - - TAOS_RES* pRes = taos_query(pConn, "drop database abc1"); - if (taos_errno(pRes) != 0) { - printf("failed to drop db, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - - showDB(pConn); - - pRes = taos_query(pConn, "create database abc1"); - if (taos_errno(pRes) != 0) { - printf("create to drop db, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - taos_close(pConn); -} +// TEST(testCase, drop_db_test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// showDB(pConn); +// +// TAOS_RES* pRes = taos_query(pConn, "drop database abc1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to drop db, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); +// +// showDB(pConn); +// +// pRes = taos_query(pConn, "create database abc1"); +// if (taos_errno(pRes) != 0) { +// printf("create to drop db, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); +// taos_close(pConn); +//} TEST(testCase, create_stable_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -503,30 +503,30 @@ TEST(testCase, show_table_Test) { taos_close(pConn); } -TEST(testCase, drop_stable_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != nullptr); - - TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1"); - if (taos_errno(pRes) != 0) { - printf("error in creating db, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("error in using db, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "drop stable st1"); - if (taos_errno(pRes) != 0) { - printf("failed to drop stable, reason:%s\n", taos_errstr(pRes)); - } - - taos_free_result(pRes); - taos_close(pConn); -} +//TEST(testCase, drop_stable_Test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != nullptr); +// +// TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1"); +// if (taos_errno(pRes) != 0) { +// printf("error in creating db, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "use abc1"); +// if (taos_errno(pRes) != 0) { +// printf("error in using db, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "drop stable st1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to drop stable, reason:%s\n", taos_errstr(pRes)); +// } +// +// taos_free_result(pRes); +// taos_close(pConn); +//} TEST(testCase, generated_request_id_test) { SHashObj* phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); @@ -601,7 +601,7 @@ TEST(testCase, tmq_subscribe_Test) { tmq_conf_t* conf = tmq_conf_new(); tmq_conf_set(conf, "group.id", "tg1"); tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0); - + tmq_list_t* topic_list = tmq_list_new(); tmq_list_append(topic_list, "test_topic_1"); tmq_subscribe(tmq, topic_list); @@ -712,7 +712,7 @@ TEST(testCase, agg_query_tables) { } taos_free_result(pRes); - pRes = taos_query(pConn, "select count(*) from tu"); + pRes = taos_query(pConn, "select count(*) from t_x_19"); if (taos_errno(pRes) != 0) { printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); taos_free_result(pRes); diff --git a/source/common/src/tep.c b/source/common/src/tep.c index cf38ab8dd915e21c7458f88767c15c47762e5eb4..45587a88567bf4b5416623e872e0d79120efef67 100644 --- a/source/common/src/tep.c +++ b/source/common/src/tep.c @@ -1,4 +1,5 @@ #include "tep.h" +#include "common.h" #include "tglobal.h" #include "tlockfree.h" @@ -59,3 +60,99 @@ SEpSet getEpSet_s(SCorEpSet *pEpSet) { return ep; } +bool colDataIsNull(const SColumnInfoData* pColumnInfoData, uint32_t totalRows, uint32_t row, SColumnDataAgg* pColAgg) { + if (pColAgg != NULL) { + if (pColAgg->numOfNull == totalRows) { + ASSERT(pColumnInfoData->nullbitmap == NULL); + return true; + } else if (pColAgg->numOfNull == 0) { + ASSERT(pColumnInfoData->nullbitmap == NULL); + return false; + } + } + + if (pColumnInfoData->nullbitmap == NULL) { + return false; + } + + uint8_t v = (pColumnInfoData->nullbitmap[row>>3] & (1<<(8 - (row&0x07)))); + return (v == 1); +} + +bool colDataIsNull_f(const char* bitmap, uint32_t row) { + return (bitmap[row>>3] & (1<<(8 - (row&0x07)))); +} + +void colDataSetNull_f(char* bitmap, uint32_t row) { // TODO + return; +} + +void* colDataGet(const SColumnInfoData* pColumnInfoData, uint32_t row) { + if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { + uint32_t offset = ((uint32_t*)pColumnInfoData->pData)[row]; + return (char*)(pColumnInfoData->pData) + offset; // the first part is the pointer to the true binary data + } else { + return (char*)(pColumnInfoData->pData) + (row * pColumnInfoData->info.bytes); + } +} + +int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull) { + ASSERT(pColumnInfoData != NULL); + + if (isNull) { + // TODO set null value in the nullbitmap + return 0; + } + + int32_t type = pColumnInfoData->info.type; + if (IS_VAR_DATA_TYPE(type)) { + // TODO continue append var_type + } else { + char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow; + switch(type) { + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_UTINYINT: {*(int8_t*) p = *(int8_t*) pData;break;} + default: + assert(0); + } + + } + + return 0; +} + +size_t colDataGetCols(const SSDataBlock* pBlock) { + ASSERT(pBlock); + + size_t constantCols = (pBlock->pConstantList != NULL)? taosArrayGetSize(pBlock->pConstantList):0; + ASSERT( pBlock->info.numOfCols == taosArrayGetSize(pBlock->pDataBlock) + constantCols); + return pBlock->info.numOfCols; +} + +size_t colDataGetRows(const SSDataBlock* pBlock) { + return pBlock->info.rows; +} + +int32_t colDataUpdateTsWindow(SSDataBlock* pDataBlock) { + if (pDataBlock == NULL || pDataBlock->info.rows <= 0) { + return 0; + } + + if (pDataBlock->info.numOfCols <= 0) { + return -1; + } + + SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, 0); + if (pColInfoData->info.type != TSDB_DATA_TYPE_TIMESTAMP) { + return 0; + } + + ASSERT(pColInfoData->nullbitmap == NULL); + pDataBlock->info.window.skey = *(TSKEY*) colDataGet(pColInfoData, 0); + pDataBlock->info.window.ekey = *(TSKEY*) colDataGet(pColInfoData, (pDataBlock->info.rows - 1)); + return 0; +} + + + + diff --git a/source/dnode/mgmt/impl/src/dndMgmt.c b/source/dnode/mgmt/impl/src/dndMgmt.c index df886109115d22343c87f4895645a11687b15f3b..0674d719b9cb241bac8eea607f111f34832fd983 100644 --- a/source/dnode/mgmt/impl/src/dndMgmt.c +++ b/source/dnode/mgmt/impl/src/dndMgmt.c @@ -398,7 +398,7 @@ void dndSendStatusReq(SDnode *pDnode) { static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; if (pMgmt->dnodeId == 0) { - dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId); + dInfo("set dnodeId:%d clusterId:0x%" PRId64, pCfg->dnodeId, pCfg->clusterId); taosWLockLatch(&pMgmt->latch); pMgmt->dnodeId = pCfg->dnodeId; pMgmt->clusterId = pCfg->clusterId; diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index fbb4e5cef7e1c669c736ebf9e5437a7cefc1d178..a969c0162fc23dbf29ced4ebe2062cb524306d00 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -237,13 +237,11 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) { if (pIter == NULL) break; if (pObj->pDnode == NULL) break; - pEpSet->eps[pEpSet->numOfEps].port = htons(pObj->pDnode->port); - memcpy(pEpSet->eps[pEpSet->numOfEps].fqdn, pObj->pDnode->fqdn, TSDB_FQDN_LEN); if (pObj->role == TAOS_SYNC_STATE_LEADER) { pEpSet->inUse = pEpSet->numOfEps; } - pEpSet->numOfEps++; + addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, htons(pObj->pDnode->port)); sdbRelease(pSdb, pObj); } } diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 76fabc96ceac4a8b89cebc7036d651914bfa06a0..3e2b9314134d2705a6baa1d170e8f11898f9540b 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -14,14 +14,15 @@ */ #define _DEFAULT_SOURCE +#include "tglobal.h" #include "mndProfile.h" -#include "mndConsumer.h" +//#include "mndConsumer.h" #include "mndDb.h" #include "mndMnode.h" #include "mndShow.h" -#include "mndTopic.h" +//#include "mndTopic.h" #include "mndUser.h" -#include "mndVgroup.h" +//#include "mndVgroup.h" #define QUERY_ID_SIZE 20 #define QUERY_OBJ_ID_SIZE 18 @@ -230,10 +231,12 @@ static int32_t mndProcessConnectReq(SMnodeMsg *pReq) { goto CONN_OVER; } - pRsp->acctId = htonl(pUser->acctId); + pRsp->acctId = htonl(pUser->acctId); pRsp->superUser = pUser->superUser; pRsp->clusterId = htobe64(pMnode->clusterId); - pRsp->connId = htonl(pConn->id); + pRsp->connId = htonl(pConn->id); + + snprintf(pRsp->sVersion, tListLen(pRsp->sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo, gitinfo); mndGetMnodeEpSet(pMnode, &pRsp->epSet); pReq->contLen = sizeof(SConnectRsp); diff --git a/source/dnode/vnode/inc/meta.h b/source/dnode/vnode/inc/meta.h index 383073871e6eb46e75d1a0dfe48c09fc50e14cad..44a352ec54966af9a1cb13c767cc8a23988395c6 100644 --- a/source/dnode/vnode/inc/meta.h +++ b/source/dnode/vnode/inc/meta.h @@ -37,11 +37,6 @@ typedef struct SMetaCfg { uint64_t lruSize; } SMetaCfg; -typedef struct { - uint32_t nCols; - SSchema *pSchema; -} SSchemaWrapper; - typedef struct SMTbCursor SMTbCursor; typedef struct SMCtbCursor SMCtbCursor; diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index 3a1e5b9c95737304d3460b0d173a8d6a6123fdf6..faaf769e1ab16931eb4944b51ea7e791a26a226b 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -149,11 +149,12 @@ typedef struct STqGroup { } STqGroup; typedef struct STqTaskItem { - int8_t status; - int64_t offset; - void* dst; - qTaskInfo_t task; - SSubQueryMsg* pQueryMsg; + int8_t status; + int64_t offset; + void* dst; + qTaskInfo_t task; + STqReadHandle* pReadHandle; + SSubQueryMsg* pQueryMsg; } STqTaskItem; // new version diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index b56c5b30fa4fc3ce8584bdb2273a134f177420c2..c8b47bf4a6b8635d2c494ea4aa8fd6622f5e264d 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -69,14 +69,17 @@ typedef struct { } SVnodeOpt; typedef struct STqReadHandle { - int64_t ver; - uint64_t tbUid; - SSubmitMsg* pMsg; - SSubmitBlk* pBlock; - SSubmitMsgIter msgIter; - SSubmitBlkIter blkIter; - SMeta* pMeta; - SArray* pColIdList; + int64_t ver; + uint64_t tbUid; + SSubmitMsg* pMsg; + SSubmitBlk* pBlock; + SSubmitMsgIter msgIter; + SSubmitBlkIter blkIter; + SMeta* pVnodeMeta; + SArray* pColIdList; //SArray + int32_t sver; + SSchemaWrapper* pSchemaWrapper; + STSchema* pSchema; } STqReadHandle; /* ------------------------ SVnode ------------------------ */ diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index dfdc144750ab568b62d82d8780982d2f9a5c32d5..e5ccd02e484349aca0f09826a928f6e9dcd59143 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -20,6 +20,10 @@ #include "tcoding.h" #include "thash.h" +#define IMPL_WITH_LOCK 1 +// #if IMPL_WITH_LOCK +// #endif + typedef struct { tb_uid_t uid; int32_t sver; @@ -27,6 +31,9 @@ typedef struct { } SSchemaKey; struct SMetaDB { +#if IMPL_WITH_LOCK + pthread_rwlock_t rwlock; +#endif // DB DB *pTbDB; DB *pSchemaDB; @@ -58,6 +65,9 @@ static void * metaDecodeTbInfo(void *buf, STbCfg *pTbCfg); static void metaClearTbCfg(STbCfg *pTbCfg); static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW); static void * metaDecodeSchema(void *buf, SSchemaWrapper *pSW); +static void metaDBWLock(SMetaDB *pDB); +static void metaDBRLock(SMetaDB *pDB); +static void metaDBULock(SMetaDB *pDB); #define BDB_PERR(info, code) fprintf(stderr, info " reason: %s", db_strerror(code)) @@ -130,8 +140,10 @@ void metaCloseDB(SMeta *pMeta) { int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) { tb_uid_t uid; char buf[512]; + char buf1[512]; void * pBuf; - DBT key, value; + DBT key1, value1; + DBT key2, value2; SSchema *pSchema = NULL; if (pTbCfg->type == META_SUPER_TABLE) { @@ -143,19 +155,17 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) { { // save table info pBuf = buf; - memset(&key, 0, sizeof(key)); - memset(&value, 0, sizeof(key)); + memset(&key1, 0, sizeof(key1)); + memset(&value1, 0, sizeof(key1)); - key.data = &uid; - key.size = sizeof(uid); + key1.data = &uid; + key1.size = sizeof(uid); metaEncodeTbInfo(&pBuf, pTbCfg); - value.data = buf; - value.size = POINTER_DISTANCE(pBuf, buf); - value.app_data = pTbCfg; - - pMeta->pDB->pTbDB->put(pMeta->pDB->pTbDB, NULL, &key, &value, 0); + value1.data = buf; + value1.size = POINTER_DISTANCE(pBuf, buf); + value1.app_data = pTbCfg; } // save schema @@ -169,22 +179,27 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) { } if (pSchema) { - pBuf = buf; - memset(&key, 0, sizeof(key)); - memset(&value, 0, sizeof(key)); + pBuf = buf1; + memset(&key2, 0, sizeof(key2)); + memset(&value2, 0, sizeof(key2)); SSchemaKey schemaKey = {uid, 0 /*TODO*/, 0}; - key.data = &schemaKey; - key.size = sizeof(schemaKey); + key2.data = &schemaKey; + key2.size = sizeof(schemaKey); SSchemaWrapper sw = {.nCols = ncols, .pSchema = pSchema}; metaEncodeSchema(&pBuf, &sw); - value.data = buf; - value.size = POINTER_DISTANCE(pBuf, buf); + value2.data = buf1; + value2.size = POINTER_DISTANCE(pBuf, buf1); + } - pMeta->pDB->pSchemaDB->put(pMeta->pDB->pSchemaDB, NULL, &key, &value, 0); + metaDBWLock(pMeta->pDB); + pMeta->pDB->pTbDB->put(pMeta->pDB->pTbDB, NULL, &key1, &value1, 0); + if (pSchema) { + pMeta->pDB->pSchemaDB->put(pMeta->pDB->pSchemaDB, NULL, &key2, &value2, 0); } + metaDBULock(pMeta->pDB); return 0; } @@ -234,11 +249,18 @@ static SMetaDB *metaNewDB() { return NULL; } +#if IMPL_WITH_LOCK + pthread_rwlock_init(&pDB->rwlock, NULL); +#endif + return pDB; } static void metaFreeDB(SMetaDB *pDB) { if (pDB) { +#if IMPL_WITH_LOCK + pthread_rwlock_destroy(&pDB->rwlock); +#endif free(pDB); } } @@ -467,7 +489,9 @@ STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid) { key.size = sizeof(uid); // Query + metaDBRLock(pDB); ret = pDB->pTbDB->get(pDB->pTbDB, NULL, &key, &value, 0); + metaDBULock(pDB); if (ret != 0) { return NULL; } @@ -496,7 +520,9 @@ STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) { key.size = strlen(tbname); // Query + metaDBRLock(pDB); ret = pDB->pNameIdx->pget(pDB->pNameIdx, NULL, &key, &pkey, &pvalue, 0); + metaDBULock(pDB); if (ret != 0) { return NULL; } @@ -529,7 +555,9 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo key.size = sizeof(schemaKey); // Query + metaDBRLock(pDB); ret = pDB->pSchemaDB->get(pDB->pSchemaDB, NULL, &key, &value, 0); + metaDBULock(pDB); if (ret != 0) { printf("failed to query schema DB since %s================\n", db_strerror(ret)); return NULL; @@ -687,4 +715,22 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) { } else { return 0; } -} \ No newline at end of file +} + +static void metaDBWLock(SMetaDB *pDB) { +#if IMPL_WITH_LOCK + pthread_rwlock_wrlock(&(pDB->rwlock)); +#endif +} + +static void metaDBRLock(SMetaDB *pDB) { +#if IMPL_WITH_LOCK + pthread_rwlock_rdlock(&(pDB->rwlock)); +#endif +} + +static void metaDBULock(SMetaDB *pDB) { +#if IMPL_WITH_LOCK + pthread_rwlock_unlock(&(pDB->rwlock)); +#endif +} diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e953d595272aac25187fb873c0860a37c0d67d2a..3195691a13ad4295a412f3ebb6ca40267419e1e9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -13,9 +13,9 @@ * along with this program. If not, see . */ +#include "tcompare.h" #include "tqInt.h" #include "tqMetaStore.h" -#include "tcompare.h" // static // read next version data @@ -484,7 +484,8 @@ int tqConsume(STQ* pTq, STqConsumeReq* pMsg) { int tqSerializeConsumer(const STqConsumerHandle* pConsumer, STqSerializedHead** ppHead) { int32_t num = taosArrayGetSize(pConsumer->topics); - int32_t sz = sizeof(STqSerializedHead) + sizeof(int64_t) * 2 + TSDB_TOPIC_FNAME_LEN + num * (sizeof(int64_t) + TSDB_TOPIC_FNAME_LEN); + int32_t sz = sizeof(STqSerializedHead) + sizeof(int64_t) * 2 + TSDB_TOPIC_FNAME_LEN + + num * (sizeof(int64_t) + TSDB_TOPIC_FNAME_LEN); if (sz > (*ppHead)->ssize) { void* tmpPtr = realloc(*ppHead, sz); if (tmpPtr == NULL) { @@ -511,13 +512,13 @@ int tqSerializeConsumer(const STqConsumerHandle* pConsumer, STqSerializedHead** *(int64_t*)ptr = pTopic->committedOffset; POINTER_SHIFT(ptr, sizeof(int64_t)); } - + return 0; } const void* tqDeserializeConsumer(const STqSerializedHead* pHead, STqConsumerHandle** ppConsumer) { STqConsumerHandle* pConsumer = *ppConsumer; - const void* ptr = pHead->content; + const void* ptr = pHead->content; pConsumer->consumerId = *(int64_t*)ptr; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); pConsumer->epoch = *(int64_t*)ptr; @@ -668,32 +669,33 @@ int tqItemSSize() { #endif int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { - SMqConsumeReq* pReq = pMsg->pCont; - SRpcMsg rpcMsg; - int64_t reqId = pReq->reqId; - int64_t consumerId = pReq->consumerId; - int64_t reqOffset = pReq->offset; - int64_t fetchOffset = reqOffset; - int64_t blockingTime = pReq->blockingTime; + SMqConsumeReq* pReq = pMsg->pCont; + SRpcMsg rpcMsg; + int64_t reqId = pReq->reqId; + int64_t consumerId = pReq->consumerId; + int64_t reqOffset = pReq->offset; + int64_t fetchOffset = reqOffset; + int64_t blockingTime = pReq->blockingTime; - int rspLen = 0; + int rspLen = 0; + SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 1, .pBlockData = NULL}; STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); ASSERT(pConsumer); - int sz = taosArrayGetSize(pConsumer->topics); + int sz = taosArrayGetSize(pConsumer->topics); for (int i = 0; i < sz; i++) { STqTopicHandle* pTopic = taosArrayGet(pConsumer->topics, i); - //TODO: support multiple topic in one req + // TODO: support multiple topic in one req if (strcmp(pTopic->topicName, pReq->topic) != 0) { continue; } - if (fetchOffset == -1) { - fetchOffset = pTopic->committedOffset + 1; - } - int8_t pos; - int8_t skip = 0; + if (fetchOffset == -1) { + fetchOffset = pTopic->committedOffset + 1; + } + int8_t pos; + int8_t skip = 0; SWalHead* pHead; while (1) { pos = fetchOffset % TQ_BUFFER_SIZE; @@ -727,7 +729,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { qSetStreamInput(task, pCont); - //SArray + // SArray SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); while (1) { SSDataBlock* pDataBlock; @@ -741,6 +743,8 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { break; } } + //TODO copy + rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper; atomic_store_8(&pTopic->buffer.output[pos].status, 0); @@ -750,6 +754,9 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { continue; } + rsp.pBlockData = pRes; + +#if 0 pTopic->buffer.output[pos].dst = pRes; if (pTopic->buffer.firstOffset == -1 || pReq->offset < pTopic->buffer.firstOffset) { pTopic->buffer.firstOffset = pReq->offset; @@ -757,13 +764,20 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { if (pTopic->buffer.lastOffset == -1 || pReq->offset > pTopic->buffer.lastOffset) { pTopic->buffer.lastOffset = pReq->offset; } +#endif } - // put output into rsp - SMqConsumeRsp rsp = { - .consumerId = consumerId, - .numOfTopics = 1 - }; - + int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp); + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + pMsg->code = -1; + return -1; + } + void* abuf = buf; + tEncodeSMqConsumeRsp(&abuf, &rsp); + pMsg->pCont = buf; + pMsg->contLen = tlen; + pMsg->code = 0; + rpcSendResponse(pMsg); return 0; } @@ -799,6 +813,7 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { for (int i = 0; i < TQ_BUFFER_SIZE; i++) { pTopic->buffer.output[i].status = 0; STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta); + pTopic->buffer.output[i].pReadHandle = pReadHandle; pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, pReadHandle); } taosArrayPush(pConsumer->topics, pTopic); @@ -813,10 +828,13 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { if (pReadHandle == NULL) { return NULL; } - pReadHandle->pMeta = pMeta; + pReadHandle->pVnodeMeta = pMeta; pReadHandle->pMsg = NULL; pReadHandle->ver = -1; pReadHandle->pColIdList = NULL; + pReadHandle->sver = -1; + pReadHandle->pSchema = NULL; + pReadHandle->pSchemaWrapper = NULL; return pReadHandle; } @@ -837,13 +855,13 @@ bool tqNextDataBlock(STqReadHandle* pHandle) { if (pHandle->pBlock == NULL) return false; pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid); - if (pHandle->tbUid == pHandle->pBlock->uid){ + if (pHandle->tbUid == pHandle->pBlock->uid) { pHandle->pBlock->tid = htonl(pHandle->pBlock->tid); pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion); pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen); pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen); pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows); - return true; + return true; } } return false; @@ -859,41 +877,71 @@ int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) } SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { - int32_t sversion = pHandle->pBlock->sversion; - //TODO : change sversion - STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, 0); - - tb_uid_t quid; - STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->pMeta, pHandle->pBlock->uid); - if (pTbCfg->type == META_CHILD_TABLE) { - quid = pTbCfg->ctbCfg.suid; - } else { - quid = pHandle->pBlock->uid; + /*int32_t sversion = pHandle->pBlock->sversion;*/ + // TODO set to real sversion + int32_t sversion = 0; + if (pHandle->sver != sversion) { + pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->pBlock->uid, sversion); + + tb_uid_t quid; + STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->pVnodeMeta, pHandle->pBlock->uid); + if (pTbCfg->type == META_CHILD_TABLE) { + quid = pTbCfg->ctbCfg.suid; + } else { + quid = pHandle->pBlock->uid; + } + pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, quid, sversion, true); + pHandle->sver = sversion; } - SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, quid, 0, true); - SArray* pArray = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData)); + STSchema* pTschema = pHandle->pSchema; + SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper; + + int32_t numOfRows = pHandle->pBlock->numOfRows; + int32_t numOfCols = pHandle->pSchema->numOfCols; + int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList); + + SArray* pArray = taosArrayInit(colNumNeed, sizeof(SColumnInfoData)); if (pArray == NULL) { return NULL; } - SColumnInfoData colInfo; - int sz = pSchemaWrapper->nCols * pSchemaWrapper->pSchema->bytes; - colInfo.pData = malloc(sz); - if (colInfo.pData == NULL) { - return NULL; + + int j = 0; + for (int32_t i = 0; i < colNumNeed; i++) { + int32_t colId = *(int32_t*)taosArrayGet(pHandle->pColIdList, i); + while (j < pSchemaWrapper->nCols && pSchemaWrapper->pSchema[j].colId < colId) { + j++; + } + SSchema* pColSchema = &pSchemaWrapper->pSchema[j]; + ASSERT(pColSchema->colId == colId); + SColumnInfoData colInfo = {0}; + int sz = numOfRows * pColSchema->bytes; + colInfo.info.bytes = pColSchema->bytes; + colInfo.info.colId = colId; + colInfo.info.type = pColSchema->type; + + colInfo.pData = calloc(1, sz); + if (colInfo.pData == NULL) { + // TODO free + taosArrayDestroy(pArray); + return NULL; + } + taosArrayPush(pArray, &colInfo); } SMemRow row; - int32_t kvIdx; + int32_t kvIdx = 0; + tInitSubmitBlkIter(pHandle->pBlock, &pHandle->blkIter); while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) { - for (int i = 0; i < pTschema->numOfCols && kvIdx < pTschema->numOfCols; i++) { - // TODO: filter out unused column - STColumn* pCol = schemaColAt(pTschema, i); + // get all wanted col of that block + for (int32_t i = 0; i < colNumNeed; i++) { + SColumnInfoData* pColData = taosArrayGet(pArray, i); + STColumn* pCol = schemaColAt(pTschema, i); + // TODO + ASSERT(pCol->colId == pColData->info.colId); void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx); - // TODO: handle varlen - memcpy(POINTER_SHIFT(colInfo.pData, pCol->offset), val, pCol->bytes); + memcpy(pColData->pData, val, pCol->bytes); } } - taosArrayPush(pArray, &colInfo); return pArray; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index ce3738b17dcf77ca2138ead62954748ffcf60192..6efd7ef87e0ef1d8d67528c8a451b0d142b3478e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5070,6 +5070,7 @@ static SSDataBlock* doStreamBlockScan(void* param, bool* newgroup) { SStreamBlockScanInfo* pInfo = pOperator->info; SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; + pBlockInfo->rows = 0; while (tqNextDataBlock(pInfo->readerHandle)) { pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo); if (pTaskInfo->code != TSDB_CODE_SUCCESS) { @@ -5896,7 +5897,7 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) { finalizeQueryResult(pOperator, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset); pInfo->pRes->info.rows = getNumOfResult(pInfo->pCtx, pOperator->numOfOutput); - return pInfo->pRes; + return (pInfo->pRes->info.rows != 0)? pInfo->pRes:NULL; } static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) { @@ -8825,14 +8826,14 @@ void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols) { } void doDestroyTask(SExecTaskInfo *pTaskInfo) { + qDebug("%s execTask is freed", GET_TASKID(pTaskInfo)); + doDestroyTableQueryInfo(&pTaskInfo->tableqinfoGroupInfo); // taosArrayDestroy(pTaskInfo->summary.queryProfEvents); // taosHashCleanup(pTaskInfo->summary.operatorProfResults); tfree(pTaskInfo->sql); tfree(pTaskInfo->id.str); - qDebug("%s execTask is freed", GET_TASKID(pTaskInfo)); - tfree(pTaskInfo); } diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c index 8d58b262aabdbc67a8ef2224cbeead38394c0524..5d96ab47c39102ab374de28244129968d89dd6db 100644 --- a/source/libs/planner/src/physicalPlanJson.c +++ b/source/libs/planner/src/physicalPlanJson.c @@ -1125,8 +1125,6 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) { *str = cJSON_Print(json); cJSON_Delete(json); -// printf("====Physical plan:====\n"); -// printf("%s\n", *str); *len = strlen(*str) + 1; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index d546925c5f20f4d0cda1850364278d9b6ac913e7..e6b7eaca7f36eaed5b05e2d781fc659f5ad6cbbf 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -40,7 +40,8 @@ void qDestroyQueryDag(struct SQueryDag* pDag) { tfree(pDag); } -int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, SSchema** pResSchema, int32_t* numOfCols, SArray* pNodeList, uint64_t requestId) { +int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, SSchema** pResSchema, int32_t* numOfCols, SArray* pNodeList, + uint64_t requestId) { SQueryPlanNode* pLogicPlan; int32_t code = createQueryPlan(pNode, &pLogicPlan); if (TSDB_CODE_SUCCESS != code) { @@ -49,9 +50,10 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, } if (pLogicPlan->info.type != QNODE_MODIFY) { -// char* str = NULL; -// queryPlanToString(pLogicPlan, &str); -// printf("%s\n", str); + char* str = NULL; + queryPlanToString(pLogicPlan, &str); + qDebug("reqId:0x%"PRIx64": %s", requestId, str); + tfree(str); } code = optimizeQueryPlan(pLogicPlan); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 8fc8a783c4f72c6a15bb8c14b49cd061ef970743..9b9b6a73ca1d3c246b1c0ea77de7d3454cf7adcc 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -1100,8 +1100,6 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { atomic_store_8(&ctx->queryInQueue, 0); atomic_store_8(&ctx->queryContinue, 0); - DataSinkHandle sinkHandle = ctx->sinkHandle; - QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryEnd)); if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 92a95edf98abdc8d029bf8649d5578e898e943c6..621ed68bae04c2a281fa1b69a177c541a68d2bf5 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -1194,17 +1194,18 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { code = atomic_load_32(&pJob->errCode); SCH_ERR_RET(code); - SCH_RET(TSDB_CODE_SCH_STATUS_ERROR); } SSubplan *plan = pTask->plan; - if (NULL == pTask->msg) { + if (NULL == pTask->msg) { // TODO add more detailed reason for failure code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen); if (TSDB_CODE_SUCCESS != code || NULL == pTask->msg || pTask->msgLen <= 0) { - SCH_TASK_ELOG("subplanToString error, code:%x, msg:%p, len:%d", code, pTask->msg, pTask->msgLen); + SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg, pTask->msgLen); SCH_ERR_JRET(code); + } else { + SCH_TASK_DLOG(" ===physical plan=== len:%d, %s", pTask->msgLen, pTask->msg); } } @@ -1218,13 +1219,10 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { } SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType)); - return TSDB_CODE_SUCCESS; _return: - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, code)); - SCH_RET(code); } diff --git a/source/libs/transport/CMakeLists.txt b/source/libs/transport/CMakeLists.txt index 61d781210c7eb0239ce447a757a9a25c54a6775d..5c214b75a11fd9013f923bf67e7ecedb80d6afaa 100644 --- a/source/libs/transport/CMakeLists.txt +++ b/source/libs/transport/CMakeLists.txt @@ -13,7 +13,7 @@ target_link_libraries( PUBLIC util PUBLIC common ) -if (${BUILD_WITH_UV}) +if (${BUILD_WITH_UV_TRANS}) target_include_directories( transport PUBLIC "${CMAKE_SOURCE_DIR}/contrib/libuv/include" @@ -25,7 +25,7 @@ if (${BUILD_WITH_UV}) PUBLIC uv_a ) add_definitions(-DUSE_UV) -endif(${BUILD_WITH_UV}) +endif(${BUILD_WITH_UV_TRANS}) if (${BUILD_TEST}) add_subdirectory(test) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index d64df9b0f3aea486560479f55a02274d3ff47162..069ebaeb8acb427a0a70366fd7a349504d9da767 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -45,13 +45,13 @@ typedef struct SCliThrdObj { pthread_t thread; uv_loop_t* loop; uv_async_t* cliAsync; // - uv_timer_t* pTimer; + uv_timer_t* timer; void* pool; // conn pool queue msg; pthread_mutex_t msgMtx; uint64_t nextTimeout; // next timeout void* pTransInst; // - + bool quit; } SCliThrdObj; typedef struct SClientObj { @@ -94,6 +94,8 @@ static void clientHandleResp(SCliConn* conn); static void clientHandleExcept(SCliConn* conn); // handle req from app static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); +static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd); +static void clientSendQuit(SCliThrdObj* thrd); static void destroyUserdata(SRpcMsg* userdata); @@ -136,8 +138,8 @@ static void clientHandleResp(SCliConn* conn) { destroyCmsg(pMsg); conn->data = NULL; // start thread's timer of conn pool if not active - if (!uv_is_active((uv_handle_t*)pThrd->pTimer) && pRpc->idleTime > 0) { - uv_timer_start((uv_timer_t*)pThrd->pTimer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); + if (!uv_is_active((uv_handle_t*)pThrd->timer) && pRpc->idleTime > 0) { + uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } } static void clientHandleExcept(SCliConn* pConn) { @@ -155,7 +157,7 @@ static void clientHandleExcept(SCliConn* pConn) { SRpcMsg rpcMsg = {0}; rpcMsg.ahandle = pCtx->ahandle; - rpcMsg.code = -1; + rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; // SRpcInfo* pRpc = pMsg->ctx->pRpc; (pCtx->pTransInst->cfp)(NULL, &rpcMsg, NULL); pConn->notifyCount += 1; @@ -332,9 +334,8 @@ static void clientWriteCb(uv_write_t* req, int status) { tDebug("conn %p data already was written out", pConn); SCliMsg* pMsg = pConn->data; if (pMsg == NULL) { - destroy - // handle - return; + // handle + return; } destroyUserdata(&pMsg->msg); } else { @@ -375,6 +376,15 @@ static void clientConnCb(uv_connect_t* req, int status) { clientWrite(pConn); } +static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { + tDebug("thread %p start to quit", pThrd); + destroyCmsg(pMsg); + uv_close((uv_handle_t*)pThrd->cliAsync, NULL); + uv_timer_stop(pThrd->timer); + pThrd->quit = true; + // uv__async_stop(pThrd->cliAsync); + uv_stop(pThrd->loop); +} static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { uint64_t et = taosGetTimestampUs(); uint64_t el = et - pMsg->st; @@ -389,7 +399,13 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { conn->data = pMsg; conn->writeReq->data = conn; transDestroyBuffer(&conn->readBuf); + + if (pThrd->quit) { + clientHandleExcept(conn); + return; + } clientWrite(conn); + } else { SCliConn* conn = calloc(1, sizeof(SCliConn)); conn->ref++; @@ -430,7 +446,12 @@ static void clientAsyncCb(uv_async_t* handle) { QUEUE_REMOVE(h); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); - clientHandleReq(pMsg, pThrd); + if (pMsg->ctx == NULL) { + clientHandleQuit(pMsg, pThrd); + } else { + clientHandleReq(pMsg, pThrd); + } + // clientHandleReq(pMsg, pThrd); count++; } if (count >= 2) { @@ -458,7 +479,7 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd)); if (err == 0) { - tDebug("sucess to create tranport-client thread %d", i); + tDebug("success to create tranport-client thread %d", i); } cli->pThreadObj[i] = pThrd; } @@ -492,20 +513,24 @@ static SCliThrdObj* createThrdObj() { uv_async_init(pThrd->loop, pThrd->cliAsync, clientAsyncCb); pThrd->cliAsync->data = pThrd; - pThrd->pTimer = malloc(sizeof(uv_timer_t)); - uv_timer_init(pThrd->loop, pThrd->pTimer); - pThrd->pTimer->data = pThrd; + pThrd->timer = malloc(sizeof(uv_timer_t)); + uv_timer_init(pThrd->loop, pThrd->timer); + pThrd->timer->data = pThrd; pThrd->pool = creatConnPool(1); + + pThrd->quit = false; return pThrd; } static void destroyThrdObj(SCliThrdObj* pThrd) { if (pThrd == NULL) { return; } + uv_stop(pThrd->loop); pthread_join(pThrd->thread, NULL); pthread_mutex_destroy(&pThrd->msgMtx); free(pThrd->cliAsync); + free(pThrd->timer); free(pThrd->loop); free(pThrd); } @@ -517,10 +542,22 @@ static void transDestroyConnCtx(STransConnCtx* ctx) { free(ctx); } // +static void clientSendQuit(SCliThrdObj* thrd) { + // cli can stop gracefully + SCliMsg* msg = calloc(1, sizeof(SCliMsg)); + msg->ctx = NULL; // + + pthread_mutex_lock(&thrd->msgMtx); + QUEUE_PUSH(&thrd->msg, &msg->q); + pthread_mutex_unlock(&thrd->msgMtx); + + uv_async_send(thrd->cliAsync); +} void taosCloseClient(void* arg) { // impl later SClientObj* cli = arg; for (int i = 0; i < cli->numOfThreads; i++) { + clientSendQuit(cli->pThreadObj[i]); destroyThrdObj(cli->pThreadObj[i]); } free(cli->pThreadObj); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index a5ee1f1c63a9b49d19d21c65d34d1203b624bc3b..475ef32b46c549d3e631a917cd7fad119feef2fb 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -70,6 +70,7 @@ typedef struct SServerObj { uv_pipe_t** pipe; uint32_t ip; uint32_t port; + uv_async_t* pAcceptAsync; // just to quit from from accept thread } SServerObj; static const char* notify = "a"; @@ -88,9 +89,11 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status); static void uvOnAcceptCb(uv_stream_t* stream, int status); static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); static void uvWorkerAsyncCb(uv_async_t* handle); +static void uvAcceptAsyncCb(uv_async_t* handle); static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb); static void uvStartSendResp(SSrvMsg* msg); + static void destroySmsg(SSrvMsg* smsg); // check whether already read complete packet static bool readComplete(SConnBuffer* buf); @@ -389,7 +392,13 @@ void uvWorkerAsyncCb(uv_async_t* handle) { tError("except occurred, continue"); continue; } - uvStartSendResp(msg); + if (msg->pConn == NULL) { + // + free(msg); + uv_stop(pThrd->loop); + } else { + uvStartSendResp(msg); + } // uv_buf_t wb; // uvPrepareSendData(msg, &wb); // uv_timer_stop(conn->pTimer); @@ -397,6 +406,10 @@ void uvWorkerAsyncCb(uv_async_t* handle) { // uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb); } } +static void uvAcceptAsyncCb(uv_async_t* async) { + SServerObj* srv = async->data; + uv_stop(srv->loop); +} void uvOnAcceptCb(uv_stream_t* stream, int status) { if (status == -1) { @@ -517,8 +530,12 @@ static bool addHandleToAcceptloop(void* arg) { return false; } - struct sockaddr_in bind_addr; + // register an async here to quit server gracefully + srv->pAcceptAsync = calloc(1, sizeof(uv_async_t)); + uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb); + srv->pAcceptAsync->data = srv; + struct sockaddr_in bind_addr; uv_ip4_addr("0.0.0.0", srv->port, &bind_addr); if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) { tError("failed to bind: %s", uv_err_name(err)); @@ -647,21 +664,42 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) { return; } pthread_join(pThrd->thread, NULL); - // free(srv->pipe[i]); free(pThrd->loop); - pthread_mutex_destroy(&pThrd->msgMtx); + free(pThrd->workerAsync); free(pThrd); } +void sendQuitToWorkThrd(SWorkThrdObj* pThrd) { + SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); + + pthread_mutex_lock(&pThrd->msgMtx); + QUEUE_PUSH(&pThrd->msg, &srvMsg->q); + pthread_mutex_unlock(&pThrd->msgMtx); + tDebug("send quit msg to work thread"); + + uv_async_send(pThrd->workerAsync); +} + void taosCloseServer(void* arg) { // impl later SServerObj* srv = arg; for (int i = 0; i < srv->numOfThreads; i++) { + sendQuitToWorkThrd(srv->pThreadObj[i]); destroyWorkThrd(srv->pThreadObj[i]); } + + tDebug("send quit msg to accept thread"); + uv_async_send(srv->pAcceptAsync); + pthread_join(srv->thread, NULL); + + free(srv->pThreadObj); + free(srv->pAcceptAsync); free(srv->loop); + + for (int i = 0; i < srv->numOfThreads; i++) { + free(srv->pipe[i]); + } free(srv->pipe); - free(srv->pThreadObj); - pthread_join(srv->thread, NULL); + free(srv); } diff --git a/source/libs/transport/test/CMakeLists.txt b/source/libs/transport/test/CMakeLists.txt index c61f688060b2e0ec85ddaf2b31534e7830e961bf..3d9c396336a0c8a17cc0e43996f276ba7843c5db 100644 --- a/source/libs/transport/test/CMakeLists.txt +++ b/source/libs/transport/test/CMakeLists.txt @@ -1,6 +1,12 @@ add_executable(transportTest "") add_executable(client "") add_executable(server "") +add_executable(transUT "") + +target_sources(transUT + PRIVATE + "transUT.cc" +) target_sources(transportTest PRIVATE @@ -28,6 +34,13 @@ target_link_libraries (transportTest gtest_main transport ) +target_link_libraries (transUT + os + util + common + gtest_main + transport +) target_include_directories(client PUBLIC @@ -48,6 +61,13 @@ target_include_directories(server "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(transUT + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/transport" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) + + target_link_libraries (server os util diff --git a/source/libs/transport/test/transUT.cc b/source/libs/transport/test/transUT.cc new file mode 100644 index 0000000000000000000000000000000000000000..08c683590b56d6c070b28bed5702cac3a5560eec --- /dev/null +++ b/source/libs/transport/test/transUT.cc @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include +#include +#include +#include "trpc.h" +using namespace std; + +class TransObj { + public: + TransObj() { + const char *label = "APP"; + const char *secret = "secret"; + const char *user = "user"; + const char *ckey = "ckey"; + + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localPort = 0; + rpcInit.label = (char *)label; + rpcInit.numOfThreads = 5; + rpcInit.cfp = NULL; + rpcInit.sessions = 100; + rpcInit.idleTime = 100; + rpcInit.user = (char *)user; + rpcInit.secret = (char *)secret; + rpcInit.ckey = (char *)ckey; + rpcInit.spi = 1; + } + bool startCli() { + trans = NULL; + rpcInit.connType = TAOS_CONN_CLIENT; + trans = rpcOpen(&rpcInit); + return trans != NULL ? true : false; + } + bool startSrv() { + trans = NULL; + rpcInit.connType = TAOS_CONN_SERVER; + trans = rpcOpen(&rpcInit); + return trans != NULL ? true : false; + } + bool stop() { + rpcClose(trans); + trans = NULL; + return true; + } + + private: + void * trans; + SRpcInit rpcInit; +}; +class TransEnv : public ::testing::Test { + protected: + virtual void SetUp() { + // set up trans obj + tr = new TransObj(); + } + virtual void TearDown() { + // tear down + delete tr; + } + + TransObj *tr = NULL; +}; +TEST_F(TransEnv, test_start_stop) { + assert(tr->startCli()); + assert(tr->stop()); + + assert(tr->startSrv()); + assert(tr->stop()); +} diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index c283423fbfbaf587def99fc4738f2b4ed81ba344..b785ed27072cf53253db4c3abf691f3f990afff1 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -360,6 +360,7 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { } } else { int num_rows_affacted = taos_affected_rows(pSql); + taos_free_result(pSql); et = taosGetTimestampUs(); printf("Query OK, %d of %d row(s) in database (%.6fs)\n", num_rows_affacted, num_rows_affacted, (et - st) / 1E6); }