提交 71097ea5 编写于 作者: L Liu Jicong

put schema into stream

上级 96eb6a9b
......@@ -25,7 +25,7 @@ int32_t init_env() {
return -1;
}
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
return -1;
......
......@@ -127,7 +127,7 @@ static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp
tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum);
tlen += taosEncodeFixedI32(buf, pRsp->numOfTopics);
if (pRsp->numOfTopics == 0) return tlen;
tlen += tEncodeSSchemaWrapper(buf, pRsp->schema);
tlen += taosEncodeSSchemaWrapper(buf, pRsp->schema);
if (pRsp->pBlockData) {
sz = taosArrayGetSize(pRsp->pBlockData);
}
......@@ -149,7 +149,7 @@ static FORCE_INLINE void* tDecodeSMqPollRsp(void* buf, SMqPollRsp* pRsp) {
if (pRsp->numOfTopics == 0) return buf;
pRsp->schema = (SSchemaWrapper*)taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (pRsp->schema == NULL) return NULL;
buf = tDecodeSSchemaWrapper(buf, pRsp->schema);
buf = taosDecodeSSchemaWrapper(buf, pRsp->schema);
buf = taosDecodeFixedI32(buf, &sz);
pRsp->pBlockData = taosArrayInit(sz, sizeof(SSDataBlock));
for (int32_t i = 0; i < sz; i++) {
......
......@@ -1931,7 +1931,7 @@ static FORCE_INLINE int32_t tDecodeSSchema(SCoder* pDecoder, SSchema* pSchema) {
return 0;
}
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) {
static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) {
int32_t tlen = 0;
tlen += taosEncodeFixedU32(buf, pSW->nCols);
for (int32_t i = 0; i < pSW->nCols; i++) {
......@@ -1940,7 +1940,7 @@ static FORCE_INLINE int32_t tEncodeSSchemaWrapper(void** buf, const SSchemaWrapp
return tlen;
}
static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) {
static FORCE_INLINE void* taosDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) {
buf = taosDecodeFixedU32(buf, &pSW->nCols);
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
if (pSW->pSchema == NULL) {
......@@ -1953,6 +1953,27 @@ static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW)
return buf;
}
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SCoder* pEncoder, const SSchemaWrapper* pSW) {
if (tEncodeU32(pEncoder, pSW->nCols) < 0) return -1;
for (int32_t i = 0; i < pSW->nCols; i++) {
if (tEncodeSSchema(pEncoder, &pSW->pSchema[i]) < 0) return -1;
}
return pEncoder->pos;
}
static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SCoder* pDecoder, SSchemaWrapper* pSW) {
if (tDecodeU32(pDecoder, &pSW->nCols) < 0) return -1;
void* ptr = taosMemoryRealloc(pSW->pSchema, pSW->nCols * sizeof(SSchema));
if (ptr == NULL) {
return -1;
}
pSW->pSchema = (SSchema*)ptr;
for (int32_t i = 0; i < pSW->nCols; i++) {
if (tDecodeSSchema(pDecoder, &pSW->pSchema[i]) < 0) return -1;
}
return 0;
}
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
char stb[TSDB_TABLE_FNAME_LEN];
......
......@@ -16,6 +16,8 @@
#ifndef _TD_UTIL_TAOS_ERROR_H_
#define _TD_UTIL_TAOS_ERROR_H_
#include "os.h"
#ifdef __cplusplus
extern "C" {
#endif
......
......@@ -733,12 +733,12 @@ typedef struct {
int8_t sourceType;
int8_t sinkType;
// int32_t sqlLen;
int32_t sinkVgId; // 0 for automatic
char* sql;
char* logicalPlan;
char* physicalPlan;
SArray* tasks; // SArray<SArray<SStreamTask>>
SArray* ColAlias; // SArray<char*>
int32_t sinkVgId; // 0 for automatic
char* sql;
char* logicalPlan;
char* physicalPlan;
SArray* tasks; // SArray<SArray<SStreamTask>>
SSchemaWrapper outputSchema;
} SStreamObj;
int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj);
......
......@@ -17,7 +17,7 @@
int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
int32_t sz = 0;
int32_t outputNameSz = 0;
/*int32_t outputNameSz = 0;*/
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->db) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
......@@ -45,6 +45,9 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
}
}
if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1;
#if 0
if (pObj->ColAlias != NULL) {
outputNameSz = taosArrayGetSize(pObj->ColAlias);
}
......@@ -53,6 +56,7 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
char *name = taosArrayGetP(pObj->ColAlias, i);
if (tEncodeCStr(pEncoder, name) < 0) return -1;
}
#endif
return pEncoder->pos;
}
......@@ -85,6 +89,9 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
taosArrayPush(pObj->tasks, pArray);
}
}
if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1;
#if 0
int32_t outputNameSz;
if (tDecodeI32(pDecoder, &outputNameSz) < 0) return -1;
if (outputNameSz != 0) {
......@@ -98,5 +105,6 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
if (tDecodeCStrAlloc(pDecoder, &name) < 0) return -1;
taosArrayPush(pObj->ColAlias, &name);
}
#endif
return 0;
}
......@@ -14,7 +14,6 @@
*/
#include "mndStream.h"
#include "parser.h"
#include "mndAuth.h"
#include "mndDb.h"
#include "mndDnode.h"
......@@ -26,6 +25,7 @@
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "parser.h"
#include "tname.h"
#define MND_STREAM_VER_NUMBER 1
......@@ -248,23 +248,21 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) {
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) {
SNode *pAst = NULL;
#if 1 // TODO: remove debug info later
printf("ast = %s\n", ast);
#if 1 // TODO: remove debug info later
printf("ast = %s\n", ast);
#endif
if (nodesStringToNode(ast, &pAst) < 0) {
return -1;
}
#if 1
SSchemaWrapper sw = {0};
qExtractResultSchema(pAst, (int32_t*)&sw.nCols, &sw.pSchema);
qExtractResultSchema(pAst, (int32_t *)&pStream->outputSchema.nCols, &pStream->outputSchema.pSchema);
printf("|");
for (int i = 0; i < sw.nCols; i++) {
printf(" %15s |", (char *)sw.pSchema[i].name);
for (int i = 0; i < pStream->outputSchema.nCols; i++) {
printf(" %15s |", (char *)pStream->outputSchema.pSchema[i].name);
}
printf("\n=======================================================\n");
pStream->ColAlias = NULL;
#endif
if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, &pStream->physicalPlan)) {
......
......@@ -37,8 +37,7 @@ void* tmemmem(char* haystack, int hlen, char* needle, int nlen) {
}
limit = haystack + hlen - nlen + 1;
while ((haystack = (char*)memchr(
haystack, needle[0], limit - haystack)) != NULL) {
while ((haystack = (char*)memchr(haystack, needle[0], limit - haystack)) != NULL) {
if (memcmp(haystack, needle, nlen) == 0) {
return haystack;
}
......@@ -57,8 +56,8 @@ static inline int64_t walScanLogGetLastVer(SWal* pWal) {
}
#endif
SWalFileInfo *pLastFileInfo = taosArrayGet(pWal->fileInfoSet, sz-1);
char fnameStr[WAL_FILE_LEN];
SWalFileInfo* pLastFileInfo = taosArrayGet(pWal->fileInfoSet, sz - 1);
char fnameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr);
int64_t file_size = 0;
......@@ -88,20 +87,20 @@ static inline int64_t walScanLogGetLastVer(SWal* pWal) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
char* haystack = buf;
char* found = NULL;
char *candidate;
while((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(uint64_t))) != NULL) {
char* candidate;
while ((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(uint64_t))) != NULL) {
// read and validate
SWalHead *logContent = (SWalHead*)candidate;
SWalHead* logContent = (SWalHead*)candidate;
if (walValidHeadCksum(logContent) == 0 && walValidBodyCksum(logContent) == 0) {
found = candidate;
}
haystack = candidate + 1;
}
if (found == buf) {
SWalHead *logContent = (SWalHead*)found;
SWalHead* logContent = (SWalHead*)found;
if (walValidHeadCksum(logContent) != 0 || walValidBodyCksum(logContent) != 0) {
// file has to be deleted
taosMemoryFree(buf);
......@@ -111,7 +110,7 @@ static inline int64_t walScanLogGetLastVer(SWal* pWal) {
}
}
taosCloseFile(&pFile);
SWalHead *lastEntry = (SWalHead*)found;
SWalHead* lastEntry = (SWalHead*)found;
return lastEntry->head.version;
}
......@@ -158,10 +157,10 @@ int walCheckAndRepairMeta(SWal* pWal) {
int newSz = taosArrayGetSize(pLogInfoArray);
if (oldSz > newSz) {
taosArrayPopFrontBatch(pWal->fileInfoSet, oldSz - newSz);
taosArrayPopFrontBatch(pWal->fileInfoSet, oldSz - newSz);
} else if (oldSz < newSz) {
for (int i = oldSz; i < newSz; i++) {
SWalFileInfo *pFileInfo = taosArrayGet(pLogInfoArray, i);
SWalFileInfo* pFileInfo = taosArrayGet(pLogInfoArray, i);
taosArrayPush(pWal->fileInfoSet, pFileInfo);
}
}
......@@ -171,8 +170,8 @@ int walCheckAndRepairMeta(SWal* pWal) {
if (newSz > 0) {
pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
SWalFileInfo *pLastFileInfo = taosArrayGet(pWal->fileInfoSet, newSz-1);
char fnameStr[WAL_FILE_LEN];
SWalFileInfo* pLastFileInfo = taosArrayGet(pWal->fileInfoSet, newSz - 1);
char fnameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr);
int64_t file_size = 0;
taosStatFile(fnameStr, &file_size, NULL);
......@@ -191,8 +190,8 @@ int walCheckAndRepairMeta(SWal* pWal) {
}
}
//TODO: set fileSize and lastVer if necessary
// TODO: set fileSize and lastVer if necessary
return 0;
}
......@@ -239,13 +238,13 @@ char* walMetaSerialize(SWal* pWal) {
cJSON* pFiles = cJSON_CreateArray();
cJSON* pField;
if (pRoot == NULL || pMeta == NULL || pFiles == NULL) {
if(pRoot) {
if (pRoot) {
cJSON_Delete(pRoot);
}
if(pMeta) {
if (pMeta) {
cJSON_Delete(pMeta);
}
if(pFiles) {
if (pFiles) {
cJSON_Delete(pFiles);
}
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
......
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "walInt.h"
#include "taoserror.h"
#include "walInt.h"
SWalReadHandle *walOpenReadHandle(SWal *pWal) {
SWalReadHandle *pRead = taosMemoryMalloc(sizeof(SWalReadHandle));
......@@ -92,6 +92,7 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr);
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_READ);
if (pIdxTFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
......@@ -169,7 +170,8 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
}
if (pRead->pHead->head.version != ver) {
wError("unexpected wal log version: %" PRId64 ", read request version:%" PRId64 "", pRead->pHead->head.version, ver);
wError("unexpected wal log version: %" PRId64 ", read request version:%" PRId64 "", pRead->pHead->head.version,
ver);
pRead->curVersion = -1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
......@@ -177,7 +179,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
code = walValidBodyCksum(pRead->pHead);
if (code != 0) {
wError("unexpected wal log version: checksum not passed");
wError("unexpected wal log version: % " PRId64 "checksum not passed", ver);
pRead->curVersion = -1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册