提交 2adcce55 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into feature/db

......@@ -1280,8 +1280,9 @@ typedef struct {
#define STREAM_TRIGGER_WINDOW_CLOSE 2
typedef struct {
char name[TSDB_TOPIC_FNAME_LEN];
char outputSTbName[TSDB_TABLE_FNAME_LEN];
char name[TSDB_TABLE_FNAME_LEN];
char sourceDB[TSDB_DB_FNAME_LEN];
char targetStbFullName[TSDB_TABLE_FNAME_LEN];
int8_t igExists;
char* sql;
char* ast;
......
......@@ -331,7 +331,6 @@ int32_t sdbGetRawSoftVer(SSdbRaw *pRaw, int8_t *sver);
int32_t sdbGetRawTotalSize(SSdbRaw *pRaw);
SSdbRow *sdbAllocRow(int32_t objSize);
void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow, bool callFunc);
void *sdbGetRowObj(SSdbRow *pRow);
typedef struct SSdb {
......
......@@ -207,7 +207,7 @@ typedef struct SqlFunctionCtx {
struct SSDataBlock *pSrcBlock;
int32_t curBufPage;
char* udfName[TSDB_FUNC_NAME_LEN];
char udfName[TSDB_FUNC_NAME_LEN];
} SqlFunctionCtx;
enum {
......
......@@ -162,6 +162,7 @@ EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin
int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet);
int32_t fmGetUdafExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet);
#ifdef __cplusplus
}
......
......@@ -140,7 +140,7 @@ typedef int32_t (*TUdfFreeUdfColumnFunc)(SUdfColumn* column);
typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol);
typedef int32_t (*TUdfAggStartFunc)(SUdfInterBuf *buf);
typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock* block, SUdfInterBuf *interBuf);
typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf);
typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf* buf, SUdfInterBuf *resultData);
......
......@@ -132,6 +132,7 @@ extern const int32_t TYPE_BYTES[15];
#define TSDB_PERFS_TABLE_CONSUMERS "consumers"
#define TSDB_PERFS_TABLE_SUBSCRIPTIONS "subscriptions"
#define TSDB_PERFS_TABLE_OFFSETS "offsets"
#define TSDB_PERFS_TABLE_STREAMS "streams"
#define TSDB_INDEX_TYPE_SMA "SMA"
#define TSDB_INDEX_TYPE_FULLTEXT "FULLTEXT"
......
......@@ -188,7 +188,6 @@ typedef struct SRequestSendRecvBody {
typedef struct {
int8_t resType;
int32_t code;
char topic[TSDB_TOPIC_FNAME_LEN];
int32_t vgId;
SSchemaWrapper schema;
......
......@@ -739,7 +739,7 @@ TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbNa
.sql = (char*)sql,
};
tNameExtractFullName(&name, req.name);
strcpy(req.outputSTbName, tbName);
strcpy(req.targetStbFullName, tbName);
int tlen = tSerializeSCMCreateStreamReq(NULL, 0, &req);
void* buf = taosMemoryMalloc(tlen);
......
......@@ -3571,10 +3571,12 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->outputSTbName) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->targetStbFullName) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
if (tEncodeI32(&encoder, sqlLen) < 0) return -1;
if (tEncodeI32(&encoder, astLen) < 0) return -1;
if (tEncodeI8(&encoder, pReq->triggerType) < 0) return -1;
if (tEncodeI64(&encoder, pReq->watermark) < 0) return -1;
if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
if (astLen > 0 && tEncodeCStr(&encoder, pReq->ast) < 0) return -1;
......@@ -3594,7 +3596,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->outputSTbName) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->targetStbFullName) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
if (tDecodeI32(&decoder, &sqlLen) < 0) return -1;
if (tDecodeI32(&decoder, &astLen) < 0) return -1;
......@@ -3808,4 +3810,4 @@ int tDecodeSVCreateTbRsp(SCoder *pCoder, SVCreateTbRsp *pRsp) {
tEndDecode(pCoder);
return 0;
}
\ No newline at end of file
}
......@@ -578,8 +578,9 @@ typedef struct {
typedef struct {
char name[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
char outputSTbName[TSDB_TABLE_FNAME_LEN];
char sourceDb[TSDB_DB_FNAME_LEN];
char targetDb[TSDB_DB_FNAME_LEN];
char targetSTbName[TSDB_TABLE_FNAME_LEN];
int64_t createTime;
int64_t updateTime;
int64_t uid;
......
......@@ -812,6 +812,7 @@ static int32_t mndRetrieveConsumer(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock
colDataAppend(pColInfo, numOfRows, (const char *)status, false);
// subscribed topics
// TODO: split into multiple rows
char topics[TSDB_SHOW_LIST_LEN + VARSTR_HEADER_SIZE] = {0};
char *showStr = taosShowStrArray(pConsumer->assignedTopics);
tstrncpy(varDataVal(topics), showStr, TSDB_SHOW_LIST_LEN);
......
......@@ -410,7 +410,7 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
int32_t sz = 0;
/*int32_t outputNameSz = 0;*/
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->db) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
......@@ -456,7 +456,7 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->db) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
......
......@@ -123,14 +123,6 @@ static const SInfosTableSchema userStbsSchema[] = {
{.name = "table_comment", .bytes = 1024 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
};
static const SInfosTableSchema userStreamsSchema[] = {
{.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "user_name", .bytes = 23, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "dest_table", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "sql", .bytes = 1024, .type = TSDB_DATA_TYPE_VARCHAR},
};
static const SInfosTableSchema userTblsSchema[] = {
{.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
......@@ -258,7 +250,6 @@ static const SInfosTableMeta infosMeta[] = {
{TSDB_INS_TABLE_USER_FUNCTIONS, userFuncSchema, tListLen(userFuncSchema)},
{TSDB_INS_TABLE_USER_INDEXES, userIdxSchema, tListLen(userIdxSchema)},
{TSDB_INS_TABLE_USER_STABLES, userStbsSchema, tListLen(userStbsSchema)},
{TSDB_INS_TABLE_USER_STREAMS, userStreamsSchema, tListLen(userStreamsSchema)},
{TSDB_INS_TABLE_USER_TABLES, userTblsSchema, tListLen(userTblsSchema)},
{TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, userTblDistSchema, tListLen(userTblDistSchema)},
{TSDB_INS_TABLE_USER_USERS, userUsersSchema, tListLen(userUsersSchema)},
......
......@@ -76,6 +76,18 @@ static const SPerfsTableSchema offsetSchema[] = {
{.name = "skip_log_cnt", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
};
static const SPerfsTableSchema streamSchema[] = {
{.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "source_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "target_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "target_table", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
{.name = "trigger", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
};
static const SPerfsTableMeta perfsMeta[] = {
{TSDB_PERFS_TABLE_CONNECTIONS, connectionsSchema, tListLen(connectionsSchema)},
{TSDB_PERFS_TABLE_QUERIES, queriesSchema, tListLen(queriesSchema)},
......@@ -83,6 +95,7 @@ static const SPerfsTableMeta perfsMeta[] = {
{TSDB_PERFS_TABLE_CONSUMERS, consumerSchema, tListLen(consumerSchema)},
{TSDB_PERFS_TABLE_SUBSCRIPTIONS, subscriptionSchema, tListLen(subscriptionSchema)},
{TSDB_PERFS_TABLE_OFFSETS, offsetSchema, tListLen(offsetSchema)},
{TSDB_PERFS_TABLE_STREAMS, streamSchema, tListLen(streamSchema)},
};
// connection/application/
......
......@@ -382,7 +382,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;
SDbObj* pDb = mndAcquireDb(pMnode, pStream->db);
SDbObj* pDb = mndAcquireDb(pMnode, pStream->sourceDb);
ASSERT(pDb);
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
sdbRelease(pSdb, pDb);
......
......@@ -40,7 +40,7 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq);
static int32_t mndProcessMDropSmaReq(SNodeMsg *pReq);
static int32_t mndProcessVCreateSmaRsp(SNodeMsg *pRsp);
static int32_t mndProcessVDropSmaRsp(SNodeMsg *pRsp);
static int32_t mndRetrieveSma(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows);
static int32_t mndRetrieveSma(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextSma(SMnode *pMnode, void *pIter);
int32_t mndInitSma(SMnode *pMnode) {
......@@ -406,7 +406,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre
SStreamObj streamObj = {0};
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
tstrncpy(streamObj.db, pDb->name, TSDB_DB_FNAME_LEN);
tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN);
streamObj.createTime = taosGetTimestampMs();
streamObj.updateTime = streamObj.createTime;
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
......@@ -686,9 +686,9 @@ _OVER:
return code;
}
int32_t mndProcessGetSmaReq(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp *rsp, bool *exist) {
int32_t code = -1;
SSmaObj *pSma = NULL;
int32_t mndProcessGetSmaReq(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp *rsp, bool *exist) {
int32_t code = -1;
SSmaObj *pSma = NULL;
pSma = mndAcquireSma(pMnode, indexReq->indexFName);
if (pSma == NULL) {
......@@ -701,13 +701,14 @@ int32_t mndProcessGetSmaReq(SMnode *pMnode, SUserIndexReq *indexReq, SUserI
strcpy(rsp->indexType, TSDB_INDEX_TYPE_SMA);
SNodeList *pList = NULL;
int32_t extOffset = 0;
int32_t extOffset = 0;
code = nodesStringToList(pSma->expr, &pList);
if (0 == code) {
SNode *node = NULL;
FOREACH(node, pList) {
SFunctionNode *pFunc = (SFunctionNode *)node;
extOffset += snprintf(rsp->indexExts + extOffset, sizeof(rsp->indexExts) - extOffset - 1, "%s%s", (extOffset ? ",":""), pFunc->functionName);
extOffset += snprintf(rsp->indexExts + extOffset, sizeof(rsp->indexExts) - extOffset - 1, "%s%s",
(extOffset ? "," : ""), pFunc->functionName);
}
*exist = true;
......@@ -718,13 +719,12 @@ int32_t mndProcessGetSmaReq(SMnode *pMnode, SUserIndexReq *indexReq, SUserI
return code;
}
static int32_t mndProcessVDropSmaRsp(SNodeMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndRetrieveSma(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlock, int32_t rows) {
static int32_t mndRetrieveSma(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
......@@ -758,8 +758,8 @@ static int32_t mndRetrieveSma(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pBlo
char n1[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(n1, (char *)tNameGetTableName(&stbName));
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*) n, false);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)n, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pSma->createdTime, false);
......
......@@ -40,7 +40,7 @@ static int32_t mndProcessTaskDeployInternalRsp(SNodeMsg *pRsp);
/*static int32_t mndProcessDropStreamInRsp(SNodeMsg *pRsp);*/
static int32_t mndProcessStreamMetaReq(SNodeMsg *pReq);
static int32_t mndGetStreamMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveStream(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndRetrieveStream(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter);
int32_t mndInitStream(SMnode *pMnode) {
......@@ -58,8 +58,8 @@ int32_t mndInitStream(SMnode *pMnode) {
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveStream);
/*mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextStream);*/
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
return sdbSetTable(pMnode->pSdb, table);
}
......@@ -294,8 +294,8 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
mDebug("stream:%s to create", pCreate->name);
SStreamObj streamObj = {0};
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
tstrncpy(streamObj.db, pDb->name, TSDB_DB_FNAME_LEN);
tstrncpy(streamObj.outputSTbName, pCreate->outputSTbName, TSDB_TABLE_FNAME_LEN);
tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN);
tstrncpy(streamObj.targetSTbName, pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
streamObj.createTime = taosGetTimestampMs();
streamObj.updateTime = streamObj.createTime;
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
......@@ -424,58 +424,55 @@ static int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfS
return 0;
}
static int32_t mndRetrieveStream(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
static int32_t mndRetrieveStream(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SStreamObj *pStream = NULL;
int32_t cols = 0;
char *pWrite;
char prefix[TSDB_DB_FNAME_LEN] = {0};
SDbObj *pDb = mndAcquireDb(pMnode, pShow->db);
if (pDb == NULL) return 0;
tstrncpy(prefix, pShow->db, TSDB_DB_FNAME_LEN);
strcat(prefix, TS_PATH_DELIMITER);
int32_t prefixLen = (int32_t)strlen(prefix);
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pStream);
if (pShow->pIter == NULL) break;
if (pStream->dbUid != pDb->uid) {
if (strncmp(pStream->name, prefix, prefixLen) != 0) {
mError("Inconsistent stream data, name:%s, db:%s, dbUid:%" PRIu64, pStream->name, pDb->name, pDb->uid);
}
SColumnInfoData *pColInfo;
SName n;
int32_t cols = 0;
sdbRelease(pSdb, pStream);
continue;
}
char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&n, pStream->name, T_NAME_ACCT | T_NAME_DB);
tNameGetDbName(&n, varDataVal(streamName));
varDataSetLen(streamName, strlen(varDataVal(streamName)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)streamName, false);
cols = 0;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->createTime, false);
char streamName[TSDB_TABLE_NAME_LEN] = {0};
tstrncpy(streamName, pStream->name + prefixLen, TSDB_TABLE_NAME_LEN);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, streamName);
cols++;
char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
tstrncpy(&sql[VARSTR_HEADER_SIZE], pStream->sql, TSDB_SHOW_SQL_LEN);
varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE]));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pStream->createTime;
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->status, true);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pStream->sql, pShow->bytes[cols]);
cols++;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->sourceDb, true);
numOfRows++;
sdbRelease(pSdb, pStream);
}
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->targetDb, true);
mndReleaseDb(pMnode, pDb);
pShow->numOfRows += numOfRows;
return numOfRows;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->targetSTbName, true);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->waterMark, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pStream->trigger, false);
}
return 0;
}
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
......
......@@ -309,9 +309,6 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
newConsumerEp.consumerId = consumerId;
newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
/*SMqConsumer* pTestNew = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));*/
/*ASSERT(pTestNew->consumerId == consumerId);*/
/*ASSERT(pTestNew->vgs == newConsumerEp.vgs);*/
taosArrayPush(pOutput->newConsumers, &consumerId);
}
}
......@@ -369,7 +366,13 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
}
}
// 8. generate logs
// 8. TODO generate logs
mInfo("rebalance calculation completed, rebalanced vg:");
for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
mInfo("vg: %d moved from consumer %ld to consumer %ld", pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId,
pOutputRebVg->newConsumerId);
}
// 9. clear
taosHashCleanup(pHash);
......@@ -447,7 +450,9 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebO
goto REB_FAIL;
}
}
// 4. commit log: modification log
// 4. TODO commit log: modification log
// 5. execution
if (mndTransPrepare(pMnode, pTrans) != 0) goto REB_FAIL;
mndTransDrop(pTrans);
......
......@@ -39,14 +39,16 @@ static int32_t mndRetrieveUsers(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *p
static void mndCancelGetNextUser(SMnode *pMnode, void *pIter);
int32_t mndInitUser(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_USER,
.keyType = SDB_KEY_BINARY,
.deployFp = (SdbDeployFp)mndCreateDefaultUsers,
.encodeFp = (SdbEncodeFp)mndUserActionEncode,
.decodeFp = (SdbDecodeFp)mndUserActionDecode,
.insertFp = (SdbInsertFp)mndUserActionInsert,
.updateFp = (SdbUpdateFp)mndUserActionUpdate,
.deleteFp = (SdbDeleteFp)mndUserActionDelete};
SSdbTable table = {
.sdbType = SDB_USER,
.keyType = SDB_KEY_BINARY,
.deployFp = (SdbDeployFp)mndCreateDefaultUsers,
.encodeFp = (SdbEncodeFp)mndUserActionEncode,
.decodeFp = (SdbDecodeFp)mndUserActionDecode,
.insertFp = (SdbInsertFp)mndUserActionInsert,
.updateFp = (SdbUpdateFp)mndUserActionUpdate,
.deleteFp = (SdbDeleteFp)mndUserActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_USER, mndProcessCreateUserReq);
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_USER, mndProcessAlterUserReq);
......
......@@ -9,75 +9,314 @@
*
*/
#include "sut.h"
#include <gtest/gtest.h>
class MndTestShow : public ::testing::Test {
protected:
static void SetUpTestSuite() { test.Init("/tmp/mnode_test_show", 9021); }
static void TearDownTestSuite() { test.Cleanup(); }
#include "sdb.h"
static Testbase test;
class MndTestSdb : public ::testing::Test {
protected:
static void SetUpTestSuite() {}
static void TearDownTestSuite() {}
public:
void SetUp() override {}
void TearDown() override {}
};
Testbase MndTestShow::test;
typedef struct SMnode {
int32_t v100;
int32_t v200;
SSdb *pSdb;
} SMnode;
typedef struct SStrObj {
char key[24];
int8_t v8;
int16_t v16;
int32_t v32;
int64_t v64;
char vstr[32];
char unused[48];
} SStrObj;
TEST_F(MndTestShow, 01_ShowMsg_InvalidMsgMax) {
SShowReq showReq = {0};
showReq.type = TSDB_MGMT_TABLE_MAX;
typedef struct SI32Obj {
int32_t key;
int8_t v8;
int16_t v16;
int32_t v32;
int64_t v64;
char vstr[32];
char unused[48];
} SI32Obj;
int32_t contLen = tSerializeSShowReq(NULL, 0, &showReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSShowReq(pReq, contLen, &showReq);
tFreeSShowReq(&showReq);
typedef struct SI64Obj {
int64_t key;
int8_t v8;
int16_t v16;
int32_t v32;
int64_t v64;
char vstr[32];
char unused[48];
} SI64Obj;
SRpcMsg* pRsp = test.SendReq(TDMT_MND_SYSTABLE_RETRIEVE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_NE(pRsp->code, 0);
SSdbRaw *strEncode(SStrObj *pObj) {
int32_t dataPos = 0;
SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, 1, sizeof(SStrObj));
sdbSetRawBinary(pRaw, dataPos, pObj->key, sizeof(pObj->key));
dataPos += sizeof(pObj->key);
sdbSetRawInt8(pRaw, dataPos, pObj->v8);
dataPos += sizeof(pObj->v8);
sdbSetRawInt16(pRaw, dataPos, pObj->v16);
dataPos += sizeof(pObj->v16);
sdbSetRawInt32(pRaw, dataPos, pObj->v32);
dataPos += sizeof(pObj->v32);
sdbSetRawInt64(pRaw, dataPos, pObj->v64);
dataPos += sizeof(pObj->v64);
sdbSetRawBinary(pRaw, dataPos, pObj->vstr, sizeof(pObj->vstr));
dataPos += sizeof(pObj->vstr);
sdbSetRawDataLen(pRaw, dataPos);
return pRaw;
}
TEST_F(MndTestShow, 02_ShowMsg_InvalidMsgStart) {
SShowReq showReq = {0};
showReq.type = TSDB_MGMT_TABLE_START;
SSdbRow *strDecode(SSdbRaw *pRaw) {
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sver != 1) return NULL;
SSdbRow *pRow = sdbAllocRow(sizeof(SStrObj));
if (pRow == NULL) return NULL;
int32_t contLen = tSerializeSShowReq(NULL, 0, &showReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSShowReq(pReq, contLen, &showReq);
tFreeSShowReq(&showReq);
SStrObj *pObj = (SStrObj *)sdbGetRowObj(pRow);
if (pObj == NULL) return NULL;
SRpcMsg* pRsp = test.SendReq(TDMT_MND_SYSTABLE_RETRIEVE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_NE(pRsp->code, 0);
int32_t dataPos = 0;
sdbGetRawBinary(pRaw, dataPos, pObj->key, sizeof(pObj->key));
dataPos += sizeof(pObj->key);
sdbGetRawInt8(pRaw, dataPos, &pObj->v8);
dataPos += sizeof(pObj->v8);
sdbGetRawInt16(pRaw, dataPos, &pObj->v16);
dataPos += sizeof(pObj->v16);
sdbGetRawInt32(pRaw, dataPos, &pObj->v32);
dataPos += sizeof(pObj->v32);
sdbGetRawInt64(pRaw, dataPos, &pObj->v64);
dataPos += sizeof(pObj->v64);
sdbGetRawBinary(pRaw, dataPos, pObj->vstr, sizeof(pObj->vstr));
dataPos += sizeof(pObj->vstr);
return pRow;
}
TEST_F(MndTestShow, 03_ShowMsg_Conn) {
char passwd[] = "taosdata";
char secretEncrypt[TSDB_PASSWORD_LEN] = {0};
taosEncryptPass_c((uint8_t*)passwd, strlen(passwd), secretEncrypt);
int32_t strInsert(SSdb *pSdb, SStrObj *pObj) { return 0; }
int32_t strDelete(SSdb *pSdb, SStrObj *pObj, bool callFunc) { return 0; }
SConnectReq connectReq = {0};
connectReq.pid = 1234;
strcpy(connectReq.app, "mnode_test_show");
strcpy(connectReq.db, "");
strcpy(connectReq.user, "root");
strcpy(connectReq.passwd, secretEncrypt);
int32_t strUpdate(SSdb *pSdb, SStrObj *pOld, SStrObj *pNew) {
pOld->v8 = pNew->v8;
pOld->v16 = pNew->v16;
pOld->v32 = pNew->v32;
pOld->v64 = pNew->v64;
strcpy(pOld->vstr, pNew->vstr);
return 0;
}
void strSetDefault(SStrObj *pObj, int32_t index) {
memset(pObj, 0, sizeof(SStrObj));
snprintf(pObj->key, sizeof(pObj->key), "k%d", index * 1000);
pObj->v8 = index;
pObj->v16 = index;
pObj->v32 = index * 1000;
pObj->v64 = index * 1000;
snprintf(pObj->vstr, sizeof(pObj->vstr), "v%d", index * 1000);
}
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSConnectReq(pReq, contLen, &connectReq);
int32_t strDefault(SMnode *pMnode) {
SStrObj strObj;
SSdbRaw *pRaw = NULL;
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CONNECT, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, 0);
strSetDefault(&strObj, 1);
pRaw = strEncode(&strObj);
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
if (sdbWrite(pMnode->pSdb, pRaw) != 0) return -1;
test.SendShowReq(TSDB_MGMT_TABLE_CONNS, "connections", "");
// EXPECT_EQ(test.GetShowRows(), 1);
strSetDefault(&strObj, 2);
pRaw = strEncode(&strObj);
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
if (sdbWriteWithoutFree(pMnode->pSdb, pRaw) != 0) return -1;
sdbFreeRaw(pRaw);
return 0;
}
bool sdbTraverseSucc1(SMnode *pMnode, SStrObj *pObj, int32_t *p1, int32_t *p2, int32_t *p3) {
if (pObj->v8 == 1) {
*p1 = *p2 + *p3 + pObj->v8;
}
return true;
}
bool sdbTraverseSucc2(SMnode *pMnode, SStrObj *pObj, int32_t *p1, int32_t *p2, int32_t *p3) {
*p1 = *p2 + *p3 + pObj->v8;
return true;
}
TEST_F(MndTestShow, 04_ShowMsg_Cluster) {
test.SendShowReq(TSDB_MGMT_TABLE_CLUSTER, "cluster", "");
EXPECT_EQ(test.GetShowRows(), 1);
bool sdbTraverseFail(SMnode *pMnode, SStrObj *pObj, int32_t *p1, int32_t *p2, int32_t *p3) {
*p1 = *p2 + *p3;
return false;
}
TEST_F(MndTestSdb, 01_Write) {
void *pIter;
int32_t num;
SStrObj *pObj;
SMnode mnode;
SSdb *pSdb;
SSdbOpt opt = {0};
int32_t p1 = 0;
int32_t p2 = 111;
int32_t p3 = 222;
mnode.v100 = 100;
mnode.v200 = 200;
opt.pMnode = &mnode;
opt.path = "/tmp/mnode_test_sdb";
taosRemoveDir(opt.path);
SSdbTable strTable = {
.sdbType = SDB_USER,
.keyType = SDB_KEY_BINARY,
.deployFp = (SdbDeployFp)strDefault,
.encodeFp = (SdbEncodeFp)strEncode,
.decodeFp = (SdbDecodeFp)strDecode,
.insertFp = (SdbInsertFp)strInsert,
.updateFp = (SdbUpdateFp)strUpdate,
.deleteFp = (SdbDeleteFp)strDelete,
};
pSdb = sdbInit(&opt);
mnode.pSdb = pSdb;
ASSERT_NE(pSdb, nullptr);
ASSERT_EQ(sdbSetTable(pSdb, strTable), 0);
ASSERT_EQ(sdbDeploy(pSdb), 0);
#if 0
pObj = (SStrObj *)sdbAcquire(pSdb, SDB_USER, "k1000");
ASSERT_NE(pObj, nullptr);
EXPECT_STREQ(pObj->key, "k1000");
EXPECT_STREQ(pObj->vstr, "v1000");
EXPECT_EQ(pObj->v8, 1);
EXPECT_EQ(pObj->v16, 1);
EXPECT_EQ(pObj->v32, 1000);
EXPECT_EQ(pObj->v64, 1000);
sdbRelease(pSdb, pObj);
pObj = (SStrObj *)sdbAcquire(pSdb, SDB_USER, "k2000");
ASSERT_NE(pObj, nullptr);
EXPECT_STREQ(pObj->key, "k2000");
EXPECT_STREQ(pObj->vstr, "v2000");
EXPECT_EQ(pObj->v8, 2);
EXPECT_EQ(pObj->v16, 2);
EXPECT_EQ(pObj->v32, 2000);
EXPECT_EQ(pObj->v64, 2000);
sdbRelease(pSdb, pObj);
pObj = (SStrObj *)sdbAcquire(pSdb, SDB_USER, "k200");
ASSERT_EQ(pObj, nullptr);
pIter = NULL;
num = 0;
do {
pIter = sdbFetch(pSdb, SDB_USER, pIter, (void **)&pObj);
if (pIter == NULL) break;
ASSERT_NE(pObj, nullptr);
num++;
sdbRelease(pSdb, pObj);
} while (1);
EXPECT_EQ(num, 2);
do {
pIter = sdbFetch(pSdb, SDB_USER, pIter, (void **)&pObj);
if (pIter == NULL) break;
if (strcmp(pObj->key, "k1000") == 0) {
sdbCancelFetch(pSdb, pIter);
break;
}
} while (1);
EXPECT_STREQ(pObj->key, "k1000");
p1 = 0;
p2 = 111;
p3 = 222;
sdbTraverse(pSdb, SDB_USER, (sdbTraverseFp)sdbTraverseSucc2, &p1, &p2, &p3);
EXPECT_EQ(p1, 334);
p1 = 0;
p2 = 111;
p3 = 222;
sdbTraverse(pSdb, SDB_USER, (sdbTraverseFp)sdbTraverseSucc2, &p1, &p2, &p3);
EXPECT_EQ(p1, 669);
p1 = 0;
p2 = 111;
p3 = 222;
sdbTraverse(pSdb, SDB_USER, (sdbTraverseFp)sdbTraverseFail, &p1, &p2, &p3);
EXPECT_EQ(p1, 333);
EXPECT_EQ(sdbGetSize(pSdb, SDB_USER), 2);
EXPECT_EQ(sdbGetMaxId(pSdb, SDB_USER), -1);
EXPECT_EQ(sdbGetTableVer(pSdb, SDB_USER), 2);
EXPECT_EQ(sdbUpdateVer(pSdb, 0), 2);
EXPECT_EQ(sdbUpdateVer(pSdb, 1), 3);
EXPECT_EQ(sdbUpdateVer(pSdb, -1), 2);
// insert, call func
// update, call func
// delete, call func 2
// write version
// sdb Write ver
// sdbRead
#endif
ASSERT_EQ(sdbWriteFile(pSdb), 0);
sdbCleanup(pSdb);
}
TEST_F(MndTestSdb, 01_Read) {
void *pIter;
int32_t num;
SStrObj *pObj;
SMnode mnode;
SSdb *pSdb;
SSdbOpt opt = {0};
int32_t p1 = 0;
int32_t p2 = 111;
int32_t p3 = 222;
mnode.v100 = 100;
mnode.v200 = 200;
opt.pMnode = &mnode;
opt.path = "/tmp/mnode_test_sdb";
taosRemoveDir(opt.path);
SSdbTable strTable = {
.sdbType = SDB_USER,
.keyType = SDB_KEY_BINARY,
.deployFp = (SdbDeployFp)strDefault,
.encodeFp = (SdbEncodeFp)strEncode,
.decodeFp = (SdbDecodeFp)strDecode,
.insertFp = (SdbInsertFp)strInsert,
.updateFp = (SdbUpdateFp)strDelete,
.deleteFp = (SdbDeleteFp)strUpdate,
};
pSdb = sdbInit(&opt);
mnode.pSdb = pSdb;
ASSERT_EQ(sdbReadFile(pSdb), 0);
sdbCleanup(pSdb);
}
\ No newline at end of file
......@@ -52,6 +52,8 @@ typedef struct SSdbRow {
const char *sdbTableName(ESdbType type);
void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper);
void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow, bool callFunc);
#ifdef __cplusplus
}
#endif
......
......@@ -141,7 +141,7 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) {
}
static int32_t sdbCreateDir(SSdb *pSdb) {
if (taosMkDir(pSdb->currDir) != 0) {
if (taosMulMkDir(pSdb->currDir) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to create dir:%s since %s", pSdb->currDir, terrstr());
return -1;
......
......@@ -61,7 +61,6 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
len = pMsg->contLen - sizeof(SMsgHead);
// todo: change the interface here
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
vError("vgId: %d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
......
......@@ -1897,7 +1897,14 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
pCtx->functionId = pExpr->pExpr->_function.pFunctNode->funcId;
if (fmIsAggFunc(pCtx->functionId) || fmIsNonstandardSQLFunc(pCtx->functionId)) {
fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet);
bool isUdaf = fmIsUserDefinedFunc(pCtx->functionId);
if (!isUdaf) {
fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet);
} else {
char *udfName = pExpr->pExpr->_function.pFunctNode->functionName;
strncpy(pCtx->udfName, udfName, strlen(udfName));
fmGetUdafExecFuncs(pCtx->functionId, &pCtx->fpSet);
}
pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
} else {
fmGetScalarFuncExecFuncs(pCtx->functionId, &pCtx->sfp);
......
......@@ -124,7 +124,10 @@ int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) {
return TSDB_CODE_SUCCESS;
}
int32_t fmGetUdafExecFuncs(SFuncExecFuncs* pFpSet) {
int32_t fmGetUdafExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) {
if (!fmIsUserDefinedFunc(funcId)) {
return TSDB_CODE_FAILED;
}
pFpSet->getEnv = udfAggGetEnv;
pFpSet->init = udfAggInit;
pFpSet->process = udfAggProcess;
......
......@@ -232,7 +232,7 @@ void udfdProcessRequest(uv_work_t *req) {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
.bufLen= udf->bufSize,
.numOfResult = 0};
udf->aggProcFunc(&input, &outBuf);
udf->aggProcFunc(&input, &call->interBuf, &outBuf);
subRsp->resultBuf = outBuf;
break;
......
......@@ -24,7 +24,7 @@ int32_t udf2_start(SUdfInterBuf *buf) {
return 0;
}
int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf) {
int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
int64_t sumSquares = *(int64_t*)interBuf->buf;
for (int32_t i = 0; i < block->numOfCols; ++i) {
for (int32_t j = 0; j < block->numOfRows; ++i) {
......@@ -35,10 +35,10 @@ int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf) {
}
}
*(int64_t*)interBuf = sumSquares;
interBuf->bufLen = sizeof(int64_t);
*(int64_t*)newInterBuf = sumSquares;
newInterBuf->bufLen = sizeof(int64_t);
//TODO: if all null value, numOfResult = 0;
interBuf->numOfResult = 1;
newInterBuf->numOfResult = 1;
return 0;
}
......
......@@ -2659,7 +2659,7 @@ static int32_t translateCreateStream(STranslateContext* pCxt, SCreateStreamStmt*
if ('\0' != pStmt->targetTabName[0]) {
strcpy(name.dbname, pStmt->targetDbName);
strcpy(name.tname, pStmt->targetTabName);
tNameExtractFullName(&name, createReq.outputSTbName);
tNameExtractFullName(&name, createReq.targetStbFullName);
}
int32_t code = translateQuery(pCxt, pStmt->pQuery);
......
......@@ -72,8 +72,8 @@ void taosRemoveDir(const char *dirname) {
while ((de = taosReadDir(pDir)) != NULL) {
if (strcmp(taosGetDirEntryName(de), ".") == 0 || strcmp(taosGetDirEntryName(de), "..") == 0) continue;
char filename[1024];
snprintf(filename, sizeof(filename), "%s/%s", dirname, taosGetDirEntryName(de));
char filename[1024] = {0};
snprintf(filename, sizeof(filename), "%s%s%s", dirname, TD_DIRSEP, taosGetDirEntryName(de));
if (taosDirEntryIsDir(de)) {
taosRemoveDir(filename);
} else {
......
......@@ -81,7 +81,7 @@
./test.sh -f tsim/insert/backquote.sim -m
./test.sh -f tsim/parser/fourArithmetic-basic.sim -m
./test.sh -f tsim/query/interval-offset.sim -m
./test.sh -f tsim/tmq/basic1.sim -m
./test.sh -f tsim/tmq/basic3.sim -m
./test.sh -f tsim/stable/vnode3.sim -m
./test.sh -f tsim/qnode/basic1.sim -m
./test.sh -f tsim/mnode/basic1.sim -m
......
......@@ -344,8 +344,8 @@ sql drop database db
sql_error create database db PRECISION 'as'
sql_error create database db PRECISION -1
print ====> QUORUM value [1 | 2, default: 1]
#sql create database db QUORUM 2
print ====> QUORUM value [1 | 2, default: 1] 3.0 not support this item
#sql_error create database db QUORUM 2
#sql show databases
#print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
#if $data5_db != 2 then
......@@ -360,9 +360,11 @@ print ====> QUORUM value [1 | 2, default: 1]
# return -1
#endi
#sql drop database db
#sql_error create database db QUORUM 3
#sql_error create database db QUORUM 0
#sql_error create database db QUORUM -1
sql_error create database db QUORUM 1
sql_error create database db QUORUM 2
sql_error create database db QUORUM 3
sql_error create database db QUORUM 0
sql_error create database db QUORUM -1
print ====> REPLICA value [1 | 3, default: 1]
sql create database db REPLICA 3
......
......@@ -57,12 +57,12 @@ def taos_command (buildPath, key, value, expectString, cfgDir, sqlString='', key
else:
return "TAOS_FAIL"
else:
if key == 'A' or key1 == 'A' or key == 'C' or key1 == 'C':
if key == 'A' or key1 == 'A' or key == 'C' or key1 == 'C' or key == 'V' or key1 == 'V':
return "TAOS_OK", retResult
else:
return "TAOS_OK"
else:
if key == 'A' or key1 == 'A' or key == 'C' or key1 == 'C':
if key == 'A' or key1 == 'A' or key == 'C' or key1 == 'C' or key == 'V' or key1 == 'V':
return "TAOS_OK", retResult
else:
return "TAOS_FAIL"
......@@ -311,7 +311,7 @@ class TDTestCase:
tdSql.query('drop database %s'%newDbName)
tdLog.printNoPrefix("================================ parameter: -C")
newDbName="dbcc"
#newDbName="dbcc"
retCode, retVal = taos_command(buildPath, "C", keyDict['C'], "buildinfo", keyDict['c'], '', '', '')
if retCode != "TAOS_OK":
tdLog.exit("taos -C fail")
......@@ -336,6 +336,86 @@ class TDTestCase:
if (totalCfgItem["numOfCores"][2] != count) and (totalCfgItem["numOfCores"][0] != 'default'):
tdLog.exit("taos -C return numOfCores error!")
version = totalCfgItem["version"][2]
tdLog.printNoPrefix("================================ parameter: -V")
#newDbName="dbvv"
retCode, retVal = taos_command(buildPath, "V", keyDict['V'], "", keyDict['c'], '', '', '')
if retCode != "TAOS_OK":
tdLog.exit("taos -V fail")
version = 'version: ' + version
retVal = retVal.replace("\n", "")
retVal = retVal.replace("\r", "")
if retVal != version:
print ("return version: [%s]"%retVal)
print ("dict version: [%s]"%version)
tdLog.exit("taos -V version not match")
tdLog.printNoPrefix("================================ parameter: -d")
newDbName="dbd"
sqlString = 'create database ' + newDbName + ';'
retCode = taos_command(buildPath, "d", keyDict['d'], "taos>", keyDict['c'], sqlString, '', '')
if retCode != "TAOS_OK":
tdLog.exit("taos -d %s fail"%(keyDict['d']))
else:
tdSql.query("show databases")
for i in range(tdSql.queryRows):
if tdSql.getData(i, 0) == newDbName:
break
else:
tdLog.exit("create db fail after taos -d %s fail"%(keyDict['d']))
tdSql.query('drop database %s'%newDbName)
retCode = taos_command(buildPath, "d", 'dbno', "taos>", keyDict['c'], sqlString, '', '')
if retCode != "TAOS_FAIL":
tdLog.exit("taos -d dbno fail")
tdLog.printNoPrefix("================================ parameter: -w")
newDbName="dbw"
keyDict['s'] = "\"create database " + newDbName + "\""
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '')
if retCode != "TAOS_OK":
tdLog.exit("taos -w fail")
keyDict['s'] = "\"create table " + newDbName + ".ntb (ts timestamp, c binary(128))\""
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '')
if retCode != "TAOS_OK":
tdLog.exit("taos -w create table fail")
keyDict['s'] = "\"insert into " + newDbName + ".ntb values('2021-04-01 08:00:00.001', 'abcd0123456789')('2021-04-01 08:00:00.002', 'abcd012345678901234567890123456789') \""
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '')
if retCode != "TAOS_OK":
tdLog.exit("taos -w insert data fail")
keyDict['s'] = "\"insert into " + newDbName + ".ntb values('2021-04-01 08:00:00.003', 'aaaaaaaaaaaaaaaaaaaa')('2021-04-01 08:00:01.004', 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb') \""
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '')
if retCode != "TAOS_OK":
tdLog.exit("taos -w insert data fail")
keyDict['s'] = "\"insert into " + newDbName + ".ntb values('2021-04-01 08:00:00.005', 'cccccccccccccccccccc')('2021-04-01 08:00:01.006', 'dddddddddddddddddddddddddddddddddddddddd') \""
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", keyDict['c'], '', '', '')
if retCode != "TAOS_OK":
tdLog.exit("taos -w insert data fail")
keyDict['s'] = "\"select * from " + newDbName + ".ntb \""
retCode = taos_command(buildPath, "s", keyDict['s'], "aaaaaaaaaaaaaaaaaaaa", keyDict['c'], '', '', '')
if retCode != "TAOS_OK":
tdLog.exit("taos -w insert data fail")
keyDict['s'] = "\"select * from " + newDbName + ".ntb \""
retCode = taos_command(buildPath, "s", keyDict['s'], "dddddddddddddddddddddddddddddddddddddddd", keyDict['c'], '', '', '')
if retCode != "TAOS_FAIL":
tdLog.exit("taos -w insert data fail")
keyDict['s'] = "\"select * from " + newDbName + ".ntb \""
retCode = taos_command(buildPath, "s", keyDict['s'], "dddddddddddddddddddddddddddddddddddddddd", keyDict['c'], '', 'w', '60')
if retCode != "TAOS_OK":
tdLog.exit("taos -w insert data fail")
tdSql.query('drop database %s'%newDbName)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册