提交 a3ac8b64 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 482c319b
...@@ -356,7 +356,7 @@ SStreamTask* tNewSStreamTask(int64_t streamId); ...@@ -356,7 +356,7 @@ SStreamTask* tNewSStreamTask(int64_t streamId);
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask); int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeSStreamTask(SStreamTask* pTask); void tFreeSStreamTask(SStreamTask* pTask);
int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem); int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem);
static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) { static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) {
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
......
...@@ -61,7 +61,7 @@ typedef void (*FItems)(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfItems); ...@@ -61,7 +61,7 @@ typedef void (*FItems)(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfItems);
typedef struct STaosQnode STaosQnode; typedef struct STaosQnode STaosQnode;
typedef struct STaosQnode { struct STaosQnode {
STaosQnode *next; STaosQnode *next;
STaosQueue *queue; STaosQueue *queue;
int64_t timestamp; int64_t timestamp;
...@@ -70,9 +70,9 @@ typedef struct STaosQnode { ...@@ -70,9 +70,9 @@ typedef struct STaosQnode {
int8_t itype; int8_t itype;
int8_t reserved[3]; int8_t reserved[3];
char item[]; char item[];
} STaosQnode; };
typedef struct STaosQueue { struct STaosQueue {
STaosQnode *head; STaosQnode *head;
STaosQnode *tail; STaosQnode *tail;
STaosQueue *next; // for queue set STaosQueue *next; // for queue set
...@@ -84,22 +84,22 @@ typedef struct STaosQueue { ...@@ -84,22 +84,22 @@ typedef struct STaosQueue {
int64_t memOfItems; int64_t memOfItems;
int32_t numOfItems; int32_t numOfItems;
int64_t threadId; int64_t threadId;
} STaosQueue; };
typedef struct STaosQset { struct STaosQset {
STaosQueue *head; STaosQueue *head;
STaosQueue *current; STaosQueue *current;
TdThreadMutex mutex; TdThreadMutex mutex;
tsem_t sem; tsem_t sem;
int32_t numOfQueues; int32_t numOfQueues;
int32_t numOfItems; int32_t numOfItems;
} STaosQset; };
typedef struct STaosQall { struct STaosQall {
STaosQnode *current; STaosQnode *current;
STaosQnode *start; STaosQnode *start;
int32_t numOfItems; int32_t numOfItems;
} STaosQall; };
STaosQueue *taosOpenQueue(); STaosQueue *taosOpenQueue();
void taosCloseQueue(STaosQueue *queue); void taosCloseQueue(STaosQueue *queue);
......
...@@ -1338,8 +1338,9 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1338,8 +1338,9 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosWriteQitem(tmq->mqueue, pRspWrapper); taosWriteQitem(tmq->mqueue, pRspWrapper);
int32_t total = taosQueueItemSize(tmq->mqueue);
tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64, tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64,
tmq->consumerId, rspType, vgId, tmq->mqueue->numOfItems, requestId); tmq->consumerId, rspType, vgId, total, requestId);
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
taosReleaseRef(tmqMgmt.rsetId, refId); taosReleaseRef(tmqMgmt.rsetId, refId);
......
...@@ -161,10 +161,10 @@ void *queryThread(void *arg) { ...@@ -161,10 +161,10 @@ void *queryThread(void *arg) {
return NULL; return NULL;
} }
static int32_t numOfThreads = 1; int32_t numOfThreads = 1;
void tmq_commit_cb_print(tmq_t *pTmq, int32_t code, void *param) { void tmq_commit_cb_print(tmq_t *pTmq, int32_t code, void *param) {
printf("success, code:%d\n", code); printf("auto commit success, code:%d\n\n\n\n", code);
} }
void* doConsumeData(void* param) { void* doConsumeData(void* param) {
...@@ -173,7 +173,7 @@ void* doConsumeData(void* param) { ...@@ -173,7 +173,7 @@ void* doConsumeData(void* param) {
tmq_conf_t* conf = tmq_conf_new(); tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
tmq_conf_set(conf, "group.id", "cgrpName12"); tmq_conf_set(conf, "group.id", "cgrpName41");
tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest"); tmq_conf_set(conf, "auto.offset.reset", "earliest");
...@@ -1060,7 +1060,7 @@ TEST(clientCase, sub_tb_test) { ...@@ -1060,7 +1060,7 @@ TEST(clientCase, sub_tb_test) {
tmq_conf_t* conf = tmq_conf_new(); tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
tmq_conf_set(conf, "group.id", "cgrpName27"); tmq_conf_set(conf, "group.id", "cgrpName45");
tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest"); tmq_conf_set(conf, "auto.offset.reset", "earliest");
......
...@@ -1332,7 +1332,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { ...@@ -1332,7 +1332,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
pRefBlock->dataRef = pRef; pRefBlock->dataRef = pRef;
atomic_add_fetch_32(pRefBlock->dataRef, 1); atomic_add_fetch_32(pRefBlock->dataRef, 1);
if (streamTaskInput(pTask, (SStreamQueueItem*)pRefBlock) < 0) { if (tAppendDataForStream(pTask, (SStreamQueueItem*)pRefBlock) < 0) {
qError("stream task input del failed, task id %d", pTask->taskId); qError("stream task input del failed, task id %d", pTask->taskId);
atomic_sub_fetch_32(pRef, 1); atomic_sub_fetch_32(pRef, 1);
...@@ -1367,7 +1367,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { ...@@ -1367,7 +1367,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
taosArrayPush(pStreamBlock->blocks, &block); taosArrayPush(pStreamBlock->blocks, &block);
if (!failed) { if (!failed) {
if (streamTaskInput(pTask, (SStreamQueueItem*)pStreamBlock) < 0) { if (tAppendDataForStream(pTask, (SStreamQueueItem*)pStreamBlock) < 0) {
qError("stream task input del failed, task id %d", pTask->taskId); qError("stream task input del failed, task id %d", pTask->taskId);
continue; continue;
} }
...@@ -1388,13 +1388,13 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { ...@@ -1388,13 +1388,13 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
void* pIter = NULL; void* pIter = NULL;
bool failed = false; bool succ = true;
SStreamDataSubmit2* pSubmit = streamDataSubmitNew(submit); SStreamDataSubmit2* pSubmit = streamDataSubmitNew(submit);
if (pSubmit == NULL) { if (pSubmit == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to create data submit for stream since out of memory"); tqError("failed to create data submit for stream since out of memory");
failed = true; succ = false;
} }
while (1) { while (1) {
...@@ -1409,20 +1409,19 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { ...@@ -1409,20 +1409,19 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
} }
if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("skip push task %d, task status %d", pTask->taskId, pTask->taskStatus); tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->taskId, pTask->taskStatus);
continue; continue;
} }
tqDebug("data submit enqueue stream task: %d, ver: %" PRId64, pTask->taskId, submit.ver); tqDebug("data submit enqueue stream task:%d, ver: %" PRId64, pTask->taskId, submit.ver);
if (succ) {
if (!failed) { if (tAppendDataForStream(pTask, (SStreamQueueItem*)pSubmit) < 0) {
if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) { tqError("stream task:%d failed to put into queue for, too many", pTask->taskId);
tqError("stream task input failed, task id %d", pTask->taskId);
continue; continue;
} }
if (streamSchedExec(pTask) < 0) { if (streamSchedExec(pTask) < 0) {
tqError("stream task launch failed, task id %d", pTask->taskId); tqError("stream task:%d launch failed, code:%s", pTask->taskId, tstrerror(terrno));
continue; continue;
} }
} else { } else {
...@@ -1430,12 +1429,12 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { ...@@ -1430,12 +1429,12 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
} }
} }
if (pSubmit) { if (pSubmit != NULL) {
streamDataSubmitDestroy(pSubmit); streamDataSubmitDestroy(pSubmit);
taosFreeQitem(pSubmit); taosFreeQitem(pSubmit);
} }
return failed ? -1 : 0; return succ ? 0 : -1;
} }
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
......
...@@ -68,7 +68,7 @@ void streamSchedByTimer(void* param, void* tmrId) { ...@@ -68,7 +68,7 @@ void streamSchedByTimer(void* param, void* tmrId) {
atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE); atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE);
if (streamTaskInput(pTask, (SStreamQueueItem*)trigger) < 0) { if (tAppendDataForStream(pTask, (SStreamQueueItem*)trigger) < 0) {
taosFreeQitem(trigger); taosFreeQitem(trigger);
taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer); taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer);
return; return;
...@@ -123,7 +123,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SR ...@@ -123,7 +123,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SR
/*pData->blocks = pReq->data;*/ /*pData->blocks = pReq->data;*/
/*pBlock->sourceVer = pReq->sourceVer;*/ /*pBlock->sourceVer = pReq->sourceVer;*/
streamDispatchReqToData(pReq, pData); streamDispatchReqToData(pReq, pData);
if (streamTaskInput(pTask, (SStreamQueueItem*)pData) == 0) { if (tAppendDataForStream(pTask, (SStreamQueueItem*)pData) == 0) {
status = TASK_INPUT_STATUS__NORMAL; status = TASK_INPUT_STATUS__NORMAL;
} else { } else {
status = TASK_INPUT_STATUS__FAILED; status = TASK_INPUT_STATUS__FAILED;
...@@ -164,7 +164,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, ...@@ -164,7 +164,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
/*pData->blocks = pReq->data;*/ /*pData->blocks = pReq->data;*/
/*pBlock->sourceVer = pReq->sourceVer;*/ /*pBlock->sourceVer = pReq->sourceVer;*/
streamRetrieveReqToData(pReq, pData); streamRetrieveReqToData(pReq, pData);
if (streamTaskInput(pTask, (SStreamQueueItem*)pData) == 0) { if (tAppendDataForStream(pTask, (SStreamQueueItem*)pData) == 0) {
status = TASK_INPUT_STATUS__NORMAL; status = TASK_INPUT_STATUS__NORMAL;
} else { } else {
status = TASK_INPUT_STATUS__FAILED; status = TASK_INPUT_STATUS__FAILED;
...@@ -275,7 +275,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S ...@@ -275,7 +275,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
return 0; return 0;
} }
int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) { int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem) {
int8_t type = pItem->type; int8_t type = pItem->type;
if (type == STREAM_INPUT__DATA_SUBMIT) { if (type == STREAM_INPUT__DATA_SUBMIT) {
...@@ -288,9 +288,11 @@ int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) { ...@@ -288,9 +288,11 @@ int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) {
} }
taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock); taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock);
int32_t total = taosQueueItemSize(pTask->inputQueue->queue);
qDebug("stream task:%d %p submit enqueue %p %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d", pTask->taskId, qDebug("stream task:%d %p submit enqueue %p %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d", pTask->taskId,
pTask, pItem, pSubmitBlock, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen, pTask, pItem, pSubmitBlock, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen,
pSubmitBlock->submit.ver, pTask->inputQueue->queue->numOfItems); pSubmitBlock->submit.ver, total);
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) { type == STREAM_INPUT__REF_DATA_BLOCK) {
taosWriteQitem(pTask->inputQueue->queue, pItem); taosWriteQitem(pTask->inputQueue->queue, pItem);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册