提交 daba88d2 编写于 作者: S slzhou

Merge branch '3.0' of github.com:taosdata/TDengine into szhou/udf/udf-outside-tdengine

......@@ -25,7 +25,7 @@ extern "C" {
// clang-format off
#define TAOS_DEF_ERROR_CODE(mod, code) ((int32_t)((0x80000000 | ((mod)<<16) | (code))))
#define TAOS_SYSTEM_ERROR(code) (0x80ff0000 | (code))
#define TAOS_SUCCEEDED(err) ((err) >= 0)
#define TAOS_FAILED(err) ((err) < 0)
......@@ -35,7 +35,7 @@ const char* terrstr();
int32_t* taosGetErrno();
#define terrno (*taosGetErrno())
#define TSDB_CODE_SUCCESS 0
#define TSDB_CODE_FAILED -1 // unknown or needn't tell detail error
......
......@@ -191,6 +191,11 @@ bool getUniqueFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool uniqueFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t uniqueFunction(SqlFunctionCtx *pCtx);
bool getModeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool modeFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t modeFunction(SqlFunctionCtx *pCtx);
int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool twaFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t twaFunction(SqlFunctionCtx *pCtx);
......
......@@ -1045,20 +1045,28 @@ static int32_t translateFirstLastMerge(SFunctionNode* pFunc, char* pErrBuf, int3
return translateFirstLastImpl(pFunc, pErrBuf, len, false);
}
static int32_t translateUnique(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
static int32_t translateUniqueMode(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isUnique) {
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
if (!nodesExprHasColumn(pPara)) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "The parameters of UNIQUE must contain columns");
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "The parameters of %s must contain columns", isUnique ? "UNIQUE" : "MODE");
}
pFunc->node.resType = ((SExprNode*)pPara)->resType;
return TSDB_CODE_SUCCESS;
}
static int32_t translateUnique(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateUniqueMode(pFunc, pErrBuf, len, true);
}
static int32_t translateMode(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateUniqueMode(pFunc, pErrBuf, len, false);
}
static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
if (numOfParams == 0 || numOfParams > 2) {
......@@ -2109,7 +2117,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "unique",
.type = FUNCTION_TYPE_UNIQUE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC |
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC |
FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_WINDOW_FUNC | FUNC_MGT_FORBID_GROUP_BY_FUNC,
.translateFunc = translateUnique,
.getEnvFunc = getUniqueFuncEnv,
......@@ -2117,6 +2125,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = uniqueFunction,
.finalizeFunc = NULL
},
{
.name = "mode",
.type = FUNCTION_TYPE_MODE,
.classification = FUNC_MGT_AGG_FUNC,
.translateFunc = translateMode,
.getEnvFunc = getModeFuncEnv,
.initFunc = modeFunctionSetup,
.processFunc = modeFunction,
.finalizeFunc = modeFinalize,
},
{
.name = "abs",
.type = FUNCTION_TYPE_ABS,
......
......@@ -32,6 +32,7 @@
#define TAIL_MAX_OFFSET 100
#define UNIQUE_MAX_RESULT_SIZE (1024 * 1024 * 10)
#define MODE_MAX_RESULT_SIZE UNIQUE_MAX_RESULT_SIZE
#define HLL_BUCKET_BITS 14 // The bits of the bucket
#define HLL_DATA_BITS (64 - HLL_BUCKET_BITS)
......@@ -246,6 +247,19 @@ typedef struct SUniqueInfo {
char pItems[];
} SUniqueInfo;
typedef struct SModeItem {
int64_t count;
char data[];
} SModeItem;
typedef struct SModeInfo {
int32_t numOfPoints;
uint8_t colType;
int16_t colBytes;
SHashObj* pHash;
char pItems[];
} SModeInfo;
typedef struct SDerivInfo {
double prevValue; // previous value
TSKEY prevTs; // previous timestamp
......@@ -4694,21 +4708,100 @@ int32_t uniqueFunction(SqlFunctionCtx* pCtx) {
return pInfo->numOfPoints;
}
int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
bool getModeFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SModeInfo) + MODE_MAX_RESULT_SIZE;
return true;
}
bool modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
if (!functionSetup(pCtx, pResInfo)) {
return false;
}
SModeInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
pInfo->numOfPoints = 0;
pInfo->colType = pCtx->resDataInfo.type;
pInfo->colBytes = pCtx->resDataInfo.bytes;
if (pInfo->pHash != NULL) {
taosHashClear(pInfo->pHash);
} else {
pInfo->pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
}
return true;
}
static void doModeAdd(SModeInfo* pInfo, char* data, bool isNull) {
// ignore null elements
if (isNull) {
return;
}
int32_t hashKeyBytes = IS_VAR_DATA_TYPE(pInfo->colType) ? varDataTLen(data) : pInfo->colBytes;
SModeItem** pHashItem = taosHashGet(pInfo->pHash, data, hashKeyBytes);
if (pHashItem == NULL) {
int32_t size = sizeof(SModeItem) + pInfo->colBytes;
SModeItem* pItem = (SModeItem*)(pInfo->pItems + pInfo->numOfPoints * size);
memcpy(pItem->data, data, pInfo->colBytes);
pItem->count += 1;
taosHashPut(pInfo->pHash, data, hashKeyBytes, &pItem, sizeof(SModeItem*));
pInfo->numOfPoints++;
} else {
(*pHashItem)->count += 1;
}
}
int32_t modeFunction(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
SModeInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
int32_t startOffset = pCtx->offset;
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
char* data = colDataGetData(pInputCol, i);
doModeAdd(pInfo, data, colDataIsNull_s(pInputCol, i));
if (sizeof(SModeInfo) + pInfo->numOfPoints * (sizeof(SModeItem) + pInfo->colBytes) >= MODE_MAX_RESULT_SIZE) {
taosHashCleanup(pInfo->pHash);
return TSDB_CODE_OUT_OF_MEMORY;
}
}
SET_VAL(pResInfo, 1, 1);
return TSDB_CODE_SUCCESS;
}
int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SModeInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
int32_t currentRow = pBlock->info.rows;
for (int32_t i = 0; i < pResInfo->numOfRes; ++i) {
SUniqueItem* pItem = (SUniqueItem*)(pInfo->pItems + i * (sizeof(SUniqueItem) + pInfo->colBytes));
colDataAppend(pCol, i, pItem->data, false);
// TODO: handle ts output
int32_t resIndex;
int32_t maxCount = 0;
for (int32_t i = 0; i < pInfo->numOfPoints; ++i) {
SModeItem* pItem = (SModeItem*)(pInfo->pItems + i * (sizeof(SModeItem) + pInfo->colBytes));
if (pItem->count > maxCount) {
maxCount = pItem->count;
resIndex = i;
} else if (pItem->count == maxCount) {
resIndex = -1;
}
}
SModeItem* pResItem = (SModeItem*)(pInfo->pItems + resIndex * (sizeof(SModeItem) + pInfo->colBytes));
colDataAppend(pCol, currentRow, pResItem->data, (resIndex == -1) ? true : false);
return pResInfo->numOfRes;
}
bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(STwaInfo);
return true;
......
......@@ -57,7 +57,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
void snapshotSenderDestroy(SSyncSnapshotSender *pSender);
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender);
void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void *pReader);
void snapshotSenderStop(SSyncSnapshotSender *pSender);
void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish);
int32_t snapshotSend(SSyncSnapshotSender *pSender);
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
......@@ -82,7 +82,7 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg);
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply);
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
......
......@@ -240,7 +240,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId));
ASSERT(pSender != NULL);
SSnapshot snapshot;
SSnapshot snapshot = {.data = NULL,
.lastApplyIndex = SYNC_INDEX_INVALID,
.lastApplyTerm = 0,
.lastConfigIndex = SYNC_INDEX_INVALID};
void* pReader = NULL;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot, NULL, &pReader);
if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN && nextIndex <= snapshot.lastApplyIndex + 1 &&
......@@ -249,10 +252,6 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
ASSERT(pReader != NULL);
snapshotSenderStart(pSender, snapshot, pReader);
char* eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender start");
syncNodeEventLog(ths, eventLog);
taosMemoryFree(eventLog);
} else {
// no snapshot
if (pReader != NULL) {
......@@ -260,23 +259,6 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
}
}
/*
bool hasSnapshot = syncNodeHasSnapshot(ths);
SSnapshot snapshot;
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
// start sending snapshot first time
// start here, stop by receiver
if (hasSnapshot && nextIndex <= snapshot.lastApplyIndex + 1 && !snapshotSenderIsStart(pSender) &&
pMsg->privateTerm < pSender->privateTerm) {
snapshotSenderStart(pSender);
char* eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender start");
syncNodeEventLog(ths, eventLog);
taosMemoryFree(eventLog);
}
*/
SyncIndex sentryIndex = pSender->snapshot.lastApplyIndex + 1;
// update nextIndex to sentryIndex
......@@ -300,5 +282,5 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
syncIndexMgrLog2("recv sync-append-entries-reply, after pNextIndex:", ths->pNextIndex);
syncIndexMgrLog2("recv sync-append-entries-reply, after pMatchIndex:", ths->pMatchIndex);
return ret;
return 0;
}
\ No newline at end of file
......@@ -24,6 +24,7 @@
//----------------------------------
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm,
SyncSnapshotSend *pBeginMsg);
static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg);
//----------------------------------
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
......@@ -50,7 +51,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot));
pSender->finish = false;
} else {
sError("vgId:%d cannot create snapshot sender", pSyncNode->vgId);
sError("vgId:%d, cannot create snapshot sender", pSyncNode->vgId);
}
return pSender;
......@@ -127,7 +128,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void
syncEntryDestory(pEntry);
} else {
if (pSender->snapshot.lastConfigIndex == pSender->pSyncNode->pRaftCfg->lastConfigIndex) {
sTrace("vgId:%d sync sender get cfg from local", pSender->pSyncNode->vgId);
sTrace("vgId:%d, sync sender get cfg from local", pSender->pSyncNode->vgId);
pSender->lastConfig = pSender->pSyncNode->pRaftCfg->cfg;
getLastConfig = true;
}
......@@ -176,13 +177,13 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void
// event log
do {
char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender send");
char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender start");
syncNodeEventLog(pSender->pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
}
void snapshotSenderStop(SSyncSnapshotSender *pSender) {
void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
// close reader
if (pSender->pReader != NULL) {
int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
......@@ -199,6 +200,14 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender) {
// update flag
pSender->start = false;
pSender->finish = finish;
// event log
do {
char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender stop");
syncNodeEventLog(pSender->pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
}
// when sender receive ack, call this function to send msg from seq
......@@ -386,7 +395,7 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from
pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
} else {
sError("vgId:%d cannot create snapshot receiver", pSyncNode->vgId);
sError("vgId:%d, cannot create snapshot receiver", pSyncNode->vgId);
}
return pReceiver;
......@@ -409,9 +418,9 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }
// static do start
// static do start by privateTerm, pBeginMsg
// receive first snapshot data
// privateTerm, pBeginMsg
// write first block data
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm,
SyncSnapshotSend *pBeginMsg) {
// update state
......@@ -419,6 +428,7 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm p
pReceiver->privateTerm = privateTerm;
pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
pReceiver->fromId = pBeginMsg->srcId;
pReceiver->start = true;
// update snapshot
pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
......@@ -429,8 +439,16 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm p
ASSERT(pReceiver->pWriter == NULL);
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter));
ASSERT(ret == 0);
// event log
do {
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver start");
syncNodeEventLog(pReceiver->pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
}
// force stop
static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
// force close, abandon incomplete data
if (pReceiver->pWriter != NULL) {
......@@ -441,33 +459,35 @@ static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
}
pReceiver->start = false;
// event log
do {
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver force stop");
syncNodeEventLog(pReceiver->pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
}
// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
// if already start, force close, start again
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg) {
if (!snapshotReceiverIsStart(pReceiver)) {
// start
// first start
snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg);
pReceiver->start = true;
} else {
// already start
sInfo("snapshot recv, receiver already start");
sInfo("vgId:%d, snapshot recv, receiver already start", pReceiver->pSyncNode->vgId);
// force close, abandon incomplete data
int32_t ret =
pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
ASSERT(ret == 0);
pReceiver->pWriter = NULL;
snapshotReceiverForceStop(pReceiver);
// start again
snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg);
pReceiver->start = true;
}
}
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) {
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
if (pReceiver->pWriter != NULL) {
int32_t ret =
pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
......@@ -477,8 +497,69 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) {
pReceiver->start = false;
if (apply) {
// ++(pReceiver->privateTerm);
// event log
do {
SSnapshot snapshot;
pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver stop");
syncNodeEventLog(pReceiver->pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
}
static void snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
ASSERT(pMsg->seq == SYNC_SNAPSHOT_SEQ_END);
if (pReceiver->pWriter != NULL) {
int32_t code = 0;
if (pMsg->dataLen > 0) {
code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data,
pMsg->dataLen);
ASSERT(code == 0);
}
code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true);
ASSERT(code == 0);
pReceiver->pWriter = NULL;
}
pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
// update commit index
if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
}
// reset wal
pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
// event log
do {
SSnapshot snapshot;
pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver got last data, finish, apply snapshot");
syncNodeEventLog(pReceiver->pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
}
static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
ASSERT(pMsg->seq == pReceiver->ack + 1);
if (pReceiver->pWriter != NULL) {
if (pMsg->dataLen > 0) {
int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
pMsg->data, pMsg->dataLen);
ASSERT(code == 0);
}
pReceiver->ack = pMsg->seq;
// event log
do {
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver receiving");
syncNodeEventLog(pReceiver->pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
}
}
......@@ -560,33 +641,20 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
// get receiver
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
bool needRsp = false;
int32_t writeCode = 0;
// state, term, seq/ack
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
// begin
// begin, no data
snapshotReceiverStart(pReceiver, pMsg->privateTerm, pMsg);
pReceiver->ack = pMsg->seq;
needRsp = true;
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver begin");
syncNodeEventLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
// end, finish FSM
writeCode = pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
ASSERT(writeCode == 0);
pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true);
if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
}
// pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1);
pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pSyncNode->pLogStore, pMsg->lastIndex);
snapshotReceiverFinish(pReceiver, pMsg);
snapshotReceiverStop(pReceiver);
needRsp = true;
// maybe update lastconfig
if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
......@@ -601,89 +669,74 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
syncNodeDoConfigChange(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex);
}
SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
do {
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver finish, apply snapshot");
syncNodeEventLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
pReceiver->pWriter = NULL;
snapshotReceiverStop(pReceiver, true);
pReceiver->ack = pMsg->seq;
needRsp = true;
do {
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver stop");
syncNodeEventLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, false);
snapshotReceiverStop(pReceiver, false);
// force close
snapshotReceiverForceStop(pReceiver);
needRsp = false;
do {
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver force close");
syncNodeEventLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
// transfering
if (pMsg->seq == pReceiver->ack + 1) {
writeCode =
pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
ASSERT(writeCode == 0);
pReceiver->ack = pMsg->seq;
snapshotReceiverGotData(pReceiver, pMsg);
}
needRsp = true;
do {
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver receiving");
syncNodeEventLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
} else {
ASSERT(0);
}
// send ack
if (needRsp) {
// build msg
SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId);
pRspMsg->srcId = pSyncNode->myRaftId;
pRspMsg->destId = pMsg->srcId;
pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->ack = pReceiver->ack;
pRspMsg->code = writeCode;
pRspMsg->privateTerm = pReceiver->privateTerm;
pRspMsg->ack = pReceiver->ack; // receiver maybe already closed
pRspMsg->code = 0;
pRspMsg->privateTerm = pReceiver->privateTerm; // receiver maybe already closed
// send msg
SRpcMsg rpcMsg;
syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
syncSnapshotRspDestroy(pRspMsg);
}
} else {
// error log
do {
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver term not equal");
syncNodeErrorLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
}
} else {
syncNodeLog2("syncNodeOnSnapshotSendCb not follower", pSyncNode);
// error log
do {
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver not follower");
syncNodeErrorLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
}
return 0;
}
static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
ASSERT(pMsg->ack == pSender->seq);
pSender->ack = pMsg->ack;
++(pSender->seq);
}
// sender receives ack, set seq = ack + 1, send msg from seq
// if ack == SYNC_SNAPSHOT_SEQ_END, stop sender
int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
// if already drop replica, do not process
if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
sInfo("recv SyncSnapshotRsp maybe replica already dropped");
return 0;
sError("vgId:%d, recv sync-snapshot-rsp, maybe replica already dropped", pSyncNode->vgId);
return -1;
}
// get sender
......@@ -695,27 +748,49 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
// receiver ack is finish, close sender
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
pSender->finish = true;
snapshotSenderStop(pSender);
snapshotSenderStop(pSender, true);
return 0;
}
// send next msg
if (pMsg->ack == pSender->seq) {
// update sender ack
pSender->ack = pMsg->ack;
(pSender->seq)++;
snapshotSenderUpdateProgress(pSender, pMsg);
snapshotSend(pSender);
} else if (pMsg->ack == pSender->seq - 1) {
snapshotReSend(pSender);
} else {
ASSERT(0);
do {
char logBuf[64];
snprintf(logBuf, sizeof(logBuf), "error ack, recv ack:%d, my seq:%d", pMsg->ack, pSender->seq);
char *eventLog = snapshotSender2SimpleStr(pSender, logBuf);
syncNodeErrorLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
return -1;
}
} else {
// error log
do {
char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender term not equal");
syncNodeErrorLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
return -1;
}
} else {
syncNodeLog2("syncNodeOnSnapshotRspCb not leader", pSyncNode);
// error log
do {
char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender not leader");
syncNodeErrorLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
} while (0);
return -1;
}
return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册