未验证 提交 16bf52bf 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #15611 from taosdata/feature/stream

refactor(stream): use vnode meta
...@@ -56,7 +56,6 @@ enum { ...@@ -56,7 +56,6 @@ enum {
STREAM_INPUT__DATA_SUBMIT = 1, STREAM_INPUT__DATA_SUBMIT = 1,
STREAM_INPUT__DATA_BLOCK, STREAM_INPUT__DATA_BLOCK,
STREAM_INPUT__MERGED_SUBMIT, STREAM_INPUT__MERGED_SUBMIT,
// STREAM_INPUT__TABLE_SCAN,
STREAM_INPUT__TQ_SCAN, STREAM_INPUT__TQ_SCAN,
STREAM_INPUT__DATA_RETRIEVE, STREAM_INPUT__DATA_RETRIEVE,
STREAM_INPUT__GET_RES, STREAM_INPUT__GET_RES,
...@@ -154,7 +153,7 @@ typedef struct SQueryTableDataCond { ...@@ -154,7 +153,7 @@ typedef struct SQueryTableDataCond {
int32_t order; // desc|asc order to iterate the data block int32_t order; // desc|asc order to iterate the data block
int32_t numOfCols; int32_t numOfCols;
SColumnInfo* colList; SColumnInfo* colList;
int32_t type; // data block load type: int32_t type; // data block load type:
STimeWindow twindows; STimeWindow twindows;
int64_t startVersion; int64_t startVersion;
int64_t endVersion; int64_t endVersion;
......
...@@ -41,7 +41,7 @@ extern "C" { ...@@ -41,7 +41,7 @@ extern "C" {
#define WAL_REFRESH_MS 1000 #define WAL_REFRESH_MS 1000
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) #define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32) #define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_MAGIC 0xFAFBFCFDULL #define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3) #define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
typedef enum { typedef enum {
......
...@@ -78,7 +78,10 @@ struct SMeta { ...@@ -78,7 +78,10 @@ struct SMeta {
TTB* pTagIdx; TTB* pTagIdx;
TTB* pTtlIdx; TTB* pTtlIdx;
TTB* pSmaIdx; TTB* pSmaIdx;
TTB* pTaskIdx;
SMetaIdx* pIdx; SMetaIdx* pIdx;
}; };
......
...@@ -153,7 +153,7 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen); ...@@ -153,7 +153,7 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data); int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data, int64_t ver);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg);
......
...@@ -22,6 +22,7 @@ static int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kL ...@@ -22,6 +22,7 @@ static int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kL
static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int uidIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int uidIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int smaIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int smaIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int taskIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); } static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); }
static int32_t metaDestroyLock(SMeta *pMeta) { return taosThreadRwlockDestroy(&pMeta->lock); } static int32_t metaDestroyLock(SMeta *pMeta) { return taosThreadRwlockDestroy(&pMeta->lock); }
...@@ -130,6 +131,12 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) { ...@@ -130,6 +131,12 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
goto _err; goto _err;
} }
ret = tdbTbOpen("stream.task.db", sizeof(int64_t), -1, taskIdxKeyCmpr, pMeta->pEnv, &pMeta->pTaskIdx);
if (ret < 0) {
metaError("vgId: %d, failed to open meta stream task index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open index // open index
if (metaOpenIdx(pMeta) < 0) { if (metaOpenIdx(pMeta) < 0) {
metaError("vgId:%d, failed to open meta index since %s", TD_VID(pVnode), tstrerror(terrno)); metaError("vgId:%d, failed to open meta index since %s", TD_VID(pVnode), tstrerror(terrno));
...@@ -143,6 +150,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) { ...@@ -143,6 +150,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
_err: _err:
if (pMeta->pIdx) metaCloseIdx(pMeta); if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pTaskIdx) tdbTbClose(pMeta->pTaskIdx);
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx); if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx); if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx);
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx); if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
...@@ -162,6 +170,7 @@ _err: ...@@ -162,6 +170,7 @@ _err:
int metaClose(SMeta *pMeta) { int metaClose(SMeta *pMeta) {
if (pMeta) { if (pMeta) {
if (pMeta->pIdx) metaCloseIdx(pMeta); if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pTaskIdx) tdbTbClose(pMeta->pTaskIdx);
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx); if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx); if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx);
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx); if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
...@@ -378,3 +387,16 @@ static int smaIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kL ...@@ -378,3 +387,16 @@ static int smaIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kL
return 0; return 0;
} }
static int taskIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
int32_t uid1 = *(int32_t *)pKey1;
int32_t uid2 = *(int32_t *)pKey2;
if (uid1 > uid2) {
return 1;
} else if (uid1 < uid2) {
return -1;
}
return 0;
}
...@@ -695,7 +695,7 @@ FAIL: ...@@ -695,7 +695,7 @@ FAIL:
return -1; return -1;
} }
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) { int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
void* pIter = NULL; void* pIter = NULL;
bool failed = false; bool failed = false;
SStreamDataSubmit* pSubmit = NULL; SStreamDataSubmit* pSubmit = NULL;
...@@ -713,7 +713,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) { ...@@ -713,7 +713,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
SStreamTask* pTask = *(SStreamTask**)pIter; SStreamTask* pTask = *(SStreamTask**)pIter;
if (!pTask->isDataScan) continue; if (!pTask->isDataScan) continue;
qDebug("data submit enqueue stream task: %d", pTask->taskId); qDebug("data submit enqueue stream task: %d, ver: %ld", pTask->taskId, ver);
if (!failed) { if (!failed) {
if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) { if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) {
......
...@@ -48,7 +48,7 @@ int32_t tqMetaOpen(STQ* pTq) { ...@@ -48,7 +48,7 @@ int32_t tqMetaOpen(STQ* pTq) {
ASSERT(0); ASSERT(0);
} }
if (tdbTbOpen("handles", -1, -1, 0, pTq->pMetaStore, &pTq->pExecStore) < 0) { if (tdbTbOpen("handles", -1, -1, NULL, pTq->pMetaStore, &pTq->pExecStore) < 0) {
ASSERT(0); ASSERT(0);
} }
......
...@@ -252,7 +252,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) ...@@ -252,7 +252,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
SSubmitReq* pReq = (SSubmitReq*)data; SSubmitReq* pReq = (SSubmitReq*)data;
pReq->version = ver; pReq->version = ver;
tqProcessStreamTrigger(pTq, data); tqProcessStreamTrigger(pTq, data, ver);
} }
return 0; return 0;
......
...@@ -116,15 +116,15 @@ int32_t walRollback(SWal *pWal, int64_t ver) { ...@@ -116,15 +116,15 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
} }
walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr); walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND); TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
if (pIdxTFile == NULL) { if (pIdxFile == NULL) {
ASSERT(0); ASSERT(0);
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
return -1; return -1;
} }
int64_t idxOff = walGetVerIdxOffset(pWal, ver); int64_t idxOff = walGetVerIdxOffset(pWal, ver);
code = taosLSeekFile(pIdxTFile, idxOff, SEEK_SET); code = taosLSeekFile(pIdxFile, idxOff, SEEK_SET);
if (code < 0) { if (code < 0) {
ASSERT(0); ASSERT(0);
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
...@@ -132,7 +132,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) { ...@@ -132,7 +132,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
} }
// read idx file and get log file pos // read idx file and get log file pos
SWalIdxEntry entry; SWalIdxEntry entry;
if (taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) { if (taosReadFile(pIdxFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
ASSERT(0); ASSERT(0);
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
return -1; return -1;
...@@ -140,24 +140,24 @@ int32_t walRollback(SWal *pWal, int64_t ver) { ...@@ -140,24 +140,24 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
ASSERT(entry.ver == ver); ASSERT(entry.ver == ver);
walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr); walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND); TdFilePtr pLogFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
if (pLogTFile == NULL) { if (pLogFile == NULL) {
ASSERT(0);
// TODO // TODO
terrno = TAOS_SYSTEM_ERROR(errno);
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
return -1; return -1;
} }
code = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET); code = taosLSeekFile(pLogFile, entry.offset, SEEK_SET);
if (code < 0) { if (code < 0) {
ASSERT(0);
// TODO // TODO
terrno = TAOS_SYSTEM_ERROR(errno);
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
return -1; return -1;
} }
// validate offset // validate offset
SWalCkHead head; SWalCkHead head;
ASSERT(taosValidFile(pLogTFile)); ASSERT(taosValidFile(pLogFile));
int64_t size = taosReadFile(pLogTFile, &head, sizeof(SWalCkHead)); int64_t size = taosReadFile(pLogFile, &head, sizeof(SWalCkHead));
if (size != sizeof(SWalCkHead)) { if (size != sizeof(SWalCkHead)) {
ASSERT(0); ASSERT(0);
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
...@@ -180,14 +180,14 @@ int32_t walRollback(SWal *pWal, int64_t ver) { ...@@ -180,14 +180,14 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
} }
// truncate old files // truncate old files
code = taosFtruncateFile(pLogTFile, entry.offset); code = taosFtruncateFile(pLogFile, entry.offset);
if (code < 0) { if (code < 0) {
ASSERT(0); ASSERT(0);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
return -1; return -1;
} }
code = taosFtruncateFile(pIdxTFile, idxOff); code = taosFtruncateFile(pIdxFile, idxOff);
if (code < 0) { if (code < 0) {
ASSERT(0); ASSERT(0);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
...@@ -205,8 +205,10 @@ int32_t walRollback(SWal *pWal, int64_t ver) { ...@@ -205,8 +205,10 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
ASSERT(((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->fileSize == 0); ASSERT(((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->fileSize == 0);
((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->firstVer = -1; ((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->firstVer = -1;
} }
taosCloseFile(&pIdxTFile); taosCloseFile(&pIdxFile);
taosCloseFile(&pLogTFile); taosCloseFile(&pLogFile);
walSaveMeta(pWal);
// unlock // unlock
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册