提交 051217da 编写于 作者: H Hongze Cheng

Merge branch 'feature/tdb' of https://github.com/taosdata/TDengine into feature/meta

...@@ -192,7 +192,6 @@ enum { ...@@ -192,7 +192,6 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp) TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_EXEC, "vnode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_PIPE_EXEC, "vnode-task-pipe-exec", SStreamTaskExecReq, SStreamTaskExecRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_PIPE_EXEC, "vnode-task-pipe-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_MERGE_EXEC, "vnode-task-merge-exec", SStreamTaskExecReq, SStreamTaskExecRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_MERGE_EXEC, "vnode-task-merge-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_WRITE_EXEC, "vnode-task-write-exec", SStreamTaskExecReq, SStreamTaskExecRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_WRITE_EXEC, "vnode-task-write-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
......
...@@ -82,8 +82,12 @@ typedef struct { ...@@ -82,8 +82,12 @@ typedef struct {
SHashObj* pHash; // groupId to tbuid SHashObj* pHash; // groupId to tbuid
} STaskSinkTb; } STaskSinkTb;
typedef void FSmaHandle(void* vnode, int64_t smaId, const SArray* data);
typedef struct { typedef struct {
int8_t reserved; int64_t smaId;
// following are not applicable to encoder and decoder
FSmaHandle* smaHandle;
} STaskSinkSma; } STaskSinkSma;
typedef struct { typedef struct {
...@@ -156,7 +160,8 @@ typedef struct { ...@@ -156,7 +160,8 @@ typedef struct {
STaskDispatcherShuffle shuffleDispatcher; STaskDispatcherShuffle shuffleDispatcher;
}; };
// state storage // application storage
void* ahandle;
} SStreamTask; } SStreamTask;
......
...@@ -877,7 +877,7 @@ WRITE_QUEUE_FAIL: ...@@ -877,7 +877,7 @@ WRITE_QUEUE_FAIL:
} }
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
printf("call update ep %d\n", epoch); /*printf("call update ep %d\n", epoch);*/
bool set = false; bool set = false;
int32_t sz = taosArrayGetSize(pRsp->topics); int32_t sz = taosArrayGetSize(pRsp->topics);
SArray* newTopics = taosArrayInit(sz, sizeof(SMqClientTopic)); SArray* newTopics = taosArrayInit(sz, sizeof(SMqClientTopic));
......
...@@ -279,7 +279,6 @@ void vmInitMsgHandles(SMgmtWrapper *pWrapper) { ...@@ -279,7 +279,6 @@ void vmInitMsgHandles(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_EXEC, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_PIPE_EXEC, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_TASK_PIPE_EXEC, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, (NodeMsgFp)vmProcessMergeMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, (NodeMsgFp)vmProcessMergeMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
......
...@@ -27,7 +27,7 @@ void mndCleanupScheduler(SMnode* pMnode); ...@@ -27,7 +27,7 @@ void mndCleanupScheduler(SMnode* pMnode);
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub); int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub);
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream); int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -31,7 +31,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); ...@@ -31,7 +31,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans); int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans, int64_t smaId);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -36,11 +36,11 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { ...@@ -36,11 +36,11 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
if (tEncodeI32(pEncoder, sz) < 0) return -1; if (tEncodeI32(pEncoder, sz) < 0) return -1;
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SArray *pArray = taosArrayGet(pObj->tasks, i); SArray *pArray = taosArrayGetP(pObj->tasks, i);
int32_t innerSz = taosArrayGetSize(pArray); int32_t innerSz = taosArrayGetSize(pArray);
if (tEncodeI32(pEncoder, innerSz) < 0) return -1; if (tEncodeI32(pEncoder, innerSz) < 0) return -1;
for (int32_t j = 0; j < innerSz; j++) { for (int32_t j = 0; j < innerSz; j++) {
SStreamTask *pTask = taosArrayGet(pArray, j); SStreamTask *pTask = taosArrayGetP(pArray, j);
if (tEncodeSStreamTask(pEncoder, pTask) < 0) return -1; if (tEncodeSStreamTask(pEncoder, pTask) < 0) return -1;
} }
} }
...@@ -76,17 +76,18 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { ...@@ -76,17 +76,18 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
int32_t sz; int32_t sz;
if (tDecodeI32(pDecoder, &sz) < 0) return -1; if (tDecodeI32(pDecoder, &sz) < 0) return -1;
if (sz != 0) { if (sz != 0) {
pObj->tasks = taosArrayInit(sz, sizeof(SArray)); pObj->tasks = taosArrayInit(sz, sizeof(void *));
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
int32_t innerSz; int32_t innerSz;
if (tDecodeI32(pDecoder, &innerSz) < 0) return -1; if (tDecodeI32(pDecoder, &innerSz) < 0) return -1;
SArray *pArray = taosArrayInit(innerSz, sizeof(SStreamTask)); SArray *pArray = taosArrayInit(innerSz, sizeof(void *));
for (int32_t j = 0; j < innerSz; j++) { for (int32_t j = 0; j < innerSz; j++) {
SStreamTask task; SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (tDecodeSStreamTask(pDecoder, &task) < 0) return -1; if (pTask == NULL) return -1;
taosArrayPush(pArray, &task); if (tDecodeSStreamTask(pDecoder, pTask) < 0) return -1;
taosArrayPush(pArray, &pTask);
} }
taosArrayPush(pObj->tasks, pArray); taosArrayPush(pObj->tasks, &pArray);
} }
} }
......
...@@ -119,7 +119,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) { ...@@ -119,7 +119,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
return pVgroup; return pVgroup;
} }
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId) {
SSdb* pSdb = pMnode->pSdb; SSdb* pSdb = pMnode->pSdb;
SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan); SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
if (pPlan == NULL) { if (pPlan == NULL) {
...@@ -164,6 +164,10 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -164,6 +164,10 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// only for inplace // only for inplace
pTask->sinkType = TASK_SINK__SHOW; pTask->sinkType = TASK_SINK__SHOW;
pTask->showSink.reserved = 0; pTask->showSink.reserved = 0;
if (smaId != -1) {
pTask->sinkType = TASK_SINK__SMA;
pTask->smaSink.smaId = smaId;
}
} else { } else {
pTask->sinkType = TASK_SINK__NONE; pTask->sinkType = TASK_SINK__NONE;
} }
......
...@@ -69,7 +69,8 @@ void mndCleanupSma(SMnode *pMnode) {} ...@@ -69,7 +69,8 @@ void mndCleanupSma(SMnode *pMnode) {}
static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) { static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t size = sizeof(SSmaObj) + pSma->exprLen + pSma->tagsFilterLen + pSma->sqlLen + pSma->astLen + TSDB_SMA_RESERVE_SIZE; int32_t size =
sizeof(SSmaObj) + pSma->exprLen + pSma->tagsFilterLen + pSma->sqlLen + pSma->astLen + TSDB_SMA_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_SMA, TSDB_SMA_VER_NUMBER, size); SSdbRaw *pRaw = sdbAllocRaw(SDB_SMA, TSDB_SMA_VER_NUMBER, size);
if (pRaw == NULL) goto _OVER; if (pRaw == NULL) goto _OVER;
...@@ -427,7 +428,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre ...@@ -427,7 +428,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER;
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) goto _OVER; if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans, smaObj.uid) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0; code = 0;
......
...@@ -246,7 +246,7 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) { ...@@ -246,7 +246,7 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) {
return code; return code;
} }
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) { int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans, int64_t smaId) {
SNode *pAst = NULL; SNode *pAst = NULL;
if (nodesStringToNode(ast, &pAst) < 0) { if (nodesStringToNode(ast, &pAst) < 0) {
...@@ -271,7 +271,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast ...@@ -271,7 +271,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
return -1; return -1;
} }
if (mndScheduleStream(pMnode, pTrans, pStream) < 0) { if (mndScheduleStream(pMnode, pTrans, pStream, smaId) < 0) {
mError("stream:%ld, schedule stream since %s", pStream->uid, terrstr()); mError("stream:%ld, schedule stream since %s", pStream->uid, terrstr());
return -1; return -1;
} }
...@@ -310,7 +310,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe ...@@ -310,7 +310,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
} }
mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name); mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name);
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) { if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans, -1) != 0) {
mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr()); mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
......
...@@ -198,10 +198,13 @@ int tqCommit(STQ*); ...@@ -198,10 +198,13 @@ int tqCommit(STQ*);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessSetConnReq(STQ* pTq, char* msg); int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
int32_t tqProcessRebReq(STQ* pTq, char* msg); int32_t tqProcessRebReq(STQ* pTq, char* msg);
int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg); int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen); int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen);
// sma
void smaHandleRes(SVnode* pVnode, int64_t smaId, const SArray* data);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -476,6 +476,7 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -476,6 +476,7 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
if (tqExpandTask(pTq, pTask, 4) < 0) { if (tqExpandTask(pTq, pTask, 4) < 0) {
ASSERT(0); ASSERT(0);
} }
pTask->ahandle = pTq->pVnode;
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask)); taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask));
...@@ -497,11 +498,9 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) { ...@@ -497,11 +498,9 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) {
return 0; return 0;
} }
int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg) { int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen) {
char* msgstr = POINTER_SHIFT(msg->pCont, sizeof(SMsgHead));
SStreamTaskExecReq req; SStreamTaskExecReq req;
tDecodeSStreamTaskExecReq(msgstr, &req); tDecodeSStreamTaskExecReq(msg, &req);
int32_t taskId = req.taskId; int32_t taskId = req.taskId;
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
......
...@@ -43,6 +43,8 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -43,6 +43,8 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) { int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in fetch queue is processing"); vTrace("message in fetch queue is processing");
char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_VND_FETCH: case TDMT_VND_FETCH:
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg); return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg);
...@@ -65,10 +67,9 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -65,10 +67,9 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) {
return vnodeGetTableMeta(pVnode, pMsg); return vnodeGetTableMeta(pVnode, pMsg);
case TDMT_VND_CONSUME: case TDMT_VND_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg); return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_VND_TASK_EXEC:
case TDMT_VND_TASK_PIPE_EXEC: case TDMT_VND_TASK_PIPE_EXEC:
case TDMT_VND_TASK_MERGE_EXEC: case TDMT_VND_TASK_MERGE_EXEC:
return tqProcessTaskExec(pVnode->pTq, pMsg); return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen);
case TDMT_VND_STREAM_TRIGGER: case TDMT_VND_STREAM_TRIGGER:
return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen); return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen);
case TDMT_VND_QUERY_HEARTBEAT: case TDMT_VND_QUERY_HEARTBEAT:
......
...@@ -178,6 +178,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -178,6 +178,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
pMsg->contLen - sizeof(SMsgHead)) < 0) { pMsg->contLen - sizeof(SMsgHead)) < 0) {
} }
} break; } break;
case TDMT_VND_TASK_WRITE_EXEC: {
if (tqProcessTaskExec(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
}
} break;
case TDMT_VND_CREATE_SMA: { // timeRangeSMA case TDMT_VND_CREATE_SMA: { // timeRangeSMA
#if 0 #if 0
SSmaCfg vCreateSmaReq = {0}; SSmaCfg vCreateSmaReq = {0};
......
...@@ -72,6 +72,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in ...@@ -72,6 +72,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
if (pTask->sinkType == TASK_SINK__TABLE) { if (pTask->sinkType == TASK_SINK__TABLE) {
// //
} else if (pTask->sinkType == TASK_SINK__SMA) { } else if (pTask->sinkType == TASK_SINK__SMA) {
pTask->smaSink.smaHandle(pTask->ahandle, pTask->smaSink.smaId, pRes);
// //
} else if (pTask->sinkType == TASK_SINK__FETCH) { } else if (pTask->sinkType == TASK_SINK__FETCH) {
// //
...@@ -205,9 +206,16 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) { ...@@ -205,9 +206,16 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1;
} }
if (pTask->sinkType != TASK_SINK__NONE) { if (pTask->sinkType == TASK_SINK__TABLE) {
// TODO: wrap
if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1; if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__SMA) {
if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__FETCH) {
if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__SHOW) {
if (tEncodeI8(pEncoder, pTask->showSink.reserved) < 0) return -1;
} else {
ASSERT(pTask->sinkType == TASK_SINK__NONE);
} }
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
...@@ -244,8 +252,16 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) { ...@@ -244,8 +252,16 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) {
if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1;
} }
if (pTask->sinkType != TASK_SINK__NONE) { if (pTask->sinkType == TASK_SINK__TABLE) {
if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__SMA) {
if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__FETCH) {
if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__SHOW) {
if (tDecodeI8(pDecoder, &pTask->showSink.reserved) < 0) return -1;
} else {
ASSERT(pTask->sinkType == TASK_SINK__NONE);
} }
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
......
...@@ -122,7 +122,7 @@ static void tdbPCacheClearLock(SPCache *pCache) { tdbMutexDestroy(&(pCache->mute ...@@ -122,7 +122,7 @@ static void tdbPCacheClearLock(SPCache *pCache) { tdbMutexDestroy(&(pCache->mute
static void tdbPCacheLock(SPCache *pCache) { tdbMutexLock(&(pCache->mutex)); } static void tdbPCacheLock(SPCache *pCache) { tdbMutexLock(&(pCache->mutex)); }
static void tdbPCacheUnlock(SPCache *pCache) { tdbMutexDestroy(&(pCache->mutex)); } static void tdbPCacheUnlock(SPCache *pCache) { tdbMutexUnlock(&(pCache->mutex)); }
static bool tdbPCacheLocked(SPCache *pCache) { static bool tdbPCacheLocked(SPCache *pCache) {
assert(0); assert(0);
......
...@@ -80,7 +80,7 @@ int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) { ...@@ -80,7 +80,7 @@ int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) {
// pPager->pCache // pPager->pCache
pPager->pCache = pCache; pPager->pCache = pCache;
pPager->fd = tdbOsOpen(pPager->dbFileName, O_RDWR | O_CREAT); pPager->fd = tdbOsOpen(pPager->dbFileName, TDB_O_CREAT | TDB_O_RDWR, 0755);
if (pPager->fd < 0) { if (pPager->fd < 0) {
return -1; return -1;
} }
...@@ -168,7 +168,7 @@ int tdbPagerBegin(SPager *pPager) { ...@@ -168,7 +168,7 @@ int tdbPagerBegin(SPager *pPager) {
} }
// Open the journal // Open the journal
pPager->jfd = tdbOsOpen(pPager->jFileName, O_RDWR | O_CREAT); pPager->jfd = tdbOsOpen(pPager->jFileName, TDB_O_CREAT | TDB_O_RDWR, 0755);
if (pPager->jfd < 0) { if (pPager->jfd < 0) {
return -1; return -1;
} }
......
...@@ -23,30 +23,27 @@ extern "C" { ...@@ -23,30 +23,27 @@ extern "C" {
// TODO: use cmake to control the option // TODO: use cmake to control the option
#define TDB_FOR_TDENGINE #define TDB_FOR_TDENGINE
// For memory -----------------
#ifdef TDB_FOR_TDENGINE #ifdef TDB_FOR_TDENGINE
// For memory -----------------
#define tdbOsMalloc taosMemoryMalloc #define tdbOsMalloc taosMemoryMalloc
#define tdbOsCalloc taosMemoryCalloc #define tdbOsCalloc taosMemoryCalloc
#define tdbOsRealloc taosMemoryRealloc #define tdbOsRealloc taosMemoryRealloc
#define tdbOsFree taosMemoryFree #define tdbOsFree taosMemoryFree
#else
#define tdbOsMalloc malloc
#define tdbOsCalloc calloc
#define tdbOsRealloc realloc
#define tdbOsFree free
#endif
// For file and directory ----------------- // For file and directory -----------------
#ifdef TDB_FOR_TDENGINE
/* file */ /* file */
typedef TdFilePtr tdb_fd_t; typedef TdFilePtr tdb_fd_t;
#define tdbOsOpen taosOpenFile #define TDB_O_CREAT TD_FILE_CTEATE
#define TDB_O_WRITE TD_FILE_WRITE
#define TDB_O_READ TD_FILE_READ
#define TDB_O_TRUNC TD_FILE_TRUNC
#define TDB_O_APPEND TD_FILE_APPEND
#define TDB_O_RDWR (TD_FILE_WRITE) | (TD_FILE_READ)
#define tdbOsOpen(PATH, OPTION, MODE) taosOpenFile((PATH), (OPTION))
#define tdbOsClose(FD) taosCloseFile(&(FD)) #define tdbOsClose(FD) taosCloseFile(&(FD))
#define tdbOsRead taosReadFile #define tdbOsRead taosReadFile
#define tdbOsPRead taosPReadFile #define tdbOsPRead taosPReadFile
...@@ -59,31 +56,7 @@ typedef TdFilePtr tdb_fd_t; ...@@ -59,31 +56,7 @@ typedef TdFilePtr tdb_fd_t;
#define tdbOsMkdir taosMkDir #define tdbOsMkdir taosMkDir
#define tdbOsRmdir taosRemoveDir #define tdbOsRmdir taosRemoveDir
#else
/* file */
typedef int tdb_fd_t;
#define tdbOsOpen open
#define tdbOsClose close
i64 tdbOsRead(tdb_fd_t fd, void *pData, i64 nBytes);
i64 tdbOsPRead(tdb_fd_t fd, void *pData, i64 nBytes, i64 offset);
i64 tdbOsWrite(tdb_fd_t fd, const void *pData, i64 nBytes);
#define tdbOsFSync fsync
#define tdbOsLSeek lseek
#define tdbOsRemove remove
/* directory */
#define tdbOsMkdir mkdir
#define tdbOsRmdir rmdir
#endif
// For threads and lock ----------------- // For threads and lock -----------------
#ifdef TDB_FOR_TDENGINE
/* spin lock */ /* spin lock */
typedef TdThreadSpinlock tdb_spinlock_t; typedef TdThreadSpinlock tdb_spinlock_t;
...@@ -103,6 +76,40 @@ typedef TdThreadMutex tdb_mutex_t; ...@@ -103,6 +76,40 @@ typedef TdThreadMutex tdb_mutex_t;
#else #else
// For memory -----------------
#define tdbOsMalloc malloc
#define tdbOsCalloc calloc
#define tdbOsRealloc realloc
#define tdbOsFree free
// For file and directory -----------------
/* file */
typedef int tdb_fd_t;
#define TDB_O_CREAT O_CREAT
#define TDB_O_WRITE O_WRONLY
#define TDB_O_READ O_RDONLY
#define TDB_O_TRUNC O_TRUNC
#define TDB_O_APPEND O_APPEND
#define TDB_O_RDWR O_RDWR
#define tdbOsOpen(PATH, OPTION, MODE) open((PATH), (OPTION), (MODE))
#define tdbOsClose close
i64 tdbOsRead(tdb_fd_t fd, void *pData, i64 nBytes);
i64 tdbOsPRead(tdb_fd_t fd, void *pData, i64 nBytes, i64 offset);
i64 tdbOsWrite(tdb_fd_t fd, const void *pData, i64 nBytes);
#define tdbOsFSync fsync
#define tdbOsLSeek lseek
#define tdbOsRemove remove
/* directory */
#define tdbOsMkdir mkdir
#define tdbOsRmdir rmdir
// For threads and lock -----------------
/* spin lock */ /* spin lock */
typedef pthread_spinlock_t tdb_spinlock_t; typedef pthread_spinlock_t tdb_spinlock_t;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册