diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index a23ebdecfd527b55c72bc1e042b84de51b25885b..4e6e47f4f98f76dfcb3be09e21b07fbb59630763 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -7,4 +7,4 @@ FROM mcr.microsoft.com/vscode/devcontainers/cpp:0-${VARIANT} # [Optional] Uncomment this section to install additional packages. # RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \ # && apt-get -y install --no-install-recommends -RUN apt-get update && apt-get -y install tree vim tmux +RUN apt-get update && apt-get -y install tree vim tmux python3-pip diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 7e543fb7709da33c9831c50bd00163bf7d78e2fc..a6dd51b035e02c6aef9db1c0c0732178063f3dee 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2383,9 +2383,9 @@ typedef struct { int32_t epoch; uint64_t reqId; int64_t consumerId; - int64_t blockingTime; + int64_t waitTime; int64_t currentOffset; -} SMqPollReqV2; +} SMqPollReq; typedef struct { int32_t vgId; @@ -2400,53 +2400,6 @@ typedef struct { SSchemaWrapper schema; } SMqSubTopicEp; -typedef struct { - SMqRspHead head; - int64_t reqOffset; - int64_t rspOffset; - int32_t skipLogNum; - int32_t dataLen; - SArray* blockPos; // beginning pos for each SRetrieveTableRsp - void* blockData; // serialized batched SRetrieveTableRsp -} SMqPollRspV2; - -static FORCE_INLINE int32_t tEncodeSMqPollRspV2(void** buf, const SMqPollRspV2* pRsp) { - int32_t tlen = 0; - tlen += taosEncodeFixedI64(buf, pRsp->reqOffset); - tlen += taosEncodeFixedI64(buf, pRsp->rspOffset); - tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum); - tlen += taosEncodeFixedI32(buf, pRsp->dataLen); - if (pRsp->dataLen != 0) { - int32_t sz = taosArrayGetSize(pRsp->blockPos); - tlen += taosEncodeFixedI32(buf, sz); - for (int32_t i = 0; i < sz; i++) { - int32_t blockPos = *(int32_t*)taosArrayGet(pRsp->blockPos, i); - tlen += taosEncodeFixedI32(buf, blockPos); - } - tlen += taosEncodeBinary(buf, pRsp->blockData, pRsp->dataLen); - } - return tlen; -} - -static FORCE_INLINE void* tDecodeSMqPollRspV2(const void* buf, SMqPollRspV2* pRsp) { - buf = taosDecodeFixedI64(buf, &pRsp->reqOffset); - buf = taosDecodeFixedI64(buf, &pRsp->rspOffset); - buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum); - buf = taosDecodeFixedI32(buf, &pRsp->dataLen); - if (pRsp->dataLen != 0) { - int32_t sz; - buf = taosDecodeFixedI32(buf, &sz); - pRsp->blockPos = taosArrayInit(sz, sizeof(int32_t)); - for (int32_t i = 0; i < sz; i++) { - int32_t blockPos; - buf = taosDecodeFixedI32(buf, &blockPos); - taosArrayPush(pRsp->blockPos, &blockPos); - } - buf = taosDecodeBinary(buf, &pRsp->blockData, pRsp->dataLen); - } - return (void*)buf; -} - typedef struct { SMqRspHead head; int64_t reqOffset; diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 7b9f68103d1d6243615a17e686486e1e9cd8cb08..0e7d486eab698db56f3d5ca3191ac7cba3bf37a9 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -59,9 +59,13 @@ typedef struct { void * pNode; } SNodeMsg; -typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *); +typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *rf); typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey); -typedef int (*RpcRfp)(void *parent, SRpcMsg *, SEpSet *); +/// +// // SRpcMsg code +// REDIERE, +// NOT READY, EpSet +typedef bool (*RpcRfp)(int32_t code); typedef struct SRpcInit { uint16_t localPort; // local port diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 7b0d70b7695e5ad190ef491b2e9c16d32c61fbb0..93a950466b9164f4aa342a86976ce9e38b20fa1c 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -72,8 +72,6 @@ extern "C" { #define WAL_FILE_LEN (WAL_PATH_LEN + 32) #define WAL_MAGIC 0xFAFBFCFDULL -#define WAL_CUR_FAILED 1 - #pragma pack(push, 1) typedef enum { TAOS_WAL_NOLOG = 0, diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index a00fd4714eece7a57a74739c3d08bc2b5035d1b6..7c873acadbb349af07533764ac86e126d9910940 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -246,6 +246,12 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t pResInfo->numOfCols = numOfCols; // TODO handle memory leak + if (pResInfo->fields != NULL) { + taosMemoryFree(pResInfo->fields); + } + if (pResInfo->userFields != NULL) { + taosMemoryFree(pResInfo->userFields); + } pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD)); pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD)); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index e0fb46e122cabad1903ca6c7874f8e9259ee3dbb..d0f6a296d96484d2ef99c79238e1df4dc136a598 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -16,7 +16,6 @@ #include "clientInt.h" #include "clientLog.h" #include "parser.h" -#include "planner.h" #include "tdatablock.h" #include "tdef.h" #include "tglobal.h" @@ -68,7 +67,7 @@ struct tmq_conf_t { char* user; char* pass; char* db; - tmq_commit_cb* commit_cb; + tmq_commit_cb* commitCb; }; struct tmq_t { @@ -115,8 +114,8 @@ enum { enum { TMQ_CONSUMER_STATUS__INIT = 0, - TMQ_CONSUMER_STATUS__SUBSCRIBED, TMQ_CONSUMER_STATUS__READY, + TMQ_CONSUMER_STATUS__NO_TOPIC, }; enum { @@ -175,7 +174,6 @@ typedef struct { int32_t epoch; int32_t vgId; tsem_t rspSem; - int32_t sync; } SMqPollCbParam; typedef struct { @@ -300,6 +298,7 @@ void tmqAssignDelayedHbTask(void* param, void* tmrId) { int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t)); *pTaskType = TMQ_DELAYED_TASK__HB; taosWriteQitem(tmq->delayedTask, pTaskType); + tsem_post(&tmq->rspSem); } void tmqAssignDelayedCommitTask(void* param, void* tmrId) { @@ -307,6 +306,7 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) { int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t)); *pTaskType = TMQ_DELAYED_TASK__COMMIT; taosWriteQitem(tmq->delayedTask, pTaskType); + tsem_post(&tmq->rspSem); } void tmqAssignDelayedReportTask(void* param, void* tmrId) { @@ -314,6 +314,7 @@ void tmqAssignDelayedReportTask(void* param, void* tmrId) { int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t)); *pTaskType = TMQ_DELAYED_TASK__REPORT; taosWriteQitem(tmq->delayedTask, pTaskType); + tsem_post(&tmq->rspSem); } int32_t tmqHandleAllDelayedTask(tmq_t* tmq) { @@ -364,7 +365,6 @@ int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) { SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param; pParam->rspErr = code; tmq_t* tmq = pParam->tmq; - atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__SUBSCRIBED); tsem_post(&pParam->rspSem); return 0; } @@ -385,14 +385,16 @@ tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) { } for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { SMqClientTopic* topic = taosArrayGetP(tmq->clientTopics, i); - tmq_list_append(*topics, strdup(topic->topicName)); + tmq_list_append(*topics, topic->topicName); } return TMQ_RESP_ERR__SUCCESS; } tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq) { - tmq_list_t* lst = tmq_list_new(); - return tmq_subscribe(tmq, lst); + tmq_list_t* lst = tmq_list_new(); + tmq_resp_err_t rsp = tmq_subscribe(tmq, lst); + tmq_list_destroy(lst); + return rsp; } #if 0 @@ -475,7 +477,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { strcpy(pTmq->groupId, conf->groupId); pTmq->autoCommit = conf->autoCommit; pTmq->autoCommitInterval = conf->autoCommitInterval; - pTmq->commit_cb = conf->commit_cb; + pTmq->commit_cb = conf->commitCb; pTmq->resetOffsetCfg = conf->resetOffset; // assign consumerId @@ -655,6 +657,9 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { int64_t transporterId = 0; asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + // avoid double free if msg is sent + buf = NULL; + tsem_wait(¶m.rspSem); tsem_destroy(¶m.rspSem); @@ -686,7 +691,7 @@ FAIL: void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { // - conf->commit_cb = cb; + conf->commitCb = cb; } TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbName, const char* sql) { @@ -798,7 +803,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { // do not write into queue since updating epoch reset tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch); - /*tsem_post(&tmq->rspSem);*/ + tsem_post(&tmq->rspSem); return 0; } @@ -806,25 +811,6 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch); } -#if 0 - if (pParam->sync == 1) { - /**pParam->msg = taosMemoryMalloc(sizeof(tmq_message_t));*/ - *pParam->msg = taosAllocateQitem(sizeof(tmq_message_t)); - if (*pParam->msg) { - memcpy(*pParam->msg, pMsg->pData, sizeof(SMqRspHead)); - tDecodeSMqConsumeRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &((*pParam->msg)->consumeRsp)); - if ((*pParam->msg)->consumeRsp.numOfTopics != 0) { - pVg->currentOffset = (*pParam->msg)->consumeRsp.rspOffset; - } - taosWriteQitem(tmq->mqueue, *pParam->msg); - tsem_post(&pParam->rspSem); - return 0; - } - tsem_post(&pParam->rspSem); - return -1; - } -#endif - SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper)); if (pRspWrapper == NULL) { tscWarn("msg discard from vg %d, epoch %d since out of memory", pParam->vgId, pParam->epoch); @@ -843,14 +829,14 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { pRspWrapper->msg.reqOffset, pRspWrapper->msg.rspOffset); taosWriteQitem(tmq->mqueue, pRspWrapper); - /*tsem_post(&tmq->rspSem);*/ + tsem_post(&tmq->rspSem); return 0; CREATE_MSG_FAIL: if (pParam->epoch == tmq->epoch) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); } - /*tsem_post(&tmq->rspSem);*/ + tsem_post(&tmq->rspSem); return -1; } @@ -927,6 +913,12 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics); taosHashCleanup(pHash); tmq->clientTopics = newTopics; + + if (taosArrayGetSize(tmq->clientTopics) == 0) + atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC); + else + atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY); + atomic_store_32(&tmq->epoch, epoch); return set; } @@ -955,9 +947,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp); /*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/ /*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/ - if (tmqUpdateEp(tmq, head->epoch, &rsp)) { - atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY); - } + tmqUpdateEp(tmq, head->epoch, &rsp); tDeleteSMqAskEpRsp(&rsp); } else { SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper)); @@ -972,7 +962,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg); taosWriteQitem(tmq->mqueue, pWrapper); - /*tsem_post(&tmq->rspSem);*/ + tsem_post(&tmq->rspSem); taosMemoryFree(pParam); } @@ -1076,7 +1066,7 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) { return TMQ_RESP_ERR__FAIL; } -SMqPollReqV2* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTopic* pTopic, SMqClientVg* pVg) { +SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopic* pTopic, SMqClientVg* pVg) { int64_t reqOffset; if (pVg->currentOffset >= 0) { reqOffset = pVg->currentOffset; @@ -1088,7 +1078,7 @@ SMqPollReqV2* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClient reqOffset = tmq->resetOffsetCfg; } - SMqPollReqV2* pReq = taosMemoryMalloc(sizeof(SMqPollReqV2)); + SMqPollReq* pReq = taosMemoryMalloc(sizeof(SMqPollReq)); if (pReq == NULL) { return NULL; } @@ -1101,14 +1091,14 @@ SMqPollReqV2* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClient pReq->subKey[tlen] = TMQ_SEPARATOR; strcpy(pReq->subKey + tlen + 1, pTopic->topicName); - pReq->blockingTime = blockingTime; + pReq->waitTime = waitTime; pReq->consumerId = tmq->consumerId; pReq->epoch = tmq->epoch; pReq->currentOffset = reqOffset; pReq->reqId = generateRequestId(); pReq->head.vgId = htonl(pVg->vgId); - pReq->head.contLen = htonl(sizeof(SMqPollReqV2)); + pReq->head.contLen = htonl(sizeof(SMqPollReq)); return pReq; } @@ -1130,7 +1120,7 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) { return pRspObj; } -int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { +int32_t tmqPollImpl(tmq_t* tmq, int64_t waitTime) { /*printf("call poll\n");*/ for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); @@ -1151,17 +1141,17 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { #endif } atomic_store_32(&pVg->vgSkipCnt, 0); - SMqPollReqV2* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg); + SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, waitTime, pTopic, pVg); if (pReq == NULL) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - /*tsem_post(&tmq->rspSem);*/ + tsem_post(&tmq->rspSem); return -1; } SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam)); if (pParam == NULL) { taosMemoryFree(pReq); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - /*tsem_post(&tmq->rspSem);*/ + tsem_post(&tmq->rspSem); return -1; } pParam->tmq = tmq; @@ -1169,20 +1159,19 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { pParam->pTopic = pTopic; pParam->vgId = pVg->vgId; pParam->epoch = tmq->epoch; - pParam->sync = 0; SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo)); if (sendInfo == NULL) { taosMemoryFree(pReq); taosMemoryFree(pParam); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - /*tsem_post(&tmq->rspSem);*/ + tsem_post(&tmq->rspSem); return -1; } sendInfo->msgInfo = (SDataBuf){ .pData = pReq, - .len = sizeof(SMqPollReqV2), + .len = sizeof(SMqPollReq), .handle = NULL, }; sendInfo->requestId = pReq->reqId; @@ -1222,7 +1211,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) return 0; } -SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) { +SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t waitTime, bool pollIfReset) { while (1) { SMqRspWrapper* rspWrapper = NULL; taosGetQitem(tmq->qall, (void**)&rspWrapper); @@ -1261,37 +1250,41 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) { taosFreeQitem(rspWrapper); if (pollIfReset && reset) { tscDebug("consumer %ld reset and repoll", tmq->consumerId); - tmqPollImpl(tmq, blockingTime); + tmqPollImpl(tmq, waitTime); } } } } -TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { +TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) { SMqRspObj* rspObj; int64_t startTime = taosGetTimestampMs(); - rspObj = tmqHandleAllRsp(tmq, blocking_time, false); + rspObj = tmqHandleAllRsp(tmq, wait_time, false); if (rspObj) { return (TAOS_RES*)rspObj; } + if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) { + return NULL; + } + while (1) { tmqHandleAllDelayedTask(tmq); - tmqPollImpl(tmq, blocking_time); - - /*tsem_wait(&tmq->rspSem);*/ + tmqPollImpl(tmq, wait_time); - rspObj = tmqHandleAllRsp(tmq, blocking_time, false); + rspObj = tmqHandleAllRsp(tmq, wait_time, false); if (rspObj) { return (TAOS_RES*)rspObj; } - if (blocking_time != 0) { + if (wait_time != 0) { int64_t endTime = taosGetTimestampMs(); - if (endTime - startTime > blocking_time) { + int64_t leftTime = endTime - startTime; + if (leftTime > wait_time) { tscDebug("consumer %ld (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch); return NULL; } + tsem_timewait(&tmq->rspSem, leftTime * 1000); } } } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index b0461067e12e59e83e8a2a1ec672931959d26a2e..5eb89e8bb7c81f107d1c8db1eb3e31410fd5eb8e 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -33,12 +33,12 @@ extern "C" { // tqDebug =================== // clang-format off -#define tqFatal(...) do { if (tqDebugFlag & DEBUG_FATAL) { taosPrintLog("TQ FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) -#define tqError(...) do { if (tqDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) -#define tqWarn(...) do { if (tqDebugFlag & DEBUG_WARN) { taosPrintLog("TQ WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) -#define tqInfo(...) do { if (tqDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) -#define tqDebug(...) do { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0) -#define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0) +#define tqFatal(...) do { if (tqDebugFlag & DEBUG_FATAL) { taosPrintLog("TQ FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) +#define tqError(...) do { if (tqDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) +#define tqWarn(...) do { if (tqDebugFlag & DEBUG_WARN) { taosPrintLog("TQ WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) +#define tqInfo(...) do { if (tqDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) +#define tqDebug(...) do { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0) +#define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0) // clang-format on #define TQ_BUFFER_SIZE 4 diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index dbfa8fe9edcb27e6721eb522c7a8bbb9f0941e45..46a39e72f91b392f1a84fd13bedcccc799b05eda 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -32,6 +32,7 @@ int metaGetTableEntryByVersion(SMetaReader *pReader, int64_t version, tb_uid_t u // query table.db if (tdbDbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pReader->pBuf, &pReader->szBuf) < 0) { + terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST; goto _err; } @@ -54,6 +55,7 @@ int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) { // query uid.idx if (tdbDbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pReader->pBuf, &pReader->szBuf) < 0) { + terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST; return -1; } @@ -67,6 +69,7 @@ int metaGetTableEntryByName(SMetaReader *pReader, const char *name) { // query name.idx if (tdbDbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pReader->pBuf, &pReader->szBuf) < 0) { + terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST; return -1; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 28a489752a1c6181c5733dd535ef1a38e5b16377..7cee82f660d92ce0b64e3c6e491ae13da7030c78 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -78,7 +78,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ SMqDataBlkRsp rsp = {0}; rsp.reqOffset = pExec->pushHandle.reqOffset; - rsp.blockData = taosArrayInit(0, sizeof(int32_t)); + rsp.blockData = taosArrayInit(0, sizeof(void*)); rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t)); if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { @@ -210,35 +210,6 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &req); -#if 0 - void* pIter = taosHashIterate(pTq->tqPushMgr->pHash, NULL); - while (pIter != NULL) { - STqPusher* pusher = *(STqPusher**)pIter; - if (pusher->type == TQ_PUSHER_TYPE__STREAM) { - STqStreamPusher* streamPusher = (STqStreamPusher*)pusher; - // repack - STqStreamToken* token = taosMemoryMalloc(sizeof(STqStreamToken)); - if (token == NULL) { - taosHashCancelIterate(pTq->tqPushMgr->pHash, pIter); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - token->type = TQ_STREAM_TOKEN__DATA; - token->data = msg; - // set input - // exec - } - // send msg to ep - } - // iterate hash - // process all msg - // if waiting - // memcpy and send msg to fetch thread - // TODO: add reference - // if handle waiting, launch query and response to consumer - // - // if no waiting handle, return -#endif return 0; } @@ -375,11 +346,11 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu } int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { - SMqPollReqV2* pReq = pMsg->pCont; - int64_t consumerId = pReq->consumerId; - int64_t waitTime = pReq->blockingTime; - int32_t reqEpoch = pReq->epoch; - int64_t fetchOffset; + SMqPollReq* pReq = pMsg->pCont; + int64_t consumerId = pReq->consumerId; + int64_t waitTime = pReq->waitTime; + int32_t reqEpoch = pReq->epoch; + int64_t fetchOffset; // get offset to fetch message if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) { diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 5d11494d578ecaced1889ebf5d479b7c9bd7517f..ef7b9c97b501d6b0536c71b1c34d43f848c3cfea 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -54,6 +54,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { metaReaderInit(&mer1, pVnode->pMeta, 0); if (metaGetTableEntryByName(&mer1, infoReq.tbName) < 0) { + code = terrno; goto _exit; } @@ -105,6 +106,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { } tSerializeSTableMetaRsp(pRsp, rspLen, &metaRsp); +_exit: rpcMsg.handle = pMsg->handle; rpcMsg.ahandle = pMsg->ahandle; rpcMsg.refId = pMsg->refId; @@ -114,7 +116,6 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { tmsgSendRsp(&rpcMsg); -_exit: taosMemoryFree(metaRsp.pSchemas); metaReaderClear(&mer2); metaReaderClear(&mer1); @@ -123,8 +124,8 @@ _exit: int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { pLoad->vgId = TD_VID(pVnode); - //pLoad->syncState = TAOS_SYNC_STATE_LEADER; - pLoad->syncState = syncGetMyRole(pVnode->sync); // sync integration + // pLoad->syncState = TAOS_SYNC_STATE_LEADER; + pLoad->syncState = syncGetMyRole(pVnode->sync); // sync integration pLoad->numOfTables = metaGetTbNum(pVnode->pMeta); pLoad->numOfTimeSeries = 400; pLoad->totalStorage = 300; diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index 4d4ac6c1e4e5cbd5294d3eca8c12a1c7bc6a96ac..621ea7b7fc7483dd6b073cf6301bf373e154e634 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -27,9 +27,14 @@ static int32_t getSchemaBytes(const SSchema* pSchema) { } } +// todo : to convert data according to SSDatablock static void buildRspData(const STableMeta* pMeta, char* pData) { - int32_t* pColSizes = (int32_t*)pData; - pData += DESCRIBE_RESULT_COLS * sizeof(int32_t); + int32_t* payloadLen = (int32_t*) pData; + uint64_t* groupId = (uint64_t*)(pData + sizeof(int32_t)); + + int32_t* pColSizes = (int32_t*)(pData + sizeof(int32_t) + sizeof(uint64_t)); + pData = (char*) pColSizes + DESCRIBE_RESULT_COLS * sizeof(int32_t); + int32_t numOfRows = TABLE_TOTAL_COL_NUM(pMeta); // Field @@ -79,6 +84,9 @@ static void buildRspData(const STableMeta* pMeta, char* pData) { for (int32_t i = 0; i < DESCRIBE_RESULT_COLS; ++i) { pColSizes[i] = htonl(pColSizes[i]); } + + + *payloadLen = (int32_t)(pData - (char*)payloadLen); } static int32_t calcRspSize(const STableMeta* pMeta) { @@ -87,7 +95,8 @@ static int32_t calcRspSize(const STableMeta* pMeta) { (numOfRows * sizeof(int32_t) + numOfRows * DESCRIBE_RESULT_FIELD_LEN) + (numOfRows * sizeof(int32_t) + numOfRows * DESCRIBE_RESULT_TYPE_LEN) + (BitmapLen(numOfRows) + numOfRows * sizeof(int32_t)) + - (numOfRows * sizeof(int32_t) + numOfRows * DESCRIBE_RESULT_NOTE_LEN); + (numOfRows * sizeof(int32_t) + numOfRows * DESCRIBE_RESULT_NOTE_LEN) + + sizeof(int32_t) + sizeof(uint64_t); } static int32_t execDescribe(SNode* pStmt, SRetrieveTableRsp** pRsp) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 00ec92465f25366c5461f5c9532d859ff9c25474..b841224fe4047c48a52cc0d9c1c8c5de2cf70bfd 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -254,6 +254,17 @@ enum { OP_EXEC_DONE = 0x9, }; +typedef struct SOperatorFpSet { + __optr_open_fn_t _openFn; // DO NOT invoke this function directly + __optr_fn_t getNextFn; + __optr_fn_t getStreamResFn; // execute the aggregate in the stream model, todo remove it + __optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP + __optr_close_fn_t closeFn; + __optr_encode_fn_t encodeResultRow; + __optr_decode_fn_t decodeResultRow; + __optr_get_explain_fn_t getExplainFn; +} SOperatorFpSet; + typedef struct SOperatorInfo { uint8_t operatorType; bool blockingOptr; // block operator or not @@ -267,15 +278,7 @@ typedef struct SOperatorInfo { SResultInfo resultInfo; struct SOperatorInfo** pDownstream; // downstram pointer list int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator - // todo extract struct - __optr_open_fn_t _openFn; // DO NOT invoke this function directly - __optr_fn_t getNextFn; - __optr_fn_t getStreamResFn; // execute the aggregate in the stream model. - __optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP - __optr_close_fn_t closeFn; - __optr_encode_fn_t encodeResultRow; - __optr_decode_fn_t decodeResultRow; - __optr_get_explain_fn_t getExplainFn; + SOperatorFpSet fpSet; } SOperatorInfo; typedef struct { @@ -609,6 +612,10 @@ typedef struct SJoinOperatorInfo { SNode *pOnCondition; } SJoinOperatorInfo; +SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn, + __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode, + __optr_decode_fn_t decode, __optr_get_explain_fn_t explain); + int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); void operatorDummyCloseFn(void* param, int32_t numOfCols); int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); @@ -653,6 +660,9 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, + STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo); SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 5cbda9073316e4fb82ae881e50f22fbf35a6cf46..516afe5553e3a4d2827d162f80a7f18118147472 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -159,7 +159,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { int64_t st = 0; st = taosGetTimestampUs(); - *pRes = pTaskInfo->pRoot->getNextFn(pTaskInfo->pRoot, &newgroup); + *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &newgroup); uint64_t el = (taosGetTimestampUs() - st); pTaskInfo->cost.elapsedTime += el; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 35410130154e5904f13d122e0c74f6050af68511..318d87e100bf933126a69871b1508bed9bc08d6f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -226,6 +226,23 @@ int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } +SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn, + __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode, + __optr_decode_fn_t decode, __optr_get_explain_fn_t explain) { + SOperatorFpSet fpSet = { + ._openFn = openFn, + .getNextFn = nextFn, + .getStreamResFn = streamFn, + .cleanupFn = cleanup, + .closeFn = closeFn, + .encodeResultRow = encode, + .decodeResultRow = decode, + .getExplainFn = explain, + }; + + return fpSet; +} + void operatorDummyCloseFn(void* param, int32_t numOfCols) {} static int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, @@ -4081,7 +4098,7 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator, bool* newgroup) { SExchangeInfo* pExchangeInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - pTaskInfo->code = pOperator->_openFn(pOperator); + pTaskInfo->code = pOperator->fpSet._openFn(pOperator); if (pTaskInfo->code != TSDB_CODE_SUCCESS) { return NULL; } @@ -4176,9 +4193,9 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock pOperator->info = pInfo; pOperator->numOfOutput = pBlock->info.numOfCols; pOperator->pTaskInfo = pTaskInfo; - pOperator->_openFn = prepareLoadRemoteData; // assign a dummy function. - pOperator->getNextFn = doLoadRemoteData; - pOperator->closeFn = destroyExchangeOperatorInfo; + + pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, destroyExchangeOperatorInfo, + NULL, NULL, NULL); #if 1 { // todo refactor @@ -4289,7 +4306,7 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i SSDataBlock* loadNextDataBlock(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*)param; bool newgroup = false; - return pOperator->getNextFn(pOperator, &newgroup); + return pOperator->fpSet.getNextFn(pOperator, &newgroup); } static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) { @@ -4586,9 +4603,9 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t pOperator->pExpr = pExprInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->getNextFn = doSortedMerge; - pOperator->closeFn = destroySortedMergeOperatorInfo; + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSortedMerge, NULL, NULL, destroySortedMergeOperatorInfo, + NULL, NULL, NULL); code = appendDownstream(pOperator, downstream, numOfDownstream); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -4667,8 +4684,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->getNextFn = doSort; - pOperator->closeFn = destroyOrderOperatorInfo; + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSort, NULL, NULL, destroyOrderOperatorInfo, + NULL, NULL, NULL); int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -4710,7 +4727,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { bool newgroup = true; while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, &newgroup); + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, &newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -4765,7 +4782,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator, bool* newgroup) } SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - pTaskInfo->code = pOperator->_openFn(pOperator); + pTaskInfo->code = pOperator->fpSet._openFn(pOperator); if (pTaskInfo->code != TSDB_CODE_SUCCESS) { return NULL; } @@ -5007,7 +5024,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup) // The downstream exec may change the value of the newgroup, so use a local variable instead. publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -5070,7 +5087,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, &newgroup); + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, &newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -5121,9 +5138,9 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro SSDataBlock* pBlock = pInfo->binfo.pRes; if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { - return pOperator->getStreamResFn(pOperator, newgroup); + return pOperator->fpSet.getStreamResFn(pOperator, newgroup); } else { - pTaskInfo->code = pOperator->_openFn(pOperator); + pTaskInfo->code = pOperator->fpSet._openFn(pOperator); if (pTaskInfo->code != TSDB_CODE_SUCCESS) { return NULL; } @@ -5165,7 +5182,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator, bool* newgroup while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -5216,7 +5233,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator, bool* newgroup) { while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; @@ -5265,7 +5282,7 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -5400,7 +5417,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -5451,7 +5468,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup) while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; @@ -5531,7 +5548,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator, bool* newgroup) { SOperatorInfo* pDownstream = pOperator->pDownstream[0]; while (1) { publishOperatorProfEvent(pDownstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = pDownstream->getNextFn(pDownstream, newgroup); + SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream, newgroup); publishOperatorProfEvent(pDownstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (*newgroup) { @@ -5603,8 +5620,8 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { return; } - if (pOperator->closeFn != NULL) { - pOperator->closeFn(pOperator->info, pOperator->numOfOutput); + if (pOperator->fpSet.closeFn != NULL) { + pOperator->fpSet.closeFn(pOperator->info, pOperator->numOfOutput); } if (pOperator->pDownstream != NULL) { @@ -5739,12 +5756,9 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->pExpr = pExprInfo; pOperator->numOfOutput = numOfCols; pOperator->pTaskInfo = pTaskInfo; - pOperator->_openFn = doOpenAggregateOptr; - pOperator->getNextFn = getAggregateResult; - pOperator->closeFn = destroyAggOperatorInfo; - pOperator->encodeResultRow = aggEncodeResultRow; - pOperator->decodeResultRow = aggDecodeResultRow; + pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo, + aggEncodeResultRow, aggDecodeResultRow, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -5871,9 +5885,9 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p pOperator->info = pInfo; pOperator->pExpr = pExprInfo; pOperator->numOfOutput = num; - pOperator->_openFn = operatorDummyOpenFn; - pOperator->getNextFn = doProjectOperation; - pOperator->closeFn = destroyProjectOperatorInfo; + + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL, destroyProjectOperatorInfo, + NULL, NULL, NULL); pOperator->pTaskInfo = pTaskInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -5929,12 +5943,9 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->pTaskInfo = pTaskInfo; pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; - pOperator->_openFn = doOpenIntervalAgg; - pOperator->getNextFn = doBuildIntervalResult; - pOperator->getStreamResFn = doStreamIntervalAgg; - pOperator->closeFn = destroyIntervalOperatorInfo; - pOperator->encodeResultRow = aggEncodeResultRow; - pOperator->decodeResultRow = aggDecodeResultRow; + + pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, doStreamIntervalAgg, NULL, destroyIntervalOperatorInfo, + aggEncodeResultRow, aggDecodeResultRow, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -5951,6 +5962,65 @@ _error: return NULL; } +SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, + STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) { + STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } + + pInfo->order = TSDB_ORDER_ASC; + pInfo->interval = *pInterval; + pInfo->execModel = OPTR_EXEC_MODEL_STREAM; + pInfo->win = pTaskInfo->window; + pInfo->twAggSup = *pTwAggSupp; + pInfo->primaryTsIndex = primaryTsSlotId; + + int32_t numOfRows = 4096; + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + + initResultSizeInfo(pOperator, numOfRows); + int32_t code = + initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); + initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win); + + // pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); + if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) { + goto _error; + } + + initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); + + pOperator->name = "StreamTimeIntervalAggOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERVAL; + pOperator->blockingOptr = true; + pOperator->status = OP_NOT_OPENED; + pOperator->pExpr = pExprInfo; + pOperator->pTaskInfo = pTaskInfo; + pOperator->numOfOutput = numOfCols; + pOperator->info = pInfo; + + pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doStreamIntervalAgg, doStreamIntervalAgg, NULL, destroyIntervalOperatorInfo, + aggEncodeResultRow, aggDecodeResultRow, NULL); + + code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + return pOperator; + + _error: + destroyIntervalOperatorInfo(pInfo, numOfCols); + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); + pTaskInfo->code = code; + return NULL; + +} + SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo) { STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo)); @@ -5969,8 +6039,9 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->getNextFn = doAllIntervalAgg; - pOperator->closeFn = destroyBasicOperatorInfo; + + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doAllIntervalAgg, NULL, NULL, destroyBasicOperatorInfo, + NULL, NULL, NULL); int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -6010,10 +6081,9 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf pOperator->pTaskInfo = pTaskInfo; pOperator->info = pInfo; - pOperator->getNextFn = doStateWindowAgg; - pOperator->closeFn = destroyStateWindowOperatorInfo; - pOperator->encodeResultRow = aggEncodeResultRow; - pOperator->decodeResultRow = aggDecodeResultRow; + + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStateWindowAgg, NULL, NULL, destroyStateWindowOperatorInfo, + aggEncodeResultRow, aggDecodeResultRow, NULL); int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -6057,10 +6127,9 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo pOperator->pExpr = pExprInfo; pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; - pOperator->getNextFn = doSessionWindowAgg; - pOperator->closeFn = destroySWindowOperatorInfo; - pOperator->encodeResultRow = aggEncodeResultRow; - pOperator->decodeResultRow = aggDecodeResultRow; + + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL, destroySWindowOperatorInfo, + aggEncodeResultRow, aggDecodeResultRow, NULL); pOperator->pTaskInfo = pTaskInfo; code = appendDownstream(pOperator, &downstream, 1); @@ -6147,12 +6216,10 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; - pOperator->_openFn = operatorDummyOpenFn; - pOperator->getNextFn = doFill; - pOperator->pTaskInfo = pTaskInfo; - - pOperator->closeFn = destroySFillOperatorInfo; + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, + NULL, NULL, NULL); + pOperator->pTaskInfo = pTaskInfo; code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -7017,8 +7084,8 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo (*pRes)[*resNum].startupCost = operatorInfo->cost.openCost; (*pRes)[*resNum].totalCost = operatorInfo->cost.totalCost; - if (operatorInfo->getExplainFn) { - int32_t code = (*operatorInfo->getExplainFn)(operatorInfo, &(*pRes)->verboseInfo); + if (operatorInfo->fpSet.getExplainFn) { + int32_t code = (*operatorInfo->fpSet.getExplainFn)(operatorInfo, &(*pRes)->verboseInfo); if (code) { qError("operator getExplainFn failed, error:%s", tstrerror(code)); return code; @@ -7055,7 +7122,7 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator, bool* newgroup) if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { SOperatorInfo* ds1 = pOperator->pDownstream[0]; publishOperatorProfEvent(ds1, QUERY_PROF_BEFORE_OPERATOR_EXEC); - pJoinInfo->pLeft = ds1->getNextFn(ds1, newgroup); + pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1, newgroup); publishOperatorProfEvent(ds1, QUERY_PROF_AFTER_OPERATOR_EXEC); pJoinInfo->leftPos = 0; @@ -7068,7 +7135,7 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator, bool* newgroup) if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { SOperatorInfo* ds2 = pOperator->pDownstream[1]; publishOperatorProfEvent(ds2, QUERY_PROF_BEFORE_OPERATOR_EXEC); - pJoinInfo->pRight = ds2->getNextFn(ds2, newgroup); + pJoinInfo->pRight = ds2->fpSet.getNextFn(ds2, newgroup); publishOperatorProfEvent(ds2, QUERY_PROF_AFTER_OPERATOR_EXEC); pJoinInfo->rightPos = 0; @@ -7161,9 +7228,9 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->getNextFn = doMergeJoin; - pOperator->closeFn = destroyBasicOperatorInfo; + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyBasicOperatorInfo, + NULL, NULL, NULL); int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream); return pOperator; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 27c616498e7468e714d667b0e8e0939340dc8355..beee11ec189680adaf5388d44e56bc523e5ca91a 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -277,7 +277,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; @@ -360,12 +360,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->_openFn = operatorDummyOpenFn; - pOperator->getNextFn = hashGroupbyAggregate; - pOperator->closeFn = destroyGroupOperatorInfo; - pOperator->encodeResultRow = aggEncodeResultRow; - pOperator->decodeResultRow = aggDecodeResultRow; + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, NULL, destroyGroupOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -562,7 +558,7 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator, bool* newgroup) { while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; @@ -618,14 +614,13 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->blockingOptr = true; pOperator->status = OP_NOT_OPENED; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION; - pInfo->binfo.pRes = pResultBlock; pOperator->numOfOutput = numOfCols; pOperator->pExpr = pExprInfo; pOperator->info = pInfo; - pOperator->_openFn = operatorDummyOpenFn; - pOperator->getNextFn = hashPartition; - pOperator->closeFn = destroyPartitionOperatorInfo; + + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, NULL, destroyPartitionOperatorInfo, + NULL, NULL, NULL); code = appendDownstream(pOperator, &downstream, 1); return pOperator; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 277ab9bc6ffb81e693fdd0585643a266db60d539..f4eee01007fa353afc59d4ba0e9bb2ef3aeaf7f9 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -405,7 +405,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCon pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->numOfOutput = numOfOutput; - pOperator->getNextFn = doTableScan; + pOperator->fpSet.getNextFn = doTableScan; pOperator->pTaskInfo = pTaskInfo; static int32_t cost = 0; @@ -429,7 +429,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle) { pOperator->blockingOptr = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->getNextFn = doTableScanImpl; + pOperator->fpSet.getNextFn = doTableScanImpl; return pOperator; } @@ -502,8 +502,8 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* // pOperator->operatorType = OP_TableBlockInfoScan; pOperator->blockingOptr = false; pOperator->status = OP_NOT_OPENED; - pOperator->_openFn = operatorDummyOpenFn; - pOperator->getNextFn = doBlockInfoScan; + pOperator->fpSet._openFn = operatorDummyOpenFn; + pOperator->fpSet.getNextFn = doBlockInfoScan; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; @@ -532,7 +532,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamBlockScanInfo* pInfo = pOperator->info; - pTaskInfo->code = pOperator->_openFn(pOperator); + pTaskInfo->code = pOperator->fpSet._openFn(pOperator); if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -659,9 +659,9 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->numOfOutput = pResBlock->info.numOfCols; - pOperator->_openFn = operatorDummyOpenFn; - pOperator->getNextFn = doStreamBlockScan; - pOperator->closeFn = operatorDummyCloseFn; + pOperator->fpSet._openFn = operatorDummyOpenFn; + pOperator->fpSet.getNextFn = doStreamBlockScan; + pOperator->fpSet.closeFn = operatorDummyCloseFn; pOperator->pTaskInfo = pTaskInfo; return pOperator; @@ -981,8 +981,8 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->numOfOutput = pResBlock->info.numOfCols; - pOperator->getNextFn = doSysTableScan; - pOperator->closeFn = destroySysScanOperator; + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, + NULL, NULL, NULL); pOperator->pTaskInfo = pTaskInfo; return pOperator; @@ -1139,11 +1139,12 @@ SOperatorInfo* createTagScanOperatorInfo(void* pReaderHandle, SExprInfo* pExpr, pOperator->blockingOptr = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->getNextFn = doTagScan; + + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL, NULL, NULL); pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->pTaskInfo = pTaskInfo; - pOperator->closeFn = destroyTagScanOperatorInfo; return pOperator; _error: diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index 543df30686caf8f331fa935a72ef00275264407d..de8df7b91641f7ffe21a059793b90fed519e082a 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -199,9 +199,9 @@ SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_ pOperator->name = "dummyInputOpertor4Test"; if (numOfCols == 1) { - pOperator->getNextFn = getDummyBlock; + pOperator->fpSet.getNextFn = getDummyBlock; } else { - pOperator->getNextFn = get2ColsDummyBlock; + pOperator->fpSet.getNextFn = get2ColsDummyBlock; } SDummyInputInfo *pInfo = (SDummyInputInfo*) taosMemoryCalloc(1, sizeof(SDummyInputInfo)); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 5dab6f0a978679a65a9254f8fe25c37b14ef5dfb..aa8a03f3d2a0e867bee12e9c632cc656b7bad965 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -103,6 +103,9 @@ typedef void* queue[2]; /* Return the structure holding the given element. */ #define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field)))) +#define TRANS_RETRY_COUNT_LIMIT 10 // retry count limit +#define TRANS_RETRY_INTERVAL 5 // ms retry interval + typedef struct { SRpcInfo* pRpc; // associated SRpcInfo SEpSet epSet; // ip list provided by app @@ -137,14 +140,12 @@ typedef struct { int8_t connType; // connection type cli/srv int64_t rid; // refId returned by taosAddRef + int8_t retryCount; STransCtx appCtx; // STransMsg* pRsp; // for synchronous API tsem_t* pSem; // for synchronous API - int hThrdIdx; - char* ip; - uint32_t port; - // SEpSet* pSet; // for synchronous API + int hThrdIdx; } STransConnCtx; #pragma pack(push, 1) @@ -215,8 +216,6 @@ void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey); bool transCompressMsg(char* msg, int32_t len, int32_t* flen); bool transDecompressMsg(char* msg, int32_t len, int32_t* flen); -void transConnCtxDestroy(STransConnCtx* ctx); - void transFreeMsg(void* msg); // @@ -262,8 +261,8 @@ void transUnrefCliHandle(void* handle); void transReleaseCliHandle(void* handle); void transReleaseSrvHandle(void* handle); -void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransCtx* pCtx); -void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransMsg* pRsp); +void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx); +void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp); void transSendResponse(const STransMsg* msg); void transRegisterMsg(const STransMsg* msg); int transGetConnInfo(void* thandle, STransHandleInfo* pInfo); diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index eaca9b0fc765ddee699db235733d9508dd45dc36..56f38a7a553cbc24e85d11d2f8b7fc50e93d6e63 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -62,8 +62,7 @@ typedef struct { char ckey[TSDB_PASSWORD_LEN]; // ciphering key void (*cfp)(void* parent, SRpcMsg*, SEpSet*); - int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); - int (*retry)(void* parent, SRpcMsg*, SEpSet*); + bool (*retry)(int32_t code); int32_t refCount; void* parent; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index fa517d6d61cca34395dd5648a4146a446b9e2e64..c0da3f9c1fc7e66e96d7295cc2576c56fd181492 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -38,7 +38,6 @@ void* rpcOpen(const SRpcInit* pInit) { // register callback handle pRpc->cfp = pInit->cfp; - pRpc->afp = pInit->afp; pRpc->retry = pInit->rfp; if (pInit->connType == TAOS_CONN_SERVER) { @@ -116,19 +115,13 @@ int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; } void rpcCancelRequest(int64_t rid) { return; } void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { - char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); - uint32_t port = pEpSet->eps[pEpSet->inUse].port; - transSendRequest(shandle, ip, port, pMsg, NULL); + transSendRequest(shandle, pEpSet, pMsg, NULL); } void rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) { - char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); - uint32_t port = pEpSet->eps[pEpSet->inUse].port; - transSendRequest(shandle, ip, port, pMsg, pCtx); + transSendRequest(shandle, pEpSet, pMsg, pCtx); } void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { - char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); - uint32_t port = pEpSet->eps[pEpSet->inUse].port; - transSendRecv(shandle, ip, port, pMsg, pRsp); + transSendRecv(shandle, pEpSet, pMsg, pRsp); } void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index b43b8a1e0c30cabd1eb50e302c604b75ba5436c3..ab57f7d01738da4ed8afc1a34ffb0bc5e5736194 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1,5 +1,4 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. +/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 @@ -97,7 +96,7 @@ static void cliSendCb(uv_write_t* req, int status); static void cliConnCb(uv_connect_t* req, int status); static void cliAsyncCb(uv_async_t* handle); -static void cliAppCb(SCliConn* pConn, STransMsg* pMsg); +static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg); static SCliConn* cliCreateConn(SCliThrdObj* thrd); static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); @@ -227,6 +226,9 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); #define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1) #define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release) +#define EPSET_GET_INUSE_IP(epSet) ((epSet)->eps[(epSet)->inUse].fqdn) +#define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port) + static void* cliWorkThread(void* arg); bool cliMaySendCachedMsg(SCliConn* conn) { @@ -311,14 +313,10 @@ void cliHandleResp(SCliConn* conn) { return; } - if (pCtx == NULL || pCtx->pSem == NULL) { - tTrace("%s cli conn %p handle resp", pTransInst->label, conn); - cliAppCb(conn, &transMsg); - //(pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); - } else { - tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, conn); - memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg)); - tsem_post(pCtx->pSem); + int ret = cliAppCb(conn, &transMsg, pMsg); + if (ret != 0) { + tTrace("try to send req to next node"); + return; } destroyCmsg(pMsg); @@ -375,17 +373,15 @@ void cliHandleExcept(SCliConn* pConn) { } if (pCtx == NULL || pCtx->pSem == NULL) { - tTrace("%s cli conn %p handle except", pTransInst->label, pConn); if (transMsg.ahandle == NULL) { once = true; continue; } - cliAppCb(pConn, &transMsg); - //(pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); - } else { - tTrace("%s cli conn(sync) %p handle except", pTransInst->label, pConn); - memcpy((char*)(pCtx->pRsp), (char*)(&transMsg), sizeof(transMsg)); - tsem_post(pCtx->pSem); + } + int ret = cliAppCb(pConn, &transMsg, pMsg); + if (ret != 0) { + tTrace("try to send req to next node"); + return; } destroyCmsg(pMsg); tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn); @@ -695,7 +691,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { } } else { STransConnCtx* pCtx = pMsg->ctx; - conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); + conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet)); if (conn != NULL) { tTrace("%s cli conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn); } else { @@ -719,10 +715,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { transCtxMerge(&conn->ctx, &pCtx->appCtx); transQueuePush(&conn->cliMsgs, pMsg); - // tTrace("%s cli conn %p queue msg size %d", ((STrans*)pThrd->pTransInst)->label, conn, 2); - // return; - //} - // transDestroyBuffer(&conn->readBuf); cliSend(conn); } else { conn = cliCreateConn(pThrd); @@ -730,8 +722,8 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { transQueuePush(&conn->cliMsgs, pMsg); conn->hThrdIdx = pCtx->hThrdIdx; - conn->ip = strdup(pMsg->ctx->ip); - conn->port = pMsg->ctx->port; + conn->ip = strdup(EPSET_GET_INUSE_IP(&pCtx->epSet)); + conn->port = EPSET_GET_INUSE_PORT(&pCtx->epSet); int ret = transSetConnOption((uv_tcp_t*)conn->stream); if (ret) { @@ -743,10 +735,14 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { addr.sin_family = AF_INET; addr.sin_addr.s_addr = taosGetIpv4FromFqdn(conn->ip); addr.sin_port = (uint16_t)htons((uint16_t)conn->port); - // uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr); - // handle error in callback if fail to connect - tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port); - uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); + tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port); + ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); + if (ret != 0) { + tTrace("%s cli conn %p failed to connect to %s:%d, reason: %s", pTransInst->label, conn, conn->ip, conn->port, + uv_err_name(ret)); + cliHandleExcept(conn); + return; + } } } static void cliAsyncCb(uv_async_t* handle) { @@ -856,12 +852,10 @@ static void destroyThrdObj(SCliThrdObj* pThrd) { } static void transDestroyConnCtx(STransConnCtx* ctx) { - if (ctx != NULL) { - taosMemoryFree(ctx->ip); - } + // taosMemoryFree(ctx); } -// + void cliSendQuit(SCliThrdObj* thrd) { // cli can stop gracefully SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg)); @@ -881,17 +875,58 @@ int cliRBChoseIdx(STrans* pTransInst) { } return index % pTransInst->numOfThreads; } -void cliAppCb(SCliConn* pConn, STransMsg* transMsg) { +int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { SCliThrdObj* pThrd = pConn->hostThrd; STrans* pTransInst = pThrd->pTransInst; - if (transMsg->code == TSDB_CODE_RPC_REDIRECT && pTransInst->retry != NULL) { - SMEpSet emsg = {0}; - tDeserializeSMEpSet(transMsg->pCont, transMsg->contLen, &emsg); - pTransInst->retry(pTransInst, transMsg, &(emsg.epSet)); + if (pMsg == NULL || pMsg->ctx == NULL) { + tTrace("%s cli conn %p handle resp", pTransInst->label, pConn); + pTransInst->cfp(pTransInst->parent, pResp, NULL); + return 0; + } + + STransConnCtx* pCtx = pMsg->ctx; + SEpSet* pEpSet = &pCtx->epSet; + /* + * upper layer handle retry if code equal TSDB_CODE_RPC_NETWORK_UNAVAIL + */ + tmsg_t msgType = pCtx->msgType; + if ((pTransInst->retry != NULL && (pTransInst->retry(pResp->code))) || + ((pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && msgType == TDMT_MND_CONNECT)) { + pCtx->retryCount += 1; + pMsg->st = taosGetTimestampUs(); + if (msgType == TDMT_MND_CONNECT && pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { + if (pCtx->retryCount < pEpSet->numOfEps) { + pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps; + cliHandleReq(pMsg, pThrd); + cliDestroy((uv_handle_t*)pConn->stream); + return -1; + } + + } else if (pCtx->retryCount < TRANS_RETRY_COUNT_LIMIT) { + if (pResp->contLen == 0) { + pEpSet->inUse = (pEpSet->inUse++) % pEpSet->numOfEps; + } else { + SMEpSet emsg = {0}; + tDeserializeSMEpSet(pResp->pCont, pResp->contLen, &emsg); + pCtx->epSet = emsg.epSet; + } + cliHandleReq(pMsg, pThrd); + // release pConn + addConnToPool(pThrd, pConn); + return -1; + } + } + + if (pCtx->pSem != NULL) { + tTrace("%s cli conn %p handle resp", pTransInst->label, pConn); + memcpy((char*)pCtx->pRsp, (char*)pResp, sizeof(*pResp)); + tsem_post(pCtx->pSem); } else { - pTransInst->cfp(pTransInst->parent, transMsg, NULL); + tTrace("%s cli conn %p handle resp", pTransInst->label, pConn); + pTransInst->cfp(pTransInst->parent, pResp, pEpSet); } + return 0; } void transCloseClient(void* arg) { @@ -934,18 +969,17 @@ void transReleaseCliHandle(void* handle) { transSendAsync(thrd->asyncPool, &cmsg->q); } -void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransCtx* ctx) { +void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { STrans* pTransInst = (STrans*)shandle; - int index = CONN_HOST_THREAD_INDEX((SCliConn*)pMsg->handle); + int index = CONN_HOST_THREAD_INDEX((SCliConn*)pReq->handle); if (index == -1) { index = cliRBChoseIdx(pTransInst); } STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); - pCtx->ahandle = pMsg->ahandle; - pCtx->msgType = pMsg->msgType; - pCtx->ip = strdup(ip); - pCtx->port = port; + pCtx->epSet = *pEpSet; + pCtx->ahandle = pReq->ahandle; + pCtx->msgType = pReq->msgType; pCtx->hThrdIdx = index; if (ctx != NULL) { @@ -955,17 +989,18 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); cliMsg->ctx = pCtx; - cliMsg->msg = *pMsg; + cliMsg->msg = *pReq; cliMsg->st = taosGetTimestampUs(); cliMsg->type = Normal; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; - tDebug("send request at thread:%d %p, dst: %s:%d, app:%p", index, pMsg, ip, port, pMsg->ahandle); + tDebug("send request at thread:%d %p, dst: %s:%d, app:%p", index, pReq, EPSET_GET_INUSE_IP(&pCtx->epSet), + EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->ahandle); ASSERT(transSendAsync(thrd->asyncPool, &(cliMsg->q)) == 0); } -void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq, STransMsg* pRsp) { +void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) { STrans* pTransInst = (STrans*)shandle; int index = CONN_HOST_THREAD_INDEX(pReq->handle); if (index == -1) { @@ -973,10 +1008,9 @@ void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq } STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); + pCtx->epSet = *pEpSet; pCtx->ahandle = pReq->ahandle; pCtx->msgType = pReq->msgType; - pCtx->ip = strdup(ip); - pCtx->port = port; pCtx->hThrdIdx = index; pCtx->pSem = taosMemoryCalloc(1, sizeof(tsem_t)); pCtx->pRsp = pRsp; @@ -989,6 +1023,9 @@ void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq cliMsg->type = Normal; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; + tDebug("send request at thread:%d %p, dst: %s:%d, app:%p", index, pReq, EPSET_GET_INUSE_IP(&pCtx->epSet), + EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->ahandle); + transSendAsync(thrd->asyncPool, &(cliMsg->q)); tsem_t* pSem = pCtx->pSem; tsem_wait(pSem); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index ef595fb0ecd4f5f7f2b2f1a38eb5b993373c3f0f..eb42029090dadb5618457661f5c26b1f4885cc9c 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -93,11 +93,6 @@ bool transDecompressMsg(char* msg, int32_t len, int32_t* flen) { return false; } -void transConnCtxDestroy(STransConnCtx* ctx) { - taosMemoryFree(ctx->ip); - taosMemoryFree(ctx); -} - void transFreeMsg(void* msg) { if (msg == NULL) { return; @@ -363,10 +358,4 @@ void transQueueDestroy(STransQueue* queue) { transQueueClear(queue); taosArrayDestroy(queue->q); } -// int32_t transGetExHandle() { -// static -//} -// void transThreadOnce() { -// taosThreadOnce(&transModuleInit, ); -//} #endif diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index c941cace3bc1bd4207e90e5742f25857c9dfe144..c02cb071018c297c757580a3d315951db2c0061f 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -802,7 +802,6 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, taosThreadOnce(&transModuleInit, uvInitExHandleMgt); transSrvInst++; - // uvOpenExHandleMgt(10000); for (int i = 0; i < srv->numOfThreads; i++) { SWorkThrdObj* thrd = (SWorkThrdObj*)taosMemoryCalloc(1, sizeof(SWorkThrdObj)); @@ -831,6 +830,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, } else { // TODO: clear all other resource later tError("failed to create worker-thread %d", i); + goto End; } } if (false == addHandleToAcceptloop(srv)) { @@ -840,6 +840,8 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, if (err == 0) { tDebug("success to create accept-thread"); } else { + tError("failed to create accept-thread"); + goto End; // clear all resource later } @@ -1078,6 +1080,7 @@ void transRegisterMsg(const STransMsg* msg) { transSendAsync(pThrd->asyncPool, &srvMsg->q); uvReleaseExHandle(refId); return; + _return1: tTrace("server handle %p failed to send to register brokenlink", exh); rpcFreeCont(msg->pCont); diff --git a/source/os/src/osMemory.c b/source/os/src/osMemory.c index 5b733daec2f32a9e058115e6db51ff5ee192f8ba..73c37c28f74a0789dd5794d8e414315f76c3fda4 100644 --- a/source/os/src/osMemory.c +++ b/source/os/src/osMemory.c @@ -19,7 +19,7 @@ #ifdef USE_TD_MEMORY -#define TD_MEMORY_SYMBOL ('T'<<24|'A'<<16|'O'<<8|'S') +#define TD_MEMORY_SYMBOL ('T' << 24 | 'A' << 16 | 'O' << 8 | 'S') #define TD_MEMORY_STACK_TRACE_DEPTH 10 @@ -28,7 +28,7 @@ typedef struct TdMemoryInfo *TdMemoryInfoPtr; typedef struct TdMemoryInfo { int32_t symbol; int32_t memorySize; - void *stackTrace[TD_MEMORY_STACK_TRACE_DEPTH]; // gdb: disassemble /m 0xXXX + void *stackTrace[TD_MEMORY_STACK_TRACE_DEPTH]; // gdb: disassemble /m 0xXXX // TdMemoryInfoPtr pNext; // TdMemoryInfoPtr pPrev; } TdMemoryInfo; @@ -36,11 +36,11 @@ typedef struct TdMemoryInfo { // static TdMemoryInfoPtr GlobalMemoryPtr = NULL; #ifdef WINDOWS - #define tstrdup(str) _strdup(str) +#define tstrdup(str) _strdup(str) #else - #define tstrdup(str) strdup(str) +#define tstrdup(str) strdup(str) -#include +#include #define STACKCALL __attribute__((regparm(1), noinline)) void **STACKCALL taosGetEbp(void) { @@ -54,9 +54,9 @@ void **STACKCALL taosGetEbp(void) { int32_t taosBackTrace(void **buffer, int32_t size) { int32_t frame = 0; - void **ebp; - void **ret = NULL; - size_t func_frame_distance = 0; + void **ebp; + void **ret = NULL; + size_t func_frame_distance = 0; if (buffer != NULL && size > 0) { ebp = taosGetEbp(); func_frame_distance = (size_t)*ebp - (size_t)ebp; @@ -89,9 +89,9 @@ void *taosMemoryMalloc(int32_t size) { TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)tmp; pTdMemoryInfo->memorySize = size; pTdMemoryInfo->symbol = TD_MEMORY_SYMBOL; - taosBackTrace(pTdMemoryInfo->stackTrace,TD_MEMORY_STACK_TRACE_DEPTH); + taosBackTrace(pTdMemoryInfo->stackTrace, TD_MEMORY_STACK_TRACE_DEPTH); - return (char*)tmp + sizeof(TdMemoryInfo); + return (char *)tmp + sizeof(TdMemoryInfo); #else return malloc(size); #endif @@ -100,15 +100,15 @@ void *taosMemoryMalloc(int32_t size) { void *taosMemoryCalloc(int32_t num, int32_t size) { #ifdef USE_TD_MEMORY int32_t memorySize = num * size; - char *tmp = calloc(memorySize + sizeof(TdMemoryInfo), 1); + char *tmp = calloc(memorySize + sizeof(TdMemoryInfo), 1); if (tmp == NULL) return NULL; TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)tmp; pTdMemoryInfo->memorySize = memorySize; pTdMemoryInfo->symbol = TD_MEMORY_SYMBOL; - taosBackTrace(pTdMemoryInfo->stackTrace,TD_MEMORY_STACK_TRACE_DEPTH); + taosBackTrace(pTdMemoryInfo->stackTrace, TD_MEMORY_STACK_TRACE_DEPTH); - return (char*)tmp + sizeof(TdMemoryInfo); + return (char *)tmp + sizeof(TdMemoryInfo); #else return calloc(num, size); #endif @@ -117,8 +117,8 @@ void *taosMemoryCalloc(int32_t num, int32_t size) { void *taosMemoryRealloc(void *ptr, int32_t size) { #ifdef USE_TD_MEMORY if (ptr == NULL) return taosMemoryMalloc(size); - - TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char*)ptr - sizeof(TdMemoryInfo)); + + TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo)); assert(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL); TdMemoryInfo tdMemoryInfo; @@ -126,11 +126,11 @@ void *taosMemoryRealloc(void *ptr, int32_t size) { void *tmp = realloc(pTdMemoryInfo, size + sizeof(TdMemoryInfo)); if (tmp == NULL) return NULL; - + memcpy(tmp, &tdMemoryInfo, sizeof(TdMemoryInfo)); ((TdMemoryInfoPtr)tmp)->memorySize = size; - return (char*)tmp + sizeof(TdMemoryInfo); + return (char *)tmp + sizeof(TdMemoryInfo); #else return realloc(ptr, size); #endif @@ -139,29 +139,26 @@ void *taosMemoryRealloc(void *ptr, int32_t size) { void *taosMemoryStrDup(void *ptr) { #ifdef USE_TD_MEMORY if (ptr == NULL) return NULL; - - TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char*)ptr - sizeof(TdMemoryInfo)); + + TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo)); assert(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL); void *tmp = tstrdup((const char *)pTdMemoryInfo); if (tmp == NULL) return NULL; - + memcpy(tmp, pTdMemoryInfo, sizeof(TdMemoryInfo)); - taosBackTrace(((TdMemoryInfoPtr)tmp)->stackTrace,TD_MEMORY_STACK_TRACE_DEPTH); + taosBackTrace(((TdMemoryInfoPtr)tmp)->stackTrace, TD_MEMORY_STACK_TRACE_DEPTH); - return (char*)tmp + sizeof(TdMemoryInfo); + return (char *)tmp + sizeof(TdMemoryInfo); #else return tstrdup((const char *)ptr); #endif } - void taosMemoryFree(void *ptr) { - if (ptr == NULL) return; - #ifdef USE_TD_MEMORY - TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char*)ptr - sizeof(TdMemoryInfo)); - if(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL) { + TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo)); + if (pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL) { pTdMemoryInfo->memorySize = 0; // memset(pTdMemoryInfo, 0, sizeof(TdMemoryInfo)); free(pTdMemoryInfo); @@ -177,7 +174,7 @@ int32_t taosMemorySize(void *ptr) { if (ptr == NULL) return 0; #ifdef USE_TD_MEMORY - TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char*)ptr - sizeof(TdMemoryInfo)); + TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo)); assert(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL); return pTdMemoryInfo->memorySize; diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 6ec6d15028f9e904dc9a4eab75f93d12d3ec73e4..72ff6179db16a41f8aa6bbd52da023118406446d 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -13,6 +13,7 @@ ./test.sh -f tsim/db/basic6.sim ./test.sh -f tsim/db/basic7.sim ./test.sh -f tsim/db/error1.sim +./test.sh -f tsim/db/taosdlog.sim # ---- dnode ./test.sh -f tsim/dnode/basic1.sim diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index ec847dedbbc91ca66ff95d3ab5aa1ae9c1bffe03..da295f640e01cbf5cab4919aafc6cf56f1a268fc 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -128,6 +128,7 @@ echo "debugFlag 0" >> $TAOS_CFG echo "mDebugFlag 143" >> $TAOS_CFG echo "dDebugFlag 143" >> $TAOS_CFG echo "vDebugFlag 143" >> $TAOS_CFG +echo "tqDebugFlag 143" >> $TAOS_CFG echo "tsdbDebugFlag 143" >> $TAOS_CFG echo "cDebugFlag 143" >> $TAOS_CFG echo "jniDebugFlag 143" >> $TAOS_CFG diff --git a/tests/script/tsim/db/taosdlog.sim b/tests/script/tsim/db/taosdlog.sim new file mode 100644 index 0000000000000000000000000000000000000000..c0a0c2b519caa0f7b433029f622b9195230c6bf6 --- /dev/null +++ b/tests/script/tsim/db/taosdlog.sim @@ -0,0 +1,31 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 + +system rm -rf ../../sim/dnode1/log + +system sh/exec.sh -n dnode1 -s start +sql connect + +print =============== create database +sql create database d1 vgroups 2 +sql show databases +if $rows != 3 then + return -1 +endi + +print =============== restart + +system sh/exec.sh -n dnode1 -s stop -x SIGKILL +sleep 2000 +system rm -rf ../../sim/dnode1/log +system sh/exec.sh -n dnode1 -s start +sleep 2000 + +print =============== show databases +sql create database d2 vgroups 6 +sql show databases +if $rows != 4 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/system-test/0-others/taosShell.py b/tests/system-test/0-others/taosShell.py new file mode 100644 index 0000000000000000000000000000000000000000..a946437fedb12fa01cd6c602cb402421832150a2 --- /dev/null +++ b/tests/system-test/0-others/taosShell.py @@ -0,0 +1,233 @@ + +import taos +import sys +import time +import socket +import pexpect +import os + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * + +def taos_command (key, value, expectString, cfgDir, dbName, key1='', value1=''): + if len(key) == 0: + tdLog.exit("taos test key is null!") + + if len(cfgDir) != 0: + taosCmd = 'taos -c ' + cfgDir + ' -' + key + + if len(value) != 0: + if key == 'p': + taosCmd = taosCmd + value + else: + taosCmd = taosCmd + ' ' + value + + if len(key1) != 0: + taosCmd = taosCmd + ' -' + key1 + if key1 == 'p': + taosCmd = taosCmd + value1 + else: + if len(value1) != 0: + taosCmd = taosCmd + ' ' + value1 + + tdLog.info ("taos cmd: %s" % taosCmd) + + child = pexpect.spawn(taosCmd, timeout=3) + #output = child.readline() + #print (output.decode()) + i = child.expect([expectString, pexpect.TIMEOUT, pexpect.EOF], timeout=1) + retResult = child.before.decode() + print(retResult) + #print(child.after.decode()) + if i == 0: + print ('taos login success! Here can run sql, taos> ') + if len(dbName) != 0: + child.sendline ('create database %s;'%(dbName)) + w = child.expect(["Query OK", pexpect.TIMEOUT, pexpect.EOF], timeout=1) + if w == 0: + return "TAOS_OK" + else: + return "TAOS_FAIL" + else: + return "TAOS_OK" + else: + if key == 'A' or key1 == 'A': + return "TAOS_OK", retResult + else: + return "TAOS_FAIL" + +class TDTestCase: + + #updatecfgDict = {'serverPort': 7080, 'firstEp': 'localhost:7080'} + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring + tdSql.prepare() + # time.sleep(2) + tdSql.query("create user testpy pass 'testpy'") + + hostname = socket.gethostname() + tdLog.info ("hostname: %s" % hostname) + + buildPath = self.getBuildPath() + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + cfgPath = buildPath + "/../sim/psim/cfg" + tdLog.info("cfgPath: %s" % cfgPath) + + checkNetworkStatus = ['0: unavailable', '1: network ok', '2: service ok', '3: service degraded', '4: exiting'] + netrole = ['client', 'server'] + + keyDict = {'h':'', 'P':'6030', 'p':'testpy', 'u':'testpy', 'a':'', 'A':'', 'c':'', 'C':'', 's':'', 'r':'', 'f':'', \ + 'k':'', 't':'', 'n':'', 'l':'1024', 'N':'100', 'V':'', 'd':'db', 'w':'30', '-help':'', '-usage':'', '?':''} + + keyDict['h'] = hostname + keyDict['c'] = cfgPath + + tdLog.printNoPrefix("================================ parameter: -h") + newDbName="dbh" + retCode = taos_command("h", keyDict['h'], "taos>", keyDict['c'], newDbName) + if retCode != "TAOS_OK": + tdLog.exit("taos -h %s fail"%keyDict['h']) + else: + #dataDbName = ["information_schema", "performance_schema", "db", newDbName] + tdSql.query("show databases") + #tdSql.getResult("show databases") + for i in range(tdSql.queryRows): + if tdSql.getData(i, 0) == newDbName: + break + else: + tdLog.exit("create db fail after taos -h %s fail"%keyDict['h']) + + tdSql.query('drop database %s'%newDbName) + + tdLog.printNoPrefix("================================ parameter: -P") + #tdDnodes.stop(1) + #sleep(3) + #tdDnodes.start(1) + #sleep(3) + #keyDict['P'] = 6030 + newDbName = "dbpp" + retCode = taos_command("P", keyDict['P'], "taos>", keyDict['c'], newDbName) + if retCode != "TAOS_OK": + tdLog.exit("taos -P %s fail"%keyDict['P']) + else: + tdSql.query("show databases") + for i in range(tdSql.queryRows): + if tdSql.getData(i, 0) == newDbName: + break + else: + tdLog.exit("create db fail after taos -P %s fail"%keyDict['P']) + + tdSql.query('drop database %s'%newDbName) + + tdLog.printNoPrefix("================================ parameter: -u") + newDbName="dbu" + retCode = taos_command("u", keyDict['u'], "taos>", keyDict['c'], newDbName, "p", keyDict['p']) + if retCode != "TAOS_OK": + tdLog.exit("taos -u %s -p%s fail"%(keyDict['u'], keyDict['p'])) + else: + tdSql.query("show databases") + for i in range(tdSql.queryRows): + if tdSql.getData(i, 0) == newDbName: + break + else: + tdLog.exit("create db fail after taos -u %s -p%s fail"%(keyDict['u'], keyDict['p'])) + + tdSql.query('drop database %s'%newDbName) + + tdLog.printNoPrefix("================================ parameter: -A") + newDbName="dbaa" + retCode, retVal = taos_command("p", keyDict['p'], "taos>", keyDict['c'], '', "A", '') + if retCode != "TAOS_OK": + tdLog.exit("taos -A fail") + + retCode = taos_command("u", keyDict['u'], "taos>", keyDict['c'], newDbName, 'a', retVal) + if retCode != "TAOS_OK": + tdLog.exit("taos -u %s -a %s"%(keyDict['u'], retVal)) + + tdSql.query("show databases") + for i in range(tdSql.queryRows): + if tdSql.getData(i, 0) == newDbName: + break + else: + tdLog.exit("create db fail after taos -u %s -a %s fail"%(keyDict['u'], retVal)) + + tdSql.query('drop database %s'%newDbName) + + tdLog.printNoPrefix("================================ parameter: -s") + newDbName="dbss" + keyDict['s'] = "\"create database " + newDbName + "\"" + retCode = taos_command("s", keyDict['s'], "Query OK", keyDict['c'], '', '', '') + if retCode != "TAOS_OK": + tdLog.exit("taos -s fail") + + print ("========== check new db ==========") + tdSql.query("show databases") + for i in range(tdSql.queryRows): + if tdSql.getData(i, 0) == newDbName: + break + else: + tdLog.exit("create db fail after taos -s %s fail"%(keyDict['s'])) + + keyDict['s'] = "\"create table " + newDbName + ".stb (ts timestamp, c int) tags (t int)\"" + retCode = taos_command("s", keyDict['s'], "Query OK", keyDict['c'], '', '', '') + if retCode != "TAOS_OK": + tdLog.exit("taos -s create table fail") + + keyDict['s'] = "\"create table " + newDbName + ".ctb0 using " + newDbName + ".stb tags (0) " + newDbName + ".ctb1 using " + newDbName + ".stb tags (1)\"" + retCode = taos_command("s", keyDict['s'], "Query OK", keyDict['c'], '', '', '') + if retCode != "TAOS_OK": + tdLog.exit("taos -s create table fail") + + keyDict['s'] = "\"insert into " + newDbName + ".ctb0 values('2021-04-01 08:00:00.000', 10)('2021-04-01 08:00:01.000', 20) " + newDbName + ".ctb1 values('2021-04-01 08:00:00.000', 11)('2021-04-01 08:00:01.000', 21)\"" + retCode = taos_command("s", keyDict['s'], "Query OK", keyDict['c'], '', '', '') + if retCode != "TAOS_OK": + tdLog.exit("taos -s insert data fail") + + sqlString = "select * from " + newDbName + ".ctb0" + tdSql.query(sqlString) + tdSql.checkData(0, 0, '2021-04-01 08:00:00.000') + tdSql.checkData(0, 1, 10) + tdSql.checkData(1, 0, '2021-04-01 08:00:01.000') + tdSql.checkData(1, 1, 20) + sqlString = "select * from " + newDbName + ".ctb1" + tdSql.query(sqlString) + tdSql.checkData(0, 0, '2021-04-01 08:00:00.000') + tdSql.checkData(0, 1, 11) + tdSql.checkData(1, 0, '2021-04-01 08:00:01.000') + tdSql.checkData(1, 1, 21) + + #tdSql.query('drop database %s'%newDbName) + + + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 084716230893c97698969ba0fae2c668203bb3ec..79e1411cd1ee94bf5dec7939df594572b950eabd 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -1,8 +1,6 @@ #!/bin/bash set -e -python3 ./test.py -f 0-others/taosdlog.py - #python3 ./test.py -f 2-query/between.py #python3 ./test.py -f 2-query/distinct.py python3 ./test.py -f 2-query/varchar.py