diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 0ca294129701a614128741c37931bf035a0a7b91..415d6a57ce4e1d1cdfcba2510d29cc75aa1607af 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -521,30 +521,30 @@ TEST(testCase, show_stable_Test) { // taosHashCleanup(phash); //} // -//TEST(testCase, create_topic_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// if (taos_errno(pRes) != 0) { -// printf("error in use db, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// ASSERT_TRUE(pFields == nullptr); -// -// int32_t numOfFields = taos_num_fields(pRes); -// ASSERT_EQ(numOfFields, 0); -// -// taos_free_result(pRes); -// -// char* sql = "select * from tu"; -// pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql)); -// taos_free_result(pRes); -// taos_close(pConn); -//} -// +TEST(testCase, create_topic_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + ASSERT_TRUE(pFields == nullptr); + + int32_t numOfFields = taos_num_fields(pRes); + ASSERT_EQ(numOfFields, 0); + + taos_free_result(pRes); + + char* sql = "select * from tu"; + pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql)); + taos_free_result(pRes); + taos_close(pConn); +} + //TEST(testCase, insert_test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // ASSERT_NE(pConn, nullptr); diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 67aeae0d4c3f35b1637b1d1ec51f2809e28bc4dc..16a9828e714f3bdbf5c6c75767cbc597e9f679af 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -74,6 +74,15 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER); + int32_t logicalPlanLen = strlen(pTopic->logicalPlan) + 1; + SDB_SET_INT32(pRaw, dataPos, strlen(pTopic->logicalPlan)+1, TOPIC_ENCODE_OVER); + SDB_SET_BINARY(pRaw, dataPos, pTopic->logicalPlan, logicalPlanLen, TOPIC_ENCODE_OVER); + + int32_t physicalPlanLen = strlen(pTopic->physicalPlan) + 1; + pTopic->physicalPlan = calloc(physicalPlanLen, sizeof(char)); + if (pTopic->physicalPlan == NULL) goto TOPIC_ENCODE_OVER; + SDB_SET_INT32(pRaw, dataPos, strlen(pTopic->physicalPlan)+1, TOPIC_ENCODE_OVER); + SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER); SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER); SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER); @@ -83,6 +92,12 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { TOPIC_ENCODE_OVER: if (terrno != TSDB_CODE_SUCCESS) { mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr()); + /*if (pTopic->logicalPlan) {*/ + /*free(pTopic->logicalPlan);*/ + /*}*/ + /*if (pTopic->physicalPlan) {*/ + /*free(pTopic->physicalPlan);*/ + /*}*/ sdbFreeRaw(pRaw); return NULL; } @@ -121,10 +136,23 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { pTopic->sql = calloc(pTopic->sqlLen + 1, sizeof(char)); SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER); -// SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER); -// SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len, TOPIC_DECODE_OVER); -// SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER); -// SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER); + + SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER); + pTopic->logicalPlan = calloc(len+1, sizeof(char)); + if (pTopic->logicalPlan == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto TOPIC_DECODE_OVER; + } + SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len+1, TOPIC_DECODE_OVER); + + SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER); + pTopic->logicalPlan = calloc(len + 1, sizeof(char)); + if (pTopic->physicalPlan == NULL) { + free(pTopic->logicalPlan); + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto TOPIC_DECODE_OVER; + } + SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len+1, TOPIC_DECODE_OVER); SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER) diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index 5774131377e3f17ebea76ff70eee767fa876b43d..8089826a80ed945b1716cb19b8489d58c4fc366d 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -25,6 +25,7 @@ #include "trpc.h" #include "ttimer.h" #include "tutil.h" +#include "meta.h" #ifdef __cplusplus extern "C" { @@ -314,6 +315,26 @@ const void* tqDeserializeGroup(const STqSerializedHead*, STqGroup**); static int tqQueryExecuting(int32_t status) { return status; } +typedef struct STqReadHandle { + int64_t ver; + SSubmitMsg* pMsg; + SSubmitBlk* pBlock; + SSubmitMsgIter msgIter; + SSubmitBlkIter blkIter; + SMeta* pMeta; +} STqReadHandle; + +typedef struct SSubmitBlkScanInfo { + +} SSubmitBlkScanInfo; + +STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg *pMsg); +bool tqNextDataBlock(STqReadHandle* pHandle); +int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo *pBlockInfo); +//return SArray +SArray *tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList); +//int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t status); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5ceb062bf2e5efc94c3b324a26a0ac80c92c5dec..a5be0ec29a97934c14612b3b9f187b58f3753ecb 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -607,3 +607,70 @@ int tqItemSSize() { // mainly for executor return 0; } + +STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg *pMsg) { + STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle)); + if (pReadHandle == NULL) { + return NULL; + } + pReadHandle->pMeta = pMeta; + pReadHandle->pMsg = pMsg; + tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter); + pReadHandle->ver = -1; + return NULL; +} + +bool tqNextDataBlock(STqReadHandle* pHandle) { + if(tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { + return false; + } + return true; +} + +int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) { + SMemRow row; + int32_t sversion = pHandle->pBlock->sversion; + SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false); + pBlockInfo->numOfCols = pSchema->nCols; + pBlockInfo->rows = pHandle->pBlock->numOfRows; + pBlockInfo->uid = pHandle->pBlock->uid; + //TODO: filter out unused column + return 0; +} +SArray *tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList) { + int32_t sversion = pHandle->pBlock->sversion; + SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true); + STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion); + SArray *pArray = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData)); + if (pArray == NULL) { + return NULL; + } + SColumnInfoData colInfo; + int sz = pSchemaWrapper->nCols * pSchemaWrapper->pSchema->bytes; + colInfo.pData = malloc(sz); + if (colInfo.pData == NULL) { + return NULL; + } + + for (int i = 0; i < pTschema->numOfCols; i++) { + //TODO: filter out unused column + taosArrayPush(pColumnIdList, &(schemaColAt(pTschema, i)->colId)); + } + + SMemRow row; + int32_t kvIdx; + while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) { + for (int i = 0; i < pTschema->numOfCols && kvIdx < pTschema->numOfCols; i++) { + //TODO: filter out unused column + STColumn *pCol = schemaColAt(pTschema, i); + void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx); + //TODO: handle varlen + memcpy(POINTER_SHIFT(colInfo.pData, pCol->offset), val, pCol->bytes); + } + } + taosArrayPush(pArray, &colInfo); + return pArray; +} +/*int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t status) {*/ + /*return 0;*/ +/*}*/