提交 5040cb95 编写于 作者: L Liu Jicong

enh(query): window included in serialized retrieve table rsp

上级 f0b50add
...@@ -99,8 +99,8 @@ int32_t create_stream() { ...@@ -99,8 +99,8 @@ int32_t create_stream() {
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/ /*const char* sql = "select sum(k) from tu1 interval(10m)";*/
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/ /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
pRes = taos_query(pConn, pRes = taos_query(pConn,
"create stream stream1 trigger at_once into outstb as select _wstartts, sum(k) from st1 " "create stream stream1 trigger at_once into outstb as select _wstartts, sum(k) from st1 partition "
"partition by tbname interval(10s) "); "by tbname interval(10s) ");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
......
...@@ -851,7 +851,6 @@ typedef struct { ...@@ -851,7 +851,6 @@ typedef struct {
int32_t tSerializeSServerVerRsp(void* buf, int32_t bufLen, SServerVerRsp* pRsp); int32_t tSerializeSServerVerRsp(void* buf, int32_t bufLen, SServerVerRsp* pRsp);
int32_t tDeserializeSServerVerRsp(void* buf, int32_t bufLen, SServerVerRsp* pRsp); int32_t tDeserializeSServerVerRsp(void* buf, int32_t bufLen, SServerVerRsp* pRsp);
typedef struct SQueryNodeAddr { typedef struct SQueryNodeAddr {
int32_t nodeId; // vgId or qnodeId int32_t nodeId; // vgId or qnodeId
SEpSet epSet; SEpSet epSet;
...@@ -878,7 +877,6 @@ int32_t tSerializeSDnodeListRsp(void* buf, int32_t bufLen, SDnodeListRsp* pRsp); ...@@ -878,7 +877,6 @@ int32_t tSerializeSDnodeListRsp(void* buf, int32_t bufLen, SDnodeListRsp* pRsp);
int32_t tDeserializeSDnodeListRsp(void* buf, int32_t bufLen, SDnodeListRsp* pRsp); int32_t tDeserializeSDnodeListRsp(void* buf, int32_t bufLen, SDnodeListRsp* pRsp);
void tFreeSDnodeListRsp(SDnodeListRsp* pRsp); void tFreeSDnodeListRsp(SDnodeListRsp* pRsp);
typedef struct { typedef struct {
SArray* pArray; // Array of SUseDbRsp SArray* pArray; // Array of SUseDbRsp
} SUseDbBatchRsp; } SUseDbBatchRsp;
...@@ -1245,12 +1243,12 @@ int32_t tSerializeSShowVariablesReq(void* buf, int32_t bufLen, SShowVariablesReq ...@@ -1245,12 +1243,12 @@ int32_t tSerializeSShowVariablesReq(void* buf, int32_t bufLen, SShowVariablesReq
int32_t tDeserializeSShowVariablesReq(void* buf, int32_t bufLen, SShowVariablesReq* pReq); int32_t tDeserializeSShowVariablesReq(void* buf, int32_t bufLen, SShowVariablesReq* pReq);
typedef struct { typedef struct {
char name[TSDB_CONFIG_OPTION_LEN + 1]; char name[TSDB_CONFIG_OPTION_LEN + 1];
char value[TSDB_CONFIG_VALUE_LEN + 1]; char value[TSDB_CONFIG_VALUE_LEN + 1];
} SVariablesInfo; } SVariablesInfo;
typedef struct { typedef struct {
SArray *variables; //SArray<SVariablesInfo> SArray* variables; // SArray<SVariablesInfo>
} SShowVariablesRsp; } SShowVariablesRsp;
int32_t tSerializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesRsp* pReq); int32_t tSerializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesRsp* pReq);
...@@ -1258,7 +1256,6 @@ int32_t tDeserializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesR ...@@ -1258,7 +1256,6 @@ int32_t tDeserializeSShowVariablesRsp(void* buf, int32_t bufLen, SShowVariablesR
void tFreeSShowVariablesRsp(SShowVariablesRsp* pRsp); void tFreeSShowVariablesRsp(SShowVariablesRsp* pRsp);
/* /*
* sql: show tables like '%a_%' * sql: show tables like '%a_%'
* payload is the query condition, e.g., '%a_%' * payload is the query condition, e.g., '%a_%'
...@@ -1308,6 +1305,8 @@ typedef struct { ...@@ -1308,6 +1305,8 @@ typedef struct {
int32_t compLen; int32_t compLen;
int32_t numOfRows; int32_t numOfRows;
int32_t numOfCols; int32_t numOfCols;
int64_t skey;
int64_t ekey;
char data[]; char data[];
} SRetrieveTableRsp; } SRetrieveTableRsp;
......
...@@ -634,7 +634,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { ...@@ -634,7 +634,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
if (dropReq.igNotExists) { if (dropReq.igNotExists) {
mDebug("stream:%s, not exist, ignore not exist is set", dropReq.name); mDebug("stream:%s, not exist, ignore not exist is set", dropReq.name);
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
return -1; return 0;
} else { } else {
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
return -1; return -1;
......
...@@ -27,11 +27,14 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock ...@@ -27,11 +27,14 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock
ASSERT(pReq->blockNum == taosArrayGetSize(pReq->dataLen)); ASSERT(pReq->blockNum == taosArrayGetSize(pReq->dataLen));
for (int32_t i = 0; i < blockNum; i++) { for (int32_t i = 0; i < blockNum; i++) {
int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i); /*int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i);*/
SRetrieveTableRsp* pRetrieve = taosArrayGetP(pReq->data, i); SRetrieveTableRsp* pRetrieve = taosArrayGetP(pReq->data, i);
SSDataBlock* pDataBlock = taosArrayGet(pArray, i); SSDataBlock* pDataBlock = taosArrayGet(pArray, i);
blockCompressDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data); blockCompressDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data);
// TODO: refactor // TODO: refactor
pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
pDataBlock->info.type = pRetrieve->streamBlockType; pDataBlock->info.type = pRetrieve->streamBlockType;
pDataBlock->info.childId = pReq->upstreamChildId; pDataBlock->info.childId = pReq->upstreamChildId;
} }
...@@ -46,8 +49,14 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock ...@@ -46,8 +49,14 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
} }
taosArraySetSize(pArray, 1); taosArraySetSize(pArray, 1);
SRetrieveTableRsp* pRetrieve = pReq->pRetrieve; SRetrieveTableRsp* pRetrieve = pReq->pRetrieve;
SSDataBlock* pBlock = taosArrayGet(pArray, 0); SSDataBlock* pDataBlock = taosArrayGet(pArray, 0);
blockCompressDecode(pBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data); blockCompressDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data);
// TODO: refactor
pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
pDataBlock->info.type = pRetrieve->streamBlockType;
pData->blocks = pArray; pData->blocks = pArray;
return 0; return 0;
} }
......
...@@ -104,6 +104,8 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) ...@@ -104,6 +104,8 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
pRetrieve->streamBlockType = pBlock->info.type; pRetrieve->streamBlockType = pBlock->info.type;
pRetrieve->numOfRows = htonl(pBlock->info.rows); pRetrieve->numOfRows = htonl(pBlock->info.rows);
pRetrieve->numOfCols = htonl(numOfCols); pRetrieve->numOfCols = htonl(numOfCols);
pRetrieve->skey = htobe64(pBlock->info.window.skey);
pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
int32_t actualLen = 0; int32_t actualLen = 0;
blockCompressEncode(pBlock, pRetrieve->data, &actualLen, numOfCols, false); blockCompressEncode(pBlock, pRetrieve->data, &actualLen, numOfCols, false);
...@@ -171,6 +173,8 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis ...@@ -171,6 +173,8 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
pRetrieve->completed = 1; pRetrieve->completed = 1;
pRetrieve->streamBlockType = pBlock->info.type; pRetrieve->streamBlockType = pBlock->info.type;
pRetrieve->numOfRows = htonl(pBlock->info.rows); pRetrieve->numOfRows = htonl(pBlock->info.rows);
pRetrieve->skey = htobe64(pBlock->info.window.skey);
pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
pRetrieve->numOfCols = htonl(numOfCols); pRetrieve->numOfCols = htonl(numOfCols);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册