提交 2fd72500 编写于 作者: H Haojun Liao

Merge remote-tracking branch 'origin/enh/triggerCheckPoint2' into enh/triggerCheckPoint2

...@@ -3052,7 +3052,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -3052,7 +3052,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
int32_t funResSize= getMaxFunResSize(&pOperator->exprSupp, numOfCols); int32_t funResSize= getMaxFunResSize(&pOperator->exprSupp, numOfCols);
pInfo->pState->pFileState = pAPI->stateStore.streamFileStateInit( pInfo->pState->pFileState = pAPI->stateStore.streamFileStateInit(
tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState,
pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pTaskInfo->streamInfo.snapshotVer); pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId);
pInfo->dataVersion = 0; pInfo->dataVersion = 0;
pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
pInfo->recvGetAll = false; pInfo->recvGetAll = false;
...@@ -5704,7 +5704,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -5704,7 +5704,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pState->pFileState = pTaskInfo->storageAPI.stateStore.streamFileStateInit( pInfo->pState->pFileState = pTaskInfo->storageAPI.stateStore.streamFileStateInit(
tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState,
pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pTaskInfo->streamInfo.snapshotVer); pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId);
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
pInfo, pTaskInfo); pInfo, pTaskInfo);
......
...@@ -883,7 +883,7 @@ int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest) { ...@@ -883,7 +883,7 @@ int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest) {
SStreamValue key = {0}; SStreamValue key = {0};
char* p = value; char* p = value;
if (streamStateValueIsStale(p)) { if (streamStateValueIsStale(p)) {
*dest = NULL; if (dest != NULL) *dest = NULL;
return -1; return -1;
} }
p = taosDecodeFixedI64(p, &key.unixTimestamp); p = taosDecodeFixedI64(p, &key.unixTimestamp);
...@@ -1490,9 +1490,13 @@ int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, cons ...@@ -1490,9 +1490,13 @@ int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, cons
if (pKtmp->opNum != pCur->number) { if (pKtmp->opNum != pCur->number) {
return -1; return -1;
} }
size_t vlen = 0;
if (pVal != NULL) *pVal = (char*)rocksdb_iter_value(pCur->iter, &vlen); if (pVLen != NULL) {
if (pVLen != NULL) *pVLen = vlen; size_t vlen = 0;
const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
*pVLen = decodeValueFunc((void*)valStr, vlen, NULL, (char**)pVal);
}
*pKey = pKtmp->key; *pKey = pKtmp->key;
return 0; return 0;
} }
...@@ -1555,19 +1559,18 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK ...@@ -1555,19 +1559,18 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX}; const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX};
STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0); STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0);
char buf[128] = {0}; char buf[128] = {0};
int32_t klen = stateKeyEncode((void*)&maxStateKey, buf); int32_t klen = stateKeyEncode((void*)&maxStateKey, buf);
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL; if (pCur == NULL) return NULL;
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; pCur->db = ((SBackendCfWrapper*)pState->pTdbState->pBackendCfWrapper)->rocksdb;
pCur->db = wrapper->rocksdb;
pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt); (rocksdb_readoptions_t**)&pCur->readOpt);
pCur->number = pState->number; pCur->number = pState->number;
rocksdb_iter_seek(pCur->iter, buf, (size_t)klen); rocksdb_iter_seek(pCur->iter, buf, (size_t)klen);
rocksdb_iter_prev(pCur->iter); rocksdb_iter_prev(pCur->iter);
while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) { while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
rocksdb_iter_prev(pCur->iter); rocksdb_iter_prev(pCur->iter);
...@@ -1600,18 +1603,14 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* ...@@ -1600,18 +1603,14 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey*
rocksdb_iter_seek(pCur->iter, buf, len); rocksdb_iter_seek(pCur->iter, buf, len);
if (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) { if (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) {
size_t vlen; SStateKey curKey;
char* val = (char*)rocksdb_iter_value(pCur->iter, &vlen); size_t kLen = 0;
if (!streamStateValueIsStale(val)) { char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
SStateKey curKey; stateKeyDecode((void*)&curKey, keyStr);
size_t kLen = 0;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) {
stateKeyDecode((void*)&curKey, keyStr); pCur->number = pState->number;
return pCur;
if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) {
pCur->number = pState->number;
return pCur;
}
} }
} }
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
...@@ -1900,8 +1899,7 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, ...@@ -1900,8 +1899,7 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
winKeyDecode(&winKey, keyStr); winKeyDecode(&winKey, keyStr);
const char* valStr = rocksdb_iter_value(pCur->iter, &vlen); const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
// char* dst = NULL; int32_t len = decodeValueFunc((void*)valStr, vlen, NULL, (char**)pVal);
int32_t len = decodeValueFunc((void*)valStr, vlen, NULL, (char**)pVal);
if (len < 0) { if (len < 0) {
return -1; return -1;
} }
......
...@@ -443,9 +443,9 @@ static int32_t doDispatchCheckpointMsg(SStreamTask* pTask, const SStreamCheckpoi ...@@ -443,9 +443,9 @@ static int32_t doDispatchCheckpointMsg(SStreamTask* pTask, const SStreamCheckpoi
} }
tEncoderClear(&encoder); tEncoderClear(&encoder);
initRpcMsg(&msg,TDMT_STREAM_TASK_CHECKPOINT, buf, tlen + sizeof(SMsgHead)); initRpcMsg(&msg, TDMT_STREAM_TASK_CHECKPOINT, buf, tlen + sizeof(SMsgHead));
qDebug("s-task:%s (level:%d, vgId:%d) dispatch checkpoint msg to s-task:0x%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr, qDebug("s-task:%s (level:%d, vgId:%d) dispatch checkpoint msg to s-task:0x%" PRIx64 ":0x%x (vgId:%d)",
pTask->info.taskLevel, pTask->info.nodeId, pReq->streamId, pReq->downstreamTaskId, nodeId); pTask->id.idStr, pTask->info.taskLevel, pTask->info.nodeId, pReq->streamId, pReq->downstreamTaskId, nodeId);
tmsgSendReq(pEpSet, &msg); tmsgSendReq(pEpSet, &msg);
return 0; return 0;
...@@ -518,16 +518,16 @@ int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask) { ...@@ -518,16 +518,16 @@ int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask) {
int32_t num = taosArrayGetSize(pTask->pRpcMsgList); int32_t num = taosArrayGetSize(pTask->pRpcMsgList);
ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) == num); ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) == num);
qDebug("s-task:%s level:%d checkpoint completed msg sent to %d upstream tasks", pTask->id.idStr, pTask->info.taskLevel, qDebug("s-task:%s level:%d checkpoint completed msg sent to %d upstream tasks", pTask->id.idStr,
num); pTask->info.taskLevel, num);
for(int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SRpcMsg* pMsg = taosArrayGet(pTask->pRpcMsgList, i); SRpcMsg* pMsg = taosArrayGet(pTask->pRpcMsgList, i);
tmsgSendRsp(pMsg); tmsgSendRsp(pMsg);
} }
taosArrayClear(pTask->pRpcMsgList); taosArrayClear(pTask->pRpcMsgList);
qDebug("s-task:%s level:%d source checkpoint completed msg sent to upstream", pTask->id.idStr); qDebug("s-task:%s level:%d source checkpoint completed msg sent to upstream", pTask->id.idStr, pTask->info.taskLevel);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -540,7 +540,7 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) { ...@@ -540,7 +540,7 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
tmsgSendRsp(pMsg); tmsgSendRsp(pMsg);
taosArrayClear(pTask->pRpcMsgList); taosArrayClear(pTask->pRpcMsgList);
qDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr); qDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -631,7 +631,7 @@ int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHist ...@@ -631,7 +631,7 @@ int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHist
tEncoderClear(&encoder); tEncoderClear(&encoder);
msg.info.noResp = 1; msg.info.noResp = 1;
initRpcMsg(&msg,TDMT_STREAM_SCAN_HISTORY_FINISH, buf, tlen + sizeof(SMsgHead)); initRpcMsg(&msg, TDMT_STREAM_SCAN_HISTORY_FINISH, buf, tlen + sizeof(SMsgHead));
tmsgSendReq(pEpSet, &msg); tmsgSendReq(pEpSet, &msg);
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
...@@ -797,7 +797,7 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa ...@@ -797,7 +797,7 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo}; SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};
taosArrayPush(pTask->pRpcMsgList, &rspMsg); taosArrayPush(pTask->pRpcMsgList, &rspMsg);
qDebug("s-task:%s add checkpoint rsp msg, total:%d", pTask->id.idStr, (int32_t) taosArrayGetSize(pTask->pRpcMsgList)); qDebug("s-task:%s add checkpoint rsp msg, total:%d", pTask->id.idStr, (int32_t)taosArrayGetSize(pTask->pRpcMsgList));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -386,7 +386,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, ...@@ -386,7 +386,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
0, buf); 0, buf);
// todo handle failure // todo handle failure
memset(buf, 0, len); memset(buf, 0, len);
// qDebug("===stream===put %" PRId64 " to disc, res %d", sKey.key.ts, code); // qDebug("===stream===put %" PRId64 " to disc, res %d", sKey.key.ts, code);
} }
taosMemoryFree(buf); taosMemoryFree(buf);
...@@ -397,8 +397,8 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, ...@@ -397,8 +397,8 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
streamStateClearBatch(batch); streamStateClearBatch(batch);
int64_t elapsed = taosGetTimestampMs() - st; int64_t elapsed = taosGetTimestampMs() - st;
qDebug("%s flush to disk in batch model completed, rows:%d, batch size:%d, elapsed time:%"PRId64"ms", pFileState->id, numOfElems, qDebug("%s flush to disk in batch model completed, rows:%d, batch size:%d, elapsed time:%" PRId64 "ms",
BATCH_LIMIT, elapsed); pFileState->id, numOfElems, BATCH_LIMIT, elapsed);
if (flushState) { if (flushState) {
const char* taskKey = "streamFileState"; const char* taskKey = "streamFileState";
...@@ -488,8 +488,6 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { ...@@ -488,8 +488,6 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
: pFileState->maxTs - pFileState->deleteMark; : pFileState->maxTs - pFileState->deleteMark;
deleteExpiredCheckPoint(pFileState, mark); deleteExpiredCheckPoint(pFileState, mark);
} }
void* pStVal = NULL;
int32_t len = 0;
SWinKey key = {.groupId = 0, .ts = 0}; SWinKey key = {.groupId = 0, .ts = 0};
SStreamStateCur* pCur = streamStateSeekToLast_rocksdb(pFileState->pFileStore, &key); SStreamStateCur* pCur = streamStateSeekToLast_rocksdb(pFileState->pFileStore, &key);
...@@ -511,6 +509,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { ...@@ -511,6 +509,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
taosMemoryFreeClear(pNode); taosMemoryFreeClear(pNode);
break; break;
} }
ASSERT(pVLen == pFileState->rowSize);
memcpy(pNewPos->pRowBuff, pVal, pVLen); memcpy(pNewPos->pRowBuff, pVal, pVLen);
code = tSimpleHashPut(pFileState->rowBuffMap, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES); code = tSimpleHashPut(pFileState->rowBuffMap, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
......
...@@ -49,6 +49,12 @@ if $data02 != 3 then ...@@ -49,6 +49,12 @@ if $data02 != 3 then
goto loop0 goto loop0
endi endi
print waiting for checkpoint generation ......
sleep 25000
print restart taosd
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册