diff --git a/include/common/common.h b/include/common/common.h index 9b8a4654428eb19efdc727e228eff401fac848ed..0299a29eb4fc1cad9c6b1e5f8a594289e75d6346 100644 --- a/include/common/common.h +++ b/include/common/common.h @@ -80,6 +80,80 @@ typedef struct SColumnInfoData { char *pData; // the corresponding block data in memory } SColumnInfoData; +static FORCE_INLINE int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) { + int64_t tbUid = pBlock->info.uid; + int32_t numOfCols = pBlock->info.numOfCols; + int32_t rows = pBlock->info.rows; + int32_t sz = taosArrayGetSize(pBlock->pDataBlock); + + int32_t tlen = 0; + tlen += taosEncodeFixedI64(buf, tbUid); + tlen += taosEncodeFixedI32(buf, numOfCols); + tlen += taosEncodeFixedI32(buf, rows); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SColumnInfoData* pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); + tlen += taosEncodeFixedI16(buf, pColData->info.colId); + tlen += taosEncodeFixedI16(buf, pColData->info.type); + tlen += taosEncodeFixedI16(buf, pColData->info.bytes); + int32_t colSz = rows * pColData->info.bytes; + tlen += taosEncodeBinary(buf, pColData->pData, colSz); + } + return tlen; +} + +static FORCE_INLINE void* tDecodeDataBlock(void* buf, SSDataBlock* pBlock) { + int32_t sz; + + buf = taosDecodeFixedI64(buf, &pBlock->info.uid); + buf = taosDecodeFixedI32(buf, &pBlock->info.numOfCols); + buf = taosDecodeFixedI32(buf, &pBlock->info.rows); + buf = taosDecodeFixedI32(buf, &sz); + pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData)); + for (int32_t i = 0; i < sz; i++) { + SColumnInfoData data; + buf = taosDecodeFixedI16(buf, &data.info.colId); + buf = taosDecodeFixedI16(buf, &data.info.type); + buf = taosDecodeFixedI16(buf, &data.info.bytes); + int32_t colSz = pBlock->info.rows * data.info.bytes; + buf = taosDecodeBinary(buf, (void**)&data.pData, colSz); + taosArrayPush(pBlock->pDataBlock, &data); + } + return buf; +} + +static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp* pRsp) { + int32_t tlen = 0; + int32_t sz = 0; + tlen += taosEncodeFixedI64(buf, pRsp->consumerId); + tlen += tEncodeSSchemaWrapper(buf, pRsp->schemas); + if (pRsp->pBlockData) { + sz = taosArrayGetSize(pRsp->pBlockData); + } + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SSDataBlock* pBlock = (SSDataBlock*) taosArrayGet(pRsp->pBlockData, i); + tlen += tEncodeDataBlock(buf, pBlock); + } + return tlen; +} + +static FORCE_INLINE void* tDecodeSMqConsumeRsp(void* buf, SMqConsumeRsp* pRsp) { + int32_t sz; + buf = taosDecodeFixedI64(buf, &pRsp->consumerId); + pRsp->schemas = (SSchemaWrapper*)calloc(1, sizeof(SSchemaWrapper)); + if (pRsp->schemas == NULL) return NULL; + buf = tDecodeSSchemaWrapper(buf, pRsp->schemas); + buf = taosDecodeFixedI32(buf, &sz); + pRsp->pBlockData = taosArrayInit(sz, sizeof(SSDataBlock)); + for (int32_t i = 0; i < sz; i++) { + SSDataBlock block; + tDecodeDataBlock(buf, &block); + taosArrayPush(pRsp->pBlockData, &block); + } + return buf; +} + //====================================================================================================================== // the following structure shared by parser and executor typedef struct SColumn { diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ef5a854d20333ecf8a05ef50e44bf62c59eb440d..289f2143ab39be6e0a646d05fa30f10d34d0fbae 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1592,16 +1592,53 @@ typedef struct SMqSetCVgRsp { char cGroup[TSDB_CONSUMER_GROUP_LEN]; } SMqSetCVgRsp; -typedef struct SMqColData { - int16_t colId; - int16_t type; - int16_t bytes; -} SMqColMeta; +typedef struct { + uint32_t nCols; + SSchema *pSchema; +} SSchemaWrapper; + +static FORCE_INLINE int32_t tEncodeSSchema(void** buf, const SSchema* pSchema) { + int32_t tlen = 0; + tlen += taosEncodeFixedI8(buf, pSchema->type); + tlen += taosEncodeFixedI32(buf, pSchema->bytes); + tlen += taosEncodeFixedI32(buf, pSchema->colId); + tlen += taosEncodeString(buf, pSchema->name); + return tlen; +} + +static FORCE_INLINE void* tDecodeSSchema(void* buf, SSchema* pSchema) { + buf = taosDecodeFixedI8(buf, &pSchema->type); + buf = taosDecodeFixedI32(buf, &pSchema->bytes); + buf = taosDecodeFixedI32(buf, &pSchema->colId); + buf = taosDecodeStringTo(buf, pSchema->name); + return buf; +} + +static FORCE_INLINE int32_t tEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) { + int32_t tlen = 0; + tlen += taosEncodeFixedU32(buf, pSW->nCols); + for (int32_t i = 0; i < pSW->nCols; i ++) { + tlen += tEncodeSSchema(buf, &pSW->pSchema[i]); + } + return tlen; +} + +static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) { + buf = taosDecodeFixedU32(buf, &pSW->nCols); + pSW->pSchema = (SSchema*) calloc(pSW->nCols, sizeof(SSchema)); + if (pSW->pSchema == NULL) { + return NULL; + } + for (int32_t i = 0; i < pSW->nCols; i ++) { + buf = tDecodeSSchema(buf, &pSW->pSchema[i]); + } + return buf; +} typedef struct SMqTbData { int64_t uid; int32_t numOfRows; - char colData[]; + char* colData; } SMqTbData; typedef struct SMqTopicBlk { @@ -1616,18 +1653,12 @@ typedef struct SMqTopicBlk { } SMqTopicData; typedef struct SMqConsumeRsp { - int64_t consumerId; - int32_t numOfCols; - SMqColMeta* meta; - int32_t numOfTopics; - SMqTopicData* data; + int64_t consumerId; + SSchemaWrapper* schemas; + int32_t numOfTopics; + SArray* pBlockData; //SArray } SMqConsumeRsp; -static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp* pRsp) { - int32_t tlen = 0; - return tlen; -} - // one req for one vg+topic typedef struct SMqConsumeReq { SMsgHead head; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index df086463dbd98848493b212b0c4070dd515f6122..afac8acc7a4f11244e4f7dae84253b523c9c18e1 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -119,7 +119,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo)); p->mgmtEp = epSet; p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores); - p->pAppHbMgr = appHbMgrInit(p); + /*p->pAppHbMgr = appHbMgrInit(p);*/ taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES); pInst = &p; @@ -621,6 +621,27 @@ struct tmq_message_t { }; int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) { + SMqConsumeRsp rsp; + tDecodeSMqConsumeRsp(pMsg->pData, &rsp); + int32_t colNum = rsp.schemas->nCols; + for (int32_t i = 0; i < colNum; i++) { + printf("| %s |", rsp.schemas->pSchema[i].name); + } + printf("\n"); + int32_t sz = taosArrayGetSize(rsp.pBlockData); + for (int32_t i = 0; i < sz; i++) { + SSDataBlock* pDataBlock = taosArrayGet(rsp.pBlockData, i); + int32_t rows = pDataBlock->info.rows; + for (int32_t j = 0; j < colNum; j++) { + SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, j); + for (int32_t k = 0; k < rows; k++) { + void* var = POINTER_SHIFT(pColInfoData->pData, k * pColInfoData->info.bytes); + if (j == 0) printf(" %ld ", *(int64_t*)var); + if (j == 1) printf(" %d ", *(int32_t*)var); + } + } + /*pDataBlock->*/ + } return 0; } @@ -721,9 +742,9 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { pRequest->body.requestMsg = (SDataBuf){ .pData = pReq, .len = sizeof(SMqConsumeReq) }; SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); - /*sendInfo->requestObjRefId = 0;*/ + sendInfo->requestObjRefId = 0; /*sendInfo->param = &tmq_message;*/ - /*sendInfo->fp = tmq_poll_cb_inner;*/ + sendInfo->fp = tmq_poll_cb_inner; int64_t transporterId = 0; asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index ec088eb0735824f3348287ec54fd476f00dc7adb..1e056293bec87b7c20e7f9f855d6479bb0318d44 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -71,8 +71,8 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { pTscObj->pAppInfo->clusterId = pConnect->clusterId; atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); - SClientHbKey connKey = {.connId = pConnect->connId, .hbType = HEARTBEAT_TYPE_QUERY}; - hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, NULL); + /*SClientHbKey connKey = {.connId = pConnect->connId, .hbType = HEARTBEAT_TYPE_QUERY};*/ + /*hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, NULL);*/ // pRequest->body.resInfo.pRspMsg = pMsg->pData; tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId, diff --git a/source/dnode/vnode/inc/meta.h b/source/dnode/vnode/inc/meta.h index 383073871e6eb46e75d1a0dfe48c09fc50e14cad..44a352ec54966af9a1cb13c767cc8a23988395c6 100644 --- a/source/dnode/vnode/inc/meta.h +++ b/source/dnode/vnode/inc/meta.h @@ -37,11 +37,6 @@ typedef struct SMetaCfg { uint64_t lruSize; } SMetaCfg; -typedef struct { - uint32_t nCols; - SSchema *pSchema; -} SSchemaWrapper; - typedef struct SMTbCursor SMTbCursor; typedef struct SMCtbCursor SMCtbCursor; diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index 3a1e5b9c95737304d3460b0d173a8d6a6123fdf6..faaf769e1ab16931eb4944b51ea7e791a26a226b 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -149,11 +149,12 @@ typedef struct STqGroup { } STqGroup; typedef struct STqTaskItem { - int8_t status; - int64_t offset; - void* dst; - qTaskInfo_t task; - SSubQueryMsg* pQueryMsg; + int8_t status; + int64_t offset; + void* dst; + qTaskInfo_t task; + STqReadHandle* pReadHandle; + SSubQueryMsg* pQueryMsg; } STqTaskItem; // new version diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index b56c5b30fa4fc3ce8584bdb2273a134f177420c2..c8b47bf4a6b8635d2c494ea4aa8fd6622f5e264d 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -69,14 +69,17 @@ typedef struct { } SVnodeOpt; typedef struct STqReadHandle { - int64_t ver; - uint64_t tbUid; - SSubmitMsg* pMsg; - SSubmitBlk* pBlock; - SSubmitMsgIter msgIter; - SSubmitBlkIter blkIter; - SMeta* pMeta; - SArray* pColIdList; + int64_t ver; + uint64_t tbUid; + SSubmitMsg* pMsg; + SSubmitBlk* pBlock; + SSubmitMsgIter msgIter; + SSubmitBlkIter blkIter; + SMeta* pVnodeMeta; + SArray* pColIdList; //SArray + int32_t sver; + SSchemaWrapper* pSchemaWrapper; + STSchema* pSchema; } STqReadHandle; /* ------------------------ SVnode ------------------------ */ diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e953d595272aac25187fb873c0860a37c0d67d2a..3195691a13ad4295a412f3ebb6ca40267419e1e9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -13,9 +13,9 @@ * along with this program. If not, see . */ +#include "tcompare.h" #include "tqInt.h" #include "tqMetaStore.h" -#include "tcompare.h" // static // read next version data @@ -484,7 +484,8 @@ int tqConsume(STQ* pTq, STqConsumeReq* pMsg) { int tqSerializeConsumer(const STqConsumerHandle* pConsumer, STqSerializedHead** ppHead) { int32_t num = taosArrayGetSize(pConsumer->topics); - int32_t sz = sizeof(STqSerializedHead) + sizeof(int64_t) * 2 + TSDB_TOPIC_FNAME_LEN + num * (sizeof(int64_t) + TSDB_TOPIC_FNAME_LEN); + int32_t sz = sizeof(STqSerializedHead) + sizeof(int64_t) * 2 + TSDB_TOPIC_FNAME_LEN + + num * (sizeof(int64_t) + TSDB_TOPIC_FNAME_LEN); if (sz > (*ppHead)->ssize) { void* tmpPtr = realloc(*ppHead, sz); if (tmpPtr == NULL) { @@ -511,13 +512,13 @@ int tqSerializeConsumer(const STqConsumerHandle* pConsumer, STqSerializedHead** *(int64_t*)ptr = pTopic->committedOffset; POINTER_SHIFT(ptr, sizeof(int64_t)); } - + return 0; } const void* tqDeserializeConsumer(const STqSerializedHead* pHead, STqConsumerHandle** ppConsumer) { STqConsumerHandle* pConsumer = *ppConsumer; - const void* ptr = pHead->content; + const void* ptr = pHead->content; pConsumer->consumerId = *(int64_t*)ptr; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); pConsumer->epoch = *(int64_t*)ptr; @@ -668,32 +669,33 @@ int tqItemSSize() { #endif int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { - SMqConsumeReq* pReq = pMsg->pCont; - SRpcMsg rpcMsg; - int64_t reqId = pReq->reqId; - int64_t consumerId = pReq->consumerId; - int64_t reqOffset = pReq->offset; - int64_t fetchOffset = reqOffset; - int64_t blockingTime = pReq->blockingTime; + SMqConsumeReq* pReq = pMsg->pCont; + SRpcMsg rpcMsg; + int64_t reqId = pReq->reqId; + int64_t consumerId = pReq->consumerId; + int64_t reqOffset = pReq->offset; + int64_t fetchOffset = reqOffset; + int64_t blockingTime = pReq->blockingTime; - int rspLen = 0; + int rspLen = 0; + SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 1, .pBlockData = NULL}; STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); ASSERT(pConsumer); - int sz = taosArrayGetSize(pConsumer->topics); + int sz = taosArrayGetSize(pConsumer->topics); for (int i = 0; i < sz; i++) { STqTopicHandle* pTopic = taosArrayGet(pConsumer->topics, i); - //TODO: support multiple topic in one req + // TODO: support multiple topic in one req if (strcmp(pTopic->topicName, pReq->topic) != 0) { continue; } - if (fetchOffset == -1) { - fetchOffset = pTopic->committedOffset + 1; - } - int8_t pos; - int8_t skip = 0; + if (fetchOffset == -1) { + fetchOffset = pTopic->committedOffset + 1; + } + int8_t pos; + int8_t skip = 0; SWalHead* pHead; while (1) { pos = fetchOffset % TQ_BUFFER_SIZE; @@ -727,7 +729,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { qSetStreamInput(task, pCont); - //SArray + // SArray SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); while (1) { SSDataBlock* pDataBlock; @@ -741,6 +743,8 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { break; } } + //TODO copy + rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper; atomic_store_8(&pTopic->buffer.output[pos].status, 0); @@ -750,6 +754,9 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { continue; } + rsp.pBlockData = pRes; + +#if 0 pTopic->buffer.output[pos].dst = pRes; if (pTopic->buffer.firstOffset == -1 || pReq->offset < pTopic->buffer.firstOffset) { pTopic->buffer.firstOffset = pReq->offset; @@ -757,13 +764,20 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { if (pTopic->buffer.lastOffset == -1 || pReq->offset > pTopic->buffer.lastOffset) { pTopic->buffer.lastOffset = pReq->offset; } +#endif } - // put output into rsp - SMqConsumeRsp rsp = { - .consumerId = consumerId, - .numOfTopics = 1 - }; - + int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp); + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + pMsg->code = -1; + return -1; + } + void* abuf = buf; + tEncodeSMqConsumeRsp(&abuf, &rsp); + pMsg->pCont = buf; + pMsg->contLen = tlen; + pMsg->code = 0; + rpcSendResponse(pMsg); return 0; } @@ -799,6 +813,7 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { for (int i = 0; i < TQ_BUFFER_SIZE; i++) { pTopic->buffer.output[i].status = 0; STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta); + pTopic->buffer.output[i].pReadHandle = pReadHandle; pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, pReadHandle); } taosArrayPush(pConsumer->topics, pTopic); @@ -813,10 +828,13 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { if (pReadHandle == NULL) { return NULL; } - pReadHandle->pMeta = pMeta; + pReadHandle->pVnodeMeta = pMeta; pReadHandle->pMsg = NULL; pReadHandle->ver = -1; pReadHandle->pColIdList = NULL; + pReadHandle->sver = -1; + pReadHandle->pSchema = NULL; + pReadHandle->pSchemaWrapper = NULL; return pReadHandle; } @@ -837,13 +855,13 @@ bool tqNextDataBlock(STqReadHandle* pHandle) { if (pHandle->pBlock == NULL) return false; pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid); - if (pHandle->tbUid == pHandle->pBlock->uid){ + if (pHandle->tbUid == pHandle->pBlock->uid) { pHandle->pBlock->tid = htonl(pHandle->pBlock->tid); pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion); pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen); pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen); pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows); - return true; + return true; } } return false; @@ -859,41 +877,71 @@ int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) } SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { - int32_t sversion = pHandle->pBlock->sversion; - //TODO : change sversion - STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, 0); - - tb_uid_t quid; - STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->pMeta, pHandle->pBlock->uid); - if (pTbCfg->type == META_CHILD_TABLE) { - quid = pTbCfg->ctbCfg.suid; - } else { - quid = pHandle->pBlock->uid; + /*int32_t sversion = pHandle->pBlock->sversion;*/ + // TODO set to real sversion + int32_t sversion = 0; + if (pHandle->sver != sversion) { + pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->pBlock->uid, sversion); + + tb_uid_t quid; + STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->pVnodeMeta, pHandle->pBlock->uid); + if (pTbCfg->type == META_CHILD_TABLE) { + quid = pTbCfg->ctbCfg.suid; + } else { + quid = pHandle->pBlock->uid; + } + pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, quid, sversion, true); + pHandle->sver = sversion; } - SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, quid, 0, true); - SArray* pArray = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData)); + STSchema* pTschema = pHandle->pSchema; + SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper; + + int32_t numOfRows = pHandle->pBlock->numOfRows; + int32_t numOfCols = pHandle->pSchema->numOfCols; + int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList); + + SArray* pArray = taosArrayInit(colNumNeed, sizeof(SColumnInfoData)); if (pArray == NULL) { return NULL; } - SColumnInfoData colInfo; - int sz = pSchemaWrapper->nCols * pSchemaWrapper->pSchema->bytes; - colInfo.pData = malloc(sz); - if (colInfo.pData == NULL) { - return NULL; + + int j = 0; + for (int32_t i = 0; i < colNumNeed; i++) { + int32_t colId = *(int32_t*)taosArrayGet(pHandle->pColIdList, i); + while (j < pSchemaWrapper->nCols && pSchemaWrapper->pSchema[j].colId < colId) { + j++; + } + SSchema* pColSchema = &pSchemaWrapper->pSchema[j]; + ASSERT(pColSchema->colId == colId); + SColumnInfoData colInfo = {0}; + int sz = numOfRows * pColSchema->bytes; + colInfo.info.bytes = pColSchema->bytes; + colInfo.info.colId = colId; + colInfo.info.type = pColSchema->type; + + colInfo.pData = calloc(1, sz); + if (colInfo.pData == NULL) { + // TODO free + taosArrayDestroy(pArray); + return NULL; + } + taosArrayPush(pArray, &colInfo); } SMemRow row; - int32_t kvIdx; + int32_t kvIdx = 0; + tInitSubmitBlkIter(pHandle->pBlock, &pHandle->blkIter); while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) { - for (int i = 0; i < pTschema->numOfCols && kvIdx < pTschema->numOfCols; i++) { - // TODO: filter out unused column - STColumn* pCol = schemaColAt(pTschema, i); + // get all wanted col of that block + for (int32_t i = 0; i < colNumNeed; i++) { + SColumnInfoData* pColData = taosArrayGet(pArray, i); + STColumn* pCol = schemaColAt(pTschema, i); + // TODO + ASSERT(pCol->colId == pColData->info.colId); void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx); - // TODO: handle varlen - memcpy(POINTER_SHIFT(colInfo.pData, pCol->offset), val, pCol->bytes); + memcpy(pColData->pData, val, pCol->bytes); } } - taosArrayPush(pArray, &colInfo); return pArray; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 00ce6f0a74322758d9680417ba3029466ecc8045..05da921526580bd5cb69f74499871b4044df6aad 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5070,6 +5070,7 @@ static SSDataBlock* doStreamBlockScan(void* param, bool* newgroup) { SStreamBlockScanInfo* pInfo = pOperator->info; SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; + pBlockInfo->rows = 0; while (tqNextDataBlock(pInfo->readerHandle)) { pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo); if (pTaskInfo->code != TSDB_CODE_SUCCESS) {