未验证 提交 7ed9da56 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #13653 from taosdata/feature/stream

feat(stream): sink table can be other db
......@@ -1752,7 +1752,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
taosArrayClear(tagArray);
taosArrayPush(tagArray, &tagVal);
tTagNew(tagArray, 1, false, &pTag);
if (!pTag) {
if (pTag == NULL) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
return NULL;
......@@ -1763,9 +1763,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
tdDestroySVCreateTbReq(&createTbReq);
if (code < 0) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
return NULL;
}
......@@ -1804,8 +1802,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
int32_t schemaLen = 0;
if (createTb) {
SVCreateTbReq createTbReq = {0};
char* cname = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
snprintf(cname, TSDB_TABLE_FNAME_LEN, "%s:%ld", stbFullName, pDataBlock->info.groupId);
char* cname = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId);
createTbReq.name = cname;
createTbReq.flags = 0;
createTbReq.type = TSDB_CHILD_TABLE;
......@@ -1819,7 +1816,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
taosArrayPush(tagArray, &tagVal);
STag* pTag = NULL;
tTagNew(tagArray, 1, false, &pTag);
if (!pTag) {
if (pTag == NULL) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
taosMemoryFreeClear(ret);
......@@ -1945,7 +1942,6 @@ void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen
const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData) {
blockDataEnsureCapacity(pBlock, numOfRows);
pBlock->info.rows = numOfRows;
const char* pStart = pData;
......@@ -2019,6 +2015,7 @@ const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t
pStart += colLen[i];
}
pBlock->info.rows = numOfRows;
ASSERT(pStart - pData == dataLen);
return pStart;
}
......@@ -186,7 +186,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
SVgObj* pVgroup;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pStream->dbUid) {
if (strcmp(pVgroup->dbName, pStream->targetDb) != 0) {
sdbRelease(pSdb, pVgroup);
continue;
}
......@@ -286,7 +286,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
pStream->tasks = taosArrayInit(totLevel, sizeof(void*));
bool hasExtraSink = false;
if (totLevel == 2 || strcmp(pStream->sourceDb, pStream->targetDb) != 0) {
bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
if (totLevel == 2 || externalTargetDB) {
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
taosArrayPush(pStream->tasks, &taskOneLevel);
// add extra sink
......@@ -405,7 +406,6 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
if (pStream->fixedSinkVgId == 0) {
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
/*pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;*/
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
ASSERT(pDb);
......@@ -426,10 +426,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
for (int32_t j = 0; j < sinkLvSize; j++) {
SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j);
/*printf("vgid %d node id %d\n", pVgInfo->vgId, pTask->nodeId);*/
if (pLastLevelTask->nodeId == pVgInfo->vgId) {
pVgInfo->taskId = pLastLevelTask->taskId;
/*printf("taskid %d set to %d\n", pVgInfo->taskId, pTask->taskId);*/
break;
}
}
......
......@@ -253,7 +253,9 @@ int walRoll(SWal *pWal) {
static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
SWalIdxEntry entry = {.ver = ver, .offset = offset};
int size = taosWriteFile(pWal->pWriteIdxTFile, &entry, sizeof(SWalIdxEntry));
/*int64_t idxOffset = taosLSeekFile(pWal->pWriteIdxTFile, 0, SEEK_CUR);*/
/*wDebug("write index: ver: %ld, offset: %ld, at %ld", ver, offset, idxOffset);*/
int size = taosWriteFile(pWal->pWriteIdxTFile, &entry, sizeof(SWalIdxEntry));
if (size != sizeof(SWalIdxEntry)) {
terrno = TAOS_SYSTEM_ERROR(errno);
// TODO truncate
......
......@@ -174,9 +174,9 @@ int32_t taosRenameFile(const char *oldName, const char *newName) {
int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime) {
struct stat fileStat;
#ifdef WINDOWS
int32_t code = _stat(path, &fileStat);
int32_t code = _stat(path, &fileStat);
#else
int32_t code = stat(path, &fileStat);
int32_t code = stat(path, &fileStat);
#endif
if (code < 0) {
return code;
......@@ -201,7 +201,7 @@ int32_t taosDevInoFile(TdFilePtr pFile, int64_t *stDev, int64_t *stIno) {
#ifdef WINDOWS
BY_HANDLE_FILE_INFORMATION bhfi;
HANDLE handle = (HANDLE)_get_osfhandle(pFile->fd);
HANDLE handle = (HANDLE)_get_osfhandle(pFile->fd);
if (GetFileInformationByHandle(handle, &bhfi) == FALSE) {
printf("taosFStatFile get file info fail.");
return -1;
......@@ -216,7 +216,7 @@ int32_t taosDevInoFile(TdFilePtr pFile, int64_t *stDev, int64_t *stIno) {
}
#else
struct stat fileStat;
int32_t code = fstat(pFile->fd, &fileStat);
if (code < 0) {
......@@ -238,7 +238,7 @@ int32_t taosDevInoFile(TdFilePtr pFile, int64_t *stDev, int64_t *stIno) {
void autoDelFileListAdd(const char *path) { return; }
TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
int fd = -1;
FILE *fp = NULL;
if (tdFileOptions & TD_FILE_STREAM) {
......@@ -316,12 +316,12 @@ int64_t taosCloseFile(TdFilePtr *ppFile) {
(*ppFile)->fp = NULL;
}
if ((*ppFile)->fd >= 0) {
#ifdef WINDOWS
#ifdef WINDOWS
HANDLE h = (HANDLE)_get_osfhandle((*ppFile)->fd);
!FlushFileBuffers(h);
#else
#else
fsync((*ppFile)->fd);
#endif
#endif
close((*ppFile)->fd);
(*ppFile)->fd = -1;
}
......@@ -345,11 +345,11 @@ int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) {
char *tbuf = (char *)buf;
while (leftbytes > 0) {
#ifdef WINDOWS
#ifdef WINDOWS
readbytes = _read(pFile->fd, (void *)tbuf, (uint32_t)leftbytes);
#else
#else
readbytes = read(pFile->fd, (void *)tbuf, (uint32_t)leftbytes);
#endif
#endif
if (readbytes < 0) {
if (errno == EINTR) {
continue;
......@@ -433,9 +433,6 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) {
}
int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence) {
if (pFile == NULL) {
return 0;
}
#if FILE_WITH_LOCK
taosThreadRwlockRdlock(&(pFile->rwlock));
#endif
......@@ -459,9 +456,9 @@ int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime) {
struct stat fileStat;
#ifdef WINDOWS
int32_t code = _fstat(pFile->fd, &fileStat);
int32_t code = _fstat(pFile->fd, &fileStat);
#else
int32_t code = fstat(pFile->fd, &fileStat);
int32_t code = fstat(pFile->fd, &fileStat);
#endif
if (code < 0) {
return code;
......@@ -565,12 +562,12 @@ int32_t taosFsyncFile(TdFilePtr pFile) {
if (pFile->fp != NULL) return fflush(pFile->fp);
if (pFile->fd >= 0) {
#ifdef WINDOWS
#ifdef WINDOWS
HANDLE h = (HANDLE)_get_osfhandle(pFile->fd);
return !FlushFileBuffers(h);
#else
#else
return fsync(pFile->fd);
#endif
#endif
}
return 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册