提交 abc92880 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/tkv

...@@ -163,6 +163,12 @@ typedef struct { ...@@ -163,6 +163,12 @@ typedef struct {
int32_t vgVersion; int32_t vgVersion;
} SBuildUseDBInput; } SBuildUseDBInput;
typedef struct SField {
char name[TSDB_COL_NAME_LEN];
uint8_t type;
int32_t bytes;
} SField;
#pragma pack(push, 1) #pragma pack(push, 1)
// null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta // null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta
...@@ -252,16 +258,23 @@ typedef struct SSchema { ...@@ -252,16 +258,23 @@ typedef struct SSchema {
typedef struct { typedef struct {
char name[TSDB_TABLE_FNAME_LEN]; char name[TSDB_TABLE_FNAME_LEN];
int8_t igExists; int8_t igExists;
int32_t numOfTags;
int32_t numOfColumns; int32_t numOfColumns;
SSchema pSchemas[]; int32_t numOfTags;
SArray* pColumns;
SArray* pTags;
} SMCreateStbReq; } SMCreateStbReq;
int32_t tSerializeSMCreateStbReq(void** buf, SMCreateStbReq* pReq);
void* tDeserializeSMCreateStbReq(void* buf, SMCreateStbReq* pReq);
typedef struct { typedef struct {
char name[TSDB_TABLE_FNAME_LEN]; char name[TSDB_TABLE_FNAME_LEN];
int8_t igNotExists; int8_t igNotExists;
} SMDropStbReq; } SMDropStbReq;
int32_t tSerializeSMDropStbReq(void** buf, SMDropStbReq* pReq);
void* tDeserializeSMDropStbReq(void* buf, SMDropStbReq* pReq);
typedef struct { typedef struct {
char name[TSDB_TABLE_FNAME_LEN]; char name[TSDB_TABLE_FNAME_LEN];
int8_t alterType; int8_t alterType;
......
...@@ -37,12 +37,6 @@ typedef struct SQueryNode { ...@@ -37,12 +37,6 @@ typedef struct SQueryNode {
#define queryNodeType(nodeptr) (((const SQueryNode*)(nodeptr))->type) #define queryNodeType(nodeptr) (((const SQueryNode*)(nodeptr))->type)
typedef struct SField {
char name[TSDB_COL_NAME_LEN];
uint8_t type;
int32_t bytes;
} SField;
typedef struct SFieldInfo { typedef struct SFieldInfo {
int16_t numOfOutput; // number of column in result int16_t numOfOutput; // number of column in result
SField *final; SField *final;
......
...@@ -336,3 +336,82 @@ void *tDeserializeSVDropTbReq(void *buf, SVDropTbReq *pReq) { ...@@ -336,3 +336,82 @@ void *tDeserializeSVDropTbReq(void *buf, SVDropTbReq *pReq) {
buf = taosDecodeFixedU8(buf, &pReq->type); buf = taosDecodeFixedU8(buf, &pReq->type);
return buf; return buf;
} }
int32_t tSerializeSMCreateStbReq(void **buf, SMCreateStbReq *pReq) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pReq->name);
tlen += taosEncodeFixedI8(buf, pReq->igExists);
tlen += taosEncodeFixedI32(buf, pReq->numOfColumns);
tlen += taosEncodeFixedI32(buf, pReq->numOfTags);
for (int32_t i = 0; i < pReq->numOfColumns; ++i) {
SField *pField = taosArrayGet(pReq->pColumns, i);
tlen += taosEncodeFixedI8(buf, pField->type);
tlen += taosEncodeFixedI32(buf, pField->bytes);
tlen += taosEncodeString(buf, pField->name);
}
for (int32_t i = 0; i < pReq->numOfTags; ++i) {
SField *pField = taosArrayGet(pReq->pTags, i);
tlen += taosEncodeFixedI8(buf, pField->type);
tlen += taosEncodeFixedI32(buf, pField->bytes);
tlen += taosEncodeString(buf, pField->name);
}
return tlen;
}
void *tDeserializeSMCreateStbReq(void *buf, SMCreateStbReq *pReq) {
buf = taosDecodeStringTo(buf, pReq->name);
buf = taosDecodeFixedI8(buf, &pReq->igExists);
buf = taosDecodeFixedI32(buf, &pReq->numOfColumns);
buf = taosDecodeFixedI32(buf, &pReq->numOfTags);
pReq->pColumns = taosArrayInit(pReq->numOfColumns, sizeof(SField));
pReq->pTags = taosArrayInit(pReq->numOfTags, sizeof(SField));
if (pReq->pColumns == NULL || pReq->pTags == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
for (int32_t i = 0; i < pReq->numOfColumns; ++i) {
SField field = {0};
buf = taosDecodeFixedI8(buf, &field.type);
buf = taosDecodeFixedI32(buf, &field.bytes);
buf = taosDecodeStringTo(buf, field.name);
if (taosArrayPush(pReq->pColumns, &field) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
}
for (int32_t i = 0; i < pReq->numOfTags; ++i) {
SField field = {0};
buf = taosDecodeFixedI8(buf, &field.type);
buf = taosDecodeFixedI32(buf, &field.bytes);
buf = taosDecodeStringTo(buf, field.name);
if (taosArrayPush(pReq->pTags, &field) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
}
return buf;
}
int32_t tSerializeSMDropStbReq(void **buf, SMDropStbReq *pReq) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pReq->name);
tlen += taosEncodeFixedI8(buf, pReq->igNotExists);
return tlen;
}
void *tDeserializeSMDropStbReq(void *buf, SMDropStbReq *pReq) {
buf = taosDecodeStringTo(buf, pReq->name);
buf = taosDecodeFixedI8(buf, &pReq->igNotExists);
return buf;
}
...@@ -304,8 +304,8 @@ typedef struct { ...@@ -304,8 +304,8 @@ typedef struct {
int32_t nextColId; int32_t nextColId;
int32_t numOfColumns; int32_t numOfColumns;
int32_t numOfTags; int32_t numOfTags;
SSchema* pTags;
SSchema* pColumns; SSchema* pColumns;
SSchema* pTags;
SRWLatch lock; SRWLatch lock;
} SStbObj; } SStbObj;
......
...@@ -262,7 +262,7 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) { ...@@ -262,7 +262,7 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) {
return mndAcquireDb(pMnode, db); return mndAcquireDb(pMnode, db);
} }
static void *mndBuildCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) { static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) {
SName name = {0}; SName name = {0};
tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
...@@ -295,7 +295,7 @@ static void *mndBuildCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb ...@@ -295,7 +295,7 @@ static void *mndBuildCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb
return pHead; return pHead;
} }
static void *mndBuildDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) { static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) {
SName name = {0}; SName name = {0};
tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
...@@ -323,14 +323,6 @@ static void *mndBuildDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, ...@@ -323,14 +323,6 @@ static void *mndBuildDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb,
} }
static int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) { static int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
pCreate->numOfColumns = htonl(pCreate->numOfColumns);
pCreate->numOfTags = htonl(pCreate->numOfTags);
int32_t totalCols = pCreate->numOfColumns + pCreate->numOfTags;
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pCreate->pSchemas[i];
pSchema->bytes = htonl(pSchema->bytes);
}
if (pCreate->igExists < 0 || pCreate->igExists > 1) { if (pCreate->igExists < 0 || pCreate->igExists > 1) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION; terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1; return -1;
...@@ -346,18 +338,39 @@ static int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) { ...@@ -346,18 +338,39 @@ static int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
return -1; return -1;
} }
int32_t maxColId = (TSDB_MAX_COLUMNS + TSDB_MAX_TAGS); SField *pField = taosArrayGet(pCreate->pColumns, 0) ;
for (int32_t i = 0; i < totalCols; ++i) { if (pField->type != TSDB_DATA_TYPE_TIMESTAMP) {
SSchema *pSchema = &pCreate->pSchemas[i]; terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
if (pSchema->type < 0) { return -1;
}
for (int32_t i = 0; i < pCreate->numOfColumns; ++i) {
SField *pField = taosArrayGet(pCreate->pColumns, i);
if (pField->type < 0) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION; terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1; return -1;
} }
if (pSchema->bytes <= 0) { if (pField->bytes <= 0) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION; terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1; return -1;
} }
if (pSchema->name[0] == 0) { if (pField->name[0] == 0) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
}
for (int32_t i = 0; i < pCreate->numOfTags; ++i) {
SField *pField = taosArrayGet(pCreate->pTags, i);
if (pField->type < 0) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
if (pField->bytes <= 0) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
if (pField->name[0] == 0) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION; terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1; return -1;
} }
...@@ -404,7 +417,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -404,7 +417,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) continue; if (pVgroup->dbUid != pDb->uid) continue;
void *pReq = mndBuildCreateStbReq(pMnode, pVgroup, pStb, &contLen); void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen);
if (pReq == NULL) { if (pReq == NULL) {
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
...@@ -440,7 +453,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -440,7 +453,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
if (pVgroup->dbUid != pDb->uid) continue; if (pVgroup->dbUid != pDb->uid) continue;
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildDropStbReq(pMnode, pVgroup, pStb, &contLen); void *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen);
if (pReq == NULL) { if (pReq == NULL) {
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
...@@ -485,16 +498,23 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pReq, SMCreateStbReq *pCr ...@@ -485,16 +498,23 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pReq, SMCreateStbReq *pCr
return -1; return -1;
} }
memcpy(stbObj.pColumns, pCreate->pSchemas, stbObj.numOfColumns * sizeof(SSchema));
memcpy(stbObj.pTags, pCreate->pSchemas + stbObj.numOfColumns, stbObj.numOfTags * sizeof(SSchema));
for (int32_t i = 0; i < stbObj.numOfColumns; ++i) { for (int32_t i = 0; i < stbObj.numOfColumns; ++i) {
stbObj.pColumns[i].colId = stbObj.nextColId; SField *pField = taosArrayGet(pCreate->pColumns, i);
SSchema *pSchema = &stbObj.pColumns[i];
pSchema->type = pField->type;
pSchema->bytes = pField->bytes;
memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
pSchema->colId = stbObj.nextColId;
stbObj.nextColId++; stbObj.nextColId++;
} }
for (int32_t i = 0; i < stbObj.numOfTags; ++i) { for (int32_t i = 0; i < stbObj.numOfTags; ++i) {
stbObj.pTags[i].colId = stbObj.nextColId; SField *pField = taosArrayGet(pCreate->pTags, i);
SSchema *pSchema = &stbObj.pTags[i];
pSchema->type = pField->type;
pSchema->bytes = pField->bytes;
memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
pSchema->colId = stbObj.nextColId;
stbObj.nextColId++; stbObj.nextColId++;
} }
...@@ -519,57 +539,60 @@ CREATE_STB_OVER: ...@@ -519,57 +539,60 @@ CREATE_STB_OVER:
} }
static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) { static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode; SMnode *pMnode = pReq->pMnode;
SMCreateStbReq *pCreate = pReq->rpcMsg.pCont; int32_t code = -1;
SStbObj *pTopicStb = NULL;
SStbObj *pStb = NULL;
SDbObj *pDb = NULL;
SMCreateStbReq createReq = {0};
mDebug("stb:%s, start to create", pCreate->name); if (tDeserializeSMCreateStbReq(pReq->rpcMsg.pCont, &createReq) == NULL) goto CREATE_STB_OVER;
if (mndCheckCreateStbReq(pCreate) != 0) { mDebug("stb:%s, start to create", createReq.name);
mError("stb:%s, failed to create since %s", pCreate->name, terrstr()); if (mndCheckCreateStbReq(&createReq) != 0) goto CREATE_STB_OVER;
return -1;
}
SStbObj *pStb = mndAcquireStb(pMnode, pCreate->name); pStb = mndAcquireStb(pMnode, createReq.name);
if (pStb != NULL) { if (pStb != NULL) {
mndReleaseStb(pMnode, pStb); if (createReq.igExists) {
if (pCreate->igExists) { mDebug("stb:%s, already exist, ignore exist is set", createReq.name);
mDebug("stb:%s, already exist, ignore exist is set", pCreate->name); code = 0;
return 0; goto CREATE_STB_OVER;
} else { } else {
terrno = TSDB_CODE_MND_STB_ALREADY_EXIST; terrno = TSDB_CODE_MND_STB_ALREADY_EXIST;
mError("stb:%s, failed to create since %s", pCreate->name, terrstr()); goto CREATE_STB_OVER;
return -1;
} }
} else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) { } else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) {
mError("stb:%s, failed to create since %s", pCreate->name, terrstr()); goto CREATE_STB_OVER;
return -1;
} }
// topic should have different name with stb pTopicStb = mndAcquireStb(pMnode, createReq.name);
SStbObj *pTopicStb = mndAcquireStb(pMnode, pCreate->name);
if (pTopicStb != NULL) { if (pTopicStb != NULL) {
mndReleaseStb(pMnode, pTopicStb);
terrno = TSDB_CODE_MND_NAME_CONFLICT_WITH_TOPIC; terrno = TSDB_CODE_MND_NAME_CONFLICT_WITH_TOPIC;
mError("stb:%s, failed to create since %s", pCreate->name, terrstr()); goto CREATE_STB_OVER;
return -1;
} }
SDbObj *pDb = mndAcquireDbByStb(pMnode, pCreate->name); pDb = mndAcquireDbByStb(pMnode, createReq.name);
if (pDb == NULL) { if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED; terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
mError("stb:%s, failed to create since %s", pCreate->name, terrstr()); goto CREATE_STB_OVER;
return -1;
} }
int32_t code = mndCreateStb(pMnode, pReq, pCreate, pDb); code = mndCreateStb(pMnode, pReq, &createReq, pDb);
mndReleaseDb(pMnode, pDb);
CREATE_STB_OVER:
if (code != 0) { if (code != 0) {
mError("stb:%s, failed to create since %s", pCreate->name, terrstr()); mError("stb:%s, failed to create since %s", createReq.name, terrstr());
return -1; } else {
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
return TSDB_CODE_MND_ACTION_IN_PROGRESS; mndReleaseStb(pMnode, pStb);
mndReleaseStb(pMnode, pTopicStb);
mndReleaseDb(pMnode, pDb);
taosArrayClear(createReq.pColumns);
taosArrayClear(createReq.pTags);
return code;
} }
static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp) { static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp) {
...@@ -891,7 +914,7 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -891,7 +914,7 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
if (pIter == NULL) break; if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) continue; if (pVgroup->dbUid != pDb->uid) continue;
void *pReq = mndBuildCreateStbReq(pMnode, pVgroup, pStb, &contLen); void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen);
if (pReq == NULL) { if (pReq == NULL) {
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
...@@ -1049,7 +1072,7 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj * ...@@ -1049,7 +1072,7 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
if (pVgroup->dbUid != pDb->uid) continue; if (pVgroup->dbUid != pDb->uid) continue;
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildDropStbReq(pMnode, pVgroup, pStb, &contLen); void *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen);
if (pReq == NULL) { if (pReq == NULL) {
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
...@@ -1095,28 +1118,30 @@ DROP_STB_OVER: ...@@ -1095,28 +1118,30 @@ DROP_STB_OVER:
} }
static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq) { static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode; SMnode *pMnode = pReq->pMnode;
SMDropStbReq *pDrop = pReq->rpcMsg.pCont;
SMDropStbReq dropReq = {0};
tDeserializeSMDropStbReq(pReq->rpcMsg.pCont, &dropReq);
mDebug("stb:%s, start to drop", pDrop->name); mDebug("stb:%s, start to drop", dropReq.name);
SStbObj *pStb = mndAcquireStb(pMnode, pDrop->name); SStbObj *pStb = mndAcquireStb(pMnode, dropReq.name);
if (pStb == NULL) { if (pStb == NULL) {
if (pDrop->igNotExists) { if (dropReq.igNotExists) {
mDebug("stb:%s, not exist, ignore not exist is set", pDrop->name); mDebug("stb:%s, not exist, ignore not exist is set", dropReq.name);
return 0; return 0;
} else { } else {
terrno = TSDB_CODE_MND_STB_NOT_EXIST; terrno = TSDB_CODE_MND_STB_NOT_EXIST;
mError("stb:%s, failed to drop since %s", pDrop->name, terrstr()); mError("stb:%s, failed to drop since %s", dropReq.name, terrstr());
return -1; return -1;
} }
} }
SDbObj *pDb = mndAcquireDbByStb(pMnode, pDrop->name); SDbObj *pDb = mndAcquireDbByStb(pMnode, dropReq.name);
if (pDb == NULL) { if (pDb == NULL) {
mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pStb);
terrno = TSDB_CODE_MND_DB_NOT_SELECTED; terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
mError("stb:%s, failed to drop since %s", pDrop->name, terrstr()); mError("stb:%s, failed to drop since %s", dropReq.name, terrstr());
return -1; return -1;
} }
...@@ -1125,7 +1150,7 @@ static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq) { ...@@ -1125,7 +1150,7 @@ static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq) {
mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pStb);
if (code != 0) { if (code != 0) {
mError("stb:%s, failed to drop since %s", pDrop->name, terrstr()); mError("stb:%s, failed to drop since %s", dropReq.name, terrstr());
return -1; return -1;
} }
......
...@@ -22,19 +22,19 @@ class MndTestStb : public ::testing::Test { ...@@ -22,19 +22,19 @@ class MndTestStb : public ::testing::Test {
void SetUp() override {} void SetUp() override {}
void TearDown() override {} void TearDown() override {}
SCreateDbReq* BuildCreateDbReq(const char* dbname, int32_t* pContLen); SCreateDbReq* BuildCreateDbReq(const char* dbname, int32_t* pContLen);
SDropDbReq* BuildDropDbReq(const char* dbname, int32_t* pContLen); SDropDbReq* BuildDropDbReq(const char* dbname, int32_t* pContLen);
SMCreateStbReq* BuildCreateStbReq(const char* stbname, int32_t* pContLen); void* BuildCreateStbReq(const char* stbname, int32_t* pContLen);
SMAltertbReq* BuildAlterStbAddTagReq(const char* stbname, const char* tagname, int32_t* pContLen); SMAltertbReq* BuildAlterStbAddTagReq(const char* stbname, const char* tagname, int32_t* pContLen);
SMAltertbReq* BuildAlterStbDropTagReq(const char* stbname, const char* tagname, int32_t* pContLen); SMAltertbReq* BuildAlterStbDropTagReq(const char* stbname, const char* tagname, int32_t* pContLen);
SMAltertbReq* BuildAlterStbUpdateTagNameReq(const char* stbname, const char* tagname, const char* newtagname, SMAltertbReq* BuildAlterStbUpdateTagNameReq(const char* stbname, const char* tagname, const char* newtagname,
int32_t* pContLen); int32_t* pContLen);
SMAltertbReq* BuildAlterStbUpdateTagBytesReq(const char* stbname, const char* tagname, int32_t bytes, SMAltertbReq* BuildAlterStbUpdateTagBytesReq(const char* stbname, const char* tagname, int32_t bytes,
int32_t* pContLen); int32_t* pContLen);
SMAltertbReq* BuildAlterStbAddColumnReq(const char* stbname, const char* colname, int32_t* pContLen); SMAltertbReq* BuildAlterStbAddColumnReq(const char* stbname, const char* colname, int32_t* pContLen);
SMAltertbReq* BuildAlterStbDropColumnReq(const char* stbname, const char* colname, int32_t* pContLen); SMAltertbReq* BuildAlterStbDropColumnReq(const char* stbname, const char* colname, int32_t* pContLen);
SMAltertbReq* BuildAlterStbUpdateColumnBytesReq(const char* stbname, const char* colname, int32_t bytes, SMAltertbReq* BuildAlterStbUpdateColumnBytesReq(const char* stbname, const char* colname, int32_t bytes,
int32_t* pContLen); int32_t* pContLen);
}; };
Testbase MndTestStb::test; Testbase MndTestStb::test;
...@@ -78,53 +78,62 @@ SDropDbReq* MndTestStb::BuildDropDbReq(const char* dbname, int32_t* pContLen) { ...@@ -78,53 +78,62 @@ SDropDbReq* MndTestStb::BuildDropDbReq(const char* dbname, int32_t* pContLen) {
return pReq; return pReq;
} }
SMCreateStbReq* MndTestStb::BuildCreateStbReq(const char* stbname, int32_t* pContLen) { void* MndTestStb::BuildCreateStbReq(const char* stbname, int32_t* pContLen) {
int32_t cols = 2; SMCreateStbReq createReq = {0};
int32_t tags = 3; createReq.numOfColumns = 2;
int32_t contLen = (tags + cols) * sizeof(SSchema) + sizeof(SMCreateStbReq); createReq.numOfTags = 3;
createReq.igExists = 0;
SMCreateStbReq* pReq = (SMCreateStbReq*)rpcMallocCont(contLen); createReq.pColumns = taosArrayInit(createReq.numOfColumns, sizeof(SField));
strcpy(pReq->name, stbname); createReq.pTags = taosArrayInit(createReq.numOfTags, sizeof(SField));
pReq->numOfTags = htonl(tags); strcpy(createReq.name, stbname);
pReq->numOfColumns = htonl(cols);
{ {
SSchema* pSchema = &pReq->pSchemas[0]; SField field = {0};
pSchema->bytes = htonl(8); field.bytes = 8;
pSchema->type = TSDB_DATA_TYPE_TIMESTAMP; field.type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema->name, "ts"); strcpy(field.name, "ts");
taosArrayPush(createReq.pColumns, &field);
} }
{ {
SSchema* pSchema = &pReq->pSchemas[1]; SField field = {0};
pSchema->bytes = htonl(12); field.bytes = 12;
pSchema->type = TSDB_DATA_TYPE_BINARY; field.type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema->name, "col1"); strcpy(field.name, "col1");
taosArrayPush(createReq.pColumns, &field);
} }
{ {
SSchema* pSchema = &pReq->pSchemas[2]; SField field = {0};
pSchema->bytes = htonl(2); field.bytes = 2;
pSchema->type = TSDB_DATA_TYPE_TINYINT; field.type = TSDB_DATA_TYPE_TINYINT;
strcpy(pSchema->name, "tag1"); strcpy(field.name, "tag1");
taosArrayPush(createReq.pTags, &field);
} }
{ {
SSchema* pSchema = &pReq->pSchemas[3]; SField field = {0};
pSchema->bytes = htonl(8); field.bytes = 8;
pSchema->type = TSDB_DATA_TYPE_BIGINT; field.type = TSDB_DATA_TYPE_BIGINT;
strcpy(pSchema->name, "tag2"); strcpy(field.name, "tag2");
taosArrayPush(createReq.pTags, &field);
} }
{ {
SSchema* pSchema = &pReq->pSchemas[4]; SField field = {0};
pSchema->bytes = htonl(16); field.bytes = 16;
pSchema->type = TSDB_DATA_TYPE_BINARY; field.type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema->name, "tag3"); strcpy(field.name, "tag3");
taosArrayPush(createReq.pTags, &field);
} }
*pContLen = contLen; int32_t tlen = tSerializeSMCreateStbReq(NULL, &createReq);
return pReq; void* pHead = rpcMallocCont(tlen);
void* pBuf = pHead;
tSerializeSMCreateStbReq(&pBuf, &createReq);
*pContLen = tlen;
return pHead;
} }
SMAltertbReq* MndTestStb::BuildAlterStbAddTagReq(const char* stbname, const char* tagname, int32_t* pContLen) { SMAltertbReq* MndTestStb::BuildAlterStbAddTagReq(const char* stbname, const char* tagname, int32_t* pContLen) {
...@@ -260,9 +269,9 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) { ...@@ -260,9 +269,9 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
} }
{ {
int32_t contLen = 0; int32_t contLen = 0;
SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen); void* pReq = BuildCreateStbReq(stbname, &contLen);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0); ASSERT_EQ(pRsp->code, 0);
} }
...@@ -379,12 +388,15 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) { ...@@ -379,12 +388,15 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
} }
{ {
int32_t contLen = sizeof(SMDropStbReq); SMDropStbReq dropReq = {0};
strcpy(dropReq.name, stbname);
SMDropStbReq* pReq = (SMDropStbReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSMDropStbReq(NULL, &dropReq);
strcpy(pReq->name, stbname); void* pHead = rpcMallocCont(contLen);
void* pBuf = pHead;
tSerializeSMDropStbReq(&pBuf, &dropReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_STB, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_STB, pHead, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0); ASSERT_EQ(pRsp->code, 0);
} }
...@@ -418,8 +430,8 @@ TEST_F(MndTestStb, 02_Alter_Stb_AddTag) { ...@@ -418,8 +430,8 @@ TEST_F(MndTestStb, 02_Alter_Stb_AddTag) {
} }
{ {
SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen); void* pReq = BuildCreateStbReq(stbname, &contLen);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0); ASSERT_EQ(pRsp->code, 0);
} }
...@@ -483,8 +495,8 @@ TEST_F(MndTestStb, 03_Alter_Stb_DropTag) { ...@@ -483,8 +495,8 @@ TEST_F(MndTestStb, 03_Alter_Stb_DropTag) {
} }
{ {
SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen); void* pReq = BuildCreateStbReq(stbname, &contLen);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
ASSERT_EQ(pRsp->code, 0); ASSERT_EQ(pRsp->code, 0);
} }
...@@ -529,8 +541,8 @@ TEST_F(MndTestStb, 04_Alter_Stb_AlterTagName) { ...@@ -529,8 +541,8 @@ TEST_F(MndTestStb, 04_Alter_Stb_AlterTagName) {
} }
{ {
SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen); void* pReq = BuildCreateStbReq(stbname, &contLen);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
ASSERT_EQ(pRsp->code, 0); ASSERT_EQ(pRsp->code, 0);
} }
...@@ -598,8 +610,8 @@ TEST_F(MndTestStb, 05_Alter_Stb_AlterTagBytes) { ...@@ -598,8 +610,8 @@ TEST_F(MndTestStb, 05_Alter_Stb_AlterTagBytes) {
} }
{ {
SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen); void* pReq = BuildCreateStbReq(stbname, &contLen);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
ASSERT_EQ(pRsp->code, 0); ASSERT_EQ(pRsp->code, 0);
} }
...@@ -656,8 +668,8 @@ TEST_F(MndTestStb, 06_Alter_Stb_AddColumn) { ...@@ -656,8 +668,8 @@ TEST_F(MndTestStb, 06_Alter_Stb_AddColumn) {
} }
{ {
SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen); void* pReq = BuildCreateStbReq(stbname, &contLen);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0); ASSERT_EQ(pRsp->code, 0);
} }
...@@ -721,8 +733,8 @@ TEST_F(MndTestStb, 07_Alter_Stb_DropColumn) { ...@@ -721,8 +733,8 @@ TEST_F(MndTestStb, 07_Alter_Stb_DropColumn) {
} }
{ {
SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen); void* pReq = BuildCreateStbReq(stbname, &contLen);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
ASSERT_EQ(pRsp->code, 0); ASSERT_EQ(pRsp->code, 0);
} }
...@@ -786,8 +798,8 @@ TEST_F(MndTestStb, 08_Alter_Stb_AlterTagBytes) { ...@@ -786,8 +798,8 @@ TEST_F(MndTestStb, 08_Alter_Stb_AlterTagBytes) {
} }
{ {
SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen); void* pReq = BuildCreateStbReq(stbname, &contLen);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
ASSERT_EQ(pRsp->code, 0); ASSERT_EQ(pRsp->code, 0);
} }
......
...@@ -10,8 +10,8 @@ SCreateAcctReq* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, in ...@@ -10,8 +10,8 @@ SCreateAcctReq* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, in
SDropUserReq* buildDropUserMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen); SDropUserReq* buildDropUserMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
SShowReq* buildShowMsg(SShowInfo* pShowInfo, SParseContext* pParseCtx, SMsgBuf* pMsgBuf); SShowReq* buildShowMsg(SShowInfo* pShowInfo, SParseContext* pParseCtx, SMsgBuf* pMsgBuf);
SCreateDbReq* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseContext *pCtx, SMsgBuf* pMsgBuf); SCreateDbReq* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseContext *pCtx, SMsgBuf* pMsgBuf);
SMCreateStbReq* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf); char* buildCreateStbReq(SCreateTableSql* pCreateTableSql, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf);
SMDropStbReq* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf); char* buildDropStableReq(SSqlInfo* pInfo, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf);
SCreateDnodeReq *buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf); SCreateDnodeReq *buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf);
SDropDnodeReq *buildDropDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf); SDropDnodeReq *buildDropDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf);
......
...@@ -249,112 +249,64 @@ SCreateDbReq* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseContext *pCtx ...@@ -249,112 +249,64 @@ SCreateDbReq* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseContext *pCtx
return pCreateMsg; return pCreateMsg;
} }
SMCreateStbReq* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf) { char* buildCreateStbReq(SCreateTableSql* pCreateTableSql, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf) {
SSchema* pSchema; SMCreateStbReq createReq = {0};
createReq.igExists = pCreateTableSql->existCheck ? 1 : 0;
int32_t numOfTags = 0; createReq.pColumns = pCreateTableSql->colInfo.pColumns;
int32_t numOfCols = (int32_t) taosArrayGetSize(pCreateTableSql->colInfo.pColumns); createReq.pTags = pCreateTableSql->colInfo.pTagColumns;
if (pCreateTableSql->colInfo.pTagColumns != NULL) { createReq.numOfColumns = (int32_t)taosArrayGetSize(pCreateTableSql->colInfo.pColumns);
numOfTags = (int32_t) taosArrayGetSize(pCreateTableSql->colInfo.pTagColumns); createReq.numOfTags = (int32_t)taosArrayGetSize(pCreateTableSql->colInfo.pTagColumns);
SName n = {0};
if (createSName(&n, &pCreateTableSql->name, pParseCtx, pMsgBuf) != 0) {
return NULL;
} }
SMCreateStbReq* pCreateStbMsg = (SMCreateStbReq*)calloc(1, sizeof(SMCreateStbReq) + (numOfCols + numOfTags) * sizeof(SSchema)); if (tNameExtractFullName(&n, createReq.name) != 0) {
if (pCreateStbMsg == NULL) { buildInvalidOperationMsg(pMsgBuf, "invalid table name or database not specified");
return NULL; return NULL;
} }
char* pMsg = NULL; int32_t tlen = tSerializeSMCreateStbReq(NULL, &createReq);
#if 0 void* req = malloc(tlen);
int32_t tableType = pCreateTableSql->type; if (req == NULL) {
if (tableType != TSQL_CREATE_TABLE && tableType != TSQL_CREATE_STABLE) { // create by using super table, tags value terrno = TSDB_CODE_OUT_OF_MEMORY;
SArray* list = pInfo->pCreateTableInfo->childTableInfo; return NULL;
}
int32_t numOfTables = (int32_t)taosArrayGetSize(list);
pCreateStbMsg->numOfTables = htonl(numOfTables);
pMsg = (char*)pCreateMsg;
for (int32_t i = 0; i < numOfTables; ++i) {
SCreateTableMsg* pCreate = (SCreateTableMsg*)pMsg;
pCreate->numOfColumns = htons(pCmd->numOfCols);
pCreate->numOfTags = htons(pCmd->count);
pMsg += sizeof(SCreateTableMsg);
SCreatedTableInfo* p = taosArrayGet(list, i);
strcpy(pCreate->tableName, p->fullname);
pCreate->igExists = (p->igExist) ? 1 : 0;
// use dbinfo from table id without modifying current db info
pMsg = serializeTagData(&p->tagdata, pMsg);
int32_t len = (int32_t)(pMsg - (char*)pCreate);
pCreate->len = htonl(len);
}
} else {
#endif
// create (super) table
SName n = {0};
int32_t code = createSName(&n, &pCreateTableSql->name, pParseCtx, pMsgBuf);
if (code != 0) {
return NULL;
}
code = tNameExtractFullName(&n, pCreateStbMsg->name);
if (code != 0) {
buildInvalidOperationMsg(pMsgBuf, "invalid table name or database not specified");
return NULL;
}
pCreateStbMsg->igExists = pCreateTableSql->existCheck ? 1 : 0;
pCreateStbMsg->numOfColumns = htonl(numOfCols);
pCreateStbMsg->numOfTags = htonl(numOfTags);
pSchema = (SSchema*)pCreateStbMsg->pSchemas;
for (int i = 0; i < numOfCols; ++i) {
SField* pField = taosArrayGet(pCreateTableSql->colInfo.pColumns, i);
pSchema->type = pField->type;
pSchema->bytes = htonl(pField->bytes);
strcpy(pSchema->name, pField->name);
pSchema++;
}
for(int32_t i = 0; i < numOfTags; ++i) {
SField* pField = taosArrayGet(pCreateTableSql->colInfo.pTagColumns, i);
pSchema->type = pField->type;
pSchema->bytes = htonl(pField->bytes);
strcpy(pSchema->name, pField->name);
pSchema++;
}
pMsg = (char*)pSchema;
int32_t msgLen = (int32_t)(pMsg - (char*)pCreateStbMsg);
*len = msgLen;
return pCreateStbMsg; void* buf = req;
tSerializeSMCreateStbReq(&buf, &createReq);
*len = tlen;
return req;
} }
SMDropStbReq* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf) { char* buildDropStableReq(SSqlInfo* pInfo, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf) {
SToken* tableName = taosArrayGet(pInfo->pMiscInfo->a, 0); SToken* tableName = taosArrayGet(pInfo->pMiscInfo->a, 0);
SName name = {0}; SName name = {0};
int32_t code = createSName(&name, tableName, pParseCtx, pMsgBuf); int32_t code = createSName(&name, tableName, pParseCtx, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = buildInvalidOperationMsg(pMsgBuf, "invalid table name"); terrno = buildInvalidOperationMsg(pMsgBuf, "invalid table name");
return NULL; return NULL;
} }
SMDropStbReq *pDropTableMsg = (SMDropStbReq*) calloc(1, sizeof(SMDropStbReq)); SMDropStbReq dropReq = {0};
code = tNameExtractFullName(&name, dropReq.name);
code = tNameExtractFullName(&name, pDropTableMsg->name);
assert(code == TSDB_CODE_SUCCESS && name.type == TSDB_TABLE_NAME_T); assert(code == TSDB_CODE_SUCCESS && name.type == TSDB_TABLE_NAME_T);
dropReq.igNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0;
int32_t tlen = tSerializeSMDropStbReq(NULL, &dropReq);
void* req = malloc(tlen);
if (req == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pDropTableMsg->igNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0; void* buf = req;
*len = sizeof(SMDropStbReq); tSerializeSMDropStbReq(&buf, &dropReq);
return pDropTableMsg; *len = tlen;
return req;
} }
SCreateDnodeReq *buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf) { SCreateDnodeReq *buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf) {
......
...@@ -924,13 +924,13 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch ...@@ -924,13 +924,13 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch
goto _error; goto _error;
} }
pDcl->pMsg = (char*)buildCreateStbMsg(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf); pDcl->pMsg = buildCreateStbReq(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf);
pDcl->msgType = TDMT_MND_CREATE_STB; pDcl->msgType = TDMT_MND_CREATE_STB;
break; break;
} }
case TSDB_SQL_DROP_TABLE: { case TSDB_SQL_DROP_TABLE: {
pDcl->pMsg = (char*)buildDropStableMsg(pInfo, &pDcl->msgLen, pCtx, pMsgBuf); pDcl->pMsg = buildDropStableReq(pInfo, &pDcl->msgLen, pCtx, pMsgBuf);
if (pDcl->pMsg == NULL) { if (pDcl->pMsg == NULL) {
goto _error; goto _error;
} }
......
...@@ -35,6 +35,7 @@ void* rpcOpen(const SRpcInit* pInit) { ...@@ -35,6 +35,7 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->connType = pInit->connType; pRpc->connType = pInit->connType;
pRpc->idleTime = pInit->idleTime; pRpc->idleTime = pInit->idleTime;
pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
pRpc->parent = pInit->parent;
return pRpc; return pRpc;
} }
......
...@@ -124,10 +124,10 @@ static void clientHandleResp(SCliConn* conn) { ...@@ -124,10 +124,10 @@ static void clientHandleResp(SCliConn* conn) {
rpcMsg.msgType = pHead->msgType; rpcMsg.msgType = pHead->msgType;
rpcMsg.ahandle = pCtx->ahandle; rpcMsg.ahandle = pCtx->ahandle;
if (pCtx->pSem == NULL) { if (pCtx->pSem == NULL) {
tDebug("conn %p handle resp", conn); tDebug("client conn %p handle resp, ", conn);
(pRpc->cfp)(NULL, &rpcMsg, NULL); (pRpc->cfp)(pRpc->parent, &rpcMsg, NULL);
} else { } else {
tDebug("conn %p handle resp", conn); tDebug("client conn(sync) %p handle resp", conn);
memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg)); memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg));
tsem_post(pCtx->pSem); tsem_post(pCtx->pSem);
} }
...@@ -154,7 +154,7 @@ static void clientHandleExcept(SCliConn* pConn) { ...@@ -154,7 +154,7 @@ static void clientHandleExcept(SCliConn* pConn) {
clientConnDestroy(pConn, true); clientConnDestroy(pConn, true);
return; return;
} }
tDebug("conn %p start to destroy", pConn); tDebug("client conn %p start to destroy", pConn);
SCliMsg* pMsg = pConn->data; SCliMsg* pMsg = pConn->data;
destroyUserdata(&pMsg->msg); destroyUserdata(&pMsg->msg);
...@@ -166,7 +166,7 @@ static void clientHandleExcept(SCliConn* pConn) { ...@@ -166,7 +166,7 @@ static void clientHandleExcept(SCliConn* pConn) {
rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
if (pCtx->pSem == NULL) { if (pCtx->pSem == NULL) {
// SRpcInfo* pRpc = pMsg->ctx->pRpc; // SRpcInfo* pRpc = pMsg->ctx->pRpc;
(pCtx->pTransInst->cfp)(NULL, &rpcMsg, NULL); (pCtx->pTransInst->cfp)(pCtx->pTransInst->parent, &rpcMsg, NULL);
} else { } else {
memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg)); memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg));
// SRpcMsg rpcMsg // SRpcMsg rpcMsg
...@@ -184,7 +184,7 @@ static void clientTimeoutCb(uv_timer_t* handle) { ...@@ -184,7 +184,7 @@ static void clientTimeoutCb(uv_timer_t* handle) {
SCliThrdObj* pThrd = handle->data; SCliThrdObj* pThrd = handle->data;
SRpcInfo* pRpc = pThrd->pTransInst; SRpcInfo* pRpc = pThrd->pTransInst;
int64_t currentTime = pThrd->nextTimeout; int64_t currentTime = pThrd->nextTimeout;
tDebug("timeout, try to remove expire conn from conn pool"); tDebug("client conn timeout, try to remove expire conn from conn pool");
SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL); SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL);
while (p != NULL) { while (p != NULL) {
...@@ -253,7 +253,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { ...@@ -253,7 +253,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
tstrncpy(key, ip, strlen(ip)); tstrncpy(key, ip, strlen(ip));
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
tDebug("conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); tDebug("client conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst;
...@@ -294,10 +294,10 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf ...@@ -294,10 +294,10 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
pBuf->len += nread; pBuf->len += nread;
if (clientReadComplete(pBuf)) { if (clientReadComplete(pBuf)) {
uv_read_stop((uv_stream_t*)conn->stream); uv_read_stop((uv_stream_t*)conn->stream);
tDebug("conn %p read complete", conn); tDebug("client conn %p read complete", conn);
clientHandleResp(conn); clientHandleResp(conn);
} else { } else {
tDebug("conn %p read partial packet, continue to read", conn); tDebug("client conn %p read partial packet, continue to read", conn);
} }
return; return;
} }
...@@ -309,7 +309,7 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf ...@@ -309,7 +309,7 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
return; return;
} }
if (nread < 0 || nread == UV_EOF) { if (nread < 0 || nread == UV_EOF) {
tError("conn %p read error: %s", conn, uv_err_name(nread)); tError("client conn %p read error: %s", conn, uv_err_name(nread));
clientHandleExcept(conn); clientHandleExcept(conn);
} }
// tDebug("Read error %s\n", uv_err_name(nread)); // tDebug("Read error %s\n", uv_err_name(nread));
...@@ -320,9 +320,9 @@ static void clientConnDestroy(SCliConn* conn, bool clear) { ...@@ -320,9 +320,9 @@ static void clientConnDestroy(SCliConn* conn, bool clear) {
// //
conn->ref--; conn->ref--;
if (conn->ref == 0) { if (conn->ref == 0) {
tDebug("conn %p remove from conn pool", conn); tDebug("client conn %p remove from conn pool", conn);
QUEUE_REMOVE(&conn->conn); QUEUE_REMOVE(&conn->conn);
tDebug("conn %p remove from conn pool successfully", conn); tDebug("client conn %p remove from conn pool successfully", conn);
if (clear) { if (clear) {
uv_close((uv_handle_t*)conn->stream, clientDestroy); uv_close((uv_handle_t*)conn->stream, clientDestroy);
} }
...@@ -334,7 +334,7 @@ static void clientDestroy(uv_handle_t* handle) { ...@@ -334,7 +334,7 @@ static void clientDestroy(uv_handle_t* handle) {
free(conn->stream); free(conn->stream);
free(conn->writeReq); free(conn->writeReq);
tDebug("conn %p destroy successfully", conn); tDebug("client conn %p destroy successfully", conn);
free(conn); free(conn);
// clientConnDestroy(conn, false); // clientConnDestroy(conn, false);
...@@ -343,7 +343,7 @@ static void clientDestroy(uv_handle_t* handle) { ...@@ -343,7 +343,7 @@ static void clientDestroy(uv_handle_t* handle) {
static void clientWriteCb(uv_write_t* req, int status) { static void clientWriteCb(uv_write_t* req, int status) {
SCliConn* pConn = req->data; SCliConn* pConn = req->data;
if (status == 0) { if (status == 0) {
tDebug("conn %p data already was written out", pConn); tDebug("client conn %p data already was written out", pConn);
SCliMsg* pMsg = pConn->data; SCliMsg* pMsg = pConn->data;
if (pMsg == NULL) { if (pMsg == NULL) {
// handle // handle
...@@ -351,7 +351,7 @@ static void clientWriteCb(uv_write_t* req, int status) { ...@@ -351,7 +351,7 @@ static void clientWriteCb(uv_write_t* req, int status) {
} }
destroyUserdata(&pMsg->msg); destroyUserdata(&pMsg->msg);
} else { } else {
tError("conn %p failed to write: %s", pConn, uv_err_name(status)); tError("client conn %p failed to write: %s", pConn, uv_err_name(status));
clientHandleExcept(pConn); clientHandleExcept(pConn);
return; return;
} }
...@@ -370,7 +370,7 @@ static void clientWrite(SCliConn* pConn) { ...@@ -370,7 +370,7 @@ static void clientWrite(SCliConn* pConn) {
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
tDebug("conn %p data write out, msgType : %d, len: %d", pConn, pHead->msgType, msgLen); tDebug("client conn %p data write out, msgType : %s, len: %d", pConn, TMSG_INFO(pHead->msgType), msgLen);
uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb); uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb);
} }
static void clientConnCb(uv_connect_t* req, int status) { static void clientConnCb(uv_connect_t* req, int status) {
...@@ -378,11 +378,11 @@ static void clientConnCb(uv_connect_t* req, int status) { ...@@ -378,11 +378,11 @@ static void clientConnCb(uv_connect_t* req, int status) {
SCliConn* pConn = req->data; SCliConn* pConn = req->data;
if (status != 0) { if (status != 0) {
// tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status)); // tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status));
tError("conn %p failed to connect server: %s", pConn, uv_strerror(status)); tError("client conn %p failed to connect server: %s", pConn, uv_strerror(status));
clientHandleExcept(pConn); clientHandleExcept(pConn);
return; return;
} }
tDebug("conn %p create", pConn); tDebug("client conn %p create", pConn);
assert(pConn->stream == req->handle); assert(pConn->stream == req->handle);
clientWrite(pConn); clientWrite(pConn);
...@@ -400,14 +400,14 @@ static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -400,14 +400,14 @@ static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
uint64_t et = taosGetTimestampUs(); uint64_t et = taosGetTimestampUs();
uint64_t el = et - pMsg->st; uint64_t el = et - pMsg->st;
tDebug("msg tran time cost: %" PRIu64 "", el); tDebug("client msg tran time cost: %" PRIu64 "", el);
et = taosGetTimestampUs(); et = taosGetTimestampUs();
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
SCliConn* conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); SCliConn* conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
if (conn != NULL) { if (conn != NULL) {
// impl later // impl later
tDebug("conn %p get from conn pool", conn); tDebug("client get conn %p from pool", conn);
conn->data = pMsg; conn->data = pMsg;
conn->writeReq->data = conn; conn->writeReq->data = conn;
transDestroyBuffer(&conn->readBuf); transDestroyBuffer(&conn->readBuf);
......
...@@ -266,6 +266,7 @@ static void uvHandleReq(SSrvConn* pConn) { ...@@ -266,6 +266,7 @@ static void uvHandleReq(SSrvConn* pConn) {
transClearBuffer(&pConn->readBuf); transClearBuffer(&pConn->readBuf);
pConn->ref++; pConn->ref++;
tDebug("%s received on %p", TMSG_INFO(rpcMsg.msgType), pConn);
(*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL); (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
// uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
// auth // auth
...@@ -278,7 +279,7 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { ...@@ -278,7 +279,7 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
SConnBuffer* pBuf = &conn->readBuf; SConnBuffer* pBuf = &conn->readBuf;
if (nread > 0) { if (nread > 0) {
pBuf->len += nread; pBuf->len += nread;
tDebug("conn %p read summroy, total read: %d, current read: %d", conn, pBuf->len, (int)nread); tDebug("conn %p read summary, total read: %d, current read: %d", conn, pBuf->len, (int)nread);
if (readComplete(pBuf)) { if (readComplete(pBuf)) {
tDebug("conn %p alread read complete packet", conn); tDebug("conn %p alread read complete packet", conn);
uvHandleReq(conn); uvHandleReq(conn);
...@@ -717,6 +718,9 @@ void taosCloseServer(void* arg) { ...@@ -717,6 +718,9 @@ void taosCloseServer(void* arg) {
} }
void rpcSendResponse(const SRpcMsg* pMsg) { void rpcSendResponse(const SRpcMsg* pMsg) {
if (pMsg->handle == NULL) {
return;
}
SSrvConn* pConn = pMsg->handle; SSrvConn* pConn = pMsg->handle;
SWorkThrdObj* pThrd = pConn->hostThrd; SWorkThrdObj* pThrd = pConn->hostThrd;
......
...@@ -204,7 +204,12 @@ void* taosArrayGetLast(const SArray* pArray) { ...@@ -204,7 +204,12 @@ void* taosArrayGetLast(const SArray* pArray) {
return TARRAY_GET_ELEM(pArray, pArray->size - 1); return TARRAY_GET_ELEM(pArray, pArray->size - 1);
} }
size_t taosArrayGetSize(const SArray* pArray) { return pArray->size; } size_t taosArrayGetSize(const SArray* pArray) {
if (pArray == NULL) {
return 0;
}
return pArray->size;
}
void taosArraySetSize(SArray* pArray, size_t size) { void taosArraySetSize(SArray* pArray, size_t size) {
assert(size <= pArray->capacity); assert(size <= pArray->capacity);
...@@ -296,7 +301,7 @@ SArray* taosArrayDup(const SArray* pSrc) { ...@@ -296,7 +301,7 @@ SArray* taosArrayDup(const SArray* pSrc) {
} }
void taosArrayClear(SArray* pArray) { void taosArrayClear(SArray* pArray) {
assert( pArray != NULL ); if (pArray == NULL) return;
pArray->size = 0; pArray->size = 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册