未验证 提交 86c841f1 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #12438 from taosdata/feature/tq

fix(tmq): set config
...@@ -61,7 +61,7 @@ int32_t init_env() { ...@@ -61,7 +61,7 @@ int32_t init_env() {
taos_free_result(pRes); taos_free_result(pRes);
pRes = pRes =
taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)"); taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int)");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
...@@ -106,8 +106,8 @@ int32_t create_topic() { ...@@ -106,8 +106,8 @@ int32_t create_topic() {
} }
taos_free_result(pRes); taos_free_result(pRes);
/*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/ pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1"); /*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
......
...@@ -185,6 +185,7 @@ typedef struct { ...@@ -185,6 +185,7 @@ typedef struct {
int32_t async; int32_t async;
tsem_t rspSem; tsem_t rspSem;
tmq_resp_err_t rspErr; tmq_resp_err_t rspErr;
SArray* offsets;
} SMqCommitCbParam; } SMqCommitCbParam;
tmq_conf_t* tmq_conf_new() { tmq_conf_t* tmq_conf_new() {
...@@ -246,10 +247,13 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value ...@@ -246,10 +247,13 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
if (strcmp(key, "msg.with.table.name") == 0) { if (strcmp(key, "msg.with.table.name") == 0) {
if (strcmp(value, "true") == 0) { if (strcmp(value, "true") == 0) {
conf->withTbName = 1; conf->withTbName = 1;
return TMQ_CONF_OK;
} else if (strcmp(value, "false") == 0) { } else if (strcmp(value, "false") == 0) {
conf->withTbName = 0; conf->withTbName = 0;
return TMQ_CONF_OK;
} else if (strcmp(value, "none") == 0) { } else if (strcmp(value, "none") == 0) {
conf->withTbName = -1; conf->withTbName = -1;
return TMQ_CONF_OK;
} else { } else {
return TMQ_CONF_INVALID; return TMQ_CONF_INVALID;
} }
...@@ -395,6 +399,9 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -395,6 +399,9 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
if (!pParam->async) if (!pParam->async)
tsem_post(&pParam->rspSem); tsem_post(&pParam->rspSem);
else { else {
if (pParam->offsets) {
taosArrayDestroy(pParam->offsets);
}
tsem_destroy(&pParam->rspSem); tsem_destroy(&pParam->rspSem);
/*if (pParam->pArray) {*/ /*if (pParam->pArray) {*/
/*taosArrayDestroy(pParam->pArray);*/ /*taosArrayDestroy(pParam->pArray);*/
...@@ -540,10 +547,10 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in ...@@ -540,10 +547,10 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
// build msg // build msg
// send to mnode // send to mnode
SMqCMCommitOffsetReq req; SMqCMCommitOffsetReq req;
SArray* pArray = NULL; SArray* pOffsets = NULL;
if (offsets == NULL) { if (offsets == NULL) {
pArray = taosArrayInit(0, sizeof(SMqOffset)); pOffsets = taosArrayInit(0, sizeof(SMqOffset));
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
...@@ -553,11 +560,11 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in ...@@ -553,11 +560,11 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
strcpy(offset.cgroup, tmq->groupId); strcpy(offset.cgroup, tmq->groupId);
offset.vgId = pVg->vgId; offset.vgId = pVg->vgId;
offset.offset = pVg->currentOffset; offset.offset = pVg->currentOffset;
taosArrayPush(pArray, &offset); taosArrayPush(pOffsets, &offset);
} }
} }
req.num = pArray->size; req.num = pOffsets->size;
req.offsets = pArray->pData; req.offsets = pOffsets->pData;
} else { } else {
req.num = taosArrayGetSize(&offsets->container); req.num = taosArrayGetSize(&offsets->container);
req.offsets = (SMqOffset*)offsets->container.pData; req.offsets = (SMqOffset*)offsets->container.pData;
...@@ -591,6 +598,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in ...@@ -591,6 +598,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
pParam->tmq = tmq; pParam->tmq = tmq;
tsem_init(&pParam->rspSem, 0, 0); tsem_init(&pParam->rspSem, 0, 0);
pParam->async = async; pParam->async = async;
pParam->offsets = pOffsets;
pRequest->body.requestMsg = (SDataBuf){ pRequest->body.requestMsg = (SDataBuf){
.pData = buf, .pData = buf,
...@@ -613,8 +621,8 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in ...@@ -613,8 +621,8 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
tsem_destroy(&pParam->rspSem); tsem_destroy(&pParam->rspSem);
taosMemoryFree(pParam); taosMemoryFree(pParam);
if (pArray) { if (pOffsets) {
taosArrayDestroy(pArray); taosArrayDestroy(pOffsets);
} }
} }
...@@ -1015,7 +1023,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { ...@@ -1015,7 +1023,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
atomic_store_32(&tmq->epSkipCnt, 0); atomic_store_32(&tmq->epSkipCnt, 0);
#endif #endif
int32_t tlen = sizeof(SMqAskEpReq); int32_t tlen = sizeof(SMqAskEpReq);
SMqAskEpReq* req = taosMemoryMalloc(tlen); SMqAskEpReq* req = taosMemoryCalloc(1, tlen);
if (req == NULL) { if (req == NULL) {
tscError("failed to malloc get subscribe ep buf"); tscError("failed to malloc get subscribe ep buf");
/*atomic_store_8(&tmq->epStatus, 0);*/ /*atomic_store_8(&tmq->epStatus, 0);*/
...@@ -1025,7 +1033,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { ...@@ -1025,7 +1033,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
req->epoch = htonl(tmq->epoch); req->epoch = htonl(tmq->epoch);
strcpy(req->cgroup, tmq->groupId); strcpy(req->cgroup, tmq->groupId);
SMqAskEpCbParam* pParam = taosMemoryMalloc(sizeof(SMqAskEpCbParam)); SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
if (pParam == NULL) { if (pParam == NULL) {
tscError("failed to malloc subscribe param"); tscError("failed to malloc subscribe param");
taosMemoryFree(req); taosMemoryFree(req);
...@@ -1107,7 +1115,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopic* ...@@ -1107,7 +1115,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopic*
reqOffset = tmq->resetOffsetCfg; reqOffset = tmq->resetOffsetCfg;
} }
SMqPollReq* pReq = taosMemoryMalloc(sizeof(SMqPollReq)); SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
if (pReq == NULL) { if (pReq == NULL) {
return NULL; return NULL;
} }
......
...@@ -93,6 +93,7 @@ struct STqReadHandle { ...@@ -93,6 +93,7 @@ struct STqReadHandle {
SMeta* pVnodeMeta; SMeta* pVnodeMeta;
SArray* pColIdList; // SArray<int16_t> SArray* pColIdList; // SArray<int16_t>
int32_t sver; int32_t sver;
int64_t cachedSchemaUid;
SSchemaWrapper* pSchemaWrapper; SSchemaWrapper* pSchemaWrapper;
STSchema* pSchema; STSchema* pSchema;
}; };
......
...@@ -559,6 +559,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -559,6 +559,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
} }
// db subscribe // db subscribe
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) { } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
rsp.withSchema = 1;
STqReadHandle* pReader = pExec->pExecReader[workerId]; STqReadHandle* pReader = pExec->pExecReader[workerId];
tqReadHandleSetMsg(pReader, pCont, 0); tqReadHandleSetMsg(pReader, pCont, 0);
while (tqNextDataBlock(pReader)) { while (tqNextDataBlock(pReader)) {
......
...@@ -25,6 +25,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { ...@@ -25,6 +25,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
pReadHandle->ver = -1; pReadHandle->ver = -1;
pReadHandle->pColIdList = NULL; pReadHandle->pColIdList = NULL;
pReadHandle->sver = -1; pReadHandle->sver = -1;
pReadHandle->cachedSchemaUid = -1;
pReadHandle->pSchema = NULL; pReadHandle->pSchema = NULL;
pReadHandle->pSchemaWrapper = NULL; pReadHandle->pSchemaWrapper = NULL;
pReadHandle->tbIdHash = NULL; pReadHandle->tbIdHash = NULL;
...@@ -84,19 +85,20 @@ bool tqNextDataBlock(STqReadHandle* pHandle) { ...@@ -84,19 +85,20 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
return false; return false;
} }
int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* pGroupId, uint64_t* pUid, int32_t* pNumOfRows, int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* pGroupId, uint64_t* pUid,
int16_t* pNumOfCols) { int32_t* pNumOfRows, int16_t* pNumOfCols) {
/*int32_t sversion = pHandle->pBlock->sversion;*/ /*int32_t sversion = pHandle->pBlock->sversion;*/
// TODO set to real sversion // TODO set to real sversion
*pUid = 0; *pUid = 0;
int32_t sversion = 0; int32_t sversion = 0;
if (pHandle->sver != sversion) { if (pHandle->sver != sversion || pHandle->cachedSchemaUid != pHandle->msgIter.suid) {
pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion); pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion);
// this interface use suid instead of uid // this interface use suid instead of uid
pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.suid, sversion, true); pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.suid, sversion, true);
pHandle->sver = sversion; pHandle->sver = sversion;
pHandle->cachedSchemaUid = pHandle->msgIter.suid;
} }
STSchema* pTschema = pHandle->pSchema; STSchema* pTschema = pHandle->pSchema;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册