提交 6dc93a8d 编写于 作者: Y yihaoDeng

add stream ver

上级 f7dc9497
...@@ -30,6 +30,7 @@ extern "C" { ...@@ -30,6 +30,7 @@ extern "C" {
typedef struct SStreamTask SStreamTask; typedef struct SStreamTask SStreamTask;
#define SSTREAM_TASK_VER 1
enum { enum {
STREAM_STATUS__NORMAL = 0, STREAM_STATUS__NORMAL = 0,
STREAM_STATUS__STOP, STREAM_STATUS__STOP,
...@@ -266,13 +267,13 @@ typedef struct SCheckpointInfo { ...@@ -266,13 +267,13 @@ typedef struct SCheckpointInfo {
} SCheckpointInfo; } SCheckpointInfo;
typedef struct SStreamStatus { typedef struct SStreamStatus {
int8_t taskStatus; int8_t taskStatus;
int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set
int8_t schedStatus; int8_t schedStatus;
int8_t keepTaskStatus; int8_t keepTaskStatus;
bool transferState; bool transferState;
int8_t timerActive; // timer is active int8_t timerActive; // timer is active
int8_t pauseAllowed; // allowed task status to be set to be paused int8_t pauseAllowed; // allowed task status to be set to be paused
} SStreamStatus; } SStreamStatus;
typedef struct SHistDataRange { typedef struct SHistDataRange {
...@@ -309,6 +310,7 @@ typedef struct { ...@@ -309,6 +310,7 @@ typedef struct {
} STaskTimestamp; } STaskTimestamp;
struct SStreamTask { struct SStreamTask {
int64_t ver;
SStreamId id; SStreamId id;
SSTaskBasicInfo info; SSTaskBasicInfo info;
STaskOutputInfo outputInfo; STaskOutputInfo outputInfo;
...@@ -589,10 +591,10 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus); ...@@ -589,10 +591,10 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus);
bool streamTaskIsIdle(const SStreamTask* pTask); bool streamTaskIsIdle(const SStreamTask* pTask);
int32_t streamTaskEndScanWAL(SStreamTask* pTask); int32_t streamTaskEndScanWAL(SStreamTask* pTask);
SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId); SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize); int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
// recover and fill history // recover and fill history
void streamTaskCheckDownstreamTasks(SStreamTask* pTask); void streamTaskCheckDownstreamTasks(SStreamTask* pTask);
...@@ -628,7 +630,8 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask); ...@@ -628,7 +630,8 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask);
// agg level // agg level
int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask); int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask);
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq *pReq, SRpcHandleInfo* pRpcInfo); int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq,
SRpcHandleInfo* pRpcInfo);
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask); int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask);
// stream task meta // stream task meta
...@@ -642,7 +645,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); ...@@ -642,7 +645,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded); int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded);
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId); int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); // todo remove it int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); // todo remove it
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
...@@ -659,7 +662,6 @@ int32_t streamTaskReleaseState(SStreamTask* pTask); ...@@ -659,7 +662,6 @@ int32_t streamTaskReleaseState(SStreamTask* pTask);
int32_t streamTaskReloadState(SStreamTask* pTask); int32_t streamTaskReloadState(SStreamTask* pTask);
int32_t streamAlignTransferState(SStreamTask* pTask); int32_t streamAlignTransferState(SStreamTask* pTask);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -70,6 +70,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { ...@@ -70,6 +70,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
if (tEncodeI32(pEncoder, innerSz) < 0) return -1; if (tEncodeI32(pEncoder, innerSz) < 0) return -1;
for (int32_t j = 0; j < innerSz; j++) { for (int32_t j = 0; j < innerSz; j++) {
SStreamTask *pTask = taosArrayGetP(pArray, j); SStreamTask *pTask = taosArrayGetP(pArray, j);
pTask->ver = SSTREAM_TASK_VER;
if (tEncodeStreamTask(pEncoder, pTask) < 0) return -1; if (tEncodeStreamTask(pEncoder, pTask) < 0) return -1;
} }
} }
...@@ -154,7 +155,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) { ...@@ -154,7 +155,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
return 0; return 0;
} }
static void* freeStreamTasks(SArray* pTaskLevel) { static void *freeStreamTasks(SArray *pTaskLevel) {
int32_t numOfLevel = taosArrayGetSize(pTaskLevel); int32_t numOfLevel = taosArrayGetSize(pTaskLevel);
for (int32_t i = 0; i < numOfLevel; i++) { for (int32_t i = 0; i < numOfLevel; i++) {
SArray *pLevel = taosArrayGetP(pTaskLevel, i); SArray *pLevel = taosArrayGetP(pTaskLevel, i);
...@@ -192,14 +193,14 @@ SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) { ...@@ -192,14 +193,14 @@ SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp)); SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
if (pVgEpNew == NULL) return NULL; if (pVgEpNew == NULL) return NULL;
pVgEpNew->vgId = pVgEp->vgId; pVgEpNew->vgId = pVgEp->vgId;
// pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg); // pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg);
pVgEpNew->epSet = pVgEp->epSet; pVgEpNew->epSet = pVgEp->epSet;
return pVgEpNew; return pVgEpNew;
} }
void tDeleteSMqVgEp(SMqVgEp *pVgEp) { void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
if (pVgEp) { if (pVgEp) {
// taosMemoryFreeClear(pVgEp->qmsg); // taosMemoryFreeClear(pVgEp->qmsg);
taosMemoryFree(pVgEp); taosMemoryFree(pVgEp);
} }
} }
...@@ -207,14 +208,14 @@ void tDeleteSMqVgEp(SMqVgEp *pVgEp) { ...@@ -207,14 +208,14 @@ void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) { int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pVgEp->vgId); tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
// tlen += taosEncodeString(buf, pVgEp->qmsg); // tlen += taosEncodeString(buf, pVgEp->qmsg);
tlen += taosEncodeSEpSet(buf, &pVgEp->epSet); tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
return tlen; return tlen;
} }
void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) { void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) {
buf = taosDecodeFixedI32(buf, &pVgEp->vgId); buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
if(sver == 1){ if (sver == 1) {
uint64_t size = 0; uint64_t size = 0;
buf = taosDecodeVariantU64(buf, &size); buf = taosDecodeVariantU64(buf, &size);
buf = POINTER_SHIFT(buf, size); buf = POINTER_SHIFT(buf, size);
...@@ -223,7 +224,7 @@ void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) { ...@@ -223,7 +224,7 @@ void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) {
return (void *)buf; return (void *)buf;
} }
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char* cgroup) { SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup) {
SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj)); SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
if (pConsumer == NULL) { if (pConsumer == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -260,12 +261,12 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char* cgroup) { ...@@ -260,12 +261,12 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char* cgroup) {
} }
void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer, bool delete) { void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer, bool delete) {
if(pConsumer == NULL) return; if (pConsumer == NULL) return;
taosArrayDestroyP(pConsumer->currentTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->currentTopics, (FDelete)taosMemoryFree);
taosArrayDestroyP(pConsumer->rebNewTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->rebNewTopics, (FDelete)taosMemoryFree);
taosArrayDestroyP(pConsumer->rebRemovedTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->rebRemovedTopics, (FDelete)taosMemoryFree);
taosArrayDestroyP(pConsumer->assignedTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->assignedTopics, (FDelete)taosMemoryFree);
if(delete){ if (delete) {
taosMemoryFree(pConsumer); taosMemoryFree(pConsumer);
} }
} }
...@@ -392,7 +393,7 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s ...@@ -392,7 +393,7 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s
taosArrayPush(pConsumer->assignedTopics, &topic); taosArrayPush(pConsumer->assignedTopics, &topic);
} }
if(sver > 1){ if (sver > 1) {
buf = taosDecodeFixedI8(buf, &pConsumer->withTbName); buf = taosDecodeFixedI8(buf, &pConsumer->withTbName);
buf = taosDecodeFixedI8(buf, &pConsumer->autoCommit); buf = taosDecodeFixedI8(buf, &pConsumer->autoCommit);
buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval); buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval);
...@@ -401,18 +402,18 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s ...@@ -401,18 +402,18 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s
return (void *)buf; return (void *)buf;
} }
//SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) { // SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) {
// SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp)); // SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp));
// if (pConsumerEpNew == NULL) return NULL; // if (pConsumerEpNew == NULL) return NULL;
// pConsumerEpNew->consumerId = pConsumerEpOld->consumerId; // pConsumerEpNew->consumerId = pConsumerEpOld->consumerId;
// pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, NULL); // pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, NULL);
// return pConsumerEpNew; // return pConsumerEpNew;
//} // }
// //
//void tDeleteSMqConsumerEp(void *data) { // void tDeleteSMqConsumerEp(void *data) {
// SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data; // SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data;
// taosArrayDestroy(pConsumerEp->vgs); // taosArrayDestroy(pConsumerEp->vgs);
//} // }
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) { int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
int32_t tlen = 0; int32_t tlen = 0;
...@@ -420,7 +421,7 @@ int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) { ...@@ -420,7 +421,7 @@ int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp); tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp);
int32_t szVgs = taosArrayGetSize(pConsumerEp->offsetRows); int32_t szVgs = taosArrayGetSize(pConsumerEp->offsetRows);
tlen += taosEncodeFixedI32(buf, szVgs); tlen += taosEncodeFixedI32(buf, szVgs);
for (int32_t j= 0; j < szVgs; ++j) { for (int32_t j = 0; j < szVgs; ++j) {
OffsetRows *offRows = taosArrayGet(pConsumerEp->offsetRows, j); OffsetRows *offRows = taosArrayGet(pConsumerEp->offsetRows, j);
tlen += taosEncodeFixedI32(buf, offRows->vgId); tlen += taosEncodeFixedI32(buf, offRows->vgId);
tlen += taosEncodeFixedI64(buf, offRows->rows); tlen += taosEncodeFixedI64(buf, offRows->rows);
...@@ -434,28 +435,28 @@ int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) { ...@@ -434,28 +435,28 @@ int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
// do nothing // do nothing
} }
} }
//#if 0 // #if 0
// int32_t sz = taosArrayGetSize(pConsumerEp->vgs); // int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
// tlen += taosEncodeFixedI32(buf, sz); // tlen += taosEncodeFixedI32(buf, sz);
// for (int32_t i = 0; i < sz; i++) { // for (int32_t i = 0; i < sz; i++) {
// SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i); // SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
// tlen += tEncodeSMqVgEp(buf, pVgEp); // tlen += tEncodeSMqVgEp(buf, pVgEp);
// } // }
//#endif // #endif
return tlen; return tlen;
} }
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) { void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) {
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver); buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
if (sver > 1){ if (sver > 1) {
int32_t szVgs = 0; int32_t szVgs = 0;
buf = taosDecodeFixedI32(buf, &szVgs); buf = taosDecodeFixedI32(buf, &szVgs);
if(szVgs > 0){ if (szVgs > 0) {
pConsumerEp->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows)); pConsumerEp->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
if (NULL == pConsumerEp->offsetRows) return NULL; if (NULL == pConsumerEp->offsetRows) return NULL;
for (int32_t j= 0; j < szVgs; ++j) { for (int32_t j = 0; j < szVgs; ++j) {
OffsetRows* offRows = taosArrayReserve(pConsumerEp->offsetRows, 1); OffsetRows *offRows = taosArrayReserve(pConsumerEp->offsetRows, 1);
buf = taosDecodeFixedI32(buf, &offRows->vgId); buf = taosDecodeFixedI32(buf, &offRows->vgId);
buf = taosDecodeFixedI64(buf, &offRows->rows); buf = taosDecodeFixedI64(buf, &offRows->rows);
buf = taosDecodeFixedI8(buf, &offRows->offset.type); buf = taosDecodeFixedI8(buf, &offRows->offset.type);
...@@ -470,21 +471,21 @@ void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t s ...@@ -470,21 +471,21 @@ void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t s
} }
} }
} }
//#if 0 // #if 0
// int32_t sz; // int32_t sz;
// buf = taosDecodeFixedI32(buf, &sz); // buf = taosDecodeFixedI32(buf, &sz);
// pConsumerEp->vgs = taosArrayInit(sz, sizeof(void *)); // pConsumerEp->vgs = taosArrayInit(sz, sizeof(void *));
// for (int32_t i = 0; i < sz; i++) { // for (int32_t i = 0; i < sz; i++) {
// SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp)); // SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
// buf = tDecodeSMqVgEp(buf, pVgEp); // buf = tDecodeSMqVgEp(buf, pVgEp);
// taosArrayPush(pConsumerEp->vgs, &pVgEp); // taosArrayPush(pConsumerEp->vgs, &pVgEp);
// } // }
//#endif // #endif
return (void *)buf; return (void *)buf;
} }
SMqSubscribeObj *tNewSubscribeObj(const char* key) { SMqSubscribeObj *tNewSubscribeObj(const char *key) {
SMqSubscribeObj *pSubObj = taosMemoryCalloc(1, sizeof(SMqSubscribeObj)); SMqSubscribeObj *pSubObj = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
if (pSubObj == NULL) { if (pSubObj == NULL) {
return NULL; return NULL;
...@@ -577,7 +578,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) { ...@@ -577,7 +578,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
int32_t szVgs = taosArrayGetSize(pSub->offsetRows); int32_t szVgs = taosArrayGetSize(pSub->offsetRows);
tlen += taosEncodeFixedI32(buf, szVgs); tlen += taosEncodeFixedI32(buf, szVgs);
for (int32_t j= 0; j < szVgs; ++j) { for (int32_t j = 0; j < szVgs; ++j) {
OffsetRows *offRows = taosArrayGet(pSub->offsetRows, j); OffsetRows *offRows = taosArrayGet(pSub->offsetRows, j);
tlen += taosEncodeFixedI32(buf, offRows->vgId); tlen += taosEncodeFixedI32(buf, offRows->vgId);
tlen += taosEncodeFixedI64(buf, offRows->rows); tlen += taosEncodeFixedI64(buf, offRows->rows);
...@@ -617,14 +618,14 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) { ...@@ -617,14 +618,14 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver); buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
buf = taosDecodeStringTo(buf, pSub->dbName); buf = taosDecodeStringTo(buf, pSub->dbName);
if (sver > 1){ if (sver > 1) {
int32_t szVgs = 0; int32_t szVgs = 0;
buf = taosDecodeFixedI32(buf, &szVgs); buf = taosDecodeFixedI32(buf, &szVgs);
if(szVgs > 0){ if (szVgs > 0) {
pSub->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows)); pSub->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
if (NULL == pSub->offsetRows) return NULL; if (NULL == pSub->offsetRows) return NULL;
for (int32_t j= 0; j < szVgs; ++j) { for (int32_t j = 0; j < szVgs; ++j) {
OffsetRows* offRows = taosArrayReserve(pSub->offsetRows, 1); OffsetRows *offRows = taosArrayReserve(pSub->offsetRows, 1);
buf = taosDecodeFixedI32(buf, &offRows->vgId); buf = taosDecodeFixedI32(buf, &offRows->vgId);
buf = taosDecodeFixedI64(buf, &offRows->rows); buf = taosDecodeFixedI64(buf, &offRows->rows);
buf = taosDecodeFixedI8(buf, &offRows->offset.type); buf = taosDecodeFixedI8(buf, &offRows->offset.type);
...@@ -639,71 +640,71 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) { ...@@ -639,71 +640,71 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
} }
} }
buf = taosDecodeString(buf, &pSub->qmsg); buf = taosDecodeString(buf, &pSub->qmsg);
}else{ } else {
pSub->qmsg = taosStrdup(""); pSub->qmsg = taosStrdup("");
} }
return (void *)buf; return (void *)buf;
} }
//SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) { // SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
// SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry)); // SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
// if (pEntryNew == NULL) return NULL; // if (pEntryNew == NULL) return NULL;
// pEntryNew->epoch = pEntry->epoch; // pEntryNew->epoch = pEntry->epoch;
// pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp); // pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
// return pEntryNew; // return pEntryNew;
//} // }
// //
//void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) { // void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
// taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp); // taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp);
//} // }
//int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) { // int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) {
// int32_t tlen = 0; // int32_t tlen = 0;
// tlen += taosEncodeFixedI32(buf, pEntry->epoch); // tlen += taosEncodeFixedI32(buf, pEntry->epoch);
// tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry); // tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry);
// return tlen; // return tlen;
//} // }
// //
//void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) { // void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) {
// buf = taosDecodeFixedI32(buf, &pEntry->epoch); // buf = taosDecodeFixedI32(buf, &pEntry->epoch);
// buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry)); // buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
// return (void *)buf; // return (void *)buf;
//} // }
//SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) { // SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
// SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj)); // SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj));
// if (pLogNew == NULL) return pLogNew; // if (pLogNew == NULL) return pLogNew;
// memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN); // memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN);
// pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp); // pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
// return pLogNew; // return pLogNew;
//} // }
// //
//void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) { // void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
// taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp); // taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp);
//} // }
//int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) { // int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) {
// int32_t tlen = 0; // int32_t tlen = 0;
// tlen += taosEncodeString(buf, pLog->key); // tlen += taosEncodeString(buf, pLog->key);
// tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry); // tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry);
// return tlen; // return tlen;
//} // }
// //
//void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) { // void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
// buf = taosDecodeStringTo(buf, pLog->key); // buf = taosDecodeStringTo(buf, pLog->key);
// buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry)); // buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
// return (void *)buf; // return (void *)buf;
//} // }
// //
//int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) { // int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
// int32_t tlen = 0; // int32_t tlen = 0;
// tlen += taosEncodeString(buf, pOffset->key); // tlen += taosEncodeString(buf, pOffset->key);
// tlen += taosEncodeFixedI64(buf, pOffset->offset); // tlen += taosEncodeFixedI64(buf, pOffset->offset);
// return tlen; // return tlen;
//} // }
// //
//void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) { // void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
// buf = taosDecodeStringTo(buf, pOffset->key); // buf = taosDecodeStringTo(buf, pOffset->key);
// buf = taosDecodeFixedI64(buf, &pOffset->offset); // buf = taosDecodeFixedI64(buf, &pOffset->offset);
// return buf; // return buf;
//} // }
...@@ -140,7 +140,11 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) { ...@@ -140,7 +140,11 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
void *buf = NULL; void *buf = NULL;
int8_t sver = 0; int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto STREAM_DECODE_OVER; if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
mError("stream read invalid data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream manually",
tsDataDir);
goto STREAM_DECODE_OVER;
}
if (sver != MND_STREAM_VER_NUMBER) { if (sver != MND_STREAM_VER_NUMBER) {
terrno = 0; terrno = 0;
...@@ -429,9 +433,11 @@ FAIL: ...@@ -429,9 +433,11 @@ FAIL:
return 0; return 0;
} }
int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) { int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
SEncoder encoder; SEncoder encoder;
tEncoderInit(&encoder, NULL, 0); tEncoderInit(&encoder, NULL, 0);
pTask->ver = SSTREAM_TASK_VER;
tEncodeStreamTask(&encoder, pTask); tEncodeStreamTask(&encoder, pTask);
int32_t size = encoder.pos; int32_t size = encoder.pos;
......
...@@ -218,11 +218,11 @@ _EXIT: ...@@ -218,11 +218,11 @@ _EXIT:
} }
void streamBackendCleanup(void* arg) { void streamBackendCleanup(void* arg) {
SBackendWrapper* pHandle = (SBackendWrapper*)arg; SBackendWrapper* pHandle = (SBackendWrapper*)arg;
RocksdbCfInst** pIter = (RocksdbCfInst**)taosHashIterate(pHandle->cfInst, NULL); void* pIter = taosHashIterate(pHandle->cfInst, NULL);
while (pIter != NULL) { while (pIter != NULL) {
RocksdbCfInst* inst = *pIter; RocksdbCfInst* inst = *(RocksdbCfInst**)pIter;
destroyRocksdbCfInst(inst); destroyRocksdbCfInst(inst);
taosHashIterate(pHandle->cfInst, pIter); pIter = taosHashIterate(pHandle->cfInst, pIter);
} }
taosHashCleanup(pHandle->cfInst); taosHashCleanup(pHandle->cfInst);
...@@ -833,7 +833,10 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t ...@@ -833,7 +833,10 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
qDebug("succ to open rocksdb cf"); qDebug("succ to open rocksdb cf");
} }
// close default cf // close default cf
if (((rocksdb_column_family_handle_t**)cfHandle)[0] != 0) rocksdb_column_family_handle_destroy(cfHandle[0]); if (((rocksdb_column_family_handle_t**)cfHandle)[0] != 0) {
rocksdb_column_family_handle_destroy(cfHandle[0]);
cfHandle[0] = NULL;
}
rocksdb_options_destroy(cfOpts[0]); rocksdb_options_destroy(cfOpts[0]);
handle->db = db; handle->db = db;
......
...@@ -202,6 +202,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { ...@@ -202,6 +202,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
void* buf = NULL; void* buf = NULL;
int32_t len; int32_t len;
int32_t code; int32_t code;
pTask->ver = SSTREAM_TASK_VER;
tEncodeSize(tEncodeStreamTask, pTask, len, code); tEncodeSize(tEncodeStreamTask, pTask, len, code);
if (code < 0) { if (code < 0) {
return -1; return -1;
...@@ -331,7 +332,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) { ...@@ -331,7 +332,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) {
qDebug("s-task:0x%x set task status:%s", taskId, streamGetTaskStatusStr(TASK_STATUS__DROPPING)); qDebug("s-task:0x%x set task status:%s", taskId, streamGetTaskStatusStr(TASK_STATUS__DROPPING));
while(1) { while (1) {
taosRLockLatch(&pMeta->lock); taosRLockLatch(&pMeta->lock);
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
...@@ -443,10 +444,16 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { ...@@ -443,10 +444,16 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
taosArrayDestroy(pRecycleList); taosArrayDestroy(pRecycleList);
return -1; return -1;
} }
tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeStreamTask(&decoder, pTask); if (tDecodeStreamTask(&decoder, pTask) < 0) {
tDecoderClear(&decoder); tDecoderClear(&decoder);
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
taosArrayDestroy(pRecycleList);
tFreeStreamTask(pTask);
return -1;
}
if (pTask->status.taskStatus == TASK_STATUS__DROPPING) { if (pTask->status.taskStatus == TASK_STATUS__DROPPING) {
int32_t taskId = pTask->id.taskId; int32_t taskId = pTask->id.taskId;
...@@ -500,13 +507,13 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { ...@@ -500,13 +507,13 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
} }
if (taosArrayGetSize(pRecycleList) > 0) { if (taosArrayGetSize(pRecycleList) > 0) {
for(int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) {
int32_t taskId = *(int32_t*) taosArrayGet(pRecycleList, i); int32_t taskId = *(int32_t*)taosArrayGet(pRecycleList, i);
streamMetaRemoveTask(pMeta, taskId); streamMetaRemoveTask(pMeta, taskId);
} }
} }
qDebug("vgId:%d load %d task from disk", pMeta->vgId, (int32_t) taosArrayGetSize(pMeta->pTaskList)); qDebug("vgId:%d load %d task from disk", pMeta->vgId, (int32_t)taosArrayGetSize(pMeta->pTaskList));
taosArrayDestroy(pRecycleList); taosArrayDestroy(pRecycleList);
return 0; return 0;
} }
...@@ -26,13 +26,14 @@ static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) { ...@@ -26,13 +26,14 @@ static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
return 0; return 0;
} }
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam, SArray* pTaskList) { SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam,
SArray* pTaskList) {
SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask)); SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) { if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pTask->ver = SSTREAM_TASK_VER;
pTask->id.taskId = tGenIdPI32(); pTask->id.taskId = tGenIdPI32();
pTask->id.streamId = streamId; pTask->id.streamId = streamId;
pTask->info.taskLevel = taskLevel; pTask->info.taskLevel = taskLevel;
...@@ -72,6 +73,7 @@ int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) { ...@@ -72,6 +73,7 @@ int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) {
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->ver) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->id.streamId) < 0) return -1; if (tEncodeI64(pEncoder, pTask->id.streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->info.totalLevel) < 0) return -1; if (tEncodeI32(pEncoder, pTask->info.totalLevel) < 0) return -1;
...@@ -135,6 +137,9 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { ...@@ -135,6 +137,9 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tStartDecode(pDecoder) < 0) return -1; if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1;
if (pTask->ver != SSTREAM_TASK_VER) return -1;
if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->info.totalLevel) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->info.totalLevel) < 0) return -1;
...@@ -163,7 +168,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { ...@@ -163,7 +168,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeI64(pDecoder, &pTask->dataRange.window.skey)) return -1; if (tDecodeI64(pDecoder, &pTask->dataRange.window.skey)) return -1;
if (tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)) return -1; if (tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)) return -1;
int32_t epSz; int32_t epSz = -1;
if (tDecodeI32(pDecoder, &epSz) < 0) return -1; if (tDecodeI32(pDecoder, &epSz) < 0) return -1;
pTask->pUpstreamEpInfoList = taosArrayInit(epSz, POINTER_BYTES); pTask->pUpstreamEpInfoList = taosArrayInit(epSz, POINTER_BYTES);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册