提交 5ea4eba3 编写于 作者: L Liu Jicong

fix query crash

上级 3520e464
......@@ -80,6 +80,80 @@ typedef struct SColumnInfoData {
char *pData; // the corresponding block data in memory
} SColumnInfoData;
static FORCE_INLINE int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
int64_t tbUid = pBlock->info.uid;
int32_t numOfCols = pBlock->info.numOfCols;
int32_t rows = pBlock->info.rows;
int32_t sz = taosArrayGetSize(pBlock->pDataBlock);
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, tbUid);
tlen += taosEncodeFixedI32(buf, numOfCols);
tlen += taosEncodeFixedI32(buf, rows);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SColumnInfoData* pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
tlen += taosEncodeFixedI16(buf, pColData->info.colId);
tlen += taosEncodeFixedI16(buf, pColData->info.type);
tlen += taosEncodeFixedI16(buf, pColData->info.bytes);
int32_t colSz = rows * pColData->info.bytes;
tlen += taosEncodeBinary(buf, pColData->pData, colSz);
}
return tlen;
}
static FORCE_INLINE void* tDecodeDataBlock(void* buf, SSDataBlock* pBlock) {
int32_t sz;
buf = taosDecodeFixedI64(buf, &pBlock->info.uid);
buf = taosDecodeFixedI32(buf, &pBlock->info.numOfCols);
buf = taosDecodeFixedI32(buf, &pBlock->info.rows);
buf = taosDecodeFixedI32(buf, &sz);
pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData));
for (int32_t i = 0; i < sz; i++) {
SColumnInfoData data;
buf = taosDecodeFixedI16(buf, &data.info.colId);
buf = taosDecodeFixedI16(buf, &data.info.type);
buf = taosDecodeFixedI16(buf, &data.info.bytes);
int32_t colSz = pBlock->info.rows * data.info.bytes;
buf = taosDecodeBinary(buf, (void**)&data.pData, colSz);
taosArrayPush(pBlock->pDataBlock, &data);
}
return buf;
}
static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp* pRsp) {
int32_t tlen = 0;
int32_t sz = 0;
tlen += taosEncodeFixedI64(buf, pRsp->consumerId);
tlen += tEncodeSSchemaWrapper(buf, pRsp->schemas);
if (pRsp->pBlockData) {
sz = taosArrayGetSize(pRsp->pBlockData);
}
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pBlock = (SSDataBlock*) taosArrayGet(pRsp->pBlockData, i);
tlen += tEncodeDataBlock(buf, pBlock);
}
return tlen;
}
static FORCE_INLINE void* tDecodeSMqConsumeRsp(void* buf, SMqConsumeRsp* pRsp) {
int32_t sz;
buf = taosDecodeFixedI64(buf, &pRsp->consumerId);
pRsp->schemas = (SSchemaWrapper*)calloc(1, sizeof(SSchemaWrapper));
if (pRsp->schemas == NULL) return NULL;
buf = tDecodeSSchemaWrapper(buf, pRsp->schemas);
buf = taosDecodeFixedI32(buf, &sz);
pRsp->pBlockData = taosArrayInit(sz, sizeof(SSDataBlock));
for (int32_t i = 0; i < sz; i++) {
SSDataBlock block;
tDecodeDataBlock(buf, &block);
taosArrayPush(pRsp->pBlockData, &block);
}
return buf;
}
//======================================================================================================================
// the following structure shared by parser and executor
typedef struct SColumn {
......
......@@ -1592,16 +1592,53 @@ typedef struct SMqSetCVgRsp {
char cGroup[TSDB_CONSUMER_GROUP_LEN];
} SMqSetCVgRsp;
typedef struct SMqColData {
int16_t colId;
int16_t type;
int16_t bytes;
} SMqColMeta;
typedef struct {
uint32_t nCols;
SSchema *pSchema;
} SSchemaWrapper;
static FORCE_INLINE int32_t tEncodeSSchema(void** buf, const SSchema* pSchema) {
int32_t tlen = 0;
tlen += taosEncodeFixedI8(buf, pSchema->type);
tlen += taosEncodeFixedI32(buf, pSchema->bytes);
tlen += taosEncodeFixedI32(buf, pSchema->colId);
tlen += taosEncodeString(buf, pSchema->name);
return tlen;
}
static FORCE_INLINE void* tDecodeSSchema(void* buf, SSchema* pSchema) {
buf = taosDecodeFixedI8(buf, &pSchema->type);
buf = taosDecodeFixedI32(buf, &pSchema->bytes);
buf = taosDecodeFixedI32(buf, &pSchema->colId);
buf = taosDecodeStringTo(buf, pSchema->name);
return buf;
}
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) {
int32_t tlen = 0;
tlen += taosEncodeFixedU32(buf, pSW->nCols);
for (int32_t i = 0; i < pSW->nCols; i ++) {
tlen += tEncodeSSchema(buf, &pSW->pSchema[i]);
}
return tlen;
}
static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) {
buf = taosDecodeFixedU32(buf, &pSW->nCols);
pSW->pSchema = (SSchema*) calloc(pSW->nCols, sizeof(SSchema));
if (pSW->pSchema == NULL) {
return NULL;
}
for (int32_t i = 0; i < pSW->nCols; i ++) {
buf = tDecodeSSchema(buf, &pSW->pSchema[i]);
}
return buf;
}
typedef struct SMqTbData {
int64_t uid;
int32_t numOfRows;
char colData[];
char* colData;
} SMqTbData;
typedef struct SMqTopicBlk {
......@@ -1617,17 +1654,11 @@ typedef struct SMqTopicBlk {
typedef struct SMqConsumeRsp {
int64_t consumerId;
int32_t numOfCols;
SMqColMeta* meta;
SSchemaWrapper* schemas;
int32_t numOfTopics;
SMqTopicData* data;
SArray* pBlockData; //SArray<SSDataBlock>
} SMqConsumeRsp;
static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp* pRsp) {
int32_t tlen = 0;
return tlen;
}
// one req for one vg+topic
typedef struct SMqConsumeReq {
SMsgHead head;
......
......@@ -119,7 +119,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
p->mgmtEp = epSet;
p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
p->pAppHbMgr = appHbMgrInit(p);
/*p->pAppHbMgr = appHbMgrInit(p);*/
taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
pInst = &p;
......@@ -621,6 +621,27 @@ struct tmq_message_t {
};
int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) {
SMqConsumeRsp rsp;
tDecodeSMqConsumeRsp(pMsg->pData, &rsp);
int32_t colNum = rsp.schemas->nCols;
for (int32_t i = 0; i < colNum; i++) {
printf("| %s |", rsp.schemas->pSchema[i].name);
}
printf("\n");
int32_t sz = taosArrayGetSize(rsp.pBlockData);
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(rsp.pBlockData, i);
int32_t rows = pDataBlock->info.rows;
for (int32_t j = 0; j < colNum; j++) {
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, j);
for (int32_t k = 0; k < rows; k++) {
void* var = POINTER_SHIFT(pColInfoData->pData, k * pColInfoData->info.bytes);
if (j == 0) printf(" %ld ", *(int64_t*)var);
if (j == 1) printf(" %d ", *(int32_t*)var);
}
}
/*pDataBlock->*/
}
return 0;
}
......@@ -721,9 +742,9 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
pRequest->body.requestMsg = (SDataBuf){ .pData = pReq, .len = sizeof(SMqConsumeReq) };
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
/*sendInfo->requestObjRefId = 0;*/
sendInfo->requestObjRefId = 0;
/*sendInfo->param = &tmq_message;*/
/*sendInfo->fp = tmq_poll_cb_inner;*/
sendInfo->fp = tmq_poll_cb_inner;
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
......
......@@ -71,8 +71,8 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
pTscObj->pAppInfo->clusterId = pConnect->clusterId;
atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
SClientHbKey connKey = {.connId = pConnect->connId, .hbType = HEARTBEAT_TYPE_QUERY};
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, NULL);
/*SClientHbKey connKey = {.connId = pConnect->connId, .hbType = HEARTBEAT_TYPE_QUERY};*/
/*hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, NULL);*/
// pRequest->body.resInfo.pRspMsg = pMsg->pData;
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId,
......
......@@ -37,11 +37,6 @@ typedef struct SMetaCfg {
uint64_t lruSize;
} SMetaCfg;
typedef struct {
uint32_t nCols;
SSchema *pSchema;
} SSchemaWrapper;
typedef struct SMTbCursor SMTbCursor;
typedef struct SMCtbCursor SMCtbCursor;
......
......@@ -153,6 +153,7 @@ typedef struct STqTaskItem {
int64_t offset;
void* dst;
qTaskInfo_t task;
STqReadHandle* pReadHandle;
SSubQueryMsg* pQueryMsg;
} STqTaskItem;
......
......@@ -75,8 +75,11 @@ typedef struct STqReadHandle {
SSubmitBlk* pBlock;
SSubmitMsgIter msgIter;
SSubmitBlkIter blkIter;
SMeta* pMeta;
SArray* pColIdList;
SMeta* pVnodeMeta;
SArray* pColIdList; //SArray<int32_t>
int32_t sver;
SSchemaWrapper* pSchemaWrapper;
STSchema* pSchema;
} STqReadHandle;
/* ------------------------ SVnode ------------------------ */
......
......@@ -13,9 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tcompare.h"
#include "tqInt.h"
#include "tqMetaStore.h"
#include "tcompare.h"
// static
// read next version data
......@@ -484,7 +484,8 @@ int tqConsume(STQ* pTq, STqConsumeReq* pMsg) {
int tqSerializeConsumer(const STqConsumerHandle* pConsumer, STqSerializedHead** ppHead) {
int32_t num = taosArrayGetSize(pConsumer->topics);
int32_t sz = sizeof(STqSerializedHead) + sizeof(int64_t) * 2 + TSDB_TOPIC_FNAME_LEN + num * (sizeof(int64_t) + TSDB_TOPIC_FNAME_LEN);
int32_t sz = sizeof(STqSerializedHead) + sizeof(int64_t) * 2 + TSDB_TOPIC_FNAME_LEN +
num * (sizeof(int64_t) + TSDB_TOPIC_FNAME_LEN);
if (sz > (*ppHead)->ssize) {
void* tmpPtr = realloc(*ppHead, sz);
if (tmpPtr == NULL) {
......@@ -677,6 +678,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
int64_t blockingTime = pReq->blockingTime;
int rspLen = 0;
SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 1, .pBlockData = NULL};
STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
ASSERT(pConsumer);
......@@ -684,7 +686,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
for (int i = 0; i < sz; i++) {
STqTopicHandle* pTopic = taosArrayGet(pConsumer->topics, i);
//TODO: support multiple topic in one req
// TODO: support multiple topic in one req
if (strcmp(pTopic->topicName, pReq->topic) != 0) {
continue;
}
......@@ -727,7 +729,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
qSetStreamInput(task, pCont);
//SArray<SSDataBlock>
// SArray<SSDataBlock>
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
while (1) {
SSDataBlock* pDataBlock;
......@@ -741,6 +743,8 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
break;
}
}
//TODO copy
rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper;
atomic_store_8(&pTopic->buffer.output[pos].status, 0);
......@@ -750,6 +754,9 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
continue;
}
rsp.pBlockData = pRes;
#if 0
pTopic->buffer.output[pos].dst = pRes;
if (pTopic->buffer.firstOffset == -1 || pReq->offset < pTopic->buffer.firstOffset) {
pTopic->buffer.firstOffset = pReq->offset;
......@@ -757,13 +764,20 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
if (pTopic->buffer.lastOffset == -1 || pReq->offset > pTopic->buffer.lastOffset) {
pTopic->buffer.lastOffset = pReq->offset;
}
#endif
}
int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
pMsg->code = -1;
return -1;
}
// put output into rsp
SMqConsumeRsp rsp = {
.consumerId = consumerId,
.numOfTopics = 1
};
void* abuf = buf;
tEncodeSMqConsumeRsp(&abuf, &rsp);
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
rpcSendResponse(pMsg);
return 0;
}
......@@ -799,6 +813,7 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
pTopic->buffer.output[i].status = 0;
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta);
pTopic->buffer.output[i].pReadHandle = pReadHandle;
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, pReadHandle);
}
taosArrayPush(pConsumer->topics, pTopic);
......@@ -813,10 +828,13 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
if (pReadHandle == NULL) {
return NULL;
}
pReadHandle->pMeta = pMeta;
pReadHandle->pVnodeMeta = pMeta;
pReadHandle->pMsg = NULL;
pReadHandle->ver = -1;
pReadHandle->pColIdList = NULL;
pReadHandle->sver = -1;
pReadHandle->pSchema = NULL;
pReadHandle->pSchemaWrapper = NULL;
return pReadHandle;
}
......@@ -837,7 +855,7 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
if (pHandle->pBlock == NULL) return false;
pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);
if (pHandle->tbUid == pHandle->pBlock->uid){
if (pHandle->tbUid == pHandle->pBlock->uid) {
pHandle->pBlock->tid = htonl(pHandle->pBlock->tid);
pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion);
pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen);
......@@ -859,41 +877,71 @@ int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo)
}
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
int32_t sversion = pHandle->pBlock->sversion;
//TODO : change sversion
STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, 0);
/*int32_t sversion = pHandle->pBlock->sversion;*/
// TODO set to real sversion
int32_t sversion = 0;
if (pHandle->sver != sversion) {
pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->pBlock->uid, sversion);
tb_uid_t quid;
STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->pMeta, pHandle->pBlock->uid);
STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->pVnodeMeta, pHandle->pBlock->uid);
if (pTbCfg->type == META_CHILD_TABLE) {
quid = pTbCfg->ctbCfg.suid;
} else {
quid = pHandle->pBlock->uid;
}
pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, quid, sversion, true);
pHandle->sver = sversion;
}
SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, quid, 0, true);
SArray* pArray = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData));
STSchema* pTschema = pHandle->pSchema;
SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper;
int32_t numOfRows = pHandle->pBlock->numOfRows;
int32_t numOfCols = pHandle->pSchema->numOfCols;
int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);
SArray* pArray = taosArrayInit(colNumNeed, sizeof(SColumnInfoData));
if (pArray == NULL) {
return NULL;
}
SColumnInfoData colInfo;
int sz = pSchemaWrapper->nCols * pSchemaWrapper->pSchema->bytes;
colInfo.pData = malloc(sz);
int j = 0;
for (int32_t i = 0; i < colNumNeed; i++) {
int32_t colId = *(int32_t*)taosArrayGet(pHandle->pColIdList, i);
while (j < pSchemaWrapper->nCols && pSchemaWrapper->pSchema[j].colId < colId) {
j++;
}
SSchema* pColSchema = &pSchemaWrapper->pSchema[j];
ASSERT(pColSchema->colId == colId);
SColumnInfoData colInfo = {0};
int sz = numOfRows * pColSchema->bytes;
colInfo.info.bytes = pColSchema->bytes;
colInfo.info.colId = colId;
colInfo.info.type = pColSchema->type;
colInfo.pData = calloc(1, sz);
if (colInfo.pData == NULL) {
// TODO free
taosArrayDestroy(pArray);
return NULL;
}
taosArrayPush(pArray, &colInfo);
}
SMemRow row;
int32_t kvIdx;
int32_t kvIdx = 0;
tInitSubmitBlkIter(pHandle->pBlock, &pHandle->blkIter);
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
for (int i = 0; i < pTschema->numOfCols && kvIdx < pTschema->numOfCols; i++) {
// TODO: filter out unused column
// get all wanted col of that block
for (int32_t i = 0; i < colNumNeed; i++) {
SColumnInfoData* pColData = taosArrayGet(pArray, i);
STColumn* pCol = schemaColAt(pTschema, i);
// TODO
ASSERT(pCol->colId == pColData->info.colId);
void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx);
// TODO: handle varlen
memcpy(POINTER_SHIFT(colInfo.pData, pCol->offset), val, pCol->bytes);
memcpy(pColData->pData, val, pCol->bytes);
}
}
taosArrayPush(pArray, &colInfo);
return pArray;
}
......@@ -5070,6 +5070,7 @@ static SSDataBlock* doStreamBlockScan(void* param, bool* newgroup) {
SStreamBlockScanInfo* pInfo = pOperator->info;
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
pBlockInfo->rows = 0;
while (tqNextDataBlock(pInfo->readerHandle)) {
pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册