提交 35fb7bbd 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/tsdb_snapshot

......@@ -128,10 +128,12 @@ typedef struct SVnodeModifyLogicNode {
SVgDataBlocks* pVgDataBlocks;
SNode* pAffectedRows; // SColumnNode
uint64_t tableId;
uint64_t stableId;
int8_t tableType; // table type
char tableFName[TSDB_TABLE_FNAME_LEN];
STimeWindow deleteTimeRange;
SVgroupsInfo* pVgroupList;
SNodeList* pInsertCols;
} SVnodeModifyLogicNode;
typedef struct SExchangeLogicNode {
......@@ -459,7 +461,9 @@ typedef struct SDataInserterNode {
typedef struct SQueryInserterNode {
SDataSinkNode sink;
SNodeList* pCols;
uint64_t tableId;
uint64_t stableId;
int8_t tableType; // table type
char tableFName[TSDB_TABLE_FNAME_LEN];
int32_t vgId;
......
......@@ -215,6 +215,7 @@ int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, in
bool syncEnvIsStart();
const char* syncStr(ESyncState state);
bool syncIsRestoreFinish(int64_t rid);
int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg);
......
......@@ -27,7 +27,7 @@ extern "C" {
#define TAOS_CONN_SERVER 0
#define TAOS_CONN_CLIENT 1
#define IsReq(pMsg) (pMsg->msgType & 1U)
#define IsReq(pMsg) (pMsg->msgType & 1U)
extern int32_t tsRpcHeadSize;
......@@ -35,12 +35,13 @@ typedef struct {
uint32_t clientIp;
uint16_t clientPort;
int64_t applyIndex;
uint64_t applyTerm;
char user[TSDB_USER_LEN];
} SRpcConnInfo;
typedef struct SRpcHandleInfo {
// rpc info
void * handle; // rpc handle returned to app
void *handle; // rpc handle returned to app
int64_t refId; // refid, used by server
int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp);
int32_t persistHandle; // persist handle or not
......@@ -53,7 +54,7 @@ typedef struct SRpcHandleInfo {
void *node; // node mgmt handle
// resp info
void * rsp;
void *rsp;
int32_t rspLen;
// conn info
......@@ -62,7 +63,7 @@ typedef struct SRpcHandleInfo {
typedef struct SRpcMsg {
tmsg_t msgType;
void * pCont;
void *pCont;
int32_t contLen;
int32_t code;
SRpcHandleInfo info;
......@@ -74,7 +75,7 @@ typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType);
typedef struct SRpcInit {
char localFqdn[TSDB_FQDN_LEN];
uint16_t localPort; // local port
char * label; // for debug purpose
char *label; // for debug purpose
int32_t numOfThreads; // number of threads to handle connections
int32_t sessions; // number of sessions allowed
int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
......@@ -99,12 +100,12 @@ typedef struct {
typedef struct {
int32_t msgType;
void * val;
void *val;
int32_t (*clone)(void *src, void **dst);
} SRpcBrokenlinkVal;
typedef struct {
SHashObj * args;
SHashObj *args;
SRpcBrokenlinkVal brokenVal;
void (*freeFunc)(const void *arg);
} SRpcCtx;
......
......@@ -234,17 +234,14 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
}
#if 1
char *syncNodeStr = sync2SimpleStr(pVnode->sync);
static int64_t vndTick = 0;
if (++vndTick % 10 == 1) {
vGTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pVnode->sync), TMSG_INFO(pMsg->msgType), syncNodeStr);
}
if (gRaftDetailLog) {
char logBuf[512] = {0};
snprintf(logBuf, sizeof(logBuf), "vnode process syncmsg, msgType:%d, syncNode:%s", pMsg->msgType, syncNodeStr);
syncRpcMsgLog2(logBuf, pMsg);
}
taosMemoryFree(syncNodeStr);
do {
char *syncNodeStr = sync2SimpleStr(pVnode->sync);
static int64_t vndTick = 0;
if (++vndTick % 10 == 1) {
vGTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pVnode->sync), TMSG_INFO(pMsg->msgType), syncNodeStr);
}
taosMemoryFree(syncNodeStr);
} while (0);
#endif
if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_NO_SNAPSHOT) {
......@@ -297,7 +294,8 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg->msgType);
code = -1;
}
} else {
} else if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_WAL_FIRST) {
// use wal first strategy
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
......@@ -403,43 +401,53 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon
}
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SVnode *pVnode = pFsm->data;
SSnapshot snapshot = {0};
SyncIndex beginIndex = SYNC_INDEX_INVALID;
char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf),
"commitCb execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld\n", pFsm,
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex);
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
SVnode *pVnode = pFsm->data;
vTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
rpcMsg.info.conn.applyIndex = cbMeta.index;
rpcMsg.info.conn.applyTerm = cbMeta.term;
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
}
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf), "preCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm,
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
SVnode *pVnode = pFsm->data;
vTrace("vgId:%d, pre-commit-cb is excuted, fsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
}
static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf), "rollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm,
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
SVnode *pVnode = pFsm->data;
vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
}
static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) { return 0; }
static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) {
SVnode *pVnode = pFsm->data;
SSnapshotParam *pSnapshotParam = pParam;
int32_t code =
vnodeSnapshotReaderOpen(pVnode, (SVSnapshotReader **)ppReader, pSnapshotParam->start, pSnapshotParam->end);
return code;
}
static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { return 0; }
static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) {
SVnode *pVnode = pFsm->data;
int32_t code = vnodeSnapshotReaderClose(pReader);
return code;
}
static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { return 0; }
static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
SVnode *pVnode = pFsm->data;
int32_t code = vnodeSnapshotRead(pReader, (const void **)ppBuf, len);
return code;
}
static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWriter) { return 0; }
......
......@@ -394,10 +394,12 @@ static int32_t logicVnodeModifCopy(const SVnodeModifyLogicNode* pSrc, SVnodeModi
COPY_SCALAR_FIELD(msgType);
CLONE_NODE_FIELD(pAffectedRows);
COPY_SCALAR_FIELD(tableId);
COPY_SCALAR_FIELD(stableId);
COPY_SCALAR_FIELD(tableType);
COPY_CHAR_ARRAY_FIELD(tableFName);
COPY_OBJECT_FIELD(deleteTimeRange, sizeof(STimeWindow));
CLONE_OBJECT_FIELD(pVgroupList, vgroupsInfoClone);
CLONE_NODE_LIST_FIELD(pInsertCols);
return TSDB_CODE_SUCCESS;
}
......
......@@ -2214,6 +2214,8 @@ static int32_t physiDispatchNodeToJson(const void* pObj, SJson* pJson) { return
static int32_t jsonToPhysiDispatchNode(const SJson* pJson, void* pObj) { return jsonToPhysicDataSinkNode(pJson, pObj); }
static const char* jkQueryInsertPhysiPlanInsertCols = "InsertCols";
static const char* jkQueryInsertPhysiPlanStableId = "StableId";
static const char* jkQueryInsertPhysiPlanTableId = "TableId";
static const char* jkQueryInsertPhysiPlanTableType = "TableType";
static const char* jkQueryInsertPhysiPlanTableFName = "TableFName";
......@@ -2224,6 +2226,12 @@ static int32_t physiQueryInsertNodeToJson(const void* pObj, SJson* pJson) {
const SQueryInserterNode* pNode = (const SQueryInserterNode*)pObj;
int32_t code = physicDataSinkNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkQueryInsertPhysiPlanInsertCols, pNode->pCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkQueryInsertPhysiPlanStableId, pNode->stableId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkQueryInsertPhysiPlanTableId, pNode->tableId);
}
......@@ -2247,6 +2255,12 @@ static int32_t jsonToPhysiQueryInsertNode(const SJson* pJson, void* pObj) {
SQueryInserterNode* pNode = (SQueryInserterNode*)pObj;
int32_t code = jsonToPhysicDataSinkNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkQueryInsertPhysiPlanInsertCols, &pNode->pCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetUBigIntValue(pJson, jkQueryInsertPhysiPlanStableId, &pNode->stableId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetUBigIntValue(pJson, jkQueryInsertPhysiPlanTableId, &pNode->tableId);
}
......
......@@ -1681,5 +1681,10 @@ SNode* createInsertStmt(SAstCreateContext* pCxt, SNode* pTable, SNodeList* pCols
pStmt->pTable = pTable;
pStmt->pCols = pCols;
pStmt->pQuery = pQuery;
if (QUERY_NODE_SELECT_STMT == nodeType(pQuery)) {
strcpy(((SSelectStmt*)pQuery)->stmtName, ((STableNode*)pTable)->tableAlias);
} else if (QUERY_NODE_SET_OPERATOR == nodeType(pQuery)) {
strcpy(((SSetOperator*)pQuery)->stmtName, ((STableNode*)pTable)->tableAlias);
}
return (SNode*)pStmt;
}
......@@ -2839,17 +2839,87 @@ static int32_t translateDelete(STranslateContext* pCxt, SDeleteStmt* pDelete) {
return code;
}
static int32_t translateInsertCols(STranslateContext* pCxt, SInsertStmt* pInsert) {
if (NULL == pInsert->pCols) {
return createAllColumns(pCxt, false, &pInsert->pCols);
}
return translateExprList(pCxt, pInsert->pCols);
}
static int32_t translateInsertQuery(STranslateContext* pCxt, SInsertStmt* pInsert) {
int32_t code = resetTranslateNamespace(pCxt);
if (TSDB_CODE_SUCCESS == code) {
code = translateQuery(pCxt, pInsert->pQuery);
}
return code;
}
static int32_t addOrderByPrimaryKeyToQueryImpl(STranslateContext* pCxt, SNode* pPrimaryKeyExpr,
SNodeList** pOrderByList) {
SOrderByExprNode* pOrderByExpr = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
if (NULL == pOrderByExpr) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pOrderByExpr->nullOrder = NULL_ORDER_FIRST;
pOrderByExpr->order = ORDER_ASC;
pOrderByExpr->pExpr = nodesCloneNode(pPrimaryKeyExpr);
if (NULL == pOrderByExpr->pExpr) {
nodesDestroyNode((SNode*)pOrderByExpr);
return TSDB_CODE_OUT_OF_MEMORY;
}
((SExprNode*)pOrderByExpr->pExpr)->orderAlias = true;
NODES_DESTORY_LIST(*pOrderByList);
return nodesListMakeStrictAppend(pOrderByList, (SNode*)pOrderByExpr);
}
static int32_t addOrderByPrimaryKeyToQuery(STranslateContext* pCxt, SNode* pPrimaryKeyExpr, SNode* pStmt) {
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) {
return addOrderByPrimaryKeyToQueryImpl(pCxt, pPrimaryKeyExpr, &((SSelectStmt*)pStmt)->pOrderByList);
}
return addOrderByPrimaryKeyToQueryImpl(pCxt, pPrimaryKeyExpr, &((SSetOperator*)pStmt)->pOrderByList);
}
static int32_t translateInsertProject(STranslateContext* pCxt, SInsertStmt* pInsert) {
SNodeList* pProjects = getProjectList(pInsert->pQuery);
if (LIST_LENGTH(pInsert->pCols) != LIST_LENGTH(pProjects)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM, "Illegal number of columns");
}
SNode* pPrimaryKeyExpr = NULL;
SNode* pBoundCol = NULL;
SNode* pProj = NULL;
FORBOTH(pBoundCol, pInsert->pCols, pProj, pProjects) {
SColumnNode* pCol = (SColumnNode*)pBoundCol;
SExprNode* pExpr = (SExprNode*)pProj;
if (!dataTypeEqual(&pCol->node.resType, &pExpr->resType)) {
SNode* pFunc = NULL;
int32_t code = createCastFunc(pCxt, pProj, pCol->node.resType, &pFunc);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
REPLACE_LIST2_NODE(pFunc);
pExpr = (SExprNode*)pFunc;
}
snprintf(pExpr->aliasName, sizeof(pExpr->aliasName), "%s", pCol->colName);
if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) {
pPrimaryKeyExpr = pProj;
}
}
return addOrderByPrimaryKeyToQuery(pCxt, pPrimaryKeyExpr, pInsert->pQuery);
}
static int32_t translateInsert(STranslateContext* pCxt, SInsertStmt* pInsert) {
pCxt->pCurrStmt = (SNode*)pInsert;
int32_t code = translateFrom(pCxt, pInsert->pTable);
if (TSDB_CODE_SUCCESS == code) {
code = translateExprList(pCxt, pInsert->pCols);
code = translateInsertCols(pCxt, pInsert);
}
if (TSDB_CODE_SUCCESS == code) {
code = resetTranslateNamespace(pCxt);
code = translateInsertQuery(pCxt, pInsert);
}
if (TSDB_CODE_SUCCESS == code) {
code = translateQuery(pCxt, pInsert->pQuery);
code = translateInsertProject(pCxt, pInsert);
}
return code;
}
......
......@@ -1281,10 +1281,16 @@ static int32_t createVnodeModifLogicNodeByInsert(SLogicPlanContext* pCxt, SInser
pModify->modifyType = MODIFY_TABLE_TYPE_INSERT;
pModify->tableId = pRealTable->pMeta->uid;
pModify->stableId = pRealTable->pMeta->suid;
pModify->tableType = pRealTable->pMeta->tableType;
snprintf(pModify->tableFName, sizeof(pModify->tableFName), "%d.%s.%s", pCxt->pPlanCxt->acctId,
pRealTable->table.dbName, pRealTable->table.tableName);
TSWAP(pModify->pVgroupList, pRealTable->pVgroupList);
pModify->pInsertCols = nodesCloneList(pInsert->pCols);
if (NULL == pModify->pInsertCols) {
nodesDestroyNode((SNode*)pModify);
return TSDB_CODE_OUT_OF_MEMORY;
}
*pLogicNode = (SLogicNode*)pModify;
return TSDB_CODE_SUCCESS;
......
......@@ -1510,18 +1510,21 @@ static int32_t createQueryInserter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNod
}
pInserter->tableId = pModify->tableId;
pInserter->stableId = pModify->stableId;
pInserter->tableType = pModify->tableType;
strcpy(pInserter->tableFName, pModify->tableFName);
pInserter->vgId = pModify->pVgroupList->vgroups[0].vgId;
pInserter->epSet = pModify->pVgroupList->vgroups[0].epSet;
vgroupInfoToNodeAddr(pModify->pVgroupList->vgroups, &pSubplan->execNode);
int32_t code = TSDB_CODE_SUCCESS;
pInserter->sink.pInputDataBlockDesc =
(SDataBlockDescNode*)nodesCloneNode((SNode*)pSubplan->pNode->pOutputDataBlockDesc);
if (NULL == pInserter->sink.pInputDataBlockDesc) {
code = TSDB_CODE_OUT_OF_MEMORY;
int32_t code = setListSlotId(pCxt, pSubplan->pNode->pOutputDataBlockDesc->dataBlockId, -1, pModify->pInsertCols,
&pInserter->pCols);
if (TSDB_CODE_SUCCESS == code) {
pInserter->sink.pInputDataBlockDesc =
(SDataBlockDescNode*)nodesCloneNode((SNode*)pSubplan->pNode->pOutputDataBlockDesc);
if (NULL == pInserter->sink.pInputDataBlockDesc) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
......@@ -1530,7 +1533,7 @@ static int32_t createQueryInserter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNod
nodesDestroyNode((SNode*)pInserter);
}
return TSDB_CODE_SUCCESS;
return code;
}
static int32_t buildInsertSelectSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
......
......@@ -96,4 +96,8 @@ TEST_F(PlanOtherTest, insert) {
useDb("root", "test");
run("INSERT INTO t1 SELECT * FROM t1");
run("INSERT INTO t1 (ts, c1, c2) SELECT ts, c1, c2 FROM st1");
run("INSERT INTO t1 (ts, c1, c2) SELECT ts, c1, c2 FROM st1s1 UNION ALL SELECT ts, c1, c2 FROM st2");
}
......@@ -36,6 +36,7 @@ typedef struct SRaftCfg {
TdFilePtr pFile;
char path[TSDB_FILENAME_LEN * 2];
int8_t isStandBy;
int32_t batchSize;
int8_t snapshotStrategy;
SyncIndex lastConfigIndex;
......@@ -49,19 +50,20 @@ int32_t raftCfgClose(SRaftCfg *pRaftCfg);
int32_t raftCfgPersist(SRaftCfg *pRaftCfg);
int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex);
cJSON *syncCfg2Json(SSyncCfg *pSyncCfg);
char *syncCfg2Str(SSyncCfg *pSyncCfg);
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg);
cJSON * syncCfg2Json(SSyncCfg *pSyncCfg);
char * syncCfg2Str(SSyncCfg *pSyncCfg);
char * syncCfg2SimpleStr(SSyncCfg *pSyncCfg);
int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg);
int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg);
cJSON *raftCfg2Json(SRaftCfg *pRaftCfg);
char *raftCfg2Str(SRaftCfg *pRaftCfg);
cJSON * raftCfg2Json(SRaftCfg *pRaftCfg);
char * raftCfg2Str(SRaftCfg *pRaftCfg);
int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg);
int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg);
typedef struct SRaftCfgMeta {
int8_t isStandBy;
int32_t batchSize;
int8_t snapshotStrategy;
SyncIndex lastConfigIndex;
} SRaftCfgMeta;
......
......@@ -40,14 +40,14 @@ typedef struct SSyncSnapshotSender {
bool start;
int32_t seq;
int32_t ack;
void *pReader;
void *pCurrentBlock;
void * pReader;
void * pCurrentBlock;
int32_t blockLen;
SSnapshotParam snapshotParam;
SSnapshot snapshot;
SSyncCfg lastConfig;
int64_t sendingMS;
SSyncNode *pSyncNode;
SSyncNode * pSyncNode;
int32_t replicaIndex;
SyncTerm term;
SyncTerm privateTerm;
......@@ -64,20 +64,20 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender);
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender);
char *snapshotSender2Str(SSyncSnapshotSender *pSender);
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event);
char * snapshotSender2Str(SSyncSnapshotSender *pSender);
char * snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event);
//---------------------------------------------------
typedef struct SSyncSnapshotReceiver {
bool start;
int32_t ack;
void *pWriter;
void * pWriter;
SyncTerm term;
SyncTerm privateTerm;
SSnapshotParam snapshotParam;
SSnapshot snapshot;
SRaftId fromId;
SSyncNode *pSyncNode;
SSyncNode * pSyncNode;
} SSyncSnapshotReceiver;
......@@ -88,8 +88,8 @@ int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event);
char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
char * snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event);
//---------------------------------------------------
// on message
......
......@@ -834,7 +834,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
//
// operation:
// if hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex, append entry
// match my-commit-index or my-commit-index + 1
// match my-commit-index or my-commit-index + batchSize
do {
bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
(pMsg->prevLogIndex <= ths->commitIndex);
......@@ -928,11 +928,13 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
bool condition = condition1 || condition2;
if (condition) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-batch, not match, pre-index:%ld, pre-term:%lu, datalen:%d", pMsg->prevLogIndex,
pMsg->prevLogTerm, pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-batch, not match, pre-index:%ld, pre-term:%lu, datalen:%d",
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
} while (0);
// prepare response msg
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
......
......@@ -109,19 +109,30 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
}
// only start once
static void syncNodeStartSnapshot(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, SyncTerm lastApplyTerm,
SyncAppendEntriesReply* pMsg) {
static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, SyncTerm lastApplyTerm,
SyncAppendEntriesReply* pMsg) {
// get sender
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId));
ASSERT(pSender != NULL);
if (snapshotSenderIsStart(pSender)) {
do {
char* eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender already start");
syncNodeErrorLog(ths, eventLog);
taosMemoryFree(eventLog);
} while (0);
return;
}
SSnapshot snapshot = {
.data = NULL, .lastApplyIndex = endIndex, .lastApplyTerm = lastApplyTerm, .lastConfigIndex = SYNC_INDEX_INVALID};
void* pReader = NULL;
SSnapshotParam readerParam = {.start = beginIndex, .end = endIndex};
ths->pFsm->FpSnapshotStartRead(ths->pFsm, &readerParam, &pReader);
if (!snapshotSenderIsStart(pSender) && pMsg->privateTerm < pSender->privateTerm) {
int32_t code = ths->pFsm->FpSnapshotStartRead(ths->pFsm, &readerParam, &pReader);
ASSERT(code == 0);
if (pMsg->privateTerm < pSender->privateTerm) {
ASSERT(pReader != NULL);
snapshotSenderStart(pSender, readerParam, snapshot, pReader);
......@@ -178,7 +189,9 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
// start snapshot <match+1, old snapshot.end>
SSnapshot oldSnapshot;
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &oldSnapshot);
syncNodeStartSnapshot(ths, newMatchIndex + 1, oldSnapshot.lastApplyIndex, oldSnapshot.lastApplyTerm, pMsg);
ASSERT(oldSnapshot.lastApplyIndex >= newMatchIndex + 1);
syncNodeStartSnapshotOnce(ths, newMatchIndex + 1, oldSnapshot.lastApplyIndex, oldSnapshot.lastApplyTerm,
pMsg); // term maybe not ok?
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), oldSnapshot.lastApplyIndex + 1);
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), newMatchIndex);
......@@ -187,7 +200,6 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
} else {
SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
// notice! int64, uint64
if (nextIndex > SYNC_INDEX_BEGIN) {
--nextIndex;
......@@ -198,7 +210,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
SSyncRaftEntry* pEntry;
int32_t code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, nextIndex, &pEntry);
ASSERT(code == 0);
syncNodeStartSnapshot(ths, SYNC_INDEX_BEGIN, nextIndex, pEntry->term, pMsg);
syncNodeStartSnapshotOnce(ths, SYNC_INDEX_BEGIN, nextIndex, pEntry->term, pMsg);
// get sender
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId));
......
......@@ -397,6 +397,38 @@ bool syncIsRestoreFinish(int64_t rid) {
return b;
}
int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot) {
if (index < SYNC_INDEX_BEGIN) {
return -1;
}
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return -1;
}
ASSERT(rid == pSyncNode->rid);
SSyncRaftEntry* pEntry = NULL;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry);
if (code != 0) {
if (pEntry != NULL) {
syncEntryDestory(pEntry);
}
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return -1;
}
ASSERT(pEntry != NULL);
pSnapshot->data = NULL;
pSnapshot->lastApplyIndex = index;
pSnapshot->lastApplyTerm = pEntry->term;
pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index);
syncEntryDestory(pEntry);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return 0;
}
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
......@@ -786,6 +818,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
int32_t code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, &retIndex);
if (code == 0) {
pMsg->info.conn.applyIndex = retIndex;
pMsg->info.conn.applyTerm = pSyncNode->pRaftStore->currentTerm;
rpcFreeCont(rpcMsg.pCont);
syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
ret = 1;
......@@ -846,6 +879,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
meta.isStandBy = pSyncInfo->isStandBy;
meta.snapshotStrategy = pSyncInfo->snapshotStrategy;
meta.lastConfigIndex = SYNC_INDEX_INVALID;
meta.batchSize = pSyncInfo->batchSize;
ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath);
ASSERT(ret == 0);
......
......@@ -101,7 +101,7 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
char *syncCfg2Str(SSyncCfg *pSyncCfg) {
cJSON *pJson = syncCfg2Json(pSyncCfg);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......@@ -109,7 +109,7 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) {
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) {
if (pSyncCfg != NULL) {
int32_t len = 512;
char *s = taosMemoryMalloc(len);
char * s = taosMemoryMalloc(len);
memset(s, 0, len);
snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex);
......@@ -183,6 +183,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
cJSON_AddItemToObject(pRoot, "SSyncCfg", syncCfg2Json(&(pRaftCfg->cfg)));
cJSON_AddNumberToObject(pRoot, "isStandBy", pRaftCfg->isStandBy);
cJSON_AddNumberToObject(pRoot, "snapshotStrategy", pRaftCfg->snapshotStrategy);
cJSON_AddNumberToObject(pRoot, "batchSize", pRaftCfg->batchSize);
char buf64[128];
snprintf(buf64, sizeof(buf64), "%ld", pRaftCfg->lastConfigIndex);
......@@ -205,7 +206,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
char *raftCfg2Str(SRaftCfg *pRaftCfg) {
cJSON *pJson = raftCfg2Json(pRaftCfg);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......@@ -228,6 +229,7 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) {
SRaftCfg raftCfg;
raftCfg.cfg = *pCfg;
raftCfg.isStandBy = meta.isStandBy;
raftCfg.batchSize = meta.batchSize;
raftCfg.snapshotStrategy = meta.snapshotStrategy;
raftCfg.lastConfigIndex = meta.lastConfigIndex;
raftCfg.configIndexCount = 1;
......@@ -257,6 +259,9 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
cJSON *pJsonIsStandBy = cJSON_GetObjectItem(pJson, "isStandBy");
pRaftCfg->isStandBy = cJSON_GetNumberValue(pJsonIsStandBy);
cJSON *pJsonBatchSize = cJSON_GetObjectItem(pJson, "batchSize");
pRaftCfg->batchSize = cJSON_GetNumberValue(pJsonBatchSize);
cJSON *pJsonSnapshotStrategy = cJSON_GetObjectItem(pJson, "snapshotStrategy");
pRaftCfg->snapshotStrategy = cJSON_GetNumberValue(pJsonSnapshotStrategy);
......@@ -280,7 +285,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
(pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring);
}
cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg));
ASSERT(code == 0);
......
......@@ -127,7 +127,7 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) {
while (pStub) {
size_t len;
void *key = taosHashGetKey(pStub, &len);
void * key = taosHashGetKey(pStub, &len);
SyncIndex *pIndex = (SyncIndex *)key;
int64_t nowMS = taosGetTimestampMs();
......
......@@ -374,14 +374,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
cJSON *pJson = snapshotSender2Json(pSender);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
int32_t len = 256;
char *s = taosMemoryMalloc(len);
char * s = taosMemoryMalloc(len);
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
char host[64];
......@@ -644,7 +644,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
cJSON_AddStringToObject(pFromId, "addr", u64buf);
{
uint64_t u64 = pReceiver->fromId.addr;
cJSON *pTmp = pFromId;
cJSON * pTmp = pFromId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
......@@ -677,14 +677,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
cJSON *pJson = snapshotReceiver2Json(pReceiver);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) {
int32_t len = 256;
char *s = taosMemoryMalloc(len);
char * s = taosMemoryMalloc(len);
SRaftId fromId = pReceiver->fromId;
char host[128];
......
......@@ -54,15 +54,15 @@ void test1() {
SyncAppendEntriesBatch *pMsg = createMsg();
syncAppendEntriesBatchLog2((char *)"==test1==", pMsg);
/*
SOffsetAndContLen *metaArr = syncAppendEntriesBatchMetaTableArray(pMsg);
int32_t retArrSize = pMsg->dataCount;
for (int i = 0; i < retArrSize; ++i) {
SSyncRaftEntry *pEntry = (SSyncRaftEntry*)(pMsg->data + metaArr[i].offset);
ASSERT(pEntry->bytes == metaArr[i].contLen);
syncEntryPrint(pEntry);
}
*/
/*
SOffsetAndContLen *metaArr = syncAppendEntriesBatchMetaTableArray(pMsg);
int32_t retArrSize = pMsg->dataCount;
for (int i = 0; i < retArrSize; ++i) {
SSyncRaftEntry *pEntry = (SSyncRaftEntry*)(pMsg->data + metaArr[i].offset);
ASSERT(pEntry->bytes == metaArr[i].contLen);
syncEntryPrint(pEntry);
}
*/
syncAppendEntriesBatchDestroy(pMsg);
}
......
......@@ -114,7 +114,7 @@ int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32
return 0;
}
int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void *pParam, void** ppWriter) {
int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter) {
*ppWriter = (void*)0xCDEF;
char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartWrite== pFsm:%p, *ppWriter:%p", pFsm, *ppWriter);
......
......@@ -8,11 +8,10 @@ void print(SHashObj *pNextIndex) {
printf("----------------\n");
uint64_t *p = (uint64_t *)taosHashIterate(pNextIndex, NULL);
while (p) {
size_t len;
void* key = taosHashGetKey(p, &len);
void * key = taosHashGetKey(p, &len);
SRaftId *pRaftId = (SRaftId*)key;
SRaftId *pRaftId = (SRaftId *)key;
printf("key:<%lu, %d>, value:%lu \n", pRaftId->addr, pRaftId->vgId, *p);
p = (uint64_t *)taosHashIterate(pNextIndex, p);
......
......@@ -26,6 +26,7 @@ SRaftCfg* createRaftCfg() {
snprintf(((pCfg->cfg.nodeInfo)[i]).nodeFqdn, sizeof(((pCfg->cfg.nodeInfo)[i]).nodeFqdn), "100.200.300.%d", i);
}
pCfg->isStandBy = taosGetTimestampSec() % 100;
pCfg->batchSize = taosGetTimestampSec() % 100;
pCfg->configIndexCount = 5;
for (int i = 0; i < MAX_CONFIG_INDEX_COUNT; ++i) {
......@@ -84,6 +85,7 @@ void test3() {
SRaftCfgMeta meta;
meta.isStandBy = 7;
meta.snapshotStrategy = 9;
meta.batchSize = 10;
meta.lastConfigIndex = 789;
raftCfgCreateFile(pCfg, meta, s);
printf("%s create json file: %s \n", (char*)__FUNCTION__, s);
......@@ -109,6 +111,7 @@ void test5() {
pCfg->cfg.myIndex = taosGetTimestampSec();
pCfg->isStandBy += 2;
pCfg->snapshotStrategy += 3;
pCfg->batchSize += 4;
pCfg->lastConfigIndex += 1000;
pCfg->configIndexCount = 5;
......
......@@ -74,7 +74,7 @@ void syncRespMgrGetAndDelTest(uint64_t i) {
}
SSyncNode *createSyncNode() {
SSyncNode *pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode));
SSyncNode *pSyncNode = (SSyncNode *)taosMemoryMalloc(sizeof(SSyncNode));
memset(pSyncNode, 0, sizeof(SSyncNode));
return pSyncNode;
}
......
......@@ -29,7 +29,7 @@ int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void** ppReader) { return 0; }
int32_t SnapshotStopRead(struct SSyncFSM* pFsm, void* pReader) { return 0; }
int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len) { return 0; }
int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void *pParam, void** ppWriter) { return 0; }
int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter) { return 0; }
int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) { return 0; }
int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len) { return 0; }
......
......@@ -111,7 +111,7 @@ int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32
return 0;
}
int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void *pParam, void** ppWriter) {
int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter) {
*ppWriter = (void*)0xCDEF;
char logBuf[256] = {0};
......@@ -314,18 +314,18 @@ int main(int argc, char** argv) {
exit(-1);
}
int32_t replicaNum = atoi(argv[1]);
int32_t myIndex = atoi(argv[2]);
ESyncStrategy enableSnapshot = (ESyncStrategy)atoi(argv[3]);
int32_t lastApplyIndex = atoi(argv[4]);
int32_t lastApplyTerm = atoi(argv[5]);
int32_t writeRecordNum = atoi(argv[6]);
bool isStandBy = atoi(argv[7]);
int32_t isConfigChange = atoi(argv[8]);
int32_t iterTimes = atoi(argv[9]);
int32_t finishLastApplyIndex = atoi(argv[10]);
int32_t finishLastApplyTerm = atoi(argv[11]);
int32_t leaderTransfer = atoi(argv[12]);
int32_t replicaNum = atoi(argv[1]);
int32_t myIndex = atoi(argv[2]);
ESyncStrategy enableSnapshot = (ESyncStrategy)atoi(argv[3]);
int32_t lastApplyIndex = atoi(argv[4]);
int32_t lastApplyTerm = atoi(argv[5]);
int32_t writeRecordNum = atoi(argv[6]);
bool isStandBy = atoi(argv[7]);
int32_t isConfigChange = atoi(argv[8]);
int32_t iterTimes = atoi(argv[9]);
int32_t finishLastApplyIndex = atoi(argv[10]);
int32_t finishLastApplyTerm = atoi(argv[11]);
int32_t leaderTransfer = atoi(argv[12]);
sInfo(
"args: replicaNum:%d, myIndex:%d, enableSnapshot:%d, lastApplyIndex:%d, lastApplyTerm:%d, writeRecordNum:%d, "
......
此差异已折叠。
......@@ -419,12 +419,66 @@ class TDTestCase:
tdSql.checkData(3,0,4)
tdSql.query("select csum(abs(c1))+2 from t1 ")
tdSql.checkRows(4)
def csum_support_stable(self):
tdSql.query(" select csum(1) from stb1 ")
tdSql.checkRows(70)
tdSql.query("select csum(c1) from stb1 partition by tbname ")
tdSql.checkRows(40)
# tdSql.query("select csum(st1) from stb1 partition by tbname")
# tdSql.checkRows(70)
tdSql.query("select csum(st1+c1) from stb1 partition by tbname")
tdSql.checkRows(40)
tdSql.query("select csum(st1+c1) from stb1 partition by tbname")
tdSql.checkRows(40)
tdSql.query("select csum(st1+c1) from stb1 partition by tbname")
tdSql.checkRows(40)
# # bug need fix
# tdSql.query("select csum(st1+c1) from stb1 partition by tbname slimit 1 ")
# tdSql.checkRows(4)
# tdSql.error("select csum(st1+c1) from stb1 partition by tbname limit 1 ")
# bug need fix
tdSql.query("select csum(st1+c1) from stb1 partition by tbname")
tdSql.checkRows(40)
# bug need fix
# tdSql.query("select tbname , csum(c1) from stb1 partition by tbname")
# tdSql.checkRows(40)
# tdSql.query("select tbname , csum(st1) from stb1 partition by tbname")
# tdSql.checkRows(70)
# tdSql.query("select tbname , csum(st1) from stb1 partition by tbname slimit 1")
# tdSql.checkRows(7)
# partition by tags
# tdSql.query("select st1 , csum(c1) from stb1 partition by st1")
# tdSql.checkRows(40)
# tdSql.query("select csum(c1) from stb1 partition by st1")
# tdSql.checkRows(40)
# tdSql.query("select st1 , csum(c1) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(4)
# tdSql.query("select csum(c1) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(4)
# partition by col
# tdSql.query("select c1 , csum(c1) from stb1 partition by c1")
# tdSql.checkRows(41)
# tdSql.query("select csum(c1) from stb1 partition by c1")
# tdSql.checkRows(41)
# tdSql.query("select c1 , csum(c1) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(4)
# tdSql.query("select csum(c1) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(4)
def run(self):
import traceback
try:
# run in develop branch
self.csum_test_run()
self.csum_support_stable()
pass
except Exception as e:
traceback.print_exc()
......
......@@ -355,9 +355,63 @@ class TDTestCase:
tdSql.execute(f"create table tt{i} using stb2 tags({i})")
pass
def diff_support_stable(self):
tdSql.query(" select diff(1) from stb1 ")
tdSql.checkRows(229)
tdSql.checkData(0,0,0)
tdSql.query("select diff(c1) from stb1 partition by tbname ")
tdSql.checkRows(199)
# tdSql.query("select diff(st1) from stb1 partition by tbname")
# tdSql.checkRows(229)
tdSql.query("select diff(st1+c1) from stb1 partition by tbname")
tdSql.checkRows(199)
tdSql.query("select diff(st1+c1) from stb1 partition by tbname")
tdSql.checkRows(199)
tdSql.query("select diff(st1+c1) from stb1 partition by tbname")
tdSql.checkRows(199)
# # bug need fix
# tdSql.query("select diff(st1+c1) from stb1 partition by tbname slimit 1 ")
# tdSql.checkRows(19)
# tdSql.error("select diff(st1+c1) from stb1 partition by tbname limit 1 ")
# bug need fix
tdSql.query("select diff(st1+c1) from stb1 partition by tbname")
tdSql.checkRows(199)
# bug need fix
# tdSql.query("select tbname , diff(c1) from stb1 partition by tbname")
# tdSql.checkRows(199)
# tdSql.query("select tbname , diff(st1) from stb1 partition by tbname")
# tdSql.checkRows(199)
# tdSql.query("select tbname , diff(st1) from stb1 partition by tbname slimit 1")
# tdSql.checkRows(19)
# partition by tags
# tdSql.query("select st1 , diff(c1) from stb1 partition by st1")
# tdSql.checkRows(199)
# tdSql.query("select diff(c1) from stb1 partition by st1")
# tdSql.checkRows(199)
# tdSql.query("select st1 , diff(c1) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(19)
# tdSql.query("select diff(c1) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(19)
# partition by col
# tdSql.query("select c1 , diff(c1) from stb1 partition by c1")
# tdSql.checkRows(199)
# tdSql.query("select diff(c1) from stb1 partition by c1")
# tdSql.checkRows(41)
# tdSql.query("select c1 , diff(c1) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(19)
# tdSql.query("select diff(c1) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(19)
def diff_test_run(self) :
tdLog.printNoPrefix("==========TD-10594==========")
tdLog.printNoPrefix("==========run test case for diff function==========")
tbnum = 10
nowtime = int(round(time.time() * 1000))
per_table_rows = 10
......@@ -422,6 +476,7 @@ class TDTestCase:
try:
# run in develop branch
self.diff_test_run()
self.diff_support_stable()
pass
except Exception as e:
traceback.print_exc()
......
......@@ -673,11 +673,65 @@ class TDTestCase:
tdSql.query("select mavg(abs(c1),1) from t1")
tdSql.checkRows(4)
def mavg_support_stable(self):
tdSql.query(" select mavg(1,3) from stb1 ")
tdSql.checkRows(68)
tdSql.checkData(0,0,1.000000000)
tdSql.query("select mavg(c1,3) from stb1 partition by tbname ")
tdSql.checkRows(38)
# tdSql.query("select mavg(st1,3) from stb1 partition by tbname")
# tdSql.checkRows(38)
tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname")
tdSql.checkRows(38)
tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname")
tdSql.checkRows(38)
tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname")
tdSql.checkRows(38)
# # bug need fix
# tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname slimit 1 ")
# tdSql.checkRows(2)
# tdSql.error("select mavg(st1+c1,3) from stb1 partition by tbname limit 1 ")
# bug need fix
tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname")
tdSql.checkRows(38)
# bug need fix
# tdSql.query("select tbname , mavg(c1,3) from stb1 partition by tbname")
# tdSql.checkRows(38)
# tdSql.query("select tbname , mavg(st1,3) from stb1 partition by tbname")
# tdSql.checkRows(38)
# tdSql.query("select tbname , mavg(st1,3) from stb1 partition by tbname slimit 1")
# tdSql.checkRows(2)
# partition by tags
# tdSql.query("select st1 , mavg(c1,3) from stb1 partition by st1")
# tdSql.checkRows(38)
# tdSql.query("select mavg(c1,3) from stb1 partition by st1")
# tdSql.checkRows(38)
# tdSql.query("select st1 , mavg(c1,3) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(2)
# tdSql.query("select mavg(c1,3) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(2)
# partition by col
# tdSql.query("select c1 , mavg(c1,3) from stb1 partition by c1")
# tdSql.checkRows(38)
# tdSql.query("select mavg(c1 ,3) from stb1 partition by c1")
# tdSql.checkRows(38)
# tdSql.query("select c1 , mavg(c1,3) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(2)
# tdSql.query("select diff(c1) from stb1 partition by st1 slimit 1")
# tdSql.checkRows(2)
def run(self):
import traceback
try:
# run in develop branch
self.mavg_test_run()
self.mavg_support_stable()
pass
except Exception as e:
traceback.print_exc()
......
......@@ -798,6 +798,36 @@ class TDTestCase:
tdSql.query("select sample(c1,100)+2 from ct1")
tdSql.query("select abs(sample(c1,100)) from ct1")
# support stable and tbname
tdSql.query("select tbname ,sample(c1,2) from stb1 partition by tbname order by tbname")
tdSql.checkRows(4)
tdSql.checkData(0,0,'ct1')
tdSql.checkData(3,0,'ct4')
# # bug need fix
# tdSql.query(" select tbname ,c1 ,t1, sample(c1,2) from stb1 partition by tbname order by tbname ")
# tdSql.checkRows(4)
# tdSql.checkData(0,0,'ct1')
# tdSql.checkData(3,0,'ct4')
# tdSql.checkData(0,2,1)
# tdSql.checkData(3,2,4)
tdSql.query(" select tbname ,c1 ,t1, sample(c1,2) from stb1 partition by t1 order by t1 ")
tdSql.checkRows(4)
tdSql.checkData(0,0,'ct1')
tdSql.checkData(3,0,'ct4')
tdSql.checkData(0,2,1)
tdSql.checkData(3,2,4)
# bug need fix
# tdSql.query(" select tbname ,c1 ,t1, sample(c1,2) from stb1 partition by c1 order by c1 ")
# tdSql.checkRows(21)
# bug need fix
# tdSql.query(" select sample(c1,2) from stb1 partition by c1 ")
# tdSql.checkRows(21)
def sample_test_run(self) :
tdLog.printNoPrefix("==========support sample function==========")
tbnum = 10
......
......@@ -18,6 +18,7 @@ class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
self.ts = 1420041600000 # 2015-01-01 00:00:00 this is begin time for first record
def prepare_datas(self):
tdSql.execute(
......@@ -344,6 +345,8 @@ class TDTestCase:
tdSql.error("select stateduration(c1,'GT',1,1b) from ct1")
tdSql.error("select stateduration(c1,'GT',1,1u) from ct1")
tdSql.error("select stateduration(c1,'GT',1,1000s) from t1")
tdSql.error("select stateduration(c1,'GT',1,10m) from t1")
tdSql.error("select stateduration(c1,'GT',1,10d) from t1")
tdSql.query("select stateduration(c1,'GT',1,1s) from t1")
tdSql.checkData(10,0,63072035)
tdSql.query("select stateduration(c1,'GT',1,1m) from t1")
......@@ -355,6 +358,58 @@ class TDTestCase:
tdSql.query("select stateduration(c1,'GT',1,1w) from t1")
tdSql.checkData(10,0,int(63072035/60/7/24/60))
def query_precision(self):
def generate_data(precision="ms"):
tdSql.execute("create database if not exists db_%s precision '%s';" %(precision, precision))
tdSql.execute("use db_%s;" %precision)
tdSql.execute("create stable db_%s.st (ts timestamp , id int) tags(ind int);"%precision)
tdSql.execute("create table db_%s.tb1 using st tags(1);"%precision)
tdSql.execute("create table db_%s.tb2 using st tags(2);"%precision)
if precision == "ms":
start_ts = self.ts
step = 10000
elif precision == "us":
start_ts = self.ts*1000
step = 10000000
elif precision == "ns":
start_ts = self.ts*1000000
step = 10000000000
else:
pass
for i in range(10):
sql1 = "insert into db_%s.tb1 values (%d,%d)"%(precision ,start_ts+i*step,i)
sql2 = "insert into db_%s.tb1 values (%d,%d)"%(precision, start_ts+i*step,i)
tdSql.execute(sql1)
tdSql.execute(sql2)
time_units = ["1s","1a","1u","1b"]
precision_list = ["ms","us","ns"]
for pres in precision_list:
generate_data(pres)
for index,unit in enumerate(time_units):
if pres == "ms":
if unit in ["1u","1b"]:
tdSql.error("select stateduration(id,'GT',1,%s) from db_%s.tb1 "%(unit,pres))
pass
else:
tdSql.query("select stateduration(id,'GT',1,%s) from db_%s.tb1 "%(unit,pres))
elif pres == "us" and unit in ["1b"]:
if unit in ["1b"]:
tdSql.error("select stateduration(id,'GT',1,%s) from db_%s.tb1 "%(unit,pres))
pass
else:
tdSql.query("select stateduration(id,'GT',1,%s) from db_%s.tb1 "%(unit,pres))
else:
tdSql.query("select stateduration(id,'GT',1,%s) from db_%s.tb1 "%(unit,pres))
basic_result = 70
tdSql.checkData(9,0,basic_result*pow(1000,index))
def check_boundary_values(self):
......@@ -420,6 +475,8 @@ class TDTestCase:
tdLog.printNoPrefix("==========step6: statecount unit time test ============")
self.check_unit_time()
self.query_precision()
def stop(self):
......
......@@ -337,7 +337,7 @@ class TDTestCase:
tdSql.checkData(2,0,5)
# nest query
# tdSql.query("select tail(c1,2) from (select c1 from ct1)")
# tdSql.query("select tail(c1,2) from (select _rowts , c1 from ct1)")
tdSql.query("select c1 from (select tail(c1,2) c1 from ct4) order by 1 nulls first")
tdSql.checkRows(2)
tdSql.checkData(0, 0, None)
......@@ -363,10 +363,59 @@ class TDTestCase:
tdSql.error("select tail(c1,2) from ct1 group by tbname")
# super table
tdSql.error("select tbname , tail(c1,2) from stb1 group by tbname")
tdSql.query("select tail(c1,2) from stb1 partition by tbname")
tdSql.checkRows(4)
# bug need fix
# tdSql.query("select tbname , tail(c1,2) from stb1 partition by tbname")
# tdSql.checkRows(4)
# tdSql.query("select tbname , tail(c1,2) from stb1 partition by tbname order by tbname")
# tdSql.checkRows(4)
# tdSql.query(" select tbname , count(c1) from stb1 partition by tbname order by tbname ")
# tdSql.checkRows(2)
# tdSql.query(" select tbname , max(c1) ,c1 from stb1 partition by tbname order by tbname ")
# tdSql.checkRows(2)
# tdSql.query(" select tbname ,first(c1) from stb1 partition by tbname order by tbname ")
# tdSql.checkRows(2)
tdSql.query("select tail(c1,2) from stb1 partition by tbname")
tdSql.checkRows(4)
# # bug need fix
# tdSql.query(" select tbname , tail(c1,2) from stb1 where t1 = 0 partition by tbname ")
# tdSql.checkRows(2)
# tdSql.query(" select tbname , tail(c1,2) from stb1 where t1 = 0 partition by tbname order by tbname ")
# tdSql.checkRows(2)
# tdSql.query(" select tbname , tail(c1,2) from stb1 where c1 = 0 partition by tbname order by tbname ")
# tdSql.checkRows(3)
# tdSql.query(" select tbname , tail(c1,2) from stb1 where c1 = 0 partition by tbname ")
# tdSql.checkRows(3)
# tdSql.query(" select tbname , tail(c1,2) from stb1 where c1 = 0 partition by tbname ")
# tdSql.checkRows(3)
tdSql.query(" select tail(t1,2) from stb1 ")
tdSql.checkRows(2)
tdSql.query(" select tail(t1+c1,2) from stb1 ")
tdSql.checkRows(2)
tdSql.query(" select tail(t1+c1,2) from stb1 partition by tbname ")
tdSql.checkRows(4)
tdSql.query(" select tail(t1,2) from stb1 partition by tbname ")
tdSql.checkRows(4)
# nest query
tdSql.query(" select tail(c1,2) from (select _rowts , t1 ,c1 , tbname from stb1 ) ")
tdSql.checkRows(2)
tdSql.checkData(0,0,None)
tdSql.checkData(1,0,9)
tdSql.query("select tail(t1,2) from (select _rowts , t1 , tbname from stb1 )")
tdSql.checkRows(2)
tdSql.checkData(0,0,4)
tdSql.checkData(1,0,1)
def check_boundary_values(self):
tdSql.execute("drop database if exists bound_test")
......
......@@ -386,10 +386,60 @@ class TDTestCase:
tdSql.error("select unique(c1) from ct1 group by tbname")
# super table
# super table
tdSql.error("select tbname , tail(c1,2) from stb1 group by tbname")
tdSql.query("select tail(c1,2) from stb1 partition by tbname")
tdSql.checkRows(4)
# bug need fix
# tdSql.query("select tbname , tail(c1,2) from stb1 partition by tbname")
# tdSql.checkRows(4)
# tdSql.query("select tbname , tail(c1,2) from stb1 partition by tbname order by tbname")
# tdSql.checkRows(4)
# tdSql.query(" select tbname , count(c1) from stb1 partition by tbname order by tbname ")
# tdSql.checkRows(2)
# tdSql.query(" select tbname , max(c1) ,c1 from stb1 partition by tbname order by tbname ")
# tdSql.checkRows(2)
# tdSql.query(" select tbname ,first(c1) from stb1 partition by tbname order by tbname ")
# tdSql.checkRows(2)
tdSql.query("select tail(c1,2) from stb1 partition by tbname")
tdSql.checkRows(4)
# # bug need fix
# tdSql.query(" select tbname , unique(c1) from stb1 where t1 = 0 partition by tbname ")
# tdSql.checkRows(2)
# tdSql.query(" select tbname , unique(c1) from stb1 where t1 = 0 partition by tbname order by tbname ")
# tdSql.checkRows(2)
# tdSql.query(" select tbname , unique(c1) from stb1 where c1 = 0 partition by tbname order by tbname ")
# tdSql.checkRows(3)
# tdSql.query(" select tbname , unique(c1) from stb1 where c1 = 0 partition by tbname ")
# tdSql.checkRows(3)
tdSql.query(" select unique(t1) from stb1 ")
tdSql.checkRows(2)
tdSql.query(" select unique(t1+c1) from stb1 ")
tdSql.checkRows(13)
tdSql.query(" select unique(t1+c1) from stb1 partition by tbname ")
tdSql.checkRows(13)
tdSql.query(" select unique(t1) from stb1 partition by tbname ")
tdSql.checkRows(2)
# nest query
tdSql.query(" select unique(c1) from (select _rowts , t1 ,c1 , tbname from stb1 ) ")
tdSql.checkRows(11)
tdSql.checkData(0,0,6)
tdSql.checkData(10,0,3)
tdSql.query("select unique(t1) from (select _rowts , t1 , tbname from stb1 )")
tdSql.checkRows(2)
tdSql.checkData(0,0,4)
tdSql.checkData(1,0,1)
def check_boundary_values(self):
tdSql.execute("drop database if exists bound_test")
......
......@@ -96,7 +96,7 @@ python3 ./test.py -f 2-query/query_cols_tags_and_or.py
# python3 ./test.py -f 2-query/nestedQuery_str.py
python3 ./test.py -f 2-query/avg.py
#python3 ./test.py -f 2-query/elapsed.py
python3 ./test.py -f 2-query/elapsed.py
python3 ./test.py -f 2-query/csum.py
python3 ./test.py -f 2-query/mavg.py
python3 ./test.py -f 2-query/diff.py
......@@ -118,6 +118,7 @@ python3 ./test.py -f 2-query/distribute_agg_avg.py
python3 ./test.py -f 2-query/distribute_agg_stddev.py
python3 ./test.py -f 2-query/twa.py
python3 ./test.py -f 2-query/irate.py
python3 ./test.py -f 2-query/and_or_for_byte.py
python3 ./test.py -f 2-query/function_null.py
python3 ./test.py -f 2-query/queryQnode.py
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册