diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 18c1bd5e9a3f5323ea92366914e3eea580c0b136..d0ec5c92960b3b2515c44a41b91372e05914af22 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -32,7 +32,8 @@ enum { }; enum { - TMQ_MSG_TYPE__POLL_RSP = 0, + TMQ_MSG_TYPE__DUMMY = 0, + TMQ_MSG_TYPE__POLL_RSP, TMQ_MSG_TYPE__EP_RSP, }; @@ -267,4 +268,4 @@ typedef struct SSessionWindow { } #endif -#endif /*_TD_COMMON_DEF_H_*/ +#endif /*_TD_COMMON_DEF_H_*/ diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 25d668e788357b596588f1282d0b8747242ac02f..1b08d6f2410df34ce8975b547f1242eccb4c8e94 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -70,7 +70,7 @@ typedef uint16_t tmsg_t; typedef enum { HEARTBEAT_TYPE_MQ = 0, - HEARTBEAT_TYPE_QUERY = 1, + HEARTBEAT_TYPE_QUERY, // types can be added here // HEARTBEAT_TYPE_MAX diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 158bccf8ccbe477b2ae26bde6bb0f19fc9d2159e..0e1b564b03ea70f656800e0bfc4443f7a6620da1 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -47,12 +47,12 @@ extern "C" { typedef struct SAppInstInfo SAppInstInfo; -typedef struct SHbConnInfo { +typedef struct { void* param; SClientHbReq* req; } SHbConnInfo; -typedef struct SAppHbMgr { +typedef struct { char* key; // statistics int32_t reportCnt; @@ -68,11 +68,11 @@ typedef struct SAppHbMgr { SHashObj* connInfo; // hash } SAppHbMgr; -typedef int32_t (*FHbRspHandle)(struct SAppHbMgr* pAppHbMgr, SClientHbRsp* pRsp); +typedef int32_t (*FHbRspHandle)(SAppHbMgr* pAppHbMgr, SClientHbRsp* pRsp); typedef int32_t (*FHbReqHandle)(SClientHbKey* connKey, void* param, SClientHbReq* req); -typedef struct SClientHbMgr { +typedef struct { int8_t inited; // ctl int8_t threadStop; @@ -108,13 +108,13 @@ typedef struct SHeartBeatInfo { } SHeartBeatInfo; struct SAppInstInfo { - int64_t numOfConns; - SCorEpSet mgmtEp; - SInstanceSummary summary; - SList* pConnList; // STscObj linked list - int64_t clusterId; - void* pTransporter; - struct SAppHbMgr* pAppHbMgr; + int64_t numOfConns; + SCorEpSet mgmtEp; + SInstanceSummary summary; + SList* pConnList; // STscObj linked list + int64_t clusterId; + void* pTransporter; + SAppHbMgr* pAppHbMgr; }; typedef struct SAppInfo { @@ -141,10 +141,6 @@ typedef struct STscObj { SAppInstInfo* pAppInfo; } STscObj; -typedef struct SMqConsumer { - STscObj* pTscObj; -} SMqConsumer; - typedef struct SReqResultInfo { const char* pRspMsg; const char* pData; @@ -172,7 +168,7 @@ typedef struct SRequestSendRecvBody { SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed. SDataBuf requestMsg; int64_t queryJob; // query job, created according to sql query DAG. - struct SQueryDag* pDag; // the query dag, generated according to the sql statement. + struct SQueryDag* pDag; // the query dag, generated according to the sql statement. SReqResultInfo resInfo; } SRequestSendRecvBody; diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index ed5b5f9bbe524daf033f92195d16f20267a2d881..b1e9de8000cbdbceed961d9d91a59dccda560d26 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -23,7 +23,7 @@ static SClientHbMgr clientHbMgr = {0}; static int32_t hbCreateThread(); static void hbStopThread(); -static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; } +static int32_t hbMqHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; } static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { int32_t code = 0; @@ -104,7 +104,7 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo return TSDB_CODE_SUCCESS; } -static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { +static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey)); if (NULL == info) { tscWarn("fail to get connInfo, may be dropped, connId:%d, type:%d", pRsp->connKey.connId, pRsp->connKey.hbType); @@ -163,7 +163,7 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp *pRs return TSDB_CODE_SUCCESS; } -static int32_t hbMqAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code) { +static int32_t hbAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code) { static int32_t emptyRspNum = 0; if (code != 0) { tfree(param); @@ -226,7 +226,11 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl db->vgVersion = htonl(db->vgVersion); } - SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = sizeof(SDbVgVersion) * dbNum, .value = dbs}; + SKv kv = { + .key = HEARTBEAT_KEY_DBINFO, + .valueLen = sizeof(SDbVgVersion) * dbNum, + .value = dbs, + }; tscDebug("hb got %d expired db, valueLen:%d", dbNum, kv.valueLen); @@ -256,7 +260,11 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC stb->tversion = htons(stb->tversion); } - SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = sizeof(SSTableMetaVersion) * stbNum, .value = stbs}; + SKv kv = { + .key = HEARTBEAT_KEY_STBINFO, + .valueLen = sizeof(SSTableMetaVersion) * stbNum, + .value = stbs, + }; tscDebug("hb got %d expired stb, valueLen:%d", stbNum, kv.valueLen); @@ -288,7 +296,7 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req return TSDB_CODE_SUCCESS; } -int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {} +int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { return 0; } void hbMgrInitMqHbHandle() { clientHbMgr.reqHandle[HEARTBEAT_TYPE_QUERY] = hbQueryHbReqHandle; @@ -396,7 +404,7 @@ static void *hbThreadFunc(void *param) { free(buf); break; } - pInfo->fp = hbMqAsyncCallBack; + pInfo->fp = hbAsyncCallBack; pInfo->msgInfo.pData = buf; pInfo->msgInfo.len = tlen; pInfo->msgType = TDMT_MND_HEARTBEAT; @@ -448,7 +456,6 @@ static void hbStopThread() { } SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) { - /*return NULL;*/ hbMgrInit(); SAppHbMgr *pAppHbMgr = malloc(sizeof(SAppHbMgr)); if (pAppHbMgr == NULL) { @@ -507,7 +514,6 @@ void appHbMgrCleanup(void) { } int hbMgrInit() { - /*return 0;*/ // init once int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1); if (old == 1) return 0; @@ -525,7 +531,7 @@ int hbMgrInit() { } void hbMgrCleanUp() { - return; +#if 0 hbStopThread(); // destroy all appHbMgr @@ -538,6 +544,7 @@ void hbMgrCleanUp() { pthread_mutex_unlock(&clientHbMgr.lock); clientHbMgr.appHbMgrs = NULL; +#endif } int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) { @@ -564,9 +571,11 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo * } int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) { - /*return 0;*/ - SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY}; - SHbConnInfo info = {0}; + SClientHbKey connKey = { + .connId = connId, + .hbType = HEARTBEAT_TYPE_QUERY, + }; + SHbConnInfo info = {0}; switch (hbType) { case HEARTBEAT_TYPE_QUERY: { @@ -587,7 +596,6 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int32_t connId, int64_t clusterId, int3 } void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { - /*return;*/ int32_t code = 0; code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); code = taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey)); @@ -599,7 +607,6 @@ void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void *key, void *value, int32_t keyLen, int32_t valueLen) { - return 0; // find req by connection id SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); ASSERT(pReq != NULL); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 490c352e8ec0ee075f6cd30b0872acdf348bce50..f3bafd73fc2373f563e0e6b1758e273ea1b586b6 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -59,6 +59,7 @@ struct tmq_t { char groupId[256]; char clientId[256]; int8_t autoCommit; + int8_t inWaiting; int64_t consumerId; int32_t epoch; int32_t resetOffsetCfg; @@ -66,9 +67,12 @@ struct tmq_t { STscObj* pTscObj; tmq_commit_cb* commit_cb; int32_t nextTopicIdx; + int32_t waitingRequest; + int32_t readyRequest; SArray* clientTopics; // SArray STaosQueue* mqueue; // queue of tmq_message_t STaosQall* qall; + tsem_t rspSem; // stat int64_t pollCnt; }; @@ -117,10 +121,12 @@ typedef struct { } SMqAskEpCbParam; typedef struct { - tmq_t* tmq; - SMqClientVg* pVg; - int32_t epoch; - tsem_t rspSem; + tmq_t* tmq; + SMqClientVg* pVg; + int32_t epoch; + tsem_t rspSem; + tmq_message_t** msg; + int32_t sync; } SMqPollCbParam; typedef struct { @@ -205,16 +211,11 @@ int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) { int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { SMqCommitCbParam* pParam = (SMqCommitCbParam*)param; - tmq_resp_err_t rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL; + pParam->rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL; if (pParam->tmq->commit_cb) { - pParam->tmq->commit_cb(pParam->tmq, rspErr, NULL, NULL); - } - if (!pParam->async) - tsem_post(&pParam->rspSem); - else { - tsem_destroy(&pParam->rspSem); - free(param); + pParam->tmq->commit_cb(pParam->tmq, pParam->rspErr, NULL, NULL); } + if (!pParam->async) tsem_post(&pParam->rspSem); return 0; } @@ -240,9 +241,12 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs return NULL; } pTmq->pTscObj = (STscObj*)conn; + pTmq->inWaiting = 0; pTmq->status = 0; pTmq->pollCnt = 0; pTmq->epoch = 0; + pTmq->waitingRequest = 0; + pTmq->readyRequest = 0; // set conf strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->groupId, conf->groupId); @@ -250,6 +254,8 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs pTmq->commit_cb = conf->commit_cb; pTmq->resetOffsetCfg = conf->resetOffset; + tsem_init(&pTmq->rspSem, 0, 0); + pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1); pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic)); @@ -315,6 +321,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in } pParam->tmq = tmq; tsem_init(&pParam->rspSem, 0, 0); + pParam->async = async; pRequest->body.requestMsg = (SDataBuf){ .pData = buf, @@ -335,6 +342,9 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in resp = pParam->rspErr; } + tsem_destroy(&pParam->rspSem); + free(pParam); + if (pArray) { taosArrayDestroy(pArray); } @@ -576,7 +586,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) { if (tmq_message == NULL) return 0; - SMqConsumeRsp* pRsp = (SMqConsumeRsp*)tmq_message; + SMqConsumeRsp* pRsp = &tmq_message->consumeRsp; return pRsp->skipLogNum; } @@ -625,56 +635,74 @@ void tmqShowMsg(tmq_message_t* tmq_message) { } int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { - printf("recv poll\n"); + /*printf("recv poll\n");*/ SMqPollCbParam* pParam = (SMqPollCbParam*)param; SMqClientVg* pVg = pParam->pVg; tmq_t* tmq = pParam->tmq; if (code != 0) { printf("msg discard\n"); - if (pParam->epoch == tmq->epoch) { - atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - } - return 0; + goto WRITE_QUEUE_FAIL; } int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch; int32_t tmqEpoch = atomic_load_32(&tmq->epoch); if (msgEpoch < tmqEpoch) { + tsem_post(&tmq->rspSem); printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch); return 0; } if (msgEpoch != tmqEpoch) { printf("mismatch rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch); + } else { + atomic_sub_fetch_32(&tmq->waitingRequest, 1); } +#if 0 + if (pParam->sync == 1) { + /**pParam->msg = malloc(sizeof(tmq_message_t));*/ + *pParam->msg = taosAllocateQitem(sizeof(tmq_message_t)); + if (*pParam->msg) { + memcpy(*pParam->msg, pMsg->pData, sizeof(SMqRspHead)); + tDecodeSMqConsumeRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &((*pParam->msg)->consumeRsp)); + if ((*pParam->msg)->consumeRsp.numOfTopics != 0) { + pVg->currentOffset = (*pParam->msg)->consumeRsp.rspOffset; + } + taosWriteQitem(tmq->mqueue, *pParam->msg); + tsem_post(&pParam->rspSem); + return 0; + } + tsem_post(&pParam->rspSem); + return -1; + } +#endif + /*SMqConsumeRsp* pRsp = calloc(1, sizeof(SMqConsumeRsp));*/ tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t)); if (pRsp == NULL) { - printf("fail\n"); - return -1; + goto WRITE_QUEUE_FAIL; } memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); tDecodeSMqConsumeRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->consumeRsp); /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ if (pRsp->consumeRsp.numOfTopics == 0) { - printf("no data\n"); - if (pParam->epoch == tmq->epoch) { - atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - } + /*printf("no data\n");*/ taosFreeQitem(pRsp); - return 0; + goto WRITE_QUEUE_FAIL; } + pRsp->extra = pParam->pVg; taosWriteQitem(tmq->mqueue, pRsp); - printf("poll in queue\n"); - /*pParam->rspMsg = (tmq_message_t*)pRsp;*/ - /*pVg->currentOffset = pRsp->consumeRsp.rspOffset;*/ - - /*printf("rsp offset: %ld\n", rsp.rspOffset);*/ - /*printf("-----msg begin----\n");*/ - /*printf("\n-----msg end------\n");*/ + atomic_add_fetch_32(&tmq->readyRequest, 1); + tsem_post(&tmq->rspSem); return 0; + +WRITE_QUEUE_FAIL: + if (pParam->epoch == tmq->epoch) { + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + } + tsem_post(&tmq->rspSem); + return code; } bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { @@ -711,81 +739,94 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { tmq_t* tmq = pParam->tmq; if (code != 0) { printf("get topic endpoint error, not ready, wait:%d\n", pParam->sync); - if (pParam->sync) { - tsem_post(&pParam->rspSem); - } - return 0; + goto END; } - tscDebug("tmq ask ep cb called"); + + // tmq's epoch is monotomically increase, + // so it's safe to discard any old epoch msg. + // epoch will only increase when received newer epoch ep msg + SMqRspHead* head = pMsg->pData; + int32_t epoch = atomic_load_32(&tmq->epoch); + if (head->epoch <= epoch) { + goto END; + } + if (pParam->sync) { - SMqRspHead* head = pMsg->pData; SMqCMGetSubEpRsp rsp; tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp); /*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/ /*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/ - int32_t epoch = atomic_load_32(&tmq->epoch); - if (head->epoch > epoch && tmqUpdateEp(tmq, head->epoch, &rsp)) { + if (tmqUpdateEp(tmq, head->epoch, &rsp)) { atomic_store_64(&tmq->status, TMQ_CONSUMER_STATUS__READY); } - tsem_post(&pParam->rspSem); tDeleteSMqCMGetSubEpRsp(&rsp); } else { tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t)); if (pRsp == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + code = -1; + goto END; } memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->getEpRsp); + taosWriteQitem(tmq->mqueue, pRsp); + tsem_post(&tmq->rspSem); } - return 0; + +END: + if (pParam->sync) { + tsem_post(&pParam->rspSem); + } + return code; } int32_t tmqAskEp(tmq_t* tmq, bool sync) { - printf("ask ep sync %d\n", sync); int32_t tlen = sizeof(SMqCMGetSubEpReq); - SMqCMGetSubEpReq* buf = malloc(tlen); - if (buf == NULL) { + SMqCMGetSubEpReq* req = malloc(tlen); + if (req == NULL) { tscError("failed to malloc get subscribe ep buf"); - goto END; - } - buf->consumerId = htobe64(tmq->consumerId); - buf->epoch = htonl(tmq->epoch); - strcpy(buf->cgroup, tmq->groupId); - - SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_GET_SUB_EP); - if (pRequest == NULL) { - tscError("failed to malloc subscribe ep request"); - goto END; + return -1; } - - pRequest->body.requestMsg = (SDataBuf){ - .pData = buf, - .len = tlen, - .handle = NULL, - }; + req->consumerId = htobe64(tmq->consumerId); + req->epoch = htonl(tmq->epoch); + strcpy(req->cgroup, tmq->groupId); SMqAskEpCbParam* pParam = malloc(sizeof(SMqAskEpCbParam)); if (pParam == NULL) { tscError("failed to malloc subscribe param"); - goto END; + free(req); + return -1; } pParam->tmq = tmq; pParam->sync = sync; tsem_init(&pParam->rspSem, 0, 0); - SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); + SMsgSendInfo* sendInfo = malloc(sizeof(SMsgSendInfo)); + if (sendInfo == NULL) { + tsem_destroy(&pParam->rspSem); + free(pParam); + free(req); + return -1; + } + + sendInfo->msgInfo = (SDataBuf){ + .pData = req, + .len = tlen, + .handle = NULL, + }; + + sendInfo->requestId = generateRequestId(); sendInfo->requestObjRefId = 0; sendInfo->param = pParam; sendInfo->fp = tmqAskEpCb; + sendInfo->msgType = TDMT_MND_GET_SUB_EP; SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); int64_t transporterId = 0; asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); -END: if (sync) tsem_wait(&pParam->rspSem); return 0; } @@ -812,7 +853,7 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) { return TMQ_RESP_ERR__FAIL; } -SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, SMqClientTopic* pTopic, SMqClientVg* pVg) { +SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTopic* pTopic, SMqClientVg* pVg) { int64_t reqOffset; if (pVg->currentOffset >= 0) { reqOffset = pVg->currentOffset; @@ -832,7 +873,7 @@ SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, SMqClie strcpy(pReq->topic, pTopic->topicName); strcpy(pReq->cgroup, tmq->groupId); - pReq->blockingTime = blocking_time; + pReq->blockingTime = blockingTime; pReq->consumerId = tmq->consumerId; pReq->epoch = tmq->epoch; pReq->currentOffset = reqOffset; @@ -862,8 +903,70 @@ void tmqClearUnhandleMsg(tmq_t* tmq) { } } +tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) { + tmq_message_t* msg = NULL; + for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { + SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); + for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { + SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); + int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); + /*if (vgStatus != TMQ_VG_STATUS__IDLE) {*/ + /*continue;*/ + /*}*/ + SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg); + if (pReq == NULL) { + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + // TODO: out of mem + return NULL; + } + SMqPollCbParam* param = malloc(sizeof(SMqPollCbParam)); + if (param == NULL) { + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + // TODO: out of mem + return NULL; + } + param->tmq = tmq; + param->pVg = pVg; + param->epoch = tmq->epoch; + param->sync = 1; + param->msg = &msg; + tsem_init(¶m->rspSem, 0, 0); + SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); + pRequest->body.requestMsg = (SDataBuf){ + .pData = pReq, + .len = sizeof(SMqConsumeReq), + .handle = NULL, + }; + + SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); + sendInfo->requestObjRefId = 0; + sendInfo->param = param; + sendInfo->fp = tmqPollCb; + + int64_t transporterId = 0; + /*printf("send poll\n");*/ + atomic_add_fetch_32(&tmq->waitingRequest, 1); + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); + pVg->pollCnt++; + tmq->pollCnt++; + + tsem_wait(¶m->rspSem); + tmq_message_t* nmsg = NULL; + while (1) { + taosReadQitem(tmq->mqueue, (void**)&nmsg); + if (nmsg == NULL) continue; + while (nmsg->head.mqMsgType != TMQ_MSG_TYPE__POLL_RSP) { + taosReadQitem(tmq->mqueue, (void**)&nmsg); + } + return nmsg; + } + } + } + return NULL; +} + int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { - printf("call poll\n"); + /*printf("call poll\n");*/ for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { @@ -876,17 +979,20 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { if (pReq == NULL) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); // TODO: out of mem + tsem_post(&tmq->rspSem); return -1; } SMqPollCbParam* param = malloc(sizeof(SMqPollCbParam)); if (param == NULL) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); // TODO: out of mem + tsem_post(&tmq->rspSem); return -1; } param->tmq = tmq; param->pVg = pVg; param->epoch = tmq->epoch; + param->sync = 0; SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); pRequest->body.requestMsg = (SDataBuf){ .pData = pReq, @@ -900,7 +1006,8 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { sendInfo->fp = tmqPollCb; int64_t transporterId = 0; - printf("send poll\n"); + /*printf("send poll\n");*/ + atomic_add_fetch_32(&tmq->waitingRequest, 1); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); pVg->pollCnt++; tmq->pollCnt++; @@ -912,7 +1019,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { // return int32_t tmqHandleRes(tmq_t* tmq, tmq_message_t* rspMsg, bool* pReset) { if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__EP_RSP) { - printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch); + /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/ if (rspMsg->head.epoch > atomic_load_32(&tmq->epoch)) { tmqUpdateEp(tmq, rspMsg->head.epoch, &rspMsg->getEpRsp); tmqClearUnhandleMsg(tmq); @@ -931,13 +1038,16 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese tmq_message_t* rspMsg = NULL; taosGetQitem(tmq->qall, (void**)&rspMsg); if (rspMsg == NULL) { - break; + taosReadAllQitems(tmq->mqueue, tmq->qall); + taosGetQitem(tmq->qall, (void**)&rspMsg); + if (rspMsg == NULL) return NULL; } if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) { - printf("handle poll rsp %d\n", rspMsg->head.mqMsgType); + atomic_sub_fetch_32(&tmq->readyRequest, 1); + /*printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);*/ if (rspMsg->head.epoch == atomic_load_32(&tmq->epoch)) { - printf("epoch match\n"); + /*printf("epoch match\n");*/ SMqClientVg* pVg = rspMsg->extra; pVg->currentOffset = rspMsg->consumeRsp.rspOffset; atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); @@ -947,7 +1057,7 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese taosFreeQitem(rspMsg); } } else { - printf("handle ep rsp %d\n", rspMsg->head.mqMsgType); + /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/ bool reset = false; tmqHandleRes(tmq, rspMsg, &reset); taosFreeQitem(rspMsg); @@ -957,36 +1067,59 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese } } } - return NULL; } -tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { +tmq_message_t* tmq_consumer_poll_v1(tmq_t* tmq, int64_t blocking_time) { tmq_message_t* rspMsg = NULL; int64_t startTime = taosGetTimestampMs(); - // TODO: put into another thread or delayed queue int64_t status = atomic_load_64(&tmq->status); tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT); - taosGetQitem(tmq->qall, (void**)&rspMsg); - if (rspMsg == NULL) { - taosReadAllQitems(tmq->mqueue, tmq->qall); + while (1) { + rspMsg = tmqSyncPollImpl(tmq, blocking_time); + if (rspMsg && rspMsg->consumeRsp.numOfTopics) { + return rspMsg; + } + + if (blocking_time != 0) { + int64_t endTime = taosGetTimestampMs(); + if (endTime - startTime > blocking_time) { + return NULL; + } + } else + return NULL; } - tmqHandleAllRsp(tmq, blocking_time, false); +} - tmqPollImpl(tmq, blocking_time); +tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { + tmq_message_t* rspMsg; + int64_t startTime = taosGetTimestampMs(); + + // TODO: put into another thread or delayed queue + int64_t status = atomic_load_64(&tmq->status); + tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT); + + rspMsg = tmqHandleAllRsp(tmq, blocking_time, false); + if (rspMsg) { + return rspMsg; + } while (1) { /*printf("cycle\n");*/ - taosReadAllQitems(tmq->mqueue, tmq->qall); - rspMsg = tmqHandleAllRsp(tmq, blocking_time, true); + if (atomic_load_32(&tmq->waitingRequest) == 0) { + tmqPollImpl(tmq, blocking_time); + } + + tsem_wait(&tmq->rspSem); + + rspMsg = tmqHandleAllRsp(tmq, blocking_time, false); if (rspMsg) { return rspMsg; } if (blocking_time != 0) { int64_t endTime = taosGetTimestampMs(); if (endTime - startTime > blocking_time) { - printf("normal exit\n"); return NULL; } } @@ -1127,6 +1260,7 @@ void tmq_message_destroy(tmq_message_t* tmq_message) { if (tmq_message == NULL) return; SMqConsumeRsp* pRsp = &tmq_message->consumeRsp; tDeleteSMqConsumeRsp(pRsp); + /*free(tmq_message);*/ taosFreeQitem(tmq_message); } @@ -1138,6 +1272,7 @@ const char* tmq_err2str(tmq_resp_err_t err) { } return "fail"; } + #if 0 tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) { tmq_t* pTmq = malloc(sizeof(tmq_t)); diff --git a/source/dnode/vnode/inc/meta.h b/source/dnode/vnode/inc/meta.h index c98a97d0dc19d669980a0baa2370243b22aebdf3..fd079b8f3233c821b59b0bf16ff841ac0e2954a0 100644 --- a/source/dnode/vnode/inc/meta.h +++ b/source/dnode/vnode/inc/meta.h @@ -24,8 +24,8 @@ extern "C" { #endif -#define META_SUPER_TABLE TD_SUPER_TABLE -#define META_CHILD_TABLE TD_CHILD_TABLE +#define META_SUPER_TABLE TD_SUPER_TABLE +#define META_CHILD_TABLE TD_CHILD_TABLE #define META_NORMAL_TABLE TD_NORMAL_TABLE // Types exported @@ -50,14 +50,14 @@ int metaDropTable(SMeta *pMeta, tb_uid_t uid); int metaCommit(SMeta *pMeta); // For Query -STbCfg * metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid); -STbCfg * metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid); +STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid); +STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid); SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline); -STSchema * metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver); +STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver); SMTbCursor *metaOpenTbCursor(SMeta *pMeta); void metaCloseTbCursor(SMTbCursor *pTbCur); -char * metaTbCursorNext(SMTbCursor *pTbCur); +char *metaTbCursorNext(SMTbCursor *pTbCur); SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid); void metaCloseCtbCurosr(SMCtbCursor *pCtbCur); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 7718e0886da0fc72418a1cf6569f65def57719ac..319bd9fde67b49112c968c75c7177b8453a2b66c 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -213,6 +213,10 @@ static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SA //} static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList) { + if (pHandle->tbIdHash) { + taosHashClear(pHandle->tbIdHash); + } + pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); if (pHandle->tbIdHash == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -227,6 +231,23 @@ static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const S return 0; } +static FORCE_INLINE int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList) { + if (pHandle->tbIdHash == NULL) { + pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + if (pHandle->tbIdHash == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + } + + for (int i = 0; i < taosArrayGetSize(tbUidList); i++) { + int64_t *pKey = (int64_t *)taosArrayGet(tbUidList, i); + taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0); + } + + return 0; +} + int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); bool tqNextDataBlock(STqReadHandle *pHandle); int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockInfo); diff --git a/source/dnode/vnode/src/inc/metaDef.h b/source/dnode/vnode/src/inc/metaDef.h index f4128f7170675633664f1cb6f6685563eaccf70c..71bfd913560974d86f62d07224dcd219ba07445e 100644 --- a/source/dnode/vnode/src/inc/metaDef.h +++ b/source/dnode/vnode/src/inc/metaDef.h @@ -76,4 +76,4 @@ struct SMeta { } #endif -#endif /*_TD_META_DEF_H_*/ \ No newline at end of file +#endif /*_TD_META_DEF_H_*/ diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index 3c464391d2f6aa88f62e060152b5f6b182e7456e..f75d9d3db07f48c39d51453f738df3eb963343cf 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -62,10 +62,10 @@ static int metaStbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT * static int metaNtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey); static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey); static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg); -static void * metaDecodeTbInfo(void *buf, STbCfg *pTbCfg); +static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg); static void metaClearTbCfg(STbCfg *pTbCfg); static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW); -static void * metaDecodeSchema(void *buf, SSchemaWrapper *pSW); +static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW); static void metaDBWLock(SMetaDB *pDB); static void metaDBRLock(SMetaDB *pDB); static void metaDBULock(SMetaDB *pDB); @@ -142,7 +142,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) { tb_uid_t uid; char buf[512]; char buf1[512]; - void * pBuf; + void *pBuf; DBT key1, value1; DBT key2, value2; SSchema *pSchema = NULL; @@ -394,7 +394,7 @@ static int metaNtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) { STbCfg *pTbCfg = (STbCfg *)(pValue->app_data); - DBT * pDbt; + DBT *pDbt; if (pTbCfg->type == META_CHILD_TABLE) { // pDbt = calloc(2, sizeof(DBT)); @@ -479,7 +479,7 @@ static void metaClearTbCfg(STbCfg *pTbCfg) { /* ------------------------ FOR QUERY ------------------------ */ STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid) { - STbCfg * pTbCfg = NULL; + STbCfg *pTbCfg = NULL; SMetaDB *pDB = pMeta->pDB; DBT key = {0}; DBT value = {0}; @@ -509,7 +509,7 @@ STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid) { } STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) { - STbCfg * pTbCfg = NULL; + STbCfg *pTbCfg = NULL; SMetaDB *pDB = pMeta->pDB; DBT key = {0}; DBT pkey = {0}; @@ -543,10 +543,10 @@ STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) { SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) { uint32_t nCols; SSchemaWrapper *pSW = NULL; - SMetaDB * pDB = pMeta->pDB; + SMetaDB *pDB = pMeta->pDB; int ret; - void * pBuf; - SSchema * pSchema; + void *pBuf; + SSchema *pSchema; SSchemaKey schemaKey = {uid, sver, 0}; DBT key = {0}; DBT value = {0}; @@ -578,7 +578,7 @@ struct SMTbCursor { SMTbCursor *metaOpenTbCursor(SMeta *pMeta) { SMTbCursor *pTbCur = NULL; - SMetaDB * pDB = pMeta->pDB; + SMetaDB *pDB = pMeta->pDB; pTbCur = (SMTbCursor *)calloc(1, sizeof(*pTbCur)); if (pTbCur == NULL) { @@ -609,7 +609,7 @@ char *metaTbCursorNext(SMTbCursor *pTbCur) { DBT key = {0}; DBT value = {0}; STbCfg tbCfg; - void * pBuf; + void *pBuf; for (;;) { if (pTbCur->pCur->get(pTbCur->pCur, &key, &value, DB_NEXT) == 0) { @@ -631,10 +631,10 @@ char *metaTbCursorNext(SMTbCursor *pTbCur) { STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) { STSchemaBuilder sb; - STSchema * pTSchema = NULL; - SSchema * pSchema; + STSchema *pTSchema = NULL; + SSchema *pSchema; SSchemaWrapper *pSW; - STbCfg * pTbCfg; + STbCfg *pTbCfg; tb_uid_t quid; pTbCfg = metaGetTbInfoByUid(pMeta, uid); @@ -662,13 +662,13 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) { } struct SMCtbCursor { - DBC * pCur; + DBC *pCur; tb_uid_t suid; }; SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) { SMCtbCursor *pCtbCur = NULL; - SMetaDB * pDB = pMeta->pDB; + SMetaDB *pDB = pMeta->pDB; int ret; pCtbCur = (SMCtbCursor *)calloc(1, sizeof(*pCtbCur)); @@ -700,7 +700,7 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) { DBT skey = {0}; DBT pkey = {0}; DBT pval = {0}; - void * pBuf; + void *pBuf; STbCfg tbCfg; // Set key diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index aa198d080699a814171a4984cd8d8228671eeecd..ccad006657772400fe7173a8a5a53fcc44362fd4 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -72,6 +72,8 @@ void tqClose(STQ* pTq) { } int tqPushMsg(STQ* pTq, void* msg, tmsg_t msgType, int64_t version) { + // if waiting + // memcpy and send msg to fetch thread // TODO: add reference // if handle waiting, launch query and response to consumer // @@ -210,7 +212,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { SMqConsumeReq* pReq = pMsg->pCont; int64_t consumerId = pReq->consumerId; int64_t fetchOffset; - /*int64_t blockingTime = pReq->blockingTime;*/ + int64_t blockingTime = pReq->blockingTime; if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) { fetchOffset = 0; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index b90ce275d21ae3645d4ea5cb966b6eed215ae716..19a3874f7592f17d61cd589652fc1f34a9cd0261 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -95,17 +95,17 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) { } int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isAdd) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo* )tinfo; + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; // traverse to the streamscan node to add this table id SOperatorInfo* pInfo = pTaskInfo->pRoot; - while(pInfo->operatorType != OP_StreamScan) { + while (pInfo->operatorType != OP_StreamScan) { pInfo = pInfo->pDownstream[0]; } SStreamBlockScanInfo* pScanInfo = pInfo->info; if (isAdd) { - int32_t code = tqReadHandleSetTbUidList(pScanInfo->readerHandle, tableIdList); + int32_t code = tqReadHandleAddTbUidList(pScanInfo->readerHandle, tableIdList); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -114,4 +114,4 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA } return TSDB_CODE_SUCCESS; -} \ No newline at end of file +} diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index a2165453d55820078292eedb7085f7877f459092..63fbf59c064a91a883a0ae52ad7fb9e1c56b1c55 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -1,23 +1,36 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + #include "os.h" -#include "tmsg.h" #include "query.h" #include "tglobal.h" -#include "tsched.h" +#include "tmsg.h" #include "trpc.h" +#include "tsched.h" -#define VALIDNUMOFCOLS(x) ((x) >= TSDB_MIN_COLUMNS && (x) <= TSDB_MAX_COLUMNS) -#define VALIDNUMOFTAGS(x) ((x) >= 0 && (x) <= TSDB_MAX_TAGS) +#define VALIDNUMOFCOLS(x) ((x) >= TSDB_MIN_COLUMNS && (x) <= TSDB_MAX_COLUMNS) +#define VALIDNUMOFTAGS(x) ((x) >= 0 && (x) <= TSDB_MAX_TAGS) static struct SSchema _s = { .colId = TSDB_TBNAME_COLUMN_INDEX, - .type = TSDB_DATA_TYPE_BINARY, + .type = TSDB_DATA_TYPE_BINARY, .bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE, .name = "tbname", }; -const SSchema* tGetTbnameColumnSchema() { - return &_s; -} +const SSchema* tGetTbnameColumnSchema() { return &_s; } static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen) { int32_t rowLen = 0; @@ -87,7 +100,7 @@ int32_t initTaskQueue() { double factor = 4.0; int32_t numOfThreads = TMAX((int)(tsNumOfCores * tsNumOfThreadsPerCore / factor), 2); - + int32_t queueSize = tsMaxConnections * 2; pTaskQueue = taosInitScheduler(queueSize, numOfThreads, "tsc"); if (NULL == pTaskQueue) { @@ -96,19 +109,21 @@ int32_t initTaskQueue() { } qDebug("task queue is initialized, numOfThreads: %d", numOfThreads); + return 0; } int32_t cleanupTaskQueue() { taosCleanUpScheduler(pTaskQueue); + return 0; } static void execHelper(struct SSchedMsg* pSchedMsg) { assert(pSchedMsg != NULL && pSchedMsg->ahandle != NULL); - __async_exec_fn_t execFn = (__async_exec_fn_t) pSchedMsg->ahandle; - int32_t code = execFn(pSchedMsg->thandle); + __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle; + int32_t code = execFn(pSchedMsg->thandle); if (code != 0 && pSchedMsg->msg != NULL) { - *(int32_t*) pSchedMsg->msg = code; + *(int32_t*)pSchedMsg->msg = code; } } @@ -116,34 +131,33 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) assert(execFn != NULL); SSchedMsg schedMsg = {0}; - schedMsg.fp = execHelper; + schedMsg.fp = execHelper; schedMsg.ahandle = execFn; schedMsg.thandle = execParam; - schedMsg.msg = code; + schedMsg.msg = code; taosScheduleTask(pTaskQueue, &schedMsg); + return 0; } -int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) { - char *pMsg = rpcMallocCont(pInfo->msgInfo.len); +int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) { + char* pMsg = rpcMallocCont(pInfo->msgInfo.len); if (NULL == pMsg) { - qError("0x%"PRIx64" msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType)); + qError("0x%" PRIx64 " msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType)); terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return terrno; } memcpy(pMsg, pInfo->msgInfo.pData, pInfo->msgInfo.len); - SRpcMsg rpcMsg = { - .msgType = pInfo->msgType, - .pCont = pMsg, - .contLen = pInfo->msgInfo.len, - .ahandle = (void*) pInfo, - .handle = pInfo->msgInfo.handle, - .code = 0 - }; + SRpcMsg rpcMsg = {.msgType = pInfo->msgType, + .pCont = pMsg, + .contLen = pInfo->msgInfo.len, + .ahandle = (void*)pInfo, + .handle = pInfo->msgInfo.handle, + .code = 0}; assert(pInfo->fp != NULL); rpcSendRequest(pTransporter, epSet, &rpcMsg, pTransporterId); return TSDB_CODE_SUCCESS; -} \ No newline at end of file +} diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 4ed3c63c183565d133d38d7179fab0c2cb05caf5..699da7579028dfd690a24a4136ea59617c398d00 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -162,8 +162,8 @@ int32_t walEndSnapshot(SWal *pWal) { } // iterate files, until the searched result for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { - if ((pWal->cfg.retentionSize != -1 && pWal->totSize > pWal->cfg.retentionSize) - || (pWal->cfg.retentionPeriod != -1 && iter->closeTs + pWal->cfg.retentionPeriod > ts)) { + if ((pWal->cfg.retentionSize != -1 && pWal->totSize > pWal->cfg.retentionSize) || + (pWal->cfg.retentionPeriod != -1 && iter->closeTs + pWal->cfg.retentionPeriod > ts)) { // delete according to file size or close time deleteCnt++; newTotSize -= iter->fileSize; @@ -279,6 +279,7 @@ int64_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in } else { // reject skip log or rewrite log // must truncate explicitly first + terrno = TSDB_CODE_WAL_INVALID_VER; return -1; } /*if (!tfValid(pWal->pWriteLogTFile)) return -1;*/ @@ -303,16 +304,18 @@ int64_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in if (taosWriteFile(pWal->pWriteLogTFile, &pWal->writeHead, sizeof(SWalHead)) != sizeof(SWalHead)) { // ftruncate - code = TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno)); + return -1; } if (taosWriteFile(pWal->pWriteLogTFile, (char *)body, bodyLen) != bodyLen) { // ftruncate - code = TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno)); + return -1; } code = walWriteIndex(pWal, index, offset); @@ -329,7 +332,7 @@ int64_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in pthread_mutex_unlock(&pWal->mutex); - return code; + return 0; } void walFsync(SWal *pWal, bool forceFsync) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 176290ba38db5043ed4c4cfb07acdef0181a171e..9c2fef5b47578c0fab395c0c136c6ee18433db10 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -13,6 +13,8 @@ * along with this program. If not, see . */ +// clang-format off + #define _DEFAULT_SOURCE #include "os.h" #include "taoserror.h" @@ -408,6 +410,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGTYPE, "Invalid msg type") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, "Unexpected generic error in wal") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, "WAL file is corrupted") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_SIZE_LIMIT, "WAL size exceeds limit") +TAOS_DEFINE_ERROR(TSDB_CODE_WAL_INVALID_VER, "WAL use invalid version") // tfs TAOS_DEFINE_ERROR(TSDB_CODE_FS_APP_ERROR, "tfs out of memory")