提交 d5133638 编写于 作者: L Liu Jicong

feat(tmq): add db subscribe

上级 ef05b1bc
...@@ -239,7 +239,6 @@ typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t; ...@@ -239,7 +239,6 @@ typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t;
typedef struct tmq_conf_t tmq_conf_t; typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t; typedef struct tmq_list_t tmq_list_t;
// typedef struct tmq_message_t tmq_message_t;
typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *)); typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *));
...@@ -285,12 +284,6 @@ DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const ...@@ -285,12 +284,6 @@ DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const
DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf); DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb); DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb);
#if 0
// temporary used function for demo only
void tmqShowMsg(tmq_message_t *tmq_message);
int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message);
#endif
/* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */ /* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */
DLL_EXPORT char *tmq_get_topic_name(TAOS_RES *res); DLL_EXPORT char *tmq_get_topic_name(TAOS_RES *res);
...@@ -301,12 +294,8 @@ DLL_EXPORT char *tmq_get_block_table_name(TAOS_RES *res); ...@@ -301,12 +294,8 @@ DLL_EXPORT char *tmq_get_block_table_name(TAOS_RES *res);
#endif #endif
#if 0 #if 0
DLL_EXPORT TAOS_ROW tmq_get_row(tmq_message_t *message);
DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message); DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message);
DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message); DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message);
DLL_EXPORT TAOS_FIELD *tmq_get_fields(tmq_t *tmq, const char *topic);
DLL_EXPORT int32_t tmq_field_count(tmq_t *tmq, const char *topic);
DLL_EXPORT void tmq_message_destroy(TAOS_RES *res);
#endif #endif
/* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */ /* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */
#if 0 #if 0
......
...@@ -24,16 +24,6 @@ ...@@ -24,16 +24,6 @@
#include "tqueue.h" #include "tqueue.h"
#include "tref.h" #include "tref.h"
#if 0
struct tmq_message_t {
SMqPollRsp msg;
char* topic;
SArray* res; // SArray<SReqResultInfo>
int32_t vgId;
int32_t resIter;
};
#endif
typedef struct { typedef struct {
int8_t tmqRspType; int8_t tmqRspType;
int32_t epoch; int32_t epoch;
...@@ -770,105 +760,12 @@ _return: ...@@ -770,105 +760,12 @@ _return:
} }
#endif #endif
static char* formatTimestamp(char* buf, int64_t val, int precision) {
time_t tt;
int32_t ms = 0;
if (precision == TSDB_TIME_PRECISION_NANO) {
tt = (time_t)(val / 1000000000);
ms = val % 1000000000;
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
tt = (time_t)(val / 1000000);
ms = val % 1000000;
} else {
tt = (time_t)(val / 1000);
ms = val % 1000;
}
/* comment out as it make testcases like select_with_tags.sim fail.
but in windows, this may cause the call to localtime crash if tt < 0,
need to find a better solution.
if (tt < 0) {
tt = 0;
}
*/
#ifdef WINDOWS
if (tt < 0) tt = 0;
#endif
if (tt <= 0 && ms < 0) {
tt--;
if (precision == TSDB_TIME_PRECISION_NANO) {
ms += 1000000000;
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
ms += 1000000;
} else {
ms += 1000;
}
}
struct tm* ptm = taosLocalTime(&tt, NULL);
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm);
if (precision == TSDB_TIME_PRECISION_NANO) {
sprintf(buf + pos, ".%09d", ms);
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
sprintf(buf + pos, ".%06d", ms);
} else {
sprintf(buf + pos, ".%03d", ms);
}
return buf;
}
#if 0 #if 0
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) { int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return 0; if (tmq_message == NULL) return 0;
SMqPollRsp* pRsp = &tmq_message->msg; SMqPollRsp* pRsp = &tmq_message->msg;
return pRsp->skipLogNum; return pRsp->skipLogNum;
} }
void tmqShowMsg(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return;
static bool noPrintSchema;
char pBuf[128];
SMqPollRsp* pRsp = &tmq_message->msg;
int32_t colNum = 2;
if (!noPrintSchema) {
printf("|");
for (int32_t i = 0; i < colNum; i++) {
if (i == 0)
printf(" %25s |", pRsp->schema->pSchema[i].name);
else
printf(" %15s |", pRsp->schema->pSchema[i].name);
}
printf("\n");
printf("===============================================\n");
noPrintSchema = true;
}
int32_t sz = taosArrayGetSize(pRsp->pBlockData);
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pRsp->pBlockData, i);
int32_t rows = pDataBlock->info.rows;
for (int32_t j = 0; j < rows; j++) {
printf("|");
for (int32_t k = 0; k < colNum; k++) {
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
switch (pColInfoData->info.type) {
case TSDB_DATA_TYPE_TIMESTAMP:
formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI);
printf(" %25s |", pBuf);
break;
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT:
printf(" %15u |", *(uint32_t*)var);
break;
}
}
printf("\n");
}
}
}
#endif #endif
int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
...@@ -1049,7 +946,6 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -1049,7 +946,6 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
} }
tDeleteSMqCMGetSubEpRsp(&rsp); tDeleteSMqCMGetSubEpRsp(&rsp);
} else { } else {
/*SMqCMGetSubEpRsp* pRsp = taosAllocateQitem(sizeof(SMqCMGetSubEpRsp));*/
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper)); SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper));
if (pWrapper == NULL) { if (pWrapper == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -1208,7 +1104,6 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) { ...@@ -1208,7 +1104,6 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
pRspObj->resIter = -1; pRspObj->resIter = -1;
memcpy(&pRspObj->rsp, &pWrapper->msg, sizeof(SMqDataBlkRsp)); memcpy(&pRspObj->rsp, &pWrapper->msg, sizeof(SMqDataBlkRsp));
/*SRetrieveTableRsp* pRetrieve = taosArrayGetP(pWrapper->msg.blockData, 0);*/
pRspObj->resInfo.totalRows = 0; pRspObj->resInfo.totalRows = 0;
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI; pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols); setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
...@@ -1355,31 +1250,6 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) { ...@@ -1355,31 +1250,6 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) {
} }
} }
#if 0
tmq_message_t* tmq_consumer_poll_v1(tmq_t* tmq, int64_t blocking_time) {
tmq_message_t* rspMsg = NULL;
int64_t startTime = taosGetTimestampMs();
int64_t status = atomic_load_64(&tmq->status);
tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT);
while (1) {
rspMsg = tmqSyncPollImpl(tmq, blocking_time);
if (rspMsg && rspMsg->consumeRsp.numOfTopics) {
return rspMsg;
}
if (blocking_time != 0) {
int64_t endTime = taosGetTimestampMs();
if (endTime - startTime > blocking_time) {
return NULL;
}
} else
return NULL;
}
}
#endif
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
SMqRspObj* rspObj; SMqRspObj* rspObj;
int64_t startTime = taosGetTimestampMs(); int64_t startTime = taosGetTimestampMs();
...@@ -1417,137 +1287,10 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { ...@@ -1417,137 +1287,10 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
} }
} }
#if 0 tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) {
// TODO
if (blocking_time <= 0) blocking_time = 1; return TMQ_RESP_ERR__SUCCESS;
if (blocking_time > 1000) blocking_time = 1000;
/*blocking_time = 1;*/
if (taosArrayGetSize(tmq->clientTopics) == 0) {
tscDebug("consumer:%ld poll but not assigned", tmq->consumerId);
/*printf("over1\n");*/
taosMsleep(blocking_time);
return NULL;
}
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx);
if (taosArrayGetSize(pTopic->vgs) == 0) {
/*printf("over2\n");*/
taosMsleep(blocking_time);
return NULL;
}
tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics);
int32_t beginVgIdx = pTopic->nextVgIdx;
while (1) {
pTopic->nextVgIdx = (pTopic->nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs);
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx);
/*printf("consume vg %d, offset %ld\n", pVg->vgId, pVg->currentOffset);*/
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, pTopic, pVg);
if (pReq == NULL) {
ASSERT(false);
taosMsleep(blocking_time);
return NULL;
}
SMqPollCbParam* param = taosMemoryMalloc(sizeof(SMqPollCbParam));
if (param == NULL) {
ASSERT(false);
taosMsleep(blocking_time);
return NULL;
}
param->tmq = tmq;
param->retMsg = &tmq_message;
param->pVg = pVg;
tsem_init(&param->rspSem, 0, 0);
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
pRequest->body.requestMsg = (SDataBuf){
.pData = pReq,
.len = sizeof(SMqConsumeReq),
.handle = NULL,
};
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->requestObjRefId = 0;
sendInfo->param = param;
sendInfo->fp = tmqPollCb;
/*printf("req offset: %ld\n", pReq->offset);*/
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
tmq->pollCnt++;
tsem_wait(&param->rspSem);
tsem_destroy(&param->rspSem);
taosMemoryFree(param);
if (tmq_message == NULL) {
if (beginVgIdx == pTopic->nextVgIdx) {
taosMsleep(blocking_time);
} else {
continue;
}
}
return tmq_message;
}
/*tsem_wait(&pRequest->body.rspSem);*/
/*if (body != NULL) {*/
/*destroySendMsgInfo(body);*/
/*}*/
/*if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {*/
/*pRequest->code = terrno;*/
/*}*/
/*return pRequest;*/
}
#endif
#if 0
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) {
if (tmq_topic_vgroup_list != NULL) {
// TODO
}
// TODO: change semaphore to gate
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, 0, pTopic, pVg);
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq)};
SMqCommitCbParam* pParam = taosMemoryMalloc(sizeof(SMqCommitCbParam));
if (pParam == NULL) {
continue;
}
pParam->tmq = tmq;
pParam->pVg = pVg;
pParam->async = async;
if (!async) tsem_init(&pParam->rspSem, 0, 0);
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->fp = tmqCommitCb;
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
if (!async) tsem_wait(&pParam->rspSem);
}
}
return 0;
} }
#endif
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { return TMQ_RESP_ERR__SUCCESS; }
const char* tmq_err2str(tmq_resp_err_t err) { const char* tmq_err2str(tmq_resp_err_t err) {
if (err == TMQ_RESP_ERR__SUCCESS) { if (err == TMQ_RESP_ERR__SUCCESS) {
...@@ -1573,10 +1316,3 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) { ...@@ -1573,10 +1316,3 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
return -1; return -1;
} }
} }
void tmq_message_destroy(TAOS_RES* res) {
if (res == NULL) return;
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
}
}
...@@ -109,7 +109,8 @@ int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList ...@@ -109,7 +109,8 @@ int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList
int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle *pHandle); bool tqNextDataBlock(STqReadHandle *pHandle);
int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, int32_t *pNumOfRows); int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, int32_t *pNumOfRows,
int32_t *pNumOfCols);
// need to reposition // need to reposition
......
...@@ -88,22 +88,12 @@ struct STqReadHandle { ...@@ -88,22 +88,12 @@ struct STqReadHandle {
SSubmitMsgIter msgIter; SSubmitMsgIter msgIter;
SSubmitBlkIter blkIter; SSubmitBlkIter blkIter;
SMeta* pVnodeMeta; SMeta* pVnodeMeta;
SArray* pColIdList; // SArray<int32_t> SArray* pColIdList; // SArray<int16_t>
int32_t sver; int32_t sver;
SSchemaWrapper* pSchemaWrapper; SSchemaWrapper* pSchemaWrapper;
STSchema* pSchema; STSchema* pSchema;
}; };
typedef struct {
int8_t type;
int8_t reserved[7];
union {
void* data;
int64_t wmTs;
int64_t checkpointId;
};
} STqStreamToken;
typedef struct { typedef struct {
int16_t ver; int16_t ver;
int16_t action; int16_t action;
...@@ -155,24 +145,26 @@ typedef struct { ...@@ -155,24 +145,26 @@ typedef struct {
char subKey[TSDB_SUBSCRIBE_KEY_LEN]; char subKey[TSDB_SUBSCRIBE_KEY_LEN];
int64_t consumerId; int64_t consumerId;
int32_t epoch; int32_t epoch;
int8_t subType;
int8_t withTbName;
int8_t withSchema;
int8_t withTag;
int8_t withTagSchema;
char* qmsg; char* qmsg;
// SRWLatch lock; // SRWLatch lock;
SWalReadHandle* pReadHandle; SWalReadHandle* pWalReader;
// number should be identical to fetch thread num // number should be identical to fetch thread num
qTaskInfo_t task[4]; STqReadHandle* pStreamReader[4];
qTaskInfo_t task[4];
} STqExec; } STqExec;
struct STQ { struct STQ {
// the collection of groups char* path;
// the handle of meta kvstore // STqMetaStore* tqMeta;
bool writeTrigger; SHashObj* execs; // subKey -> tqExec
char* path; SHashObj* pStreamTasks;
STqMetaStore* tqMeta; SVnode* pVnode;
SHashObj* tqMetaNew; // subKey -> tqExec SWal* pWal;
SHashObj* pStreamTasks;
SVnode* pVnode;
SWal* pWal;
SMeta* pVnodeMeta;
}; };
typedef struct { typedef struct {
...@@ -252,7 +244,7 @@ int tqInit(); ...@@ -252,7 +244,7 @@ int tqInit();
void tqCleanUp(); void tqCleanUp();
// open in each vnode // open in each vnode
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pMeta, SMemAllocatorFactory* allocFac); STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal);
void tqClose(STQ*); void tqClose(STQ*);
// required by vnode // required by vnode
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version);
......
...@@ -19,7 +19,7 @@ int32_t tqInit() { return tqPushMgrInit(); } ...@@ -19,7 +19,7 @@ int32_t tqInit() { return tqPushMgrInit(); }
void tqCleanUp() { tqPushMgrCleanUp(); } void tqCleanUp() { tqPushMgrCleanUp(); }
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, SMemAllocatorFactory* allocFac) { STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
STQ* pTq = taosMemoryMalloc(sizeof(STQ)); STQ* pTq = taosMemoryMalloc(sizeof(STQ));
if (pTq == NULL) { if (pTq == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
...@@ -28,15 +28,16 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, SMe ...@@ -28,15 +28,16 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, SMe
pTq->path = strdup(path); pTq->path = strdup(path);
pTq->pVnode = pVnode; pTq->pVnode = pVnode;
pTq->pWal = pWal; pTq->pWal = pWal;
pTq->pVnodeMeta = pVnodeMeta; #if 0
pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer, pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer,
(FTqDelete)taosMemoryFree, 0); (FTqDelete)taosMemoryFree, 0);
if (pTq->tqMeta == NULL) { if (pTq->tqMeta == NULL) {
taosMemoryFree(pTq); taosMemoryFree(pTq);
return NULL; return NULL;
} }
#endif
pTq->tqMetaNew = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); pTq->execs = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
...@@ -104,7 +105,11 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t versi ...@@ -104,7 +105,11 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t versi
return 0; return 0;
} }
int tqCommit(STQ* pTq) { return tqStorePersist(pTq->tqMeta); } int tqCommit(STQ* pTq) {
// do nothing
/*return tqStorePersist(pTq->tqMeta);*/
return 0;
}
int32_t tqGetTopicHandleSize(const STqTopic* pTopic) { int32_t tqGetTopicHandleSize(const STqTopic* pTopic) {
return strlen(pTopic->topicName) + strlen(pTopic->sql) + strlen(pTopic->physicalPlan) + strlen(pTopic->qmsg) + return strlen(pTopic->topicName) + strlen(pTopic->sql) + strlen(pTopic->physicalPlan) + strlen(pTopic->qmsg) +
...@@ -219,10 +224,10 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu ...@@ -219,10 +224,10 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
} }
for (int j = 0; j < TQ_BUFFER_SIZE; j++) { for (int j = 0; j < TQ_BUFFER_SIZE; j++) {
pTopic->buffer.output[j].status = 0; pTopic->buffer.output[j].status = 0;
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta); STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
SReadHandle handle = { SReadHandle handle = {
.reader = pReadHandle, .reader = pReadHandle,
.meta = pTq->pVnodeMeta, .meta = pTq->pVnode->pMeta,
}; };
pTopic->buffer.output[j].pReadHandle = pReadHandle; pTopic->buffer.output[j].pReadHandle = pReadHandle;
pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle); pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle);
...@@ -238,6 +243,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -238,6 +243,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
int32_t reqEpoch = pReq->epoch; int32_t reqEpoch = pReq->epoch;
int64_t fetchOffset; int64_t fetchOffset;
// get offset to fetch message
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) { if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
fetchOffset = walGetFirstVer(pTq->pWal); fetchOffset = walGetFirstVer(pTq->pWal);
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) { } else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
...@@ -249,7 +255,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -249,7 +255,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset); TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
STqExec* pExec = taosHashGet(pTq->tqMetaNew, pReq->subKey, strlen(pReq->subKey)); STqExec* pExec = taosHashGet(pTq->execs, pReq->subKey, strlen(pReq->subKey));
ASSERT(pExec); ASSERT(pExec);
int32_t consumerEpoch = atomic_load_32(&pExec->epoch); int32_t consumerEpoch = atomic_load_32(&pExec->epoch);
...@@ -271,7 +277,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -271,7 +277,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
} }
SWalReadHead* pHead; SWalReadHead* pHead;
if (walReadWithHandle_s(pExec->pReadHandle, fetchOffset, &pHead) < 0) { if (walReadWithHandle_s(pExec->pWalReader, fetchOffset, &pHead) < 0) {
// TODO: no more log, set timer to wait blocking time // TODO: no more log, set timer to wait blocking time
// if data inserted during waiting, launch query and // if data inserted during waiting, launch query and
// response to user // response to user
...@@ -285,41 +291,73 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -285,41 +291,73 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
if (pHead->msgType == TDMT_VND_SUBMIT) { if (pHead->msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body; SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
qTaskInfo_t task = pExec->task[workerId]; if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
ASSERT(task); qTaskInfo_t task = pExec->task[workerId];
qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK); ASSERT(task);
while (1) { qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK);
SSDataBlock* pDataBlock = NULL; while (1) {
uint64_t ts = 0; SSDataBlock* pDataBlock = NULL;
if (qExecTask(task, &pDataBlock, &ts) < 0) { uint64_t ts = 0;
ASSERT(0); if (qExecTask(task, &pDataBlock, &ts) < 0) {
ASSERT(0);
}
if (pDataBlock == NULL) break;
ASSERT(pDataBlock->info.rows != 0);
ASSERT(pDataBlock->info.numOfCols != 0);
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pDataBlock);
void* buf = taosMemoryCalloc(1, dataStrLen);
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
pRetrieve->useconds = ts;
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
pRetrieve->compressed = 0;
pRetrieve->completed = 1;
pRetrieve->numOfRows = htonl(pDataBlock->info.rows);
// TODO enable compress
int32_t actualLen = 0;
blockCompressEncode(pDataBlock, pRetrieve->data, &actualLen, pDataBlock->info.numOfCols, false);
actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen);
taosArrayPush(rsp.blockDataLen, &actualLen);
taosArrayPush(rsp.blockData, &buf);
rsp.blockNum++;
} }
if (pDataBlock == NULL) break; } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
STqReadHandle* pReader = pExec->pStreamReader[workerId];
ASSERT(pDataBlock->info.rows != 0); tqReadHandleSetMsg(pReader, pCont, 0);
ASSERT(pDataBlock->info.numOfCols != 0); while (tqNextDataBlock(pReader)) {
SSDataBlock block = {0};
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pDataBlock); if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.rows,
void* buf = taosMemoryCalloc(1, dataStrLen); &block.info.numOfCols) < 0) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; ASSERT(0);
pRetrieve->useconds = ts; }
pRetrieve->precision = TSDB_DEFAULT_PRECISION; int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(&block);
pRetrieve->compressed = 0; void* buf = taosMemoryCalloc(1, dataStrLen);
pRetrieve->completed = 1; SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
pRetrieve->numOfRows = htonl(pDataBlock->info.rows); /*pRetrieve->useconds = 0;*/
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
// TODO enable compress pRetrieve->compressed = 0;
int32_t actualLen = 0; pRetrieve->completed = 1;
blockCompressEncode(pDataBlock, pRetrieve->data, &actualLen, pDataBlock->info.numOfCols, false); pRetrieve->numOfRows = htonl(block.info.rows);
actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen); // TODO enable compress
taosArrayPush(rsp.blockDataLen, &actualLen); int32_t actualLen = 0;
taosArrayPush(rsp.blockData, &buf); blockCompressEncode(&block, pRetrieve->data, &actualLen, block.info.numOfCols, false);
rsp.blockNum++; actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen);
taosArrayPush(rsp.blockDataLen, &actualLen);
taosArrayPush(rsp.blockData, &buf);
rsp.blockNum++;
}
} else {
ASSERT(0);
} }
} }
// TODO batch optimization // TODO batch optimization:
// TODO continue scan until meeting batch requirement
if (rsp.blockNum != 0) break; if (rsp.blockNum != 0) break;
rsp.skipLogNum++; rsp.skipLogNum++;
fetchOffset++; fetchOffset++;
...@@ -572,10 +610,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -572,10 +610,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
// TODO: persist meta into tdb // TODO: persist meta into tdb
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
SMqRebVgReq req; SMqRebVgReq req = {0};
tDecodeSMqRebVgReq(msg, &req); tDecodeSMqRebVgReq(msg, &req);
// todo lock // todo lock
STqExec* pExec = taosHashGet(pTq->tqMetaNew, req.subKey, strlen(req.subKey)); STqExec* pExec = taosHashGet(pTq->execs, req.subKey, strlen(req.subKey));
if (pExec == NULL) { if (pExec == NULL) {
ASSERT(req.oldConsumerId == -1); ASSERT(req.oldConsumerId == -1);
ASSERT(req.newConsumerId != -1); ASSERT(req.newConsumerId != -1);
...@@ -586,19 +624,27 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -586,19 +624,27 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
memcpy(pExec->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN); memcpy(pExec->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
pExec->consumerId = req.newConsumerId; pExec->consumerId = req.newConsumerId;
pExec->epoch = -1; pExec->epoch = -1;
pExec->subType = req.subType;
pExec->withTbName = req.withTbName;
pExec->withSchema = req.withSchema;
pExec->withTag = req.withTag;
pExec->withTagSchema = req.withTagSchema;
pExec->qmsg = req.qmsg; pExec->qmsg = req.qmsg;
req.qmsg = NULL; req.qmsg = NULL;
pExec->pReadHandle = walOpenReadHandle(pTq->pVnode->pWal);
pExec->pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
for (int32_t i = 0; i < 4; i++) { for (int32_t i = 0; i < 4; i++) {
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta); pExec->pStreamReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
SReadHandle handle = { SReadHandle handle = {
.reader = pReadHandle, .reader = pExec->pStreamReader[i],
.meta = pTq->pVnodeMeta, .meta = pTq->pVnode->pMeta,
}; };
pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle); pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle);
ASSERT(pExec->task[i]); ASSERT(pExec->task[i]);
} }
taosHashPut(pTq->tqMetaNew, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec)); taosHashPut(pTq->execs, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec));
return 0; return 0;
} else { } else {
/*if (req.newConsumerId != -1) {*/ /*if (req.newConsumerId != -1) {*/
...@@ -627,12 +673,12 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { ...@@ -627,12 +673,12 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
return -1; return -1;
} }
for (int32_t i = 0; i < parallel; i++) { for (int32_t i = 0; i < parallel; i++) {
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta); STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
SReadHandle handle = { SReadHandle handle = {
.reader = pReadHandle, .reader = pStreamReader,
.meta = pTq->pVnodeMeta, .meta = pTq->pVnode->pMeta,
}; };
pTask->exec.runners[i].inputHandle = pReadHandle; pTask->exec.runners[i].inputHandle = pStreamReader;
pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
ASSERT(pTask->exec.runners[i].executor); ASSERT(pTask->exec.runners[i].executor);
} }
......
...@@ -82,7 +82,8 @@ bool tqNextDataBlock(STqReadHandle* pHandle) { ...@@ -82,7 +82,8 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
return false; return false;
} }
int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* pGroupId, int32_t* pNumOfRows) { int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* pGroupId, int32_t* pNumOfRows,
int32_t* pNumOfCols) {
/*int32_t sversion = pHandle->pBlock->sversion;*/ /*int32_t sversion = pHandle->pBlock->sversion;*/
// TODO set to real sversion // TODO set to real sversion
int32_t sversion = 0; int32_t sversion = 0;
...@@ -104,7 +105,6 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p ...@@ -104,7 +105,6 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper; SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper;
*pNumOfRows = pHandle->pBlock->numOfRows; *pNumOfRows = pHandle->pBlock->numOfRows;
/*int32_t numOfCols = pHandle->pSchema->numOfCols;*/
int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList); int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);
if (colNumNeed > pSchemaWrapper->nCols) { if (colNumNeed > pSchemaWrapper->nCols) {
...@@ -142,6 +142,7 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p ...@@ -142,6 +142,7 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
} }
int32_t colActual = taosArrayGetSize(*ppCols); int32_t colActual = taosArrayGetSize(*ppCols);
*pNumOfCols = colActual;
// TODO in stream shuffle case, fetch groupId // TODO in stream shuffle case, fetch groupId
*pGroupId = 0; *pGroupId = 0;
......
...@@ -112,7 +112,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -112,7 +112,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
// open tq // open tq
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR); sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR);
pVnode->pTq = tqOpen(tdir, pVnode, pVnode->pWal, pVnode->pMeta, vBufPoolGetMAF(pVnode)); pVnode->pTq = tqOpen(tdir, pVnode, pVnode->pWal);
if (pVnode->pTq == NULL) { if (pVnode->pTq == NULL) {
vError("vgId: %d failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId: %d failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
......
...@@ -561,7 +561,8 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) ...@@ -561,7 +561,8 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup)
SArray* pCols = NULL; SArray* pCols = NULL;
uint64_t groupId; uint64_t groupId;
int32_t numOfRows; int32_t numOfRows;
int32_t code = tqRetrieveDataBlock(&pCols, pInfo->readerHandle, &groupId, &numOfRows); int32_t numOfCols;
int32_t code = tqRetrieveDataBlock(&pCols, pInfo->readerHandle, &groupId, &numOfRows, &numOfCols);
if (code != TSDB_CODE_SUCCESS || numOfRows == 0) { if (code != TSDB_CODE_SUCCESS || numOfRows == 0) {
pTaskInfo->code = code; pTaskInfo->code = code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册