未验证 提交 17a26c47 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #9865 from taosdata/feature/tq

add tq support for submitblk scanner
...@@ -521,30 +521,30 @@ TEST(testCase, show_stable_Test) { ...@@ -521,30 +521,30 @@ TEST(testCase, show_stable_Test) {
// taosHashCleanup(phash); // taosHashCleanup(phash);
//} //}
// //
//TEST(testCase, create_topic_Test) { TEST(testCase, create_topic_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("error in use db, reason:%s\n", taos_errstr(pRes)); printf("error in use db, reason:%s\n", taos_errstr(pRes));
// } }
// taos_free_result(pRes); taos_free_result(pRes);
//
// TAOS_FIELD* pFields = taos_fetch_fields(pRes); TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// ASSERT_TRUE(pFields == nullptr); ASSERT_TRUE(pFields == nullptr);
//
// int32_t numOfFields = taos_num_fields(pRes); int32_t numOfFields = taos_num_fields(pRes);
// ASSERT_EQ(numOfFields, 0); ASSERT_EQ(numOfFields, 0);
//
// taos_free_result(pRes); taos_free_result(pRes);
//
// char* sql = "select * from tu"; char* sql = "select * from tu";
// pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql)); pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql));
// taos_free_result(pRes); taos_free_result(pRes);
// taos_close(pConn); taos_close(pConn);
//} }
//
//TEST(testCase, insert_test) { //TEST(testCase, insert_test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_NE(pConn, nullptr); // ASSERT_NE(pConn, nullptr);
......
...@@ -74,6 +74,15 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { ...@@ -74,6 +74,15 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
int32_t logicalPlanLen = strlen(pTopic->logicalPlan) + 1;
SDB_SET_INT32(pRaw, dataPos, strlen(pTopic->logicalPlan)+1, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pTopic->logicalPlan, logicalPlanLen, TOPIC_ENCODE_OVER);
int32_t physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
pTopic->physicalPlan = calloc(physicalPlanLen, sizeof(char));
if (pTopic->physicalPlan == NULL) goto TOPIC_ENCODE_OVER;
SDB_SET_INT32(pRaw, dataPos, strlen(pTopic->physicalPlan)+1, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER); SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER); SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);
...@@ -83,6 +92,12 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { ...@@ -83,6 +92,12 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
TOPIC_ENCODE_OVER: TOPIC_ENCODE_OVER:
if (terrno != TSDB_CODE_SUCCESS) { if (terrno != TSDB_CODE_SUCCESS) {
mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr()); mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr());
/*if (pTopic->logicalPlan) {*/
/*free(pTopic->logicalPlan);*/
/*}*/
/*if (pTopic->physicalPlan) {*/
/*free(pTopic->physicalPlan);*/
/*}*/
sdbFreeRaw(pRaw); sdbFreeRaw(pRaw);
return NULL; return NULL;
} }
...@@ -121,10 +136,23 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { ...@@ -121,10 +136,23 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
pTopic->sql = calloc(pTopic->sqlLen + 1, sizeof(char)); pTopic->sql = calloc(pTopic->sqlLen + 1, sizeof(char));
SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER); SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
// SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
// SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len, TOPIC_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
// SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER); pTopic->logicalPlan = calloc(len+1, sizeof(char));
// SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER); if (pTopic->logicalPlan == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto TOPIC_DECODE_OVER;
}
SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len+1, TOPIC_DECODE_OVER);
SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
pTopic->logicalPlan = calloc(len + 1, sizeof(char));
if (pTopic->physicalPlan == NULL) {
free(pTopic->logicalPlan);
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto TOPIC_DECODE_OVER;
}
SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len+1, TOPIC_DECODE_OVER);
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER) SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER)
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include "trpc.h" #include "trpc.h"
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
#include "meta.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -314,6 +315,26 @@ const void* tqDeserializeGroup(const STqSerializedHead*, STqGroup**); ...@@ -314,6 +315,26 @@ const void* tqDeserializeGroup(const STqSerializedHead*, STqGroup**);
static int tqQueryExecuting(int32_t status) { return status; } static int tqQueryExecuting(int32_t status) { return status; }
typedef struct STqReadHandle {
int64_t ver;
SSubmitMsg* pMsg;
SSubmitBlk* pBlock;
SSubmitMsgIter msgIter;
SSubmitBlkIter blkIter;
SMeta* pMeta;
} STqReadHandle;
typedef struct SSubmitBlkScanInfo {
} SSubmitBlkScanInfo;
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg *pMsg);
bool tqNextDataBlock(STqReadHandle* pHandle);
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo *pBlockInfo);
//return SArray<SColumnInfoData>
SArray *tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList);
//int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t status);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -607,3 +607,70 @@ int tqItemSSize() { ...@@ -607,3 +607,70 @@ int tqItemSSize() {
// mainly for executor // mainly for executor
return 0; return 0;
} }
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg *pMsg) {
STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle));
if (pReadHandle == NULL) {
return NULL;
}
pReadHandle->pMeta = pMeta;
pReadHandle->pMsg = pMsg;
tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter);
pReadHandle->ver = -1;
return NULL;
}
bool tqNextDataBlock(STqReadHandle* pHandle) {
if(tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
return false;
}
return true;
}
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) {
SMemRow row;
int32_t sversion = pHandle->pBlock->sversion;
SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false);
pBlockInfo->numOfCols = pSchema->nCols;
pBlockInfo->rows = pHandle->pBlock->numOfRows;
pBlockInfo->uid = pHandle->pBlock->uid;
//TODO: filter out unused column
return 0;
}
SArray *tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList) {
int32_t sversion = pHandle->pBlock->sversion;
SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true);
STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion);
SArray *pArray = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData));
if (pArray == NULL) {
return NULL;
}
SColumnInfoData colInfo;
int sz = pSchemaWrapper->nCols * pSchemaWrapper->pSchema->bytes;
colInfo.pData = malloc(sz);
if (colInfo.pData == NULL) {
return NULL;
}
for (int i = 0; i < pTschema->numOfCols; i++) {
//TODO: filter out unused column
taosArrayPush(pColumnIdList, &(schemaColAt(pTschema, i)->colId));
}
SMemRow row;
int32_t kvIdx;
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
for (int i = 0; i < pTschema->numOfCols && kvIdx < pTschema->numOfCols; i++) {
//TODO: filter out unused column
STColumn *pCol = schemaColAt(pTschema, i);
void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx);
//TODO: handle varlen
memcpy(POINTER_SHIFT(colInfo.pData, pCol->offset), val, pCol->bytes);
}
}
taosArrayPush(pArray, &colInfo);
return pArray;
}
/*int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t status) {*/
/*return 0;*/
/*}*/
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册