diff --git a/include/client/taos.h b/include/client/taos.h index 0fd2fd8df9c49f0352dbe89f50e678b9f7fbcf20..d3856d432e76a5a9667b0b22f5710509b0c01ba8 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -152,34 +152,34 @@ DLL_EXPORT void taos_close(TAOS *taos); const char *taos_data_type(int type); -DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos); -DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length); -DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags); -DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name); -DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name); - -DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert); -DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums); -DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes); -DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind); -DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind); -DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx); -DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt); -DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt); -DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt); -DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt); -DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt); -DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt); - -DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql); -DLL_EXPORT TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen); - -DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res); -DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result -DLL_EXPORT void taos_free_result(TAOS_RES *res); -DLL_EXPORT int taos_field_count(TAOS_RES *res); -DLL_EXPORT int taos_num_fields(TAOS_RES *res); -DLL_EXPORT int taos_affected_rows(TAOS_RES *res); +DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos); +DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length); +DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags); +DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name); +DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name); + +DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert); +DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums); +DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes); +DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind); +DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind); +DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx); +DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt); +DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt); +DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt); +DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt); +DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt); +DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt); + +DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql); +DLL_EXPORT TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen); + +DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res); +DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result +DLL_EXPORT void taos_free_result(TAOS_RES *res); +DLL_EXPORT int taos_field_count(TAOS_RES *res); +DLL_EXPORT int taos_num_fields(TAOS_RES *res); +DLL_EXPORT int taos_affected_rows(TAOS_RES *res); DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res); DLL_EXPORT int taos_select_db(TAOS *taos, const char *db); @@ -188,11 +188,14 @@ DLL_EXPORT void taos_stop_query(TAOS_RES *res); DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col); DLL_EXPORT bool taos_is_update_query(TAOS_RES *res); DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows); +DLL_EXPORT int taos_fetch_block_s(TAOS_RES *res, int* numOfRows, TAOS_ROW *rows); +DLL_EXPORT int taos_fetch_raw_block(TAOS_RES *res, int* numOfRows, void** pData); +DLL_EXPORT int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex); DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql); DLL_EXPORT void taos_reset_current_db(TAOS *taos); -DLL_EXPORT int *taos_fetch_lengths(TAOS_RES *res); -DLL_EXPORT TAOS_ROW *taos_result_block(TAOS_RES *res); +DLL_EXPORT int *taos_fetch_lengths(TAOS_RES *res); +DLL_EXPORT TAOS_ROW *taos_result_block(TAOS_RES *res); DLL_EXPORT const char *taos_get_server_info(TAOS *taos); DLL_EXPORT const char *taos_get_client_info(); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index fc7e994cbb84a1761641cd16a3e1aaf68e3a1045..a8361582bac41278e958106977cbb3f6bf6ef9f0 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1827,6 +1827,7 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) { typedef struct { int64_t leftForVer; int32_t vgId; + int32_t epoch; int64_t consumerId; char topicName[TSDB_TOPIC_FNAME_LEN]; char cgroup[TSDB_CGROUP_LEN]; @@ -1840,6 +1841,7 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pReq->leftForVer); tlen += taosEncodeFixedI32(buf, pReq->vgId); + tlen += taosEncodeFixedI32(buf, pReq->epoch); tlen += taosEncodeFixedI64(buf, pReq->consumerId); tlen += taosEncodeString(buf, pReq->topicName); tlen += taosEncodeString(buf, pReq->cgroup); @@ -1853,6 +1855,7 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { buf = taosDecodeFixedI64(buf, &pReq->leftForVer); buf = taosDecodeFixedI32(buf, &pReq->vgId); + buf = taosDecodeFixedI32(buf, &pReq->epoch); buf = taosDecodeFixedI64(buf, &pReq->consumerId); buf = taosDecodeStringTo(buf, pReq->topicName); buf = taosDecodeStringTo(buf, pReq->cgroup); @@ -1868,6 +1871,7 @@ typedef struct { int32_t vgId; int64_t oldConsumerId; int64_t newConsumerId; + char* topic; } SMqMVRebReq; static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pReq) { @@ -1876,6 +1880,7 @@ static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pR tlen += taosEncodeFixedI32(buf, pReq->vgId); tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId); tlen += taosEncodeFixedI64(buf, pReq->newConsumerId); + tlen += taosEncodeString(buf, pReq->topic); return tlen; } @@ -1884,6 +1889,7 @@ static FORCE_INLINE void* tDecodeSMqMVRebReq(void* buf, SMqMVRebReq* pReq) { buf = taosDecodeFixedI32(buf, &pReq->vgId); buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId); buf = taosDecodeFixedI64(buf, &pReq->newConsumerId); + buf = taosDecodeString(buf, &pReq->topic); return buf; } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index b2c32a4366f473da0f43d9f1392837f827209b78..e10cf5179e4da9af768141ee8b37ca48545f4895 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -385,11 +385,20 @@ bool taos_is_update_query(TAOS_RES *res) { } int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { - if (res == NULL) { + int32_t numOfRows = 0; + /*int32_t code = */taos_fetch_block_s(res, &numOfRows, rows); + return numOfRows; +} + +int taos_fetch_block_s(TAOS_RES *res, int* numOfRows, TAOS_ROW *rows) { + SRequestObj *pRequest = (SRequestObj *)res; + if (pRequest == NULL) { return 0; } - SRequestObj *pRequest = (SRequestObj *)res; + (*rows) = NULL; + (*numOfRows) = 0; + if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT || pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) { return 0; @@ -400,9 +409,51 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { // TODO refactor SReqResultInfo *pResultInfo = &pRequest->body.resInfo; pResultInfo->current = pResultInfo->numOfRows; - *rows = pResultInfo->row; - return pResultInfo->numOfRows; + (*rows) = pResultInfo->row; + (*numOfRows) = pResultInfo->numOfRows; + return pRequest->code; +} + +int taos_fetch_raw_block(TAOS_RES *res, int* numOfRows, void** pData) { + SRequestObj *pRequest = (SRequestObj *)res; + if (pRequest == NULL) { + return 0; + } + + if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT || + pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) { + return 0; + } + + doFetchRow(pRequest, false); + + SReqResultInfo *pResultInfo = &pRequest->body.resInfo; + + pResultInfo->current = pResultInfo->numOfRows; + (*numOfRows) = pResultInfo->numOfRows; + (*pData) = (void*) pResultInfo->pData; + + return 0; +} + +int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { + SRequestObj *pRequest = (SRequestObj *)res; + if (pRequest == NULL) { + return 0; + } + + int32_t numOfFields = taos_num_fields(pRequest); + if (columnIndex < 0 || columnIndex >= numOfFields || numOfFields == 0) { + return 0; + } + + TAOS_FIELD* pField = &pRequest->body.resInfo.userFields[columnIndex]; + if (!IS_VAR_DATA_TYPE(pField->type)) { + return 0; + } + + return pRequest->body.resInfo.pCol[columnIndex].offset; } int taos_validate_sql(TAOS *taos, const char *sql) { return true; } diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 602ecdf6ab50981b87355d05f6030d7613579a4f..a6e5fee2d17fc5fdab7e2f1f80e98e4b650dab4c 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -108,7 +108,7 @@ typedef struct { // connection info int32_t vgId; int32_t vgStatus; - int64_t skipCnt; + int32_t vgSkipCnt; SEpSet epSet; } SMqClientVg; @@ -849,7 +849,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { if (msgEpoch < tmqEpoch) { /*printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);*/ /*tsem_post(&tmq->rspSem);*/ - tscWarn("discard rsp from vg %d, epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch); + tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch); return 0; } @@ -881,6 +881,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { /*SMqConsumeRsp* pRsp = taosMemoryCalloc(1, sizeof(SMqConsumeRsp));*/ tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t)); if (pRsp == NULL) { + tscWarn("msg discard from vg %d, epoch %d since out of memory", pParam->vgId, pParam->epoch); goto CREATE_MSG_FAIL; } memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); @@ -969,14 +970,14 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { offset = *pOffset; tscDebug("consumer %ld epoch %d vg %d found %s", tmq->consumerId, epoch, pVgEp->vgId, vgKey); } - tscDebug("consumer %ld epoch %d vg %d offset set to %ld\n", tmq->consumerId, epoch, pVgEp->vgId, offset); + tscDebug("consumer %ld epoch %d vg %d offset set to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset); SMqClientVg clientVg = { .pollCnt = 0, .currentOffset = offset, .vgId = pVgEp->vgId, .epSet = pVgEp->epSet, .vgStatus = TMQ_VG_STATUS__IDLE, - .skipCnt = 0, + .vgSkipCnt = 0, }; taosArrayPush(topic.vgs, &clientVg); set = true; @@ -1232,9 +1233,10 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); if (vgStatus != TMQ_VG_STATUS__IDLE) { - int64_t skipCnt = atomic_add_fetch_64(&pVg->skipCnt, 1); - tscDebug("consumer %ld epoch %d skip vg %d skip cnt %ld", tmq->consumerId, tmq->epoch, pVg->vgId, skipCnt); + int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1); + tscDebug("consumer %ld epoch %d skip vg %d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt); continue; + /*if (vgSkipCnt < 10000) continue;*/ #if 0 if (skipCnt < 30000) { continue; @@ -1243,7 +1245,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { } #endif } - atomic_store_64(&pVg->skipCnt, 0); + atomic_store_32(&pVg->vgSkipCnt, 0); SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg); if (pReq == NULL) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); @@ -1409,6 +1411,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { if (blocking_time != 0) { int64_t endTime = taosGetTimestampMs(); if (endTime - startTime > blocking_time) { + tscDebug("consumer %ld (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch); return NULL; } } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index f418ec22904a5e5ddd9150fd330bb0bdf02cfd67..38ef1185e845eb806441a4f249518207d4b3ebfc 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -413,6 +413,7 @@ typedef struct { typedef struct { int32_t vgId; // -1 for unassigned int32_t status; + int32_t epoch; SEpSet epSet; int64_t oldConsumerId; int64_t consumerId; // -1 for unassigned @@ -423,6 +424,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId); tlen += taosEncodeFixedI32(buf, pConsumerEp->status); + tlen += taosEncodeFixedI32(buf, pConsumerEp->epoch); tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet); tlen += taosEncodeFixedI64(buf, pConsumerEp->oldConsumerId); tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId); @@ -433,6 +435,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) { buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId); buf = taosDecodeFixedI32(buf, &pConsumerEp->status); + buf = taosDecodeFixedI32(buf, &pConsumerEp->epoch); buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet); buf = taosDecodeFixedI64(buf, &pConsumerEp->oldConsumerId); buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 57d98396eabfdfae67a9be748d2a07ea68a53387..2e297865a0ab8cc6efc264b4ccdb3ff4ee6611df 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -493,6 +493,8 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { pConsumerEp->oldConsumerId = pConsumerEp->consumerId; pConsumerEp->consumerId = pSubConsumer->consumerId; + //TODO + pConsumerEp->epoch = 0; taosArrayPush(pSubConsumer->vgInfo, pConsumerEp); if (pConsumerEp->oldConsumerId == -1) { diff --git a/source/dnode/vnode/src/inc/tqInt.h b/source/dnode/vnode/src/inc/tqInt.h index f0c3f6801a189acd7b06039f9474a7527c895931..bb42151cf39375f217f717cf9e090e297f9f5f87 100644 --- a/source/dnode/vnode/src/inc/tqInt.h +++ b/source/dnode/vnode/src/inc/tqInt.h @@ -206,7 +206,7 @@ typedef struct { typedef struct { int64_t consumerId; - int64_t epoch; + int32_t epoch; char cgroup[TSDB_TOPIC_FNAME_LEN]; SArray* topics; // SArray } STqConsumer; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 68ade46efdb06d6de6a25fdd017594da8769d029..602e9047b39fd049ac7d555d5b17d113dfbd3fb7 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -167,7 +167,7 @@ static FORCE_INLINE int32_t tEncodeSTqConsumer(void** buf, const STqConsumer* pC int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pConsumer->consumerId); - tlen += taosEncodeFixedI64(buf, pConsumer->epoch); + tlen += taosEncodeFixedI32(buf, pConsumer->epoch); tlen += taosEncodeString(buf, pConsumer->cgroup); sz = taosArrayGetSize(pConsumer->topics); tlen += taosEncodeFixedI32(buf, sz); @@ -182,7 +182,7 @@ static FORCE_INLINE const void* tDecodeSTqConsumer(const void* buf, STqConsumer* int32_t sz; buf = taosDecodeFixedI64(buf, &pConsumer->consumerId); - buf = taosDecodeFixedI64(buf, &pConsumer->epoch); + buf = taosDecodeFixedI32(buf, &pConsumer->epoch); buf = taosDecodeStringTo(buf, pConsumer->cgroup); buf = taosDecodeFixedI32(buf, &sz); pConsumer->topics = taosArrayInit(sz, sizeof(STqTopic)); @@ -255,6 +255,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { int64_t consumerId = pReq->consumerId; int64_t fetchOffset; int64_t blockingTime = pReq->blockingTime; + int32_t reqEpoch = pReq->epoch; if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) { fetchOffset = 0; @@ -264,7 +265,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { fetchOffset = pReq->currentOffset + 1; } - /*printf("tmq poll vg %d req %ld %ld\n", pTq->pVnode->vgId, pReq->currentOffset, fetchOffset);*/ + vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, pTq->pVnode->vgId, pReq->currentOffset, fetchOffset); SMqPollRsp rsp = { /*.consumerId = consumerId,*/ @@ -274,6 +275,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); if (pConsumer == NULL) { + vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, pTq->pVnode->vgId); pMsg->pCont = NULL; pMsg->contLen = 0; pMsg->code = -1; @@ -281,30 +283,57 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { return 0; } + int32_t consumerEpoch = atomic_load_32(&pConsumer->epoch); + while (consumerEpoch < reqEpoch) { + consumerEpoch = atomic_val_compare_exchange_32(&pConsumer->epoch, consumerEpoch, reqEpoch); + } + + STqTopic* pTopic = NULL; int sz = taosArrayGetSize(pConsumer->topics); - ASSERT(sz == 1); - STqTopic* pTopic = taosArrayGet(pConsumer->topics, 0); - ASSERT(strcmp(pTopic->topicName, pReq->topic) == 0); - ASSERT(pConsumer->consumerId == consumerId); + for (int32_t i = 0; i < sz; i++) { + STqTopic* topic = taosArrayGet(pConsumer->topics, i); + //TODO race condition + ASSERT(pConsumer->consumerId == consumerId); + if (strcmp(topic->topicName, pReq->topic) == 0) { + pTopic = topic; + break; + } + } + if (pTopic == NULL) { + vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic, pTq->pVnode->vgId); + pMsg->pCont = NULL; + pMsg->contLen = 0; + pMsg->code = -1; + tmsgSendRsp(pMsg); + return 0; + } + + vDebug("poll topic %s from consumer %ld (epoch %d)", pTopic->topicName, consumerId, pReq->epoch); rsp.reqOffset = pReq->currentOffset; rsp.skipLogNum = 0; while (1) { /*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/ + //TODO + consumerEpoch = atomic_load_32(&pConsumer->epoch); + if (consumerEpoch > pReq->epoch) { + //TODO: return + break; + } SWalReadHead* pHead; if (walReadWithHandle_s(pTopic->pReadhandle, fetchOffset, &pHead) < 0) { // TODO: no more log, set timer to wait blocking time // if data inserted during waiting, launch query and // response to user + vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset); break; } - /*printf("vg %d offset %ld msgType %d from epoch %d\n", pTq->pVnode->vgId, fetchOffset, pHead->msgType, pReq->epoch);*/ + vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, pHead->msgType); /*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/ /*pHead = pTopic->pReadhandle->pHead;*/ if (pHead->msgType == TDMT_VND_SUBMIT) { SSubmitReq* pCont = (SSubmitReq*)&pHead->body; - /*printf("from topic %s from consumer\n", pTopic->topicName, consumerId);*/ qTaskInfo_t task = pTopic->buffer.output[workerId].task; ASSERT(task); qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK); @@ -324,6 +353,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } if (taosArrayGetSize(pRes) == 0) { + vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset); fetchOffset++; rsp.skipLogNum++; taosArrayDestroy(pRes); @@ -352,7 +382,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->pCont = buf; pMsg->contLen = tlen; pMsg->code = 0; - /*printf("vg %d offset %ld msgType %d from epoch %d actual rsp\n", pTq->pVnode->vgId, fetchOffset, pHead->msgType, pReq->epoch);*/ + vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset, pHead->msgType, consumerId, pReq->epoch); tmsgSendRsp(pMsg); taosMemoryFree(pHead); return 0; @@ -383,7 +413,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->contLen = tlen; pMsg->code = 0; tmsgSendRsp(pMsg); - /*printf("vg %d offset %ld from epoch %d not rsp\n", pTq->pVnode->vgId, fetchOffset, pReq->epoch);*/ + vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId, pReq->epoch); /*}*/ return 0; @@ -393,6 +423,7 @@ int32_t tqProcessRebReq(STQ* pTq, char* msg) { SMqMVRebReq req = {0}; tDecodeSMqMVRebReq(msg, &req); + vDebug("vg %d set from consumer %ld to consumer %ld", req.vgId, req.oldConsumerId ,req.newConsumerId); STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId); ASSERT(pConsumer); pConsumer->consumerId = req.newConsumerId; @@ -407,18 +438,20 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { SMqSetCVgReq req = {0}; tDecodeSMqSetCVgReq(msg, &req); - /*printf("vg %d set to consumer from %ld to %ld\n", req.vgId, req.oldConsumerId, req.newConsumerId);*/ - STqConsumer* pConsumer = taosMemoryCalloc(1, sizeof(STqConsumer)); + vDebug("vg %d set to consumer %ld", req.vgId, req.consumerId); + STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.consumerId); if (pConsumer == NULL) { - terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; - return -1; + pConsumer = taosMemoryCalloc(1, sizeof(STqConsumer)); + if (pConsumer == NULL) { + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; + return -1; + } + strcpy(pConsumer->cgroup, req.cgroup); + pConsumer->topics = taosArrayInit(0, sizeof(STqTopic)); + pConsumer->consumerId = req.consumerId; + pConsumer->epoch = 0; } - strcpy(pConsumer->cgroup, req.cgroup); - pConsumer->topics = taosArrayInit(0, sizeof(STqTopic)); - pConsumer->consumerId = req.consumerId; - pConsumer->epoch = 0; - STqTopic* pTopic = taosMemoryCalloc(1, sizeof(STqTopic)); if (pTopic == NULL) { taosArrayDestroy(pConsumer->topics); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index aa018aa84fad283e1a558460e10cd4dcba8431cf..57edc4000723e90164d67a6a7ae8500fff7472ee 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -625,24 +625,6 @@ typedef struct SSortOperatorInfo { uint64_t totalElapsed; // total elapsed time } SSortOperatorInfo; -typedef struct SDistinctDataInfo { - int32_t index; - int32_t type; - int32_t bytes; -} SDistinctDataInfo; - -typedef struct SDistinctOperatorInfo { - SHashObj* pSet; - SSDataBlock* pRes; - bool recordNullVal; // has already record the null value, no need to try again -// int64_t threshold; // todo remove it -// int64_t outputCapacity;// todo remove it -// int32_t totalBytes; // todo remove it - SResultInfo resInfo; - char* buf; - SArray* pDistinctDataInfo; -} SDistinctOperatorInfo; - int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); void operatorDummyCloseFn(void* param, int32_t numOfCols); int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); @@ -659,6 +641,8 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI uint64_t* total, SArray* pColList); void doSetOperatorCompleted(SOperatorInfo* pOperator); void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock); +SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity); +SSDataBlock* loadNextDataBlock(void* param); SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime, @@ -682,8 +666,10 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock, int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createDistinctOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResultBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); + +#if 0 SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); @@ -705,6 +691,7 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, int32_t numOfOutput); +#endif void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 630e6d71a38f4093e563237f22a94650969a1eb7..10fa5a23b40ca7cf33823a52136edbadabd7317e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1629,7 +1629,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe // window start(end) key interpolation doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep, pInfo->order, false); - updateTimeWindowInfo(&pInfo->timeWindowData, &win, true); + updateTimeWindowInfo(&pInfo->timeWindowData, &nextWin, true); doApplyFunctions(pInfo->binfo.pCtx, &nextWin, &pInfo->timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } @@ -3295,6 +3295,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) { SFilterInfo* filter = NULL; + // todo move to the initialization function int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0); SFilterColumnParam param1 = {.numOfCols = pBlock->info.numOfCols, .pDataBlock = pBlock->pDataBlock}; @@ -4755,8 +4756,7 @@ static void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHan pBlock->info.rows += 1; } -static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, bool hasVarCol, - int32_t capacity) { +SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity) { blockDataCleanup(pDataBlock); while (1) { @@ -4777,7 +4777,6 @@ static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataB SSDataBlock* loadNextDataBlock(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*)param; bool newgroup = false; - return pOperator->getNextFn(pOperator, &newgroup); } @@ -4957,7 +4956,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator, bool* newgroup) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SSortedMergeOperatorInfo* pInfo = pOperator->info; if (pOperator->status == OP_RES_TO_RETURN) { - return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->hasVarCol, pInfo->binfo.capacity); + return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->binfo.capacity); } int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; @@ -5102,15 +5101,14 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator, bool* newgroup) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SSortOperatorInfo* pInfo = pOperator->info; - bool hasVarCol = pInfo->pDataBlock->info.hasVarCol; if (pOperator->status == OP_RES_TO_RETURN) { - return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, hasVarCol, pInfo->numOfRowsInRes); + return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes); } int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize, numOfBufPage, - pInfo->pDataBlock, "GET_TASKID(pTaskInfo)"); + pInfo->pDataBlock, pTaskInfo->id.str); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock); @@ -5124,38 +5122,40 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator, bool* newgroup) { } pOperator->status = OP_RES_TO_RETURN; - return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, hasVarCol, pInfo->numOfRowsInRes); + return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes); } -SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, - SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo) { SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { - taosMemoryFreeClear(pInfo); - taosMemoryFreeClear(pOperator); - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; + goto _error; } - pInfo->sortBufSize = 1024 * 16; // 1MB, TODO dynamic set the available sort buffer - pInfo->bufPageSize = 1024; - pInfo->numOfRowsInRes = 1024; - pInfo->pDataBlock = pResBlock; - pInfo->pSortInfo = pSortInfo; + pInfo->sortBufSize = 1024 * 16; // TODO dynamic set the available sort buffer + pInfo->bufPageSize = 1024; + pInfo->numOfRowsInRes = 1024; + pInfo->pDataBlock = pResBlock; + pInfo->pSortInfo = pSortInfo; - pOperator->name = "Sort"; + pOperator->name = "SortOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; pOperator->blockingOptr = true; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; - pOperator->getNextFn = doSort; - pOperator->closeFn = destroyOrderOperatorInfo; + pOperator->pTaskInfo = pTaskInfo; + pOperator->getNextFn = doSort; + pOperator->closeFn = destroyOrderOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; + + _error: + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pInfo); + taosMemoryFree(pOperator); + return NULL; } static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { return pTableScanInfo->order; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index a33a240328214a1013f18ff7bbf86b8b069743e3..b3a8e09f166e05a490fb865e298cdf361473faa1 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -131,7 +131,7 @@ static void recordNewGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, } } -static int32_t buildGroupValKey(void* pKey, int32_t* length, SArray* pGroupColVals) { +static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) { ASSERT(pKey != NULL); size_t numOfGroupCols = taosArrayGetSize(pGroupColVals); @@ -155,8 +155,7 @@ static int32_t buildGroupValKey(void* pKey, int32_t* length, SArray* pGroupColVa } } - *length = (pStart - (char*)pKey); - return 0; + return (int32_t) (pStart - (char*)pKey); } // assign the group keys or user input constant values if required @@ -217,7 +216,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { continue; } - /*int32_t ret = */ buildGroupValKey(pInfo->keyBuf, &len, pInfo->pGroupColVals); + len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); @@ -233,7 +232,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } if (num > 0) { - /*int32_t ret = */ buildGroupValKey(pInfo->keyBuf, &len, pInfo->pGroupColVals); + len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); @@ -346,162 +345,66 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx return pOperator; _error: + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); return NULL; } -#define MULTI_KEY_DELIM "-" - -static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { - SDistinctOperatorInfo* pInfo = (SDistinctOperatorInfo*)param; - taosHashCleanup(pInfo->pSet); - taosMemoryFreeClear(pInfo->buf); - taosArrayDestroy(pInfo->pDistinctDataInfo); - pInfo->pRes = blockDataDestroy(pInfo->pRes); -} - -static void buildMultiDistinctKey(SDistinctOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowId) { - char* p = pInfo->buf; -// memset(p, 0, pInfo->totalBytes); - - for (int i = 0; i < taosArrayGetSize(pInfo->pDistinctDataInfo); i++) { - SDistinctDataInfo* pDistDataInfo = (SDistinctDataInfo*)taosArrayGet(pInfo->pDistinctDataInfo, i); - SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index); - - char* val = ((char*)pColDataInfo->pData) + pColDataInfo->info.bytes * rowId; - if (isNull(val, pDistDataInfo->type)) { - p += pDistDataInfo->bytes; - continue; - } - if (IS_VAR_DATA_TYPE(pDistDataInfo->type)) { - memcpy(p, varDataVal(val), varDataLen(val)); - p += varDataLen(val); - } else { - memcpy(p, val, pDistDataInfo->bytes); - p += pDistDataInfo->bytes; - } - memcpy(p, MULTI_KEY_DELIM, strlen(MULTI_KEY_DELIM)); - p += strlen(MULTI_KEY_DELIM); - } -} - -static bool initMultiDistinctInfo(SDistinctOperatorInfo* pInfo, SOperatorInfo* pOperator) { - for (int i = 0; i < pOperator->numOfOutput; i++) { - // pInfo->totalBytes += pOperator->pExpr[i].base.colBytes; - } -#if 0 - for (int i = 0; i < pOperator->numOfOutput; i++) { - int numOfCols = (int)(taosArrayGetSize(pBlock->pDataBlock)); - assert(i < numOfCols); - - for (int j = 0; j < numOfCols; j++) { - SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, j); - if (pColDataInfo->info.colId == pOperator->pExpr[i].base.resSchema.colId) { - SDistinctDataInfo item = {.index = j, .type = pColDataInfo->info.type, .bytes = pColDataInfo->info.bytes}; - taosArrayInsert(pInfo->pDistinctDataInfo, i, &item); - } - } - } -#endif - -// pInfo->totalBytes += (int32_t)strlen(MULTI_KEY_DELIM) * (pOperator->numOfOutput); -// pInfo->buf = taosMemoryCalloc(1, pInfo->totalBytes); - return taosArrayGetSize(pInfo->pDistinctDataInfo) == pOperator->numOfOutput ? true : false; -} - -static SSDataBlock* hashDistinct(SOperatorInfo* pOperator, bool* newgroup) { +static SSDataBlock* doPartitionData(SOperatorInfo* pOperator, bool* newgroup) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } - SDistinctOperatorInfo* pInfo = pOperator->info; - SSDataBlock* pRes = pInfo->pRes; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SSortOperatorInfo* pInfo = pOperator->info; + bool hasVarCol = pInfo->pDataBlock->info.hasVarCol; - pRes->info.rows = 0; - SSDataBlock* pBlock = NULL; - - SOperatorInfo* pDownstream = pOperator->pDownstream[0]; - while (1) { - publishOperatorProfEvent(pDownstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pDownstream->getNextFn(pDownstream, newgroup); - publishOperatorProfEvent(pDownstream, QUERY_PROF_AFTER_OPERATOR_EXEC); - - if (pBlock == NULL) { - doSetOperatorCompleted(pOperator); - break; - } - - // ensure result output buf - if (pRes->info.rows + pBlock->info.rows > pInfo->resInfo.capacity) { - int32_t newSize = pRes->info.rows + pBlock->info.rows; - for (int i = 0; i < taosArrayGetSize(pRes->pDataBlock); i++) { - SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, i); - SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, i); - -// char* tmp = taosMemoryRealloc(pResultColInfoData->pData, newSize * pDistDataInfo->bytes); -// if (tmp == NULL) { -// return NULL; -// } else { -// pResultColInfoData->pData = tmp; -// } - } - pInfo->resInfo.capacity = newSize; - } + if (pOperator->status == OP_RES_TO_RETURN) { + return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes); + } - for (int32_t i = 0; i < pBlock->info.rows; i++) { - buildMultiDistinctKey(pInfo, pBlock, i); - if (taosHashGet(pInfo->pSet, pInfo->buf, 0) == NULL) { - taosHashPut(pInfo->pSet, pInfo->buf, 0, NULL, 0); + int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize, numOfBufPage, + pInfo->pDataBlock, pTaskInfo->id.str); - for (int j = 0; j < taosArrayGetSize(pRes->pDataBlock); j++) { - SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, j); // distinct meta info - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index); // src - SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, j); // dist + tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock); - char* val = ((char*)pColInfoData->pData) + pDistDataInfo->bytes * i; - char* start = pResultColInfoData->pData + pDistDataInfo->bytes * pInfo->pRes->info.rows; - memcpy(start, val, pDistDataInfo->bytes); - } + SGenericSource* ps = taosMemoryCalloc(1, sizeof(SGenericSource)); + ps->param = pOperator->pDownstream[0]; + tsortAddSource(pInfo->pSortHandle, ps); - pRes->info.rows += 1; - } - } - - if (pRes->info.rows >= pInfo->resInfo.threshold) { - break; - } + int32_t code = tsortOpen(pInfo->pSortHandle); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, terrno); } - return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL; + pOperator->status = OP_RES_TO_RETURN; + return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes); } -SOperatorInfo* createDistinctOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) { - SDistinctOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDistinctOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); +SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResultBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { + SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pOperator->resultInfo.capacity = 4096; // todo extract function. + pInfo->sortBufSize = 1024 * 16; // TODO dynamic set the available sort buffer + pInfo->bufPageSize = 1024; + pInfo->numOfRowsInRes = 1024; + pInfo->pDataBlock = pResultBlock; + pInfo->pSortInfo = pSortInfo; -// pInfo->totalBytes = 0; - pInfo->buf = NULL; - - pInfo->pDistinctDataInfo = taosArrayInit(numOfCols, sizeof(SDistinctDataInfo)); - initMultiDistinctInfo(pInfo, pOperator); - - pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - - pOperator->name = "DistinctOperator"; + pOperator->name = "PartitionOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION; pOperator->blockingOptr = true; pOperator->status = OP_NOT_OPENED; -// pOperator->operatorType = DISTINCT; - pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; - pOperator->getNextFn = hashDistinct; - pOperator->closeFn = destroyDistinctOperatorInfo; + + pOperator->pTaskInfo = pTaskInfo; + pOperator->getNextFn = doPartitionData; +// pOperator->closeFn = destroyOrderOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -511,4 +414,4 @@ SOperatorInfo* createDistinctOperatorInfo(SOperatorInfo* downstream, SExprInfo* taosMemoryFree(pInfo); taosMemoryFree(pOperator); return NULL; -} +} \ No newline at end of file diff --git a/source/os/src/osShm.c b/source/os/src/osShm.c index 35aec30f3921e2e50b51fd3cb84860e5dfb4a3b4..b276b48d0e86193da888d545fa6dc4cf1cf6694d 100644 --- a/source/os/src/osShm.c +++ b/source/os/src/osShm.c @@ -17,14 +17,29 @@ #define _DEFAULT_SOURCE #include "os.h" +#define MAX_SHMIDS 6 + +static int32_t shmids[MAX_SHMIDS] = {0}; + +static void taosDeleteCreatedShms() { + for (int32_t i = 0; i < MAX_SHMIDS; ++i) { + int32_t shmid = shmids[i] - 1; + if (shmid >= 0) { + shmctl(shmid, IPC_RMID, NULL); + } + } +} + int32_t taosCreateShm(SShm* pShm, int32_t key, int32_t shmsize) { pShm->id = -1; - // key_t shkey = IPC_PRIVATE; - // int32_t __shmflag = IPC_CREAT | IPC_EXCL | 0600; - +#if 1 + key_t __shkey = IPC_PRIVATE; + int32_t __shmflag = IPC_CREAT | IPC_EXCL | 0600; +#else key_t __shkey = 0X95270000 + key; int32_t __shmflag = IPC_CREAT | 0600; +#endif int32_t shmid = shmget(__shkey, shmsize, __shmflag); if (shmid < 0) { @@ -39,6 +54,16 @@ int32_t taosCreateShm(SShm* pShm, int32_t key, int32_t shmsize) { pShm->id = shmid; pShm->size = shmsize; pShm->ptr = shmptr; + +#if 0 + if (key >= 0 && key < MAX_SHMIDS) { + shmids[key] = pShm->id + 1; + } + atexit(taosDeleteCreatedShms); +#else + shmctl(pShm->id, IPC_RMID, NULL); +#endif + return 0; } diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 4e04543fd8ab0b0b8712cd1a97cda969b10ed1fd..5a3ee003f095340db873b4a7b9c9b7ab5f3242f7 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -55,8 +55,8 @@ # --- for multi process mode -# ./test.sh -f tsim/user/basic1.sim -m -# ./test.sh -f tsim/stable/vnode3.sim -m -# ./test.sh -f tsim/tmq/basic.sim -m +./test.sh -f tsim/user/basic1.sim -m +./test.sh -f tsim/stable/vnode3.sim -m +./test.sh -f tsim/tmq/basic.sim -m #======================b1-end===============