提交 29fcd1b8 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 50ba5556
......@@ -255,14 +255,13 @@ int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList);
int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id);
int32_t tqNextBlock(STqReader *pReader, SSDataBlock* pBlock);
int32_t tqNextBlockInWal(STqReader* pReader);
int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData);
bool tqNextBlockImpl(STqReader *pReader);
int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData);
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
bool tqNextBlockImpl(STqReader *pReader);
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader, SSubmitTbData **pSubmitTbDataRet);
int32_t tqRetrieveDataBlock(STqReader *pReader, SSubmitTbData **pSubmitTbDataRet);
int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet);
int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg);
......
......@@ -212,7 +212,6 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msg
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit);
int32_t tqProcessSubmitReqForSubscribe(STQ* pTq);
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
......
......@@ -1069,12 +1069,15 @@ int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
taosWLockLatch(&pTq->lock);
if(taosHashGetSize(pTq->pPushMgr) > 0){
void *pIter = taosHashIterate(pTq->pPushMgr, NULL);
while(pIter){
if (taosHashGetSize(pTq->pPushMgr) > 0) {
void* pIter = taosHashIterate(pTq->pPushMgr, NULL);
while (pIter) {
STqHandle* pHandle = *(STqHandle**)pIter;
tqDebug("vgId:%d start set submit for pHandle:%p, consume id:0x%"PRIx64, vgId, pHandle, pHandle->consumerId);
if(ASSERT(pHandle->msg != NULL)){
tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);
if (ASSERT(pHandle->msg != NULL)) {
tqError("pHandle->msg should not be null");
break;
}else{
......@@ -1083,77 +1086,15 @@ int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) {
taosMemoryFree(pHandle->msg);
pHandle->msg = NULL;
}
pIter = taosHashIterate(pTq->pPushMgr, pIter);
}
taosHashClear(pTq->pPushMgr);
}
// unlock
taosWUnLockLatch(&pTq->lock);
return 0;
}
int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
#if 0
void* pIter = NULL;
SStreamDataSubmit2* pSubmit = streamDataSubmitNew(submit, STREAM_INPUT__DATA_SUBMIT);
if (pSubmit == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to create data submit for stream since out of memory");
saveOffsetForAllTasks(pTq, submit.ver);
return -1;
}
SArray* pInputQueueFullTasks = taosArrayInit(4, POINTER_BYTES);
while (1) {
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
continue;
}
if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId,
pTask->status.taskStatus);
continue;
}
// check if offset value exists
char key[128] = {0};
createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId);
if (tInputQueueIsFull(pTask)) {
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key);
int64_t ver = submit.ver;
if (pOffset == NULL) {
doSaveTaskOffset(pTq->pOffsetStore, key, submit.ver);
} else {
ver = pOffset->val.version;
}
tqDebug("s-task:%s input queue is full, discard submit block, ver:%" PRId64, pTask->id.idStr, ver);
taosArrayPush(pInputQueueFullTasks, &pTask);
continue;
}
// check if offset value exists
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key);
ASSERT(pOffset == NULL);
addSubmitBlockNLaunchTask(pTq->pOffsetStore, pTask, pSubmit, key, submit.ver);
}
streamDataSubmitDestroy(pSubmit);
taosFreeQitem(pSubmit);
#endif
tqStartStreamTasks(pTq);
return 0;
}
......@@ -1324,8 +1265,8 @@ int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->wa
int32_t tqStartStreamTasks(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
taosWLockLatch(&pMeta->lock);
int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pTasks);
if (numOfTasks == 0) {
......
......@@ -16,250 +16,10 @@
#include "tq.h"
#include "vnd.h"
#if 0
void tqTmrRspFunc(void* param, void* tmrId) {
STqHandle* pHandle = (STqHandle*)param;
atomic_store_8(&pHandle->pushHandle.tmrStopped, 1);
}
static int32_t tqLoopExecFromQueue(STQ* pTq, STqHandle* pHandle, SStreamDataSubmit** ppSubmit, SMqDataRsp* pRsp) {
SStreamDataSubmit* pSubmit = *ppSubmit;
while (pSubmit != NULL) {
if (tqLogScanExec(pTq, &pHandle->execHandle, pSubmit->data, pRsp, 0) < 0) {
}
// update processed
atomic_store_64(&pHandle->pushHandle.processedVer, pSubmit->ver);
streamQueueProcessSuccess(&pHandle->pushHandle.inputQ);
streamDataSubmitDestroy(pSubmit);
if (pRsp->blockNum > 0) {
*ppSubmit = pSubmit;
return 0;
} else {
pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ);
}
}
*ppSubmit = pSubmit;
return -1;
}
int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) {
SMqDataRsp rsp = {0};
// 1. guard and set status executing
int8_t execStatus = atomic_val_compare_exchange_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE,
TASK_EXEC_STATUS__EXECUTING);
if (execStatus == TASK_EXEC_STATUS__IDLE) {
SStreamDataSubmit* pSubmit = NULL;
// 2. check processedVer
// 2.1. if not missed, get msg from queue
// 2.2. if missed, scan wal
pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ);
while (pHandle->pushHandle.processedVer <= pSubmit->ver) {
// read from wal
}
while (pHandle->pushHandle.processedVer > pSubmit->ver + 1) {
streamQueueProcessSuccess(&pHandle->pushHandle.inputQ);
streamDataSubmitDestroy(pSubmit);
pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ);
if (pSubmit == NULL) break;
}
// 3. exec, after each success, update processed ver
// first run
if (tqLoopExecFromQueue(pTq, pHandle, &pSubmit, &rsp) == 0) {
goto SEND_RSP;
}
// set exec status closing
atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__CLOSING);
// second run
if (tqLoopExecFromQueue(pTq, pHandle, &pSubmit, &rsp) == 0) {
goto SEND_RSP;
}
// set exec status idle
atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE);
}
SEND_RSP:
// 4. if get result
// 4.1 set exec input status blocked and exec status idle
atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE);
// 4.2 rpc send
rsp.rspOffset = pHandle->pushHandle.processedVer;
/*if (tqSendPollRsp(pTq, pMsg, pReq, &rsp) < 0) {*/
/*return -1;*/
/*}*/
// 4.3 clear rpc info
memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo));
return 0;
}
int32_t tqOpenPushHandle(STQ* pTq, STqHandle* pHandle) {
memset(&pHandle->pushHandle, 0, sizeof(STqPushHandle));
pHandle->pushHandle.inputQ.queue = taosOpenQueue();
pHandle->pushHandle.inputQ.qall = taosAllocateQall();
if (pHandle->pushHandle.inputQ.queue == NULL || pHandle->pushHandle.inputQ.qall == NULL) {
if (pHandle->pushHandle.inputQ.queue) {
taosCloseQueue(pHandle->pushHandle.inputQ.queue);
}
if (pHandle->pushHandle.inputQ.qall) {
taosFreeQall(pHandle->pushHandle.inputQ.qall);
}
return -1;
}
return 0;
}
int32_t tqPreparePush(STQ* pTq, STqHandle* pHandle, int64_t reqId, const SRpcHandleInfo* pInfo, int64_t processedVer,
int64_t timeout) {
memcpy(&pHandle->pushHandle.rpcInfo, pInfo, sizeof(SRpcHandleInfo));
atomic_store_64(&pHandle->pushHandle.reqId, reqId);
atomic_store_64(&pHandle->pushHandle.processedVer, processedVer);
atomic_store_8(&pHandle->pushHandle.inputStatus, TASK_INPUT_STATUS__NORMAL);
atomic_store_8(&pHandle->pushHandle.tmrStopped, 0);
taosTmrReset(tqTmrRspFunc, (int32_t)timeout, pHandle, tqMgmt.timer, &pHandle->pushHandle.timerId);
return 0;
}
int32_t tqEnqueue(STqHandle* pHandle, SStreamDataSubmit* pSubmit) {
int8_t inputStatus = atomic_load_8(&pHandle->pushHandle.inputStatus);
if (inputStatus == TASK_INPUT_STATUS__NORMAL) {
SStreamDataSubmit* pSubmitClone = streamSubmitBlockClone(pSubmit);
if (pSubmitClone == NULL) {
return -1;
}
taosWriteQitem(pHandle->pushHandle.inputQ.queue, pSubmitClone);
return 0;
}
return -1;
}
int32_t tqSendExecReq(STQ* pTq, STqHandle* pHandle) {
//
return 0;
}
int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver, SRpcHandleInfo handleInfo) {
if (msgType != TDMT_VND_SUBMIT) return 0;
void* pIter = NULL;
STqHandle* pHandle = NULL;
SSubmitReq* pReq = (SSubmitReq*)msg;
int32_t workerId = 4;
int64_t fetchOffset = ver;
while (1) {
pIter = taosHashIterate(pTq->pushMgr, pIter);
if (pIter == NULL) break;
pHandle = *(STqHandle**)pIter;
taosWLockLatch(&pHandle->pushHandle.lock);
SMqDataRsp rsp = {0};
rsp.reqOffset = pHandle->pushHandle.reqOffset;
rsp.blockData = taosArrayInit(0, sizeof(void*));
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
if (msgType == TDMT_VND_SUBMIT) {
tqLogScanExec(pTq, &pHandle->execHandle, pReq, &rsp, workerId);
} else {
tqError("tq push unexpected msg type %d", msgType);
}
if (rsp.blockNum == 0) {
taosWUnLockLatch(&pHandle->pushHandle.lock);
continue;
}
rsp.rspOffset = fetchOffset;
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
// todo free
return -1;
}
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pHandle->pushHandle.epoch;
((SMqRspHead*)buf)->consumerId = pHandle->pushHandle.consumerId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqDataBlkRsp(&abuf, &rsp);
SRpcMsg resp = {
.info = pHandle->pushHandle.rpcInfo,
.pCont = buf,
.contLen = tlen,
.code = 0,
};
tmsgSendRsp(&resp);
memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo));
taosWUnLockLatch(&pHandle->pushHandle.lock);
tqDebug("vgId:%d offset %" PRId64 " from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, req:%" PRId64 ", rsp:%" PRId64,
TD_VID(pTq->pVnode), fetchOffset, pHandle->pushHandle.consumerId, pHandle->pushHandle.epoch, rsp.blockNum,
rsp.reqOffset, rsp.rspOffset);
// TODO destroy
taosArrayDestroy(rsp.blockData);
taosArrayDestroy(rsp.blockDataLen);
}
return 0;
}
#endif
int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
// void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg));
// int32_t len = msgLen - sizeof(SSubmitReq2Msg);
// int32_t vgId = TD_VID(pTq->pVnode);
if (msgType == TDMT_VND_SUBMIT) {
tqProcessSubmitReqForSubscribe(pTq);
// lock push mgr to avoid potential msg lost
// taosWLockLatch(&pTq->lock);
//
// int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr);
// if (numOfRegisteredPush > 0) {
// tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d",
// vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush);
//
// void* data = taosMemoryMalloc(len);
// if (data == NULL) {
// terrno = TSDB_CODE_OUT_OF_MEMORY;
// tqError("failed to copy data for stream since out of memory, vgId:%d", vgId);
// taosWUnLockLatch(&pTq->lock);
// return -1;
// }
//
// memcpy(data, pReq, len);
//
// SArray* cachedKey = taosArrayInit(0, sizeof(SItem));
// void* pIter = NULL;
//
// while (1) {
// pIter = taosHashIterate(pTq->pPushMgr, pIter);
// if (pIter == NULL) {
// break;
// }
//
// STqPushEntry* pPushEntry = *(STqPushEntry**)pIter;
//
// STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey));
// if (pHandle == NULL) {
// tqDebug("vgId:%d, failed to find handle %s in pushing data to consumer, ignore", pTq->pVnode->config.vgId,
// pPushEntry->subKey);
// continue;
// }
//
// STqExecHandle* pExec = &pHandle->execHandle;
// doPushDataForEntry(pIter, pExec, pTq, ver, vgId, data, len, cachedKey);
// }
//
// doRemovePushedEntry(cachedKey, pTq);
// taosArrayDestroyEx(cachedKey, freeItem);
// taosMemoryFree(data);
// }
//
// // unlock
// taosWUnLockLatch(&pTq->lock);
}
tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, (int)taosHashGetSize(pTq->pStreamMeta->pTasks));
......@@ -274,8 +34,7 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
}
if (msgType == TDMT_VND_SUBMIT) {
SPackedData submit = {0};
tqProcessSubmitReq(pTq, submit);
tqStartStreamTasks(pTq);
}
if (msgType == TDMT_VND_DELETE) {
......@@ -286,16 +45,16 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
return 0;
}
int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode);
STqHandle* pHandle = (STqHandle*) handle;
if(pHandle->msg == NULL){
STqHandle* pHandle = (STqHandle*)handle;
if (pHandle->msg == NULL) {
pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg));
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
pHandle->msg->pCont = rpcMallocCont(pMsg->contLen);
}else{
void *tmp = pHandle->msg->pCont;
} else {
void* tmp = pHandle->msg->pCont;
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
pHandle->msg->pCont = tmp;
}
......@@ -303,7 +62,8 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen);
pHandle->msg->contLen = pMsg->contLen;
int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES);
tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen);
tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret,
pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen);
return 0;
}
......@@ -313,6 +73,7 @@ int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) {
int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey));
tqError("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId);
if(pHandle->msg != NULL) {
tqPushDataRsp(pTq, pHandle);
......@@ -320,5 +81,6 @@ int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) {
taosMemoryFree(pHandle->msg);
pHandle->msg = NULL;
}
return 0;
}
......@@ -332,6 +332,7 @@ int32_t tqNextBlockInWal(STqReader* pReader) {
if (pBlockList == NULL || pReader->nextBlk >= taosArrayGetSize(pBlockList)) {
// try next message in wal file
// todo always retry to avoid read failure caused by wal file deletion
if (walNextValidMsg(pWalReader) < 0) {
return FETCH_TYPE__NONE;
}
......@@ -374,7 +375,7 @@ int32_t tqNextBlockInWal(STqReader* pReader) {
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
if (pReader->tbIdHash == NULL) {
int32_t code = tqRetrieveDataBlock(pReader->pResBlock, pReader, NULL);
int32_t code = tqRetrieveDataBlock(pReader, NULL);
if (code == TSDB_CODE_SUCCESS && pReader->pResBlock->info.rows > 0) {
return FETCH_TYPE__DATA;
}
......@@ -384,7 +385,7 @@ int32_t tqNextBlockInWal(STqReader* pReader) {
if (ret != NULL) {
tqDebug("tq reader return submit block, uid:%"PRId64", ver:%"PRId64, pSubmitTbData->uid, pReader->msg.ver);
int32_t code = tqRetrieveDataBlock(pReader->pResBlock, pReader, NULL);
int32_t code = tqRetrieveDataBlock(pReader, NULL);
if (code == TSDB_CODE_SUCCESS && pReader->pResBlock->info.rows > 0) {
return FETCH_TYPE__DATA;
}
......@@ -399,31 +400,6 @@ int32_t tqNextBlockInWal(STqReader* pReader) {
}
}
int32_t tqNextBlock(STqReader* pReader, SSDataBlock* pBlock) {
while (1) {
if (pReader->msg.msgStr == NULL) {
if (walNextValidMsg(pReader->pWalReader) < 0) {
return FETCH_TYPE__NONE;
}
void* pBody = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
int64_t ver = pReader->pWalReader->pHead->head.version;
tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver);
}
while (tqNextBlockImpl(pReader)) {
int32_t code = tqRetrieveDataBlock(pReader->pResBlock, pReader, NULL);
if (code != TSDB_CODE_SUCCESS || pBlock->info.rows == 0) {
continue;
}
return FETCH_TYPE__DATA;
}
}
}
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) {
pReader->msg.msgStr = msgStr;
pReader->msg.msgLen = msgLen;
......@@ -527,7 +503,7 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrap
return 0;
}
int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) {
int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) {
tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
......@@ -535,6 +511,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa
*pSubmitTbDataRet = pSubmitTbData;
}
SSDataBlock* pBlock = pReader->pResBlock;
blockDataCleanup(pBlock);
int32_t sversion = pSubmitTbData->sver;
......@@ -603,7 +580,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa
SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
if (code != TSDB_CODE_SUCCESS) {
goto FAIL;
return -1;
}
i++;
j++;
......@@ -622,7 +599,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa
if (blockDataEnsureCapacity(pBlock, numOfRows) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
return -1;
}
pBlock->info.rows = numOfRows;
......@@ -638,7 +615,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa
while (targetIdx < colActual) {
if (sourceIdx >= numOfCols) {
tqError("tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols);
goto FAIL;
return -1;
}
SColData* pCol = taosArrayGet(pCols, sourceIdx);
......@@ -647,7 +624,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa
if (pCol->nVal != numOfRows) {
tqError("tqRetrieveDataBlock pCol->nVal:%d != numOfRows:%d", pCol->nVal, numOfRows);
goto FAIL;
return -1;
}
if (pCol->cid < pColData->info.colId) {
......@@ -661,14 +638,14 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa
memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
varDataSetLen(val, colVal.value.nData);
if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
goto FAIL;
return -1;
}
} else {
colDataSetNULL(pColData, i);
}
} else {
if (colDataAppend(pColData, i, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
goto FAIL;
return -1;
}
}
}
......@@ -710,14 +687,14 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa
memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
varDataSetLen(val, colVal.value.nData);
if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
goto FAIL;
return -1;
}
} else {
colDataSetNULL(pColData, i);
}
} else {
if (colDataAppend(pColData, i, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
goto FAIL;
return -1;
}
}
......@@ -735,10 +712,6 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa
}
return 0;
FAIL:
blockDataFreeRes(pBlock);
return -1;
}
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) {
......
......@@ -66,6 +66,7 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, in
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
const int32_t MAX_ROWS_TO_RETURN = 4096;
int32_t vgId = TD_VID(pTq->pVnode);
int32_t code = 0;
int32_t totalRows = 0;
......
......@@ -175,7 +175,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
goto end;
}
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
// lock
......@@ -361,11 +361,10 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
// this is a normal subscribe requirement
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset);
}
// todo handle the case where re-balance occurs.
} else { // todo handle the case where re-balance occurs.
// for taosx
return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &offset);
}
}
int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp) {
......
......@@ -448,7 +448,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
walApplyVer(pVnode->pWal, version);
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
// /*vInfo("vgId:%d, push msg end", pVnode->config.vgId);*/
vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
}
......
......@@ -59,7 +59,7 @@ typedef struct {
STqOffsetVal currentOffset; // for tmq
SMqMetaRsp metaRsp; // for tmq fetching meta
int64_t snapshotVer;
SPackedData submit; // todo remove it
// SPackedData submit; // todo remove it
SSchemaWrapper* schema;
char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor
int8_t recoverStep;
......
......@@ -1080,6 +1080,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
STableListInfo* pTableListInfo = pScanBaseInfo->pTableListInfo;
if (pOffset->type == TMQ_OFFSET__LOG) {
// todo refactor: move away
tsdbReaderClose(pScanBaseInfo->dataReader);
pScanBaseInfo->dataReader = NULL;
......
......@@ -1636,6 +1636,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
qDebug("start to exec queue scan, %s", id);
#if 0
if (pTaskInfo->streamInfo.submit.msgStr != NULL) {
if (pInfo->tqReader->msg.msgStr == NULL) {
SPackedData submit = pTaskInfo->streamInfo.submit;
......@@ -1649,7 +1650,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
while (tqNextBlockImpl(pInfo->tqReader)) {
int32_t code = tqRetrieveDataBlock(pInfo->tqReader->pResBlock, pInfo->tqReader, NULL);
int32_t code = tqRetrieveDataBlock(pInfo->tqReader, NULL);
if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) {
continue;
}
......@@ -1665,6 +1666,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
pTaskInfo->streamInfo.submit = (SPackedData){0};
return NULL;
}
#endif
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
......@@ -1682,10 +1684,12 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) {
return NULL;
}
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pTaskInfo->streamInfo.snapshotVer);
}
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) {
while (1) {
int32_t type = tqNextBlockInWal(pInfo->tqReader);
SSDataBlock* pRes = pInfo->tqReader->pResBlock;
......@@ -2071,7 +2075,7 @@ FETCH_NEXT_BLOCK:
blockDataCleanup(pInfo->pRes);
while (tqNextBlockImpl(pInfo->tqReader)) {
int32_t code = tqRetrieveDataBlock(pInfo->tqReader->pResBlock, pInfo->tqReader, NULL);
int32_t code = tqRetrieveDataBlock(pInfo->tqReader, NULL);
if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) {
continue;
}
......@@ -2109,7 +2113,6 @@ FETCH_NEXT_BLOCK:
// record the scan action.
pInfo->numOfExec++;
pOperator->resultInfo.totalRows += pBlockInfo->rows;
// printDataBlock(pInfo->pRes, "stream scan");
qDebug("scan rows: %" PRId64, pBlockInfo->rows);
if (pBlockInfo->rows > 0) {
......
......@@ -253,7 +253,6 @@ int32_t streamExecForAll(SStreamTask* pTask) {
while (1) {
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
if (qItem == NULL) {
// qDebug("s-task:%s extract data from input queue, queue is empty, abort", pTask->id.idStr);
break;
}
......@@ -298,7 +297,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
}
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
qDebug("s-task:%s exec begin, numOfBlocks:%d", pTask->id.idStr, batchSize);
qDebug("s-task:%s start to execute, numOfBlocks:%d", pTask->id.idStr, batchSize);
streamTaskExecImpl(pTask, pInput, pRes);
......
......@@ -237,6 +237,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
}
seeked = true;
}
while (1) {
contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
if (contLen == sizeof(SWalCkHead)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册