diff --git a/include/client/taos.h b/include/client/taos.h index fc71a2dad2583a1a9b66ee7973d3da212d06eba9..3d139ce6d20eebda2f57b2ae4d7362ea65d869c0 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -239,7 +239,6 @@ typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t; typedef struct tmq_conf_t tmq_conf_t; typedef struct tmq_list_t tmq_list_t; -// typedef struct tmq_message_t tmq_message_t; typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *)); @@ -285,12 +284,6 @@ DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf); DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb); -#if 0 -// temporary used function for demo only -void tmqShowMsg(tmq_message_t *tmq_message); -int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message); -#endif - /* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */ DLL_EXPORT char *tmq_get_topic_name(TAOS_RES *res); @@ -301,12 +294,8 @@ DLL_EXPORT char *tmq_get_block_table_name(TAOS_RES *res); #endif #if 0 -DLL_EXPORT TAOS_ROW tmq_get_row(tmq_message_t *message); DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message); DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message); -DLL_EXPORT TAOS_FIELD *tmq_get_fields(tmq_t *tmq, const char *topic); -DLL_EXPORT int32_t tmq_field_count(tmq_t *tmq, const char *topic); -DLL_EXPORT void tmq_message_destroy(TAOS_RES *res); #endif /* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */ #if 0 diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 8a3990e2c335683d9870921901646ab571e26481..51e422f5c2442d3145b82ca2b55dda734ef8234f 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -24,16 +24,6 @@ #include "tqueue.h" #include "tref.h" -#if 0 -struct tmq_message_t { - SMqPollRsp msg; - char* topic; - SArray* res; // SArray - int32_t vgId; - int32_t resIter; -}; -#endif - typedef struct { int8_t tmqRspType; int32_t epoch; @@ -770,105 +760,12 @@ _return: } #endif -static char* formatTimestamp(char* buf, int64_t val, int precision) { - time_t tt; - int32_t ms = 0; - if (precision == TSDB_TIME_PRECISION_NANO) { - tt = (time_t)(val / 1000000000); - ms = val % 1000000000; - } else if (precision == TSDB_TIME_PRECISION_MICRO) { - tt = (time_t)(val / 1000000); - ms = val % 1000000; - } else { - tt = (time_t)(val / 1000); - ms = val % 1000; - } - - /* comment out as it make testcases like select_with_tags.sim fail. - but in windows, this may cause the call to localtime crash if tt < 0, - need to find a better solution. - if (tt < 0) { - tt = 0; - } - */ - -#ifdef WINDOWS - if (tt < 0) tt = 0; -#endif - if (tt <= 0 && ms < 0) { - tt--; - if (precision == TSDB_TIME_PRECISION_NANO) { - ms += 1000000000; - } else if (precision == TSDB_TIME_PRECISION_MICRO) { - ms += 1000000; - } else { - ms += 1000; - } - } - - struct tm* ptm = taosLocalTime(&tt, NULL); - size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm); - - if (precision == TSDB_TIME_PRECISION_NANO) { - sprintf(buf + pos, ".%09d", ms); - } else if (precision == TSDB_TIME_PRECISION_MICRO) { - sprintf(buf + pos, ".%06d", ms); - } else { - sprintf(buf + pos, ".%03d", ms); - } - - return buf; -} #if 0 int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) { if (tmq_message == NULL) return 0; SMqPollRsp* pRsp = &tmq_message->msg; return pRsp->skipLogNum; } - -void tmqShowMsg(tmq_message_t* tmq_message) { - if (tmq_message == NULL) return; - - static bool noPrintSchema; - char pBuf[128]; - SMqPollRsp* pRsp = &tmq_message->msg; - int32_t colNum = 2; - if (!noPrintSchema) { - printf("|"); - for (int32_t i = 0; i < colNum; i++) { - if (i == 0) - printf(" %25s |", pRsp->schema->pSchema[i].name); - else - printf(" %15s |", pRsp->schema->pSchema[i].name); - } - printf("\n"); - printf("===============================================\n"); - noPrintSchema = true; - } - int32_t sz = taosArrayGetSize(pRsp->pBlockData); - for (int32_t i = 0; i < sz; i++) { - SSDataBlock* pDataBlock = taosArrayGet(pRsp->pBlockData, i); - int32_t rows = pDataBlock->info.rows; - for (int32_t j = 0; j < rows; j++) { - printf("|"); - for (int32_t k = 0; k < colNum; k++) { - SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); - void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); - switch (pColInfoData->info.type) { - case TSDB_DATA_TYPE_TIMESTAMP: - formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI); - printf(" %25s |", pBuf); - break; - case TSDB_DATA_TYPE_INT: - case TSDB_DATA_TYPE_UINT: - printf(" %15u |", *(uint32_t*)var); - break; - } - } - printf("\n"); - } - } -} #endif int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { @@ -1049,7 +946,6 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { } tDeleteSMqCMGetSubEpRsp(&rsp); } else { - /*SMqCMGetSubEpRsp* pRsp = taosAllocateQitem(sizeof(SMqCMGetSubEpRsp));*/ SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper)); if (pWrapper == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1208,7 +1104,6 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) { pRspObj->resIter = -1; memcpy(&pRspObj->rsp, &pWrapper->msg, sizeof(SMqDataBlkRsp)); - /*SRetrieveTableRsp* pRetrieve = taosArrayGetP(pWrapper->msg.blockData, 0);*/ pRspObj->resInfo.totalRows = 0; pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI; setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols); @@ -1355,31 +1250,6 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) { } } -#if 0 -tmq_message_t* tmq_consumer_poll_v1(tmq_t* tmq, int64_t blocking_time) { - tmq_message_t* rspMsg = NULL; - int64_t startTime = taosGetTimestampMs(); - - int64_t status = atomic_load_64(&tmq->status); - tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT); - - while (1) { - rspMsg = tmqSyncPollImpl(tmq, blocking_time); - if (rspMsg && rspMsg->consumeRsp.numOfTopics) { - return rspMsg; - } - - if (blocking_time != 0) { - int64_t endTime = taosGetTimestampMs(); - if (endTime - startTime > blocking_time) { - return NULL; - } - } else - return NULL; - } -} -#endif - TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { SMqRspObj* rspObj; int64_t startTime = taosGetTimestampMs(); @@ -1417,137 +1287,10 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { } } -#if 0 - - if (blocking_time <= 0) blocking_time = 1; - if (blocking_time > 1000) blocking_time = 1000; - /*blocking_time = 1;*/ - - if (taosArrayGetSize(tmq->clientTopics) == 0) { - tscDebug("consumer:%ld poll but not assigned", tmq->consumerId); - /*printf("over1\n");*/ - taosMsleep(blocking_time); - return NULL; - } - SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx); - if (taosArrayGetSize(pTopic->vgs) == 0) { - /*printf("over2\n");*/ - taosMsleep(blocking_time); - return NULL; - } - - tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics); - int32_t beginVgIdx = pTopic->nextVgIdx; - while (1) { - pTopic->nextVgIdx = (pTopic->nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs); - SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx); - /*printf("consume vg %d, offset %ld\n", pVg->vgId, pVg->currentOffset);*/ - SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, pTopic, pVg); - if (pReq == NULL) { - ASSERT(false); - taosMsleep(blocking_time); - return NULL; - } - - SMqPollCbParam* param = taosMemoryMalloc(sizeof(SMqPollCbParam)); - if (param == NULL) { - ASSERT(false); - taosMsleep(blocking_time); - return NULL; - } - param->tmq = tmq; - param->retMsg = &tmq_message; - param->pVg = pVg; - tsem_init(¶m->rspSem, 0, 0); - - SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); - pRequest->body.requestMsg = (SDataBuf){ - .pData = pReq, - .len = sizeof(SMqConsumeReq), - .handle = NULL, - }; - - SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); - sendInfo->requestObjRefId = 0; - sendInfo->param = param; - sendInfo->fp = tmqPollCb; - - /*printf("req offset: %ld\n", pReq->offset);*/ - - int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); - tmq->pollCnt++; - - tsem_wait(¶m->rspSem); - tsem_destroy(¶m->rspSem); - taosMemoryFree(param); - - if (tmq_message == NULL) { - if (beginVgIdx == pTopic->nextVgIdx) { - taosMsleep(blocking_time); - } else { - continue; - } - } - - return tmq_message; - } - - /*tsem_wait(&pRequest->body.rspSem);*/ - - /*if (body != NULL) {*/ - /*destroySendMsgInfo(body);*/ - /*}*/ - - /*if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {*/ - /*pRequest->code = terrno;*/ - /*}*/ - - /*return pRequest;*/ -} -#endif - -#if 0 -tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) { - if (tmq_topic_vgroup_list != NULL) { - // TODO - } - - // TODO: change semaphore to gate - for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { - SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); - for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { - SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); - SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, 0, pTopic, pVg); - - SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); - pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq)}; - SMqCommitCbParam* pParam = taosMemoryMalloc(sizeof(SMqCommitCbParam)); - if (pParam == NULL) { - continue; - } - pParam->tmq = tmq; - pParam->pVg = pVg; - pParam->async = async; - if (!async) tsem_init(&pParam->rspSem, 0, 0); - - SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); - sendInfo->requestObjRefId = 0; - sendInfo->param = pParam; - sendInfo->fp = tmqCommitCb; - - int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); - - if (!async) tsem_wait(&pParam->rspSem); - } - } - - return 0; +tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { + // TODO + return TMQ_RESP_ERR__SUCCESS; } -#endif - -tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { return TMQ_RESP_ERR__SUCCESS; } const char* tmq_err2str(tmq_resp_err_t err) { if (err == TMQ_RESP_ERR__SUCCESS) { @@ -1573,10 +1316,3 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) { return -1; } } - -void tmq_message_destroy(TAOS_RES* res) { - if (res == NULL) return; - if (TD_RES_TMQ(res)) { - SMqRspObj* pRspObj = (SMqRspObj*)res; - } -} diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 2b52d333dad3193841ee107c36ccf2e5a68438f3..51ee00d823d31ed503e190f85436057d4a5922b8 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -109,7 +109,8 @@ int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); bool tqNextDataBlock(STqReadHandle *pHandle); -int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, int32_t *pNumOfRows); +int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, int32_t *pNumOfRows, + int32_t *pNumOfCols); // need to reposition diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 5e074eeb12826a65ee01298ac96c681563c9f8c2..6f929ce829f3d00715a128cfb208ab1af7a0ea0a 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -88,22 +88,12 @@ struct STqReadHandle { SSubmitMsgIter msgIter; SSubmitBlkIter blkIter; SMeta* pVnodeMeta; - SArray* pColIdList; // SArray + SArray* pColIdList; // SArray int32_t sver; SSchemaWrapper* pSchemaWrapper; STSchema* pSchema; }; -typedef struct { - int8_t type; - int8_t reserved[7]; - union { - void* data; - int64_t wmTs; - int64_t checkpointId; - }; -} STqStreamToken; - typedef struct { int16_t ver; int16_t action; @@ -155,24 +145,26 @@ typedef struct { char subKey[TSDB_SUBSCRIBE_KEY_LEN]; int64_t consumerId; int32_t epoch; + int8_t subType; + int8_t withTbName; + int8_t withSchema; + int8_t withTag; + int8_t withTagSchema; char* qmsg; // SRWLatch lock; - SWalReadHandle* pReadHandle; + SWalReadHandle* pWalReader; // number should be identical to fetch thread num - qTaskInfo_t task[4]; + STqReadHandle* pStreamReader[4]; + qTaskInfo_t task[4]; } STqExec; struct STQ { - // the collection of groups - // the handle of meta kvstore - bool writeTrigger; - char* path; - STqMetaStore* tqMeta; - SHashObj* tqMetaNew; // subKey -> tqExec - SHashObj* pStreamTasks; - SVnode* pVnode; - SWal* pWal; - SMeta* pVnodeMeta; + char* path; + // STqMetaStore* tqMeta; + SHashObj* execs; // subKey -> tqExec + SHashObj* pStreamTasks; + SVnode* pVnode; + SWal* pWal; }; typedef struct { @@ -252,7 +244,7 @@ int tqInit(); void tqCleanUp(); // open in each vnode -STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pMeta, SMemAllocatorFactory* allocFac); +STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal); void tqClose(STQ*); // required by vnode int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 510dd324599e44858304c3707f5b1005d1b77a2b..30f75218b42252bca478a9e30c6ad996d273ea00 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -19,7 +19,7 @@ int32_t tqInit() { return tqPushMgrInit(); } void tqCleanUp() { tqPushMgrCleanUp(); } -STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, SMemAllocatorFactory* allocFac) { +STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { STQ* pTq = taosMemoryMalloc(sizeof(STQ)); if (pTq == NULL) { terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; @@ -28,15 +28,16 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, SMe pTq->path = strdup(path); pTq->pVnode = pVnode; pTq->pWal = pWal; - pTq->pVnodeMeta = pVnodeMeta; +#if 0 pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer, (FTqDelete)taosMemoryFree, 0); if (pTq->tqMeta == NULL) { taosMemoryFree(pTq); return NULL; } +#endif - pTq->tqMetaNew = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); + pTq->execs = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); @@ -104,7 +105,11 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t versi return 0; } -int tqCommit(STQ* pTq) { return tqStorePersist(pTq->tqMeta); } +int tqCommit(STQ* pTq) { + // do nothing + /*return tqStorePersist(pTq->tqMeta);*/ + return 0; +} int32_t tqGetTopicHandleSize(const STqTopic* pTopic) { return strlen(pTopic->topicName) + strlen(pTopic->sql) + strlen(pTopic->physicalPlan) + strlen(pTopic->qmsg) + @@ -219,10 +224,10 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu } for (int j = 0; j < TQ_BUFFER_SIZE; j++) { pTopic->buffer.output[j].status = 0; - STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta); + STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); SReadHandle handle = { .reader = pReadHandle, - .meta = pTq->pVnodeMeta, + .meta = pTq->pVnode->pMeta, }; pTopic->buffer.output[j].pReadHandle = pReadHandle; pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle); @@ -238,6 +243,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { int32_t reqEpoch = pReq->epoch; int64_t fetchOffset; + // get offset to fetch message if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) { fetchOffset = walGetFirstVer(pTq->pWal); } else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) { @@ -249,7 +255,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset); - STqExec* pExec = taosHashGet(pTq->tqMetaNew, pReq->subKey, strlen(pReq->subKey)); + STqExec* pExec = taosHashGet(pTq->execs, pReq->subKey, strlen(pReq->subKey)); ASSERT(pExec); int32_t consumerEpoch = atomic_load_32(&pExec->epoch); @@ -271,7 +277,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } SWalReadHead* pHead; - if (walReadWithHandle_s(pExec->pReadHandle, fetchOffset, &pHead) < 0) { + if (walReadWithHandle_s(pExec->pWalReader, fetchOffset, &pHead) < 0) { // TODO: no more log, set timer to wait blocking time // if data inserted during waiting, launch query and // response to user @@ -285,41 +291,73 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { if (pHead->msgType == TDMT_VND_SUBMIT) { SSubmitReq* pCont = (SSubmitReq*)&pHead->body; - qTaskInfo_t task = pExec->task[workerId]; - ASSERT(task); - qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK); - while (1) { - SSDataBlock* pDataBlock = NULL; - uint64_t ts = 0; - if (qExecTask(task, &pDataBlock, &ts) < 0) { - ASSERT(0); + if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { + qTaskInfo_t task = pExec->task[workerId]; + ASSERT(task); + qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK); + while (1) { + SSDataBlock* pDataBlock = NULL; + uint64_t ts = 0; + if (qExecTask(task, &pDataBlock, &ts) < 0) { + ASSERT(0); + } + if (pDataBlock == NULL) break; + + ASSERT(pDataBlock->info.rows != 0); + ASSERT(pDataBlock->info.numOfCols != 0); + + int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pDataBlock); + void* buf = taosMemoryCalloc(1, dataStrLen); + SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; + pRetrieve->useconds = ts; + pRetrieve->precision = TSDB_DEFAULT_PRECISION; + pRetrieve->compressed = 0; + pRetrieve->completed = 1; + pRetrieve->numOfRows = htonl(pDataBlock->info.rows); + + // TODO enable compress + int32_t actualLen = 0; + blockCompressEncode(pDataBlock, pRetrieve->data, &actualLen, pDataBlock->info.numOfCols, false); + actualLen += sizeof(SRetrieveTableRsp); + ASSERT(actualLen <= dataStrLen); + taosArrayPush(rsp.blockDataLen, &actualLen); + taosArrayPush(rsp.blockData, &buf); + rsp.blockNum++; } - if (pDataBlock == NULL) break; - - ASSERT(pDataBlock->info.rows != 0); - ASSERT(pDataBlock->info.numOfCols != 0); - - int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pDataBlock); - void* buf = taosMemoryCalloc(1, dataStrLen); - SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; - pRetrieve->useconds = ts; - pRetrieve->precision = TSDB_DEFAULT_PRECISION; - pRetrieve->compressed = 0; - pRetrieve->completed = 1; - pRetrieve->numOfRows = htonl(pDataBlock->info.rows); - - // TODO enable compress - int32_t actualLen = 0; - blockCompressEncode(pDataBlock, pRetrieve->data, &actualLen, pDataBlock->info.numOfCols, false); - actualLen += sizeof(SRetrieveTableRsp); - ASSERT(actualLen <= dataStrLen); - taosArrayPush(rsp.blockDataLen, &actualLen); - taosArrayPush(rsp.blockData, &buf); - rsp.blockNum++; + } else if (pExec->subType == TOPIC_SUB_TYPE__DB) { + STqReadHandle* pReader = pExec->pStreamReader[workerId]; + tqReadHandleSetMsg(pReader, pCont, 0); + while (tqNextDataBlock(pReader)) { + SSDataBlock block = {0}; + if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.rows, + &block.info.numOfCols) < 0) { + ASSERT(0); + } + int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(&block); + void* buf = taosMemoryCalloc(1, dataStrLen); + SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; + /*pRetrieve->useconds = 0;*/ + pRetrieve->precision = TSDB_DEFAULT_PRECISION; + pRetrieve->compressed = 0; + pRetrieve->completed = 1; + pRetrieve->numOfRows = htonl(block.info.rows); + + // TODO enable compress + int32_t actualLen = 0; + blockCompressEncode(&block, pRetrieve->data, &actualLen, block.info.numOfCols, false); + actualLen += sizeof(SRetrieveTableRsp); + ASSERT(actualLen <= dataStrLen); + taosArrayPush(rsp.blockDataLen, &actualLen); + taosArrayPush(rsp.blockData, &buf); + rsp.blockNum++; + } + } else { + ASSERT(0); } } - // TODO batch optimization + // TODO batch optimization: + // TODO continue scan until meeting batch requirement if (rsp.blockNum != 0) break; rsp.skipLogNum++; fetchOffset++; @@ -572,10 +610,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { // TODO: persist meta into tdb int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { - SMqRebVgReq req; + SMqRebVgReq req = {0}; tDecodeSMqRebVgReq(msg, &req); // todo lock - STqExec* pExec = taosHashGet(pTq->tqMetaNew, req.subKey, strlen(req.subKey)); + STqExec* pExec = taosHashGet(pTq->execs, req.subKey, strlen(req.subKey)); if (pExec == NULL) { ASSERT(req.oldConsumerId == -1); ASSERT(req.newConsumerId != -1); @@ -586,19 +624,27 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { memcpy(pExec->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN); pExec->consumerId = req.newConsumerId; pExec->epoch = -1; + + pExec->subType = req.subType; + pExec->withTbName = req.withTbName; + pExec->withSchema = req.withSchema; + pExec->withTag = req.withTag; + pExec->withTagSchema = req.withTagSchema; + pExec->qmsg = req.qmsg; req.qmsg = NULL; - pExec->pReadHandle = walOpenReadHandle(pTq->pVnode->pWal); + + pExec->pWalReader = walOpenReadHandle(pTq->pVnode->pWal); for (int32_t i = 0; i < 4; i++) { - STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta); - SReadHandle handle = { - .reader = pReadHandle, - .meta = pTq->pVnodeMeta, + pExec->pStreamReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); + SReadHandle handle = { + .reader = pExec->pStreamReader[i], + .meta = pTq->pVnode->pMeta, }; pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle); ASSERT(pExec->task[i]); } - taosHashPut(pTq->tqMetaNew, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec)); + taosHashPut(pTq->execs, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec)); return 0; } else { /*if (req.newConsumerId != -1) {*/ @@ -627,12 +673,12 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { return -1; } for (int32_t i = 0; i < parallel; i++) { - STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta); + STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); SReadHandle handle = { - .reader = pReadHandle, - .meta = pTq->pVnodeMeta, + .reader = pStreamReader, + .meta = pTq->pVnode->pMeta, }; - pTask->exec.runners[i].inputHandle = pReadHandle; + pTask->exec.runners[i].inputHandle = pStreamReader; pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); ASSERT(pTask->exec.runners[i].executor); } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 02ce6c4aad299c1c569b76a581691bbb91bd1a49..7b21d3342de0171f7643e5e04809dac4e088845a 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -82,7 +82,8 @@ bool tqNextDataBlock(STqReadHandle* pHandle) { return false; } -int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* pGroupId, int32_t* pNumOfRows) { +int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* pGroupId, int32_t* pNumOfRows, + int32_t* pNumOfCols) { /*int32_t sversion = pHandle->pBlock->sversion;*/ // TODO set to real sversion int32_t sversion = 0; @@ -104,7 +105,6 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper; *pNumOfRows = pHandle->pBlock->numOfRows; - /*int32_t numOfCols = pHandle->pSchema->numOfCols;*/ int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList); if (colNumNeed > pSchemaWrapper->nCols) { @@ -142,6 +142,7 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p } int32_t colActual = taosArrayGetSize(*ppCols); + *pNumOfCols = colActual; // TODO in stream shuffle case, fetch groupId *pGroupId = 0; diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 9e4aa714e29ded8854c0d2602561e14162acbb0b..44af83791a642ce31b4541119671626f71ce8625 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -112,7 +112,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { // open tq sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR); - pVnode->pTq = tqOpen(tdir, pVnode, pVnode->pWal, pVnode->pMeta, vBufPoolGetMAF(pVnode)); + pVnode->pTq = tqOpen(tdir, pVnode, pVnode->pWal); if (pVnode->pTq == NULL) { vError("vgId: %d failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b054bbfcb6eb77dd250510b82843e998c8bdb765..228b42f7f6f61911bd82513e3a19fdc24220fed2 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -561,7 +561,8 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) SArray* pCols = NULL; uint64_t groupId; int32_t numOfRows; - int32_t code = tqRetrieveDataBlock(&pCols, pInfo->readerHandle, &groupId, &numOfRows); + int32_t numOfCols; + int32_t code = tqRetrieveDataBlock(&pCols, pInfo->readerHandle, &groupId, &numOfRows, &numOfCols); if (code != TSDB_CODE_SUCCESS || numOfRows == 0) { pTaskInfo->code = code;