提交 9e854d64 编写于 作者: D dapan1121

Merge branch 'feature/scheduler' of github.com:taosdata/TDengine into feature/scheduler

......@@ -199,6 +199,7 @@ typedef struct SIndexOptions {
typedef struct SCreateIndexStmt {
ENodeType type;
EIndexType indexType;
bool ignoreExists;
char indexName[TSDB_INDEX_NAME_LEN];
char tableName[TSDB_TABLE_NAME_LEN];
SNodeList* pCols;
......@@ -207,6 +208,7 @@ typedef struct SCreateIndexStmt {
typedef struct SDropIndexStmt {
ENodeType type;
bool ignoreNotExists;
char indexName[TSDB_INDEX_NAME_LEN];
char tableName[TSDB_TABLE_NAME_LEN];
} SDropIndexStmt;
......
......@@ -180,6 +180,10 @@ static int32_t taosSetTfsCfg(SConfig *pCfg) {
memcpy(&tsDiskCfg[index], pCfg, sizeof(SDiskCfg));
if (pCfg->level == 0 && pCfg->primary == 1) {
tstrncpy(tsDataDir, pCfg->dir, PATH_MAX);
if (taosMkDir(tsDataDir) != 0) {
uError("failed to create dataDir:%s since %s", tsDataDir, terrstr());
return -1;
}
}
if (taosMkDir(pCfg->dir) != 0) {
uError("failed to create tfsDir:%s since %s", tsDataDir, terrstr());
......
......@@ -77,7 +77,11 @@ int32_t bmStartWorker(SBnodeMgmt *pMgmt) {
return -1;
}
dDebug("bnode workers are initialized");
return 0;
}
void bmStopWorker(SBnodeMgmt *pMgmt) { tMultiWorkerCleanup(&pMgmt->writeWorker); }
void bmStopWorker(SBnodeMgmt *pMgmt) {
tMultiWorkerCleanup(&pMgmt->writeWorker);
dDebug("bnode workers are closed");
}
......@@ -19,7 +19,6 @@
#include "os.h"
#include "cJSON.h"
#include "monitor.h"
#include "tcache.h"
#include "tcrc32c.h"
#include "tdatablock.h"
......@@ -36,8 +35,7 @@
#include "tworker.h"
#include "dnode.h"
#include "tfs.h"
#include "wal.h"
#include "monitor.h"
#ifdef __cplusplus
extern "C" {
......@@ -53,7 +51,6 @@ extern "C" {
typedef enum { DNODE, VNODES, QNODE, SNODE, MNODE, BNODE, NODE_MAX } ENodeType;
typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus;
typedef enum { DND_ENV_INIT, DND_ENV_READY, DND_ENV_CLEANUP } EEnvStatus;
typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EWorkerType;
typedef enum { PROC_SINGLE, PROC_CHILD, PROC_PARENT } EProcType;
typedef struct SMgmtFp SMgmtFp;
......@@ -127,29 +124,30 @@ typedef struct SDnode {
bool dropped;
EDndStatus status;
EDndEvent event;
EProcType procType;
SStartupReq startup;
TdFilePtr pLockFile;
STransMgmt trans;
SMgmtWrapper wrappers[NODE_MAX];
} SDnode;
EDndStatus dndGetStatus(SDnode *pDnode);
void dndSetStatus(SDnode *pDnode, EDndStatus stat);
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType);
void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp, int32_t vgId);
void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc);
void dndSendMonitorReport(SDnode *pDnode);
EDndStatus dndGetStatus(SDnode *pDnode);
void dndSetStatus(SDnode *pDnode, EDndStatus stat);
void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp, int32_t vgId);
void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc);
void dndSendMonitorReport(SDnode *pDnode);
int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pMsg);
void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp);
int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed);
int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed);
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType);
int32_t dndMarkWrapper(SMgmtWrapper *pWrapper);
void dndReleaseWrapper(SMgmtWrapper *pWrapper);
#ifdef __cplusplus
}
#endif
......
......@@ -34,7 +34,7 @@ int32_t dndInit();
void dndCleanup();
const char *dndStatStr(EDndStatus stat);
void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup);
TdFilePtr dndCheckRunning(char *dataDir);
TdFilePtr dndCheckRunning(const char *dataDir);
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
// dndMsg.c
......@@ -50,10 +50,6 @@ SDnode *dndCreate(const SDnodeOpt *pOption);
void dndClose(SDnode *pDnode);
void dndHandleEvent(SDnode *pDnode, EDndEvent event);
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType);
int32_t dndMarkWrapper(SMgmtWrapper *pWrapper);
void dndReleaseWrapper(SMgmtWrapper *pWrapper);
// dndTransport.c
int32_t dndInitServer(SDnode *pDnode);
void dndCleanupServer(SDnode *pDnode);
......
......@@ -50,16 +50,23 @@ int32_t dndOpenNode(SMgmtWrapper *pWrapper) {
}
void dndCloseNode(SMgmtWrapper *pWrapper) {
dDebug("node:%s, start to close", pWrapper->name);
taosWLockLatch(&pWrapper->latch);
if (pWrapper->deployed) {
(*pWrapper->fp.closeFp)(pWrapper);
pWrapper->deployed = false;
}
taosWUnLockLatch(&pWrapper->latch);
while (pWrapper->refCount > 0) {
taosMsleep(10);
}
if (pWrapper->pProc) {
taosProcCleanup(pWrapper->pProc);
pWrapper->pProc = NULL;
}
taosWUnLockLatch(&pWrapper->latch);
dDebug("node:%s, has been closed", pWrapper->name);
}
static int32_t dndRunInSingleProcess(SDnode *pDnode) {
......
......@@ -15,11 +15,12 @@
#define _DEFAULT_SOURCE
#include "dndInt.h"
#include "wal.h"
static int8_t once = DND_ENV_INIT;
int32_t dndInit() {
dDebug("start to init dnode env");
dInfo("start to init dnode env");
if (atomic_val_compare_exchange_8(&once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) {
terrno = TSDB_CODE_REPEAT_INIT;
dError("failed to init dnode env since %s", terrstr());
......@@ -52,7 +53,7 @@ int32_t dndInit() {
}
void dndCleanup() {
dDebug("start to cleanup dnode env");
dInfo("start to cleanup dnode env");
if (atomic_val_compare_exchange_8(&once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) {
dError("dnode env is already cleaned up");
return;
......@@ -92,7 +93,7 @@ const char *dndStatStr(EDndStatus status) {
}
}
void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc) {
void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) {
SStartupReq *pStartup = &pDnode->startup;
tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
......@@ -104,21 +105,21 @@ void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) {
pStartup->finished = (dndGetStatus(pDnode) == DND_STAT_RUNNING);
}
TdFilePtr dndCheckRunning(char *dataDir) {
TdFilePtr dndCheckRunning(const char *dataDir) {
char filepath[PATH_MAX] = {0};
snprintf(filepath, sizeof(filepath), "%s/.running", dataDir);
TdFilePtr pFile = taosOpenFile(filepath, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to lock file:%s since %s, quit", filepath, terrstr());
dError("failed to lock file:%s since %s", filepath, terrstr());
return NULL;
}
int32_t ret = taosLockFile(pFile);
if (ret != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to lock file:%s since %s, quit", filepath, terrstr());
dError("failed to lock file:%s since %s", filepath, terrstr());
taosCloseFile(&pFile);
return NULL;
}
......@@ -129,12 +130,10 @@ TdFilePtr dndCheckRunning(char *dataDir) {
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
dDebug("startup req is received");
SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq));
dndGetStartup(pDnode, pStartup);
dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
SRpcMsg rpcRsp = {
.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq), .ahandle = pReq->ahandle};
rpcSendResponse(&rpcRsp);
......
......@@ -22,7 +22,12 @@ static int32_t dndGetMonitorDiskInfo(SDnode *pDnode, SMonDiskInfo *pInfo) {
tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name));
pInfo->tempdir.size = tsTempSpace.size;
return vmMonitorTfsInfo(dndAcquireWrapper(pDnode, VNODES), pInfo);
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, VNODES);
if (pWrapper != NULL) {
vmMonitorTfsInfo(pWrapper, pInfo);
dndReleaseWrapper(pWrapper);
}
return 0;
}
static void dndGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) {
......@@ -45,8 +50,17 @@ static void dndGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) {
taosGetCardInfo(&pInfo->net_in, &pInfo->net_out);
taosGetProcIO(&pInfo->io_read, &pInfo->io_write, &pInfo->io_read_disk, &pInfo->io_write_disk);
vmMonitorVnodeReqs(dndAcquireWrapper(pDnode, VNODES), pInfo);
pInfo->has_mnode = (dndAcquireWrapper(pDnode, MNODE)->required);
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, VNODES);
if (pWrapper != NULL) {
vmMonitorVnodeReqs(pWrapper, pInfo);
dndReleaseWrapper(pWrapper);
}
pWrapper = dndAcquireWrapper(pDnode, MNODE);
if (pWrapper != NULL) {
pInfo->has_mnode = pWrapper->required;
dndReleaseWrapper(pWrapper);
}
}
void dndSendMonitorReport(SDnode *pDnode) {
......@@ -63,10 +77,15 @@ void dndSendMonitorReport(SDnode *pDnode) {
SMonClusterInfo clusterInfo = {0};
SMonVgroupInfo vgroupInfo = {0};
SMonGrantInfo grantInfo = {0};
if (mmMonitorMnodeInfo(dndAcquireWrapper(pDnode, MNODE), &clusterInfo, &vgroupInfo, &grantInfo) == 0) {
monSetClusterInfo(pMonitor, &clusterInfo);
monSetVgroupInfo(pMonitor, &vgroupInfo);
monSetGrantInfo(pMonitor, &grantInfo);
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, MNODE);
if (pWrapper != NULL) {
if (mmMonitorMnodeInfo(pWrapper, &clusterInfo, &vgroupInfo, &grantInfo) == 0) {
monSetClusterInfo(pMonitor, &clusterInfo);
monSetVgroupInfo(pMonitor, &vgroupInfo);
monSetGrantInfo(pMonitor, &grantInfo);
}
dndReleaseWrapper(pWrapper);
}
SMonDnodeInfo dnodeInfo = {0};
......
......@@ -20,8 +20,8 @@ static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE);
if (pWrapper != NULL) {
dmUpdateMnodeEpSet(pWrapper->pMgmt, pEpSet);
dndReleaseWrapper(pWrapper);
}
dndReleaseWrapper(pWrapper);
}
static inline NodeMsgFp dndGetMsgFp(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
......
......@@ -58,7 +58,7 @@ static void dndClearMemory(SDnode *pDnode) {
SDnode *dndCreate(const SDnodeOpt *pOption) {
dInfo("start to create dnode object");
int32_t code = -1;
char path[PATH_MAX];
char path[PATH_MAX] = {0};
SDnode *pDnode = NULL;
pDnode = calloc(1, sizeof(SDnode));
......
......@@ -146,9 +146,14 @@ static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
static void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pRpcMsg, SRpcMsg *pRpcRsp) {
STransMgmt *pMgmt = &pDnode->trans;
SEpSet epSet = {0};
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE);
if (pWrapper != NULL) {
dmGetMnodeEpSet(pWrapper->pMgmt, &epSet);
dndReleaseWrapper(pWrapper);
}
SEpSet epSet = {0};
dmGetMnodeEpSet(dndAcquireWrapper(pDnode, DNODE)->pMgmt, &epSet);
rpcSendRecv(pMgmt->clientRpc, &epSet, pRpcMsg, pRpcRsp);
}
......@@ -182,9 +187,14 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
return 0;
}
if (mmGetUserAuth(dndAcquireWrapper(pDnode, MNODE), user, spi, encrypt, secret, ckey) == 0) {
dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt);
return 0;
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, MNODE);
if (pWrapper != NULL) {
if (mmGetUserAuth(pWrapper, user, spi, encrypt, secret, ckey) == 0) {
dndReleaseWrapper(pWrapper);
dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt);
return 0;
}
dndReleaseWrapper(pWrapper);
}
if (terrno != TSDB_CODE_APP_NOT_READY) {
......@@ -271,7 +281,7 @@ int32_t dndInitMsgHandle(SDnode *pDnode) {
int32_t vgId = pWrapper->msgVgIds[msgIndex];
if (msgFp == NULL) continue;
dTrace("msg:%s will be processed by %s, vgId:%d", tMsgInfo[msgIndex], pWrapper->name, vgId);
// dTrace("msg:%s will be processed by %s, vgId:%d", tMsgInfo[msgIndex], pWrapper->name, vgId);
SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex];
if (vgId == QND_VGID) {
......@@ -328,7 +338,12 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) {
SDnode *pDnode = pWrapper->pDnode;
STransMgmt *pTrans = &pDnode->trans;
SEpSet epSet = {0};
dmGetMnodeEpSet(dndAcquireWrapper(pDnode, DNODE)->pMgmt, &epSet);
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE);
if (pWrapper != NULL) {
dmGetMnodeEpSet(pWrapper->pMgmt, &epSet);
dndReleaseWrapper(pWrapper);
}
return dndSendRpcReq(pTrans, &epSet, pReq);
}
}
......@@ -336,7 +351,12 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) {
void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
if (pRsp->code == TSDB_CODE_APP_NOT_READY) {
SMgmtWrapper *pDnodeWrapper = dndAcquireWrapper(pWrapper->pDnode, DNODE);
dmSendRedirectRsp(pDnodeWrapper->pMgmt, pRsp);
if (pDnodeWrapper != NULL) {
dmSendRedirectRsp(pDnodeWrapper->pMgmt, pRsp);
dndReleaseWrapper(pDnodeWrapper);
} else {
rpcSendResponse(pRsp);
}
} else {
rpcSendResponse(pRsp);
}
......
......@@ -209,7 +209,7 @@ int32_t dmWriteFile(SDnodeMgmt *pMgmt) {
}
pMgmt->updateTime = taosGetTimestampMs();
dDebug("successed to write %s", file);
dDebug("successed to write %s", realfile);
return 0;
}
......
......@@ -74,14 +74,14 @@ void dmSendRedirectRsp(SDnodeMgmt *pMgmt, SRpcMsg *pReq) {
}
static int32_t dmStart(SMgmtWrapper *pWrapper) {
dDebug("dnode mgmt start to run");
dDebug("dnode-mgmt start to run");
return dmStartThread(pWrapper->pMgmt);
}
int32_t dmInit(SMgmtWrapper *pWrapper) {
SDnode *pDnode = pWrapper->pDnode;
SDnodeMgmt *pMgmt = calloc(1, sizeof(SDnodeMgmt));
dInfo("dnode-mgmt is initialized");
dInfo("dnode-mgmt start to init");
pDnode->dnodeId = 0;
pDnode->dropped = 0;
......
......@@ -41,8 +41,12 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
taosRUnLockLatch(&pMgmt->latch);
req.pVloads = taosArrayInit(TSDB_MAX_VNODES, sizeof(SVnodeLoad));
vmMonitorVnodeLoads(dndAcquireWrapper(pDnode, VNODES), req.pVloads);
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, VNODES);
if (pWrapper != NULL) {
req.pVloads = taosArrayInit(TSDB_MAX_VNODES, sizeof(SVnodeLoad));
vmMonitorVnodeLoads(pWrapper, req.pVloads);
dndReleaseWrapper(pWrapper);
}
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen);
......
......@@ -114,6 +114,7 @@ int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
return -1;
}
dDebug("dnode workers are initialized");
return 0;
}
......@@ -136,6 +137,7 @@ void dmStopWorker(SDnodeMgmt *pMgmt) {
taosDestoryThread(pMgmt->threadId);
pMgmt->threadId = NULL;
}
dDebug("dnode workers are closed");
}
int32_t dmProcessMgmtMsg(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
......@@ -144,6 +146,6 @@ int32_t dmProcessMgmtMsg(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
pWorker = &pMgmt->statusWorker;
}
dTrace("msg:%p, will be written to worker %s", pMsg, pWorker->name);
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
return taosWriteQitem(pWorker->queue, pMsg);
}
......@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#include "mmInt.h"
#include "wal.h"
static bool mmDeployRequired(SDnode *pDnode) {
if (pDnode->dnodeId > 0) return false;
......@@ -226,7 +227,7 @@ static int32_t mmOpen(SMgmtWrapper *pWrapper) {
}
static int32_t mmStart(SMgmtWrapper *pWrapper) {
dDebug("mnode mgmt start to run");
dDebug("mnode-mgmt start to run");
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
return mndStart(pMgmt->pMnode);
}
......
......@@ -108,6 +108,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
return -1;
}
dDebug("mnode workers are initialized");
return 0;
}
......@@ -115,4 +116,5 @@ void mmStopWorker(SMnodeMgmt *pMgmt) {
tSingleWorkerCleanup(&pMgmt->readWorker);
tSingleWorkerCleanup(&pMgmt->writeWorker);
tSingleWorkerCleanup(&pMgmt->syncWorker);
dDebug("mnode workers are closed");
}
......@@ -132,10 +132,12 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
return -1;
}
dDebug("qnode workers are initialized");
return 0;
}
void qmStopWorker(SQnodeMgmt *pMgmt) {
tSingleWorkerCleanup(&pMgmt->queryWorker);
tSingleWorkerCleanup(&pMgmt->fetchWorker);
dDebug("qnode workers are closed");
}
......@@ -80,6 +80,7 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
return -1;
}
dDebug("snode workers are initialized");
return 0;
}
......@@ -90,6 +91,7 @@ void smStopWorker(SSnodeMgmt *pMgmt) {
}
taosArrayDestroy(pMgmt->uniqueWorkers);
tSingleWorkerCleanup(&pMgmt->sharedWorker);
dDebug("snode workers are closed");
}
static FORCE_INLINE int32_t smGetSWIdFromMsg(SRpcMsg *pMsg) {
......
......@@ -257,14 +257,14 @@ static void vmCleanup(SMgmtWrapper *pWrapper) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
dInfo("vnodes-mgmt start to cleanup");
dInfo("vnode-mgmt start to cleanup");
vmCloseVnodes(pMgmt);
vmStopWorker(pMgmt);
vnodeCleanup();
// walCleanUp();
free(pMgmt);
pWrapper->pMgmt = NULL;
dInfo("vnodes-mgmt is cleaned up");
dInfo("vnode-mgmt is cleaned up");
}
static int32_t vmInit(SMgmtWrapper *pWrapper) {
......@@ -272,7 +272,7 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) {
SVnodesMgmt *pMgmt = calloc(1, sizeof(SVnodesMgmt));
int32_t code = -1;
dInfo("vnodes-mgmt start to init");
dInfo("vnode-mgmt start to init");
if (pMgmt == NULL) goto _OVER;
pMgmt->path = pWrapper->path;
......@@ -312,7 +312,7 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) {
}
if (vmOpenVnodes(pMgmt) != 0) {
dError("failed to open vnodes since %s", terrstr());
dError("failed to open vnode since %s", terrstr());
return -1;
}
......
......@@ -356,7 +356,7 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
return -1;
}
dDebug("vnode workers is initialized");
dDebug("vnode workers are initialized");
return 0;
}
......@@ -366,5 +366,5 @@ void vmStopWorker(SVnodesMgmt *pMgmt) {
tQWorkerCleanup(&pMgmt->queryPool);
tWWorkerCleanup(&pMgmt->writePool);
tWWorkerCleanup(&pMgmt->syncPool);
dDebug("vnode workers is closed");
dDebug("vnode workers are closed");
}
aux_source_directory(src FUNCTION_SRC)
list(REMOVE_ITEM FUNCTION_SRC src/udfd.c)
add_library(function STATIC ${FUNCTION_SRC})
target_include_directories(
function
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/function"
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/function"
"${CMAKE_SOURCE_DIR}/contrib/libuv/include"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
function
PUBLIC uv_a
PRIVATE os util common nodes
)
\ No newline at end of file
)
add_executable(runUdf test/runUdf.c)
target_include_directories(
runUdf
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/function"
"${CMAKE_SOURCE_DIR}/contrib/libuv/include"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
runUdf
PUBLIC uv_a
PRIVATE os util common nodes function
)
add_library(udf1 MODULE test/udf1.c)
target_include_directories(
udf1
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/function"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
#SET(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR}/build/bin)
add_executable(udfd src/udfd.c)
target_include_directories(
udfd
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/function"
"${CMAKE_SOURCE_DIR}/contrib/libuv/include"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
udfd
PUBLIC uv_a
PRIVATE os util common nodes function
)
......@@ -20,68 +20,116 @@
extern "C" {
#endif
#include "os.h"
#include "taoserror.h"
//======================================================================================
//begin API to taosd and qworker
/**
* start udf dameon service
* @return error code
*/
int32_t startUdfService();
/**
* stop udf dameon service
* @return error code
*/
int32_t stopUdfService();
enum {
TSDB_UDF_FUNC_NORMAL = 0,
TSDB_UDF_FUNC_INIT,
TSDB_UDF_FUNC_FINALIZE,
TSDB_UDF_FUNC_MERGE,
TSDB_UDF_FUNC_DESTROY,
TSDB_UDF_FUNC_MAX_NUM
TSDB_UDF_TYPE_SCALAR = 0,
TSDB_UDF_TYPE_AGGREGATE = 1
};
typedef struct SUdfInit {
int32_t maybe_null; /* 1 if function can return NULL */
uint32_t decimals; /* for real functions */
uint64_t length; /* For string functions */
char* ptr; /* free pointer for function data */
int32_t const_item; /* 0 if result is independent of arguments */
// script like lua/javascript
void* script_ctx;
void (*destroyCtxFunc)(void* script_ctx);
} SUdfInit;
enum {
TSDB_UDF_SCRIPT_BIN_LIB = 0,
TSDB_UDF_SCRIPT_LUA = 1,
};
typedef struct SUdfInfo {
int32_t functionId; // system assigned function id
int32_t funcType; // scalar function or aggregate function
int8_t resType; // result type
int16_t resBytes; // result byte
int32_t contLen; // content length
int32_t bufSize; // interbuf size
char* name; // function name
void* handle; // handle loaded in mem
void* funcs[TSDB_UDF_FUNC_MAX_NUM]; // function ptr
// for script like lua/javascript only
int isScript;
void* pScriptCtx;
SUdfInit init;
char* content;
char* path;
char *udfName; // function name
int32_t udfType; // scalar function or aggregate function
int8_t scriptType;
char *path;
int8_t resType; // result type
int16_t resBytes; // result byte
int32_t bufSize; //interbuf size
} SUdfInfo;
typedef void *UdfHandle;
/**
* setup udf
* @param udf, in
* @param handle, out
* @return error code
*/
int32_t setupUdf(SUdfInfo* udf, UdfHandle *handle);
enum {
TSDB_UDF_STEP_NORMAL = 0,
TSDB_UDF_STEP_MERGE,
TSDb_UDF_STEP_FINALIZE,
TSDB_UDF_STEP_MAX_NUM
};
/**
* call udf
* @param handle udf handle
* @param step
* @param state
* @param stateSize
* @param input
* @param newstate
* @param newStateSize
* @param output
* @return error code
*/
//TODO: must change the following after metadata flow and data flow between qworker and udfd is well defined
typedef struct SUdfDataBlock {
char* data;
int32_t size;
} SUdfDataBlock;
int32_t callUdf(UdfHandle handle, int8_t step, char *state, int32_t stateSize, SUdfDataBlock input, char **newstate,
int32_t *newStateSize, SUdfDataBlock *output);
/**
* tearn down udf
* @param handle
* @return
*/
int32_t teardownUdf(UdfHandle handle);
// end API to taosd and qworker
//=============================================================================================================================
// TODO: Must change
// begin API to UDF writer.
// script
typedef int32_t (*scriptInitFunc)(void* pCtx);
typedef void (*scriptNormalFunc)(void* pCtx, char* data, int16_t iType, int16_t iBytes, int32_t numOfRows,
int64_t* ptList, int64_t key, char* dataOutput, char* tsOutput, int32_t* numOfOutput,
int16_t oType, int16_t oBytes);
typedef void (*scriptFinalizeFunc)(void* pCtx, int64_t key, char* dataOutput, int32_t* numOfOutput);
typedef void (*scriptMergeFunc)(void* pCtx, char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput);
typedef void (*scriptDestroyFunc)(void* pCtx);
//typedef int32_t (*scriptInitFunc)(void* pCtx);
//typedef void (*scriptNormalFunc)(void* pCtx, char* data, int16_t iType, int16_t iBytes, int32_t numOfRows,
// int64_t* ptList, int64_t key, char* dataOutput, char* tsOutput, int32_t* numOfOutput,
// int16_t oType, int16_t oBytes);
//typedef void (*scriptFinalizeFunc)(void* pCtx, int64_t key, char* dataOutput, int32_t* numOfOutput);
//typedef void (*scriptMergeFunc)(void* pCtx, char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput);
//typedef void (*scriptDestroyFunc)(void* pCtx);
// dynamic lib
typedef void (*udfNormalFunc)(char* data, int16_t itype, int16_t iBytes, int32_t numOfRows, int64_t* ts,
char* dataOutput, char* interBuf, char* tsOutput, int32_t* numOfOutput, int16_t oType,
int16_t oBytes, SUdfInit* buf);
typedef int32_t (*udfInitFunc)(SUdfInit* data);
typedef void (*udfFinalizeFunc)(char* dataOutput, char* interBuf, int32_t* numOfOutput, SUdfInit* buf);
typedef void (*udfMergeFunc)(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf);
typedef void (*udfDestroyFunc)(SUdfInit* buf);
typedef int32_t (*TUdfInitFunc)();
typedef void (*TUdfDestroyFunc)();
typedef void (*TUdfFunc)(int8_t step,
char *state, int32_t stateSize, SUdfDataBlock input,
char **newstate, int32_t *newStateSize, SUdfDataBlock *output);
//typedef void (*udfMergeFunc)(char *data, int32_t numOfRows, char *dataOutput, int32_t* numOfOutput);
//typedef void (*udfFinalizeFunc)(char* state, int32_t stateSize, SUdfDataBlock *output);
// end API to UDF writer
//=======================================================================================================================
#ifdef __cplusplus
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TUDF_INT_H
#define TDENGINE_TUDF_INT_H
#ifdef __cplusplus
extern "C" {
#endif
//TODO replaces them with fnDebug
//#define debugPrint(...) taosPrintLog("Function", DEBUG_INFO, 135, __VA_ARGS__)
#define debugPrint(...) {fprintf(stderr, __VA_ARGS__);fprintf(stderr, "\n");}
enum {
UDF_TASK_SETUP = 0,
UDF_TASK_CALL = 1,
UDF_TASK_TEARDOWN = 2
};
typedef struct SUdfSetupRequest {
char udfName[16]; //
int8_t scriptType; // 0:c, 1: lua, 2:js
int8_t udfType; //udaf, udf
int16_t pathSize;
char *path;
} SUdfSetupRequest;
typedef struct SUdfSetupResponse {
int64_t udfHandle;
} SUdfSetupResponse;
typedef struct SUdfCallRequest {
int64_t udfHandle;
int8_t step;
int32_t inputBytes;
char *input;
int32_t stateBytes;
char *state;
} SUdfCallRequest;
typedef struct SUdfCallResponse {
int32_t outputBytes;
char *output;
int32_t newStateBytes;
char *newState;
} SUdfCallResponse;
typedef struct SUdfTeardownRequest {
int64_t udfHandle;
} SUdfTeardownRequest;
typedef struct SUdfTeardownResponse {
} SUdfTeardownResponse;
typedef struct SUdfRequest {
int32_t msgLen;
int64_t seqNum;
int8_t type;
void *subReq;
} SUdfRequest;
typedef struct SUdfResponse {
int32_t msgLen;
int64_t seqNum;
int8_t type;
int32_t code;
void *subRsp;
} SUdfResponse;
int32_t decodeRequest(char *buf, int32_t bufLen, SUdfRequest **pRequest);
int32_t encodeResponse(char **buf, int32_t *bufLen, SUdfResponse *response);
int32_t encodeRequest(char **buf, int32_t *bufLen, SUdfRequest *request);
int32_t decodeResponse(char *buf, int32_t bufLen, SUdfResponse **pResponse);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TUDF_INT_H
//
// Created by shenglian on 28/02/22.
//
#ifndef UDF_UDF_H
#define UDF_UDF_H
#include <stdlib.h>
#define DEBUG
#ifdef DEBUG
#define debugPrint(...) fprintf(__VA_ARGS__)
#else
#define debugPrint(...) /**/
#endif
enum {
UDF_TASK_SETUP = 0,
UDF_TASK_CALL = 1,
UDF_TASK_TEARDOWN = 2
};
typedef struct SSDataBlock{
char *data;
int32_t size;
} SSDataBlock;
typedef struct SUdfInfo {
char *udfName;
char *path;
} SUdfInfo;
typedef void *UdfHandle;
int32_t startUdfService();
int32_t stopUdfService();
//int32_t setupUdf(SUdfInfo *udf, int32_t numOfUdfs, UdfHandle *handles);
int32_t setupUdf(SUdfInfo* udf, UdfHandle* handle);
int32_t callUdf(UdfHandle handle, int8_t step, char *state, int32_t stateSize, SSDataBlock input, char **newstate,
int32_t *newStateSize, SSDataBlock *output);
int32_t teardownUdf(UdfHandle handle);
typedef struct SUdfSetupRequest {
char udfName[16]; //
int8_t scriptType; // 0:c, 1: lua, 2:js
int8_t udfType; //udaf, udf, udtf
int16_t pathSize;
char *path;
} SUdfSetupRequest;
typedef struct SUdfSetupResponse {
int64_t udfHandle;
} SUdfSetupResponse;
typedef struct SUdfCallRequest {
int64_t udfHandle;
int8_t step;
int32_t inputBytes;
char *input;
int32_t stateBytes;
char *state;
} SUdfCallRequest;
typedef struct SUdfCallResponse {
int32_t outputBytes;
char *output;
int32_t newStateBytes;
char *newState;
} SUdfCallResponse;
typedef struct SUdfTeardownRequest {
int64_t udfHandle;
} SUdfTeardownRequest;
typedef struct SUdfTeardownResponse {
} SUdfTeardownResponse;
typedef struct SUdfRequest {
int32_t msgLen;
int64_t seqNum;
int8_t type;
void *subReq;
} SUdfRequest;
typedef struct SUdfResponse {
int32_t msgLen;
int64_t seqNum;
int8_t type;
int32_t code;
void *subRsp;
} SUdfResponse;
int32_t decodeRequest(char *buf, int32_t bufLen, SUdfRequest **pRequest);
int32_t encodeResponse(char **buf, int32_t *bufLen, SUdfResponse *response);
int32_t encodeRequest(char **buf, int32_t *bufLen, SUdfRequest *request);
int32_t decodeResponse(char *buf, int32_t bufLen, SUdfResponse **pResponse);
#endif //UDF_UDF_H
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "uv.h"
#include "os.h"
#include "tlog.h"
#include "tudf.h"
#include "tudfInt.h"
static uv_loop_t *loop;
typedef struct SUdfdUvConn {
uv_stream_t *client;
char *inputBuf;
int32_t inputLen;
int32_t inputCap;
int32_t inputTotal;
} SUdfdUvConn;
typedef struct SUvUdfWork {
uv_stream_t *client;
uv_buf_t input;
uv_buf_t output;
} SUvUdfWork;
typedef struct SUdf {
int32_t refCount;
char name[16];
int8_t type;
uv_lib_t lib;
TUdfFunc normalFunc;
} SUdf;
//TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix
//TODO: add private udf structure.
typedef struct SUdfHandle {
SUdf *udf;
} SUdfHandle;
void udfdProcessRequest(uv_work_t *req) {
SUvUdfWork *uvUdf = (SUvUdfWork *) (req->data);
SUdfRequest *request = NULL;
decodeRequest(uvUdf->input.base, uvUdf->input.len, &request);
switch (request->type) {
case UDF_TASK_SETUP: {
debugPrint("%s", "process setup request");
SUdf *udf = malloc(sizeof(SUdf));
udf->refCount = 0;
SUdfSetupRequest *setup = request->subReq;
strcpy(udf->name, setup->udfName);
int err = uv_dlopen(setup->path, &udf->lib);
if (err != 0) {
debugPrint("can not load library %s. error: %s", setup->path, uv_strerror(err));
//TODO set error
}
char normalFuncName[32] = {0};
strcpy(normalFuncName, setup->udfName);
//TODO error,
//TODO find all functions normal, init, destroy, normal, merge, finalize
uv_dlsym(&udf->lib, normalFuncName, (void **) (&udf->normalFunc));
SUdfHandle *handle = malloc(sizeof(SUdfHandle));
handle->udf = udf;
udf->refCount++;
//TODO: allocate private structure and call init function and set it to handle
SUdfResponse *rsp = malloc(sizeof(SUdfResponse));
rsp->seqNum = request->seqNum;
rsp->type = request->type;
rsp->code = 0;
SUdfSetupResponse *subRsp = malloc(sizeof(SUdfSetupResponse));
subRsp->udfHandle = (int64_t) (handle);
rsp->subRsp = subRsp;
char *buf;
int32_t len;
encodeResponse(&buf, &len, rsp);
uvUdf->output = uv_buf_init(buf, len);
free(rsp->subRsp);
free(rsp);
free(request->subReq);
free(request);
free(uvUdf->input.base);
break;
}
case UDF_TASK_CALL: {
debugPrint("%s", "process call request");
SUdfCallRequest *call = request->subReq;
SUdfHandle *handle = (SUdfHandle *) (call->udfHandle);
SUdf *udf = handle->udf;
char *newState;
int32_t newStateSize;
SUdfDataBlock input = {.data = call->input, .size= call->inputBytes};
SUdfDataBlock output;
//TODO: call different functions according to the step
udf->normalFunc(call->step, call->state, call->stateBytes, input, &newState, &newStateSize, &output);
SUdfResponse *rsp = malloc(sizeof(SUdfResponse));
rsp->seqNum = request->seqNum;
rsp->type = request->type;
rsp->code = 0;
SUdfCallResponse *subRsp = malloc(sizeof(SUdfCallResponse));
subRsp->outputBytes = output.size;
subRsp->output = output.data;
subRsp->newStateBytes = newStateSize;
subRsp->newState = newState;
rsp->subRsp = subRsp;
char *buf;
int32_t len;
encodeResponse(&buf, &len, rsp);
uvUdf->output = uv_buf_init(buf, len);
free(rsp->subRsp);
free(rsp);
free(newState);
free(output.data);
free(request->subReq);
free(request);
free(uvUdf->input.base);
break;
}
case UDF_TASK_TEARDOWN: {
debugPrint("%s", "process teardown request");
SUdfTeardownRequest *teardown = request->subReq;
SUdfHandle *handle = (SUdfHandle *) (teardown->udfHandle);
SUdf *udf = handle->udf;
udf->refCount--;
if (udf->refCount == 0) {
uv_dlclose(&udf->lib);
}
free(udf);
//TODO: call destroy and free udf private
free(handle);
SUdfResponse *rsp = malloc(sizeof(SUdfResponse));
rsp->seqNum = request->seqNum;
rsp->type = request->type;
rsp->code = 0;
SUdfTeardownResponse *subRsp = malloc(sizeof(SUdfTeardownResponse));
rsp->subRsp = subRsp;
char *buf;
int32_t len;
encodeResponse(&buf, &len, rsp);
uvUdf->output = uv_buf_init(buf, len);
free(rsp->subRsp);
free(rsp);
free(request->subReq);
free(request);
free(uvUdf->input.base);
break;
}
default: {
break;
}
}
}
void udfdOnWrite(uv_write_t *req, int status) {
debugPrint("%s", "after writing to pipe");
if (status < 0) {
debugPrint("Write error %s", uv_err_name(status));
}
SUvUdfWork *work = (SUvUdfWork *) req->data;
debugPrint("\tlength: %zu", work->output.len);
free(work->output.base);
free(work);
free(req);
}
void udfdSendResponse(uv_work_t *work, int status) {
debugPrint("%s", "send response");
SUvUdfWork *udfWork = (SUvUdfWork *) (work->data);
uv_write_t *write_req = malloc(sizeof(uv_write_t));
write_req->data = udfWork;
uv_write(write_req, udfWork->client, &udfWork->output, 1, udfdOnWrite);
free(work);
}
void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
debugPrint("%s", "allocate buffer for read");
SUdfdUvConn *ctx = handle->data;
int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
if (ctx->inputCap == 0) {
ctx->inputBuf = malloc(msgHeadSize);
if (ctx->inputBuf) {
ctx->inputLen = 0;
ctx->inputCap = msgHeadSize;
ctx->inputTotal = -1;
buf->base = ctx->inputBuf;
buf->len = ctx->inputCap;
} else {
//TODO: log error
buf->base = NULL;
buf->len = 0;
}
} else {
ctx->inputCap = ctx->inputTotal > ctx->inputCap ? ctx->inputTotal : ctx->inputCap;
void *inputBuf = realloc(ctx->inputBuf, ctx->inputCap);
if (inputBuf) {
ctx->inputBuf = inputBuf;
buf->base = ctx->inputBuf + ctx->inputLen;
buf->len = ctx->inputCap - ctx->inputLen;
} else {
//TODO: log error
buf->base = NULL;
buf->len = 0;
}
}
debugPrint("\tinput buf cap - len - total : %d - %d - %d", ctx->inputCap, ctx->inputLen, ctx->inputTotal);
}
bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) {
if (pipe->inputTotal == -1 && pipe->inputLen >= sizeof(int32_t)) {
pipe->inputTotal = *(int32_t *) (pipe->inputBuf);
}
if (pipe->inputLen == pipe->inputCap && pipe->inputTotal == pipe->inputCap) {
return true;
}
return false;
}
void udfdHandleRequest(SUdfdUvConn *conn) {
uv_work_t *work = malloc(sizeof(uv_work_t));
SUvUdfWork *udfWork = malloc(sizeof(SUvUdfWork));
udfWork->client = conn->client;
udfWork->input = uv_buf_init(conn->inputBuf, conn->inputLen);
conn->inputBuf = NULL;
conn->inputLen = 0;
conn->inputCap = 0;
conn->inputTotal = -1;
work->data = udfWork;
uv_queue_work(loop, work, udfdProcessRequest, udfdSendResponse);
}
void udfdPipeCloseCb(uv_handle_t *pipe) {
SUdfdUvConn *conn = pipe->data;
free(conn->client);
free(conn->inputBuf);
free(conn);
}
void udfdUvHandleError(SUdfdUvConn *conn) {
uv_close((uv_handle_t *) conn->client, udfdPipeCloseCb);
}
void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
debugPrint("%s, nread: %zd", "read from pipe", nread);
if (nread == 0) return;
SUdfdUvConn *conn = client->data;
if (nread > 0) {
conn->inputLen += nread;
if (isUdfdUvMsgComplete(conn)) {
udfdHandleRequest(conn);
} else {
//log error or continue;
}
return;
}
if (nread < 0) {
debugPrint("Read error %s", uv_err_name(nread));
if (nread == UV_EOF) {
//TODO check more when close
} else {
}
udfdUvHandleError(conn);
}
}
void udfdOnNewConnection(uv_stream_t *server, int status) {
debugPrint("%s", "on new connection");
if (status < 0) {
// TODO
return;
}
uv_pipe_t *client = (uv_pipe_t *) malloc(sizeof(uv_pipe_t));
uv_pipe_init(loop, client, 0);
if (uv_accept(server, (uv_stream_t *) client) == 0) {
SUdfdUvConn *ctx = malloc(sizeof(SUdfdUvConn));
ctx->client = (uv_stream_t *) client;
ctx->inputBuf = 0;
ctx->inputLen = 0;
ctx->inputCap = 0;
client->data = ctx;
ctx->client = (uv_stream_t *) client;
uv_read_start((uv_stream_t *) client, udfdAllocBuffer, udfdPipeRead);
} else {
uv_close((uv_handle_t *) client, NULL);
}
}
void removeListeningPipe(int sig) {
uv_fs_t req;
uv_fs_unlink(loop, &req, "udf.sock", NULL);
exit(0);
}
int main() {
debugPrint("libuv version: %x", UV_VERSION_HEX);
loop = uv_default_loop();
uv_fs_t req;
uv_fs_unlink(loop, &req, "udf.sock", NULL);
uv_pipe_t server;
uv_pipe_init(loop, &server, 0);
signal(SIGINT, removeListeningPipe);
int r;
if ((r = uv_pipe_bind(&server, "udf.sock"))) {
debugPrint("Bind error %s\n", uv_err_name(r));
removeListeningPipe(0);
return 1;
}
if ((r = uv_listen((uv_stream_t *) &server, 128, udfdOnNewConnection))) {
debugPrint("Listen error %s", uv_err_name(r));
return 2;
}
uv_run(loop, UV_RUN_DEFAULT);
uv_loop_close(loop);
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "uv.h"
#include "tudf.h"
int main(int argc, char *argv[]) {
startUdfService();
uv_sleep(1000);
char path[256] = {0};
size_t cwdSize = 256;
int err = uv_cwd(path, &cwdSize);
if (err != 0) {
fprintf(stderr, "err cwd: %s\n", uv_strerror(err));
return err;
}
fprintf(stdout, "current working directory:%s\n", path);
strcat(path, "/libudf1.so");
SUdfInfo udfInfo = {.udfName="udf1", .path=path};
UdfHandle handle;
setupUdf(&udfInfo, &handle);
//char state[5000000] = "state";
//char input[5000000] = "input";
int dataSize = 500;
int callCount = 2;
if (argc > 1) dataSize = atoi(argv[1]);
if (argc > 2) callCount = atoi(argv[2]);
char *state = malloc(dataSize);
char *input = malloc(dataSize);
SUdfDataBlock blockInput = {.data = input, .size = dataSize};
SUdfDataBlock blockOutput;
char* newState;
int32_t newStateSize;
for (int l = 0; l < callCount; ++l) {
callUdf(handle, 0, state, dataSize, blockInput, &newState, &newStateSize, &blockOutput);
}
free(state);
free(input);
teardownUdf(handle);
stopUdfService();
}
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include "tudf.h"
void udf1(int8_t step, char *state, int32_t stateSize, SUdfDataBlock input,
char **newState, int32_t *newStateSize, SUdfDataBlock *output) {
fprintf(stdout, "%s, step:%d\n", "udf function called", step);
char *newStateBuf = malloc(stateSize);
memcpy(newStateBuf, state, stateSize);
*newState = newStateBuf;
*newStateSize = stateSize;
char *outputBuf = malloc(input.size);
memcpy(outputBuf, input.data, input.size);
output->data = outputBuf;
output->size = input.size;
return;
}
......@@ -1481,7 +1481,7 @@ static int32_t jsonToDatum(const SJson* pJson, void* pObj) {
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: {
pNode->datum.p = calloc(1, pNode->node.resType.bytes + VARSTR_HEADER_SIZE + 1 + 100);
pNode->datum.p = calloc(1, pNode->node.resType.bytes + VARSTR_HEADER_SIZE + 1);
if (NULL == pNode->datum.p) {
code = TSDB_CODE_OUT_OF_MEMORY;
break;
......
......@@ -150,9 +150,9 @@ SNode* createDropUserStmt(SAstCreateContext* pCxt, SToken* pUserName);
SNode* createCreateDnodeStmt(SAstCreateContext* pCxt, const SToken* pFqdn, const SToken* pPort);
SNode* createDropDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode);
SNode* createAlterDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, const SToken* pConfig, const SToken* pValue);
SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, SToken* pIndexName, SToken* pTableName, SNodeList* pCols, SNode* pOptions);
SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, bool ignoreExists, SToken* pIndexName, SToken* pTableName, SNodeList* pCols, SNode* pOptions);
SNode* createIndexOption(SAstCreateContext* pCxt, SNodeList* pFuncs, SNode* pInterval, SNode* pOffset, SNode* pSliding);
SNode* createDropIndexStmt(SAstCreateContext* pCxt, SToken* pIndexName, SToken* pTableName);
SNode* createDropIndexStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pIndexName, SToken* pTableName);
SNode* createCreateQnodeStmt(SAstCreateContext* pCxt, const SToken* pDnodeId);
SNode* createDropQnodeStmt(SAstCreateContext* pCxt, const SToken* pDnodeId);
SNode* createCreateTopicStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pTopicName, SNode* pQuery, const SToken* pSubscribeDbName);
......
......@@ -313,10 +313,11 @@ func_name_list(A) ::= func_name_list(B) NK_COMMA col_name(C).
func_name(A) ::= function_name(B). { A = createFunctionNode(pCxt, &B, NULL); }
/************************************************ create index ********************************************************/
cmd ::= CREATE SMA INDEX index_name(A) ON table_name(B) index_options(C). { pCxt->pRootNode = createCreateIndexStmt(pCxt, INDEX_TYPE_SMA, &A, &B, NULL, C); }
cmd ::= CREATE FULLTEXT INDEX
index_name(A) ON table_name(B) NK_LP col_name_list(C) NK_RP. { pCxt->pRootNode = createCreateIndexStmt(pCxt, INDEX_TYPE_FULLTEXT, &A, &B, C, NULL); }
cmd ::= DROP INDEX index_name(A) ON table_name(B). { pCxt->pRootNode = createDropIndexStmt(pCxt, &A, &B); }
cmd ::= CREATE SMA INDEX not_exists_opt(D)
index_name(A) ON table_name(B) index_options(C). { pCxt->pRootNode = createCreateIndexStmt(pCxt, INDEX_TYPE_SMA, D, &A, &B, NULL, C); }
cmd ::= CREATE FULLTEXT INDEX not_exists_opt(D)
index_name(A) ON table_name(B) NK_LP col_name_list(C) NK_RP. { pCxt->pRootNode = createCreateIndexStmt(pCxt, INDEX_TYPE_FULLTEXT, D, &A, &B, C, NULL); }
cmd ::= DROP INDEX exists_opt(C) index_name(A) ON table_name(B). { pCxt->pRootNode = createDropIndexStmt(pCxt, C, &A, &B); }
index_options(A) ::= . { A = NULL; }
index_options(A) ::= FUNCTION NK_LP func_list(B) NK_RP INTERVAL
......
......@@ -1160,13 +1160,14 @@ SNode* createAlterDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, const
return (SNode*)pStmt;
}
SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, SToken* pIndexName, SToken* pTableName, SNodeList* pCols, SNode* pOptions) {
SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, bool ignoreExists, SToken* pIndexName, SToken* pTableName, SNodeList* pCols, SNode* pOptions) {
if (!checkIndexName(pCxt, pIndexName) || !checkTableName(pCxt, pTableName)) {
return NULL;
}
SCreateIndexStmt* pStmt = nodesMakeNode(QUERY_NODE_CREATE_INDEX_STMT);
CHECK_OUT_OF_MEM(pStmt);
pStmt->indexType = type;
pStmt->ignoreExists = ignoreExists;
strncpy(pStmt->indexName, pIndexName->z, pIndexName->n);
strncpy(pStmt->tableName, pTableName->z, pTableName->n);
pStmt->pCols = pCols;
......@@ -1184,12 +1185,13 @@ SNode* createIndexOption(SAstCreateContext* pCxt, SNodeList* pFuncs, SNode* pInt
return (SNode*)pOptions;
}
SNode* createDropIndexStmt(SAstCreateContext* pCxt, SToken* pIndexName, SToken* pTableName) {
SNode* createDropIndexStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pIndexName, SToken* pTableName) {
if (!checkIndexName(pCxt, pIndexName) || !checkTableName(pCxt, pTableName)) {
return NULL;
}
SDropIndexStmt* pStmt = nodesMakeNode(QUERY_NODE_DROP_INDEX_STMT);
CHECK_OUT_OF_MEM(pStmt);
pStmt->ignoreNotExists = ignoreNotExists;
strncpy(pStmt->indexName, pIndexName->z, pIndexName->n);
strncpy(pStmt->tableName, pTableName->z, pTableName->n);
return (SNode*)pStmt;
......
......@@ -1422,34 +1422,110 @@ static int32_t translateShowTables(STranslateContext* pCxt) {
return TSDB_CODE_SUCCESS;
}
static int32_t translateCreateSmaIndex(STranslateContext* pCxt, SCreateIndexStmt* pStmt) {
SVCreateTSmaReq createSmaReq = {0};
static int32_t getSmaIndexDstVgId(STranslateContext* pCxt, char* pTableName, int32_t* pVgId) {
SVgroupInfo vg = {0};
int32_t code = getTableHashVgroup(pCxt, pCxt->pParseCxt->db, pTableName, &vg);
if (TSDB_CODE_SUCCESS == code) {
*pVgId = vg.vgId;
}
return code;
}
if (DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pInterval) ||
(NULL != pStmt->pOptions->pOffset && DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pOffset)) ||
(NULL != pStmt->pOptions->pSliding && DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pSliding))) {
return pCxt->errCode;
static int32_t getSmaIndexSql(STranslateContext* pCxt, char** pSql, int32_t* pLen) {
*pSql = strdup(pCxt->pParseCxt->pSql);
if (NULL == *pSql) {
return TSDB_CODE_OUT_OF_MEMORY;
}
*pLen = pCxt->pParseCxt->sqlLen + 1;
return TSDB_CODE_SUCCESS;
}
createSmaReq.tSma.intervalUnit = ((SValueNode*)pStmt->pOptions->pInterval)->unit;
createSmaReq.tSma.slidingUnit = (NULL != pStmt->pOptions->pSliding ? ((SValueNode*)pStmt->pOptions->pSliding)->unit : 0);
strcpy(createSmaReq.tSma.indexName, pStmt->indexName);
static int32_t getSmaIndexExpr(STranslateContext* pCxt, SCreateIndexStmt* pStmt, char** pExpr, int32_t* pLen) {
return nodesListToString(pStmt->pOptions->pFuncs, false, pExpr, pLen);
}
SName name;
name.type = TSDB_TABLE_NAME_T;
name.acctId = pCxt->pParseCxt->acctId;
static int32_t getSmaIndexBuildAst(STranslateContext* pCxt, SCreateIndexStmt* pStmt, char** pAst, int32_t* pLen) {
SSelectStmt* pSelect = nodesMakeNode(QUERY_NODE_SELECT_STMT);
if (NULL == pSelect) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SRealTableNode* pTable = nodesMakeNode(QUERY_NODE_REAL_TABLE);
if (NULL == pTable) {
nodesDestroyNode(pSelect);
return TSDB_CODE_OUT_OF_MEMORY;
}
strcpy(pTable->table.dbName, pCxt->pParseCxt->db);
strcpy(pTable->table.tableName, pStmt->tableName);
pSelect->pFromTable = (SNode*)pTable;
pSelect->pProjectionList = nodesCloneList(pStmt->pOptions->pFuncs);
if (NULL == pTable) {
nodesDestroyNode(pSelect);
return TSDB_CODE_OUT_OF_MEMORY;
}
SIntervalWindowNode* pInterval = nodesMakeNode(QUERY_NODE_INTERVAL_WINDOW);
if (NULL == pInterval) {
nodesDestroyNode(pSelect);
return TSDB_CODE_OUT_OF_MEMORY;
}
pSelect->pWindow = (SNode*)pInterval;
pInterval->pInterval = nodesCloneNode(pStmt->pOptions->pInterval);
pInterval->pOffset = nodesCloneNode(pStmt->pOptions->pOffset);
pInterval->pSliding = nodesCloneNode(pStmt->pOptions->pSliding);
if (NULL == pInterval->pInterval || (NULL != pStmt->pOptions->pOffset && NULL == pInterval->pOffset) ||
(NULL != pStmt->pOptions->pSliding && NULL == pInterval->pSliding)) {
nodesDestroyNode(pSelect);
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = translateQuery(pCxt, (SNode*)pSelect);
if (TSDB_CODE_SUCCESS == code) {
code = nodesNodeToString(pSelect, false, pAst, pLen);
}
nodesDestroyNode(pSelect);
return code;
}
static int32_t buildCreateSmaReq(STranslateContext* pCxt, SCreateIndexStmt* pStmt, SMCreateSmaReq* pReq) {
SName name = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId };
strcpy(name.dbname, pCxt->pParseCxt->db);
strcpy(name.tname, pStmt->indexName);
tNameExtractFullName(&name, pReq->name);
strcpy(name.tname, pStmt->tableName);
STableMeta* pMeta = NULL;
int32_t code = catalogGetTableMeta(pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &pCxt->pParseCxt->mgmtEpSet, &name, &pMeta);
if (TSDB_CODE_SUCCESS != code) {
return code;
name.tname[strlen(pStmt->tableName)] = '\0';
tNameExtractFullName(&name, pReq->stb);
pReq->igExists = pStmt->ignoreExists;
pReq->interval = ((SValueNode*)pStmt->pOptions->pInterval)->datum.i;
pReq->intervalUnit = ((SValueNode*)pStmt->pOptions->pInterval)->unit;
pReq->offset = (NULL != pStmt->pOptions->pOffset ? ((SValueNode*)pStmt->pOptions->pOffset)->datum.i : 0);
pReq->sliding = (NULL != pStmt->pOptions->pSliding ? ((SValueNode*)pStmt->pOptions->pSliding)->datum.i : pReq->interval);
pReq->slidingUnit = (NULL != pStmt->pOptions->pSliding ? ((SValueNode*)pStmt->pOptions->pSliding)->unit : pReq->intervalUnit);
int32_t code = getSmaIndexDstVgId(pCxt, pStmt->tableName, &pReq->dstVgId);
if (TSDB_CODE_SUCCESS == code) {
code = getSmaIndexSql(pCxt, &pReq->sql, &pReq->sqlLen);
}
if (TSDB_CODE_SUCCESS == code) {
code = getSmaIndexExpr(pCxt, pStmt, &pReq->expr, &pReq->exprLen);
}
if (TSDB_CODE_SUCCESS == code) {
code = getSmaIndexBuildAst(pCxt, pStmt, &pReq->ast, &pReq->astLen);
}
createSmaReq.tSma.tableUid = pMeta->uid;
createSmaReq.tSma.interval = ((SValueNode*)pStmt->pOptions->pInterval)->datum.i;
createSmaReq.tSma.sliding = (NULL != pStmt->pOptions->pSliding ? ((SValueNode*)pStmt->pOptions->pSliding)->datum.i : 0);
code = nodesListToString(pStmt->pOptions->pFuncs, false, &createSmaReq.tSma.expr, &createSmaReq.tSma.exprLen);
return code;
}
static int32_t translateCreateSmaIndex(STranslateContext* pCxt, SCreateIndexStmt* pStmt) {
if (DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pInterval) ||
(NULL != pStmt->pOptions->pOffset && DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pOffset)) ||
(NULL != pStmt->pOptions->pSliding && DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pSliding))) {
return pCxt->errCode;
}
SMCreateSmaReq createSmaReq = {0};
int32_t code = buildCreateSmaReq(pCxt, pStmt, &createSmaReq);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
......@@ -1460,14 +1536,13 @@ static int32_t translateCreateSmaIndex(STranslateContext* pCxt, SCreateIndexStmt
}
pCxt->pCmdMsg->epSet = pCxt->pParseCxt->mgmtEpSet;
pCxt->pCmdMsg->msgType = TDMT_VND_CREATE_SMA;
pCxt->pCmdMsg->msgLen = tSerializeSVCreateTSmaReq(NULL, &createSmaReq);
pCxt->pCmdMsg->msgLen = tSerializeSMCreateSmaReq(NULL, 0, &createSmaReq);
pCxt->pCmdMsg->pMsg = malloc(pCxt->pCmdMsg->msgLen);
if (NULL == pCxt->pCmdMsg->pMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
void* pBuf = pCxt->pCmdMsg->pMsg;
tSerializeSVCreateTSmaReq(&pBuf, &createSmaReq);
tdDestroyTSma(&createSmaReq.tSma);
tSerializeSMCreateSmaReq(pCxt->pCmdMsg->pMsg, pCxt->pCmdMsg->msgLen, &createSmaReq);
tFreeSMCreateSmaReq(&createSmaReq);
return TSDB_CODE_SUCCESS;
}
......
此差异已折叠。
......@@ -951,7 +951,6 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
atomic_store_ptr(&ctx->connInfo.ahandle, qwMsg->connInfo.ahandle);
QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
code = qStringToSubplan(qwMsg->msg, &plan);
if (TSDB_CODE_SUCCESS != code) {
QW_TASK_ELOG("task string to subplan failed, code:%x - %s", code, tstrerror(code));
......
......@@ -27,6 +27,12 @@ extern "C" {
#include "syncMessage.h"
#include "taosdef.h"
typedef enum EntryType {
SYNC_RAFT_ENTRY_NOOP = 0,
SYNC_RAFT_ENTRY_DATA = 1,
SYNC_RAFT_ENTRY_CONFIG = 2,
} EntryType;
typedef struct SSyncRaftEntry {
uint32_t bytes;
uint32_t msgType;
......@@ -35,12 +41,15 @@ typedef struct SSyncRaftEntry {
bool isWeak;
SyncTerm term;
SyncIndex index;
EntryType entryType;
uint32_t dataLen;
char data[];
} SSyncRaftEntry;
SSyncRaftEntry* syncEntryBuild(uint32_t dataLen);
SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index); // step 4
SSyncRaftEntry* syncEntryBuild3(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index, EntryType entryType);
SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index);
void syncEntryDestory(SSyncRaftEntry* pEntry);
char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len); // step 5
SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len); // step 6
......
......@@ -199,6 +199,9 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index);
assert(pRollBackEntry != NULL);
// maybe is a NOOP ENTRY
// assert(pRollBackEntry->entryType == SYNC_RAFT_ENTRY_DATA);
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, pRollBackEntry->index, pRollBackEntry->isWeak, 0, ths->state);
......@@ -217,7 +220,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->entryType == SYNC_RAFT_ENTRY_DATA) {
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 2, ths->state);
}
}
......@@ -242,7 +245,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->entryType == SYNC_RAFT_ENTRY_DATA) {
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 3, ths->state);
}
}
......@@ -298,7 +301,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm->FpCommitCb != NULL) {
if (ths->pFsm->FpCommitCb != NULL && pEntry->entryType == SYNC_RAFT_ENTRY_DATA) {
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0, ths->state);
}
......
......@@ -97,7 +97,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (pSyncNode->pFsm->FpCommitCb != NULL) {
if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->entryType == SYNC_RAFT_ENTRY_DATA) {
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0, pSyncNode->state);
}
......
......@@ -29,7 +29,7 @@ static int32_t syncIODestroy(SSyncIO *io);
static int32_t syncIOStartInternal(SSyncIO *io);
static int32_t syncIOStopInternal(SSyncIO *io);
static void * syncIOConsumerFunc(void *param);
static void *syncIOConsumerFunc(void *param);
static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey);
......@@ -75,6 +75,7 @@ int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) {
syncRpcMsgPrint2(logBuf, pMsg);
pMsg->handle = NULL;
pMsg->noResp = 1;
rpcSendRequest(clientRpc, pEpSet, pMsg, NULL);
return ret;
}
......@@ -234,9 +235,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) {
}
static void *syncIOConsumerFunc(void *param) {
SSyncIO * io = param;
SSyncIO *io = param;
STaosQall *qall;
SRpcMsg * pRpcMsg, rpcMsg;
SRpcMsg *pRpcMsg, rpcMsg;
qall = taosAllocateQall();
while (1) {
......@@ -324,19 +325,21 @@ static void *syncIOConsumerFunc(void *param) {
taosGetQitem(qall, (void **)&pRpcMsg);
rpcFreeCont(pRpcMsg->pCont);
if (pRpcMsg->handle != NULL) {
int msgSize = 32;
memset(&rpcMsg, 0, sizeof(rpcMsg));
rpcMsg.msgType = SYNC_RESPONSE;
rpcMsg.pCont = rpcMallocCont(msgSize);
rpcMsg.contLen = msgSize;
snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", "give a reply");
rpcMsg.handle = pRpcMsg->handle;
rpcMsg.code = 0;
syncRpcMsgPrint2((char *)"syncIOConsumerFunc rpcSendResponse --> ", &rpcMsg);
rpcSendResponse(&rpcMsg);
}
/*
if (pRpcMsg->handle != NULL) {
int msgSize = 32;
memset(&rpcMsg, 0, sizeof(rpcMsg));
rpcMsg.msgType = SYNC_RESPONSE;
rpcMsg.pCont = rpcMallocCont(msgSize);
rpcMsg.contLen = msgSize;
snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", "give a reply");
rpcMsg.handle = pRpcMsg->handle;
rpcMsg.code = 0;
syncRpcMsgPrint2((char *)"syncIOConsumerFunc rpcSendResponse --> ", &rpcMsg);
rpcSendResponse(&rpcMsg);
}
*/
taosFreeQitem(pRpcMsg);
}
......
......@@ -37,11 +37,13 @@ static int32_t tsNodeRefId = -1;
// ------ local funciton ---------
// enqueue message ----
static void syncNodeEqPingTimer(void* param, void* tmrId);
static void syncNodeEqElectTimer(void* param, void* tmrId);
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static void syncNodeEqPingTimer(void* param, void* tmrId);
static void syncNodeEqElectTimer(void* param, void* tmrId);
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeEqNoop(SSyncNode* ths);
static int32_t syncNodeAppendNoop(SSyncNode* ths);
// on message ----
// process message ----
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg);
......@@ -669,6 +671,10 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
assert(voteGrantedMajority(pSyncNode->pVotesGranted));
syncNodeBecomeLeader(pSyncNode);
// Raft 3.6.2 Committing entries from previous terms
syncNodeAppendNoop(pSyncNode);
// syncNodeEqNoop(pSyncNode);
}
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
......@@ -803,6 +809,47 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
}
}
static int32_t syncNodeEqNoop(SSyncNode* ths) {
int32_t ret = 0;
assert(ths->state == TAOS_SYNC_STATE_LEADER);
SyncIndex index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1;
SyncTerm term = ths->pRaftStore->currentTerm;
SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index);
assert(pEntry != NULL);
uint32_t entryLen;
char* serialized = syncEntrySerialize(pEntry, &entryLen);
SyncClientRequest* pSyncMsg = syncClientRequestBuild(entryLen);
assert(pSyncMsg->dataLen == entryLen);
memcpy(pSyncMsg->data, serialized, entryLen);
SRpcMsg rpcMsg;
syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
ths->FpEqMsg(ths->queue, &rpcMsg);
free(serialized);
syncClientRequestDestroy(pSyncMsg);
return ret;
}
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
int32_t ret = 0;
SyncIndex index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1;
SyncTerm term = ths->pRaftStore->currentTerm;
SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index);
assert(pEntry != NULL);
if (ths->state == TAOS_SYNC_STATE_LEADER) {
ths->pLogStore->appendEntry(ths->pLogStore, pEntry);
syncNodeReplicate(ths);
}
return ret;
}
// on message ----
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
int32_t ret = 0;
......@@ -851,7 +898,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL && pEntry->entryType == SYNC_RAFT_ENTRY_DATA) {
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0, ths->state);
}
}
......@@ -866,7 +913,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL && pEntry->entryType == SYNC_RAFT_ENTRY_DATA) {
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 1, ths->state);
}
}
......
......@@ -28,6 +28,13 @@ SSyncRaftEntry* syncEntryBuild(uint32_t dataLen) {
// step 4. SyncClientRequest => SSyncRaftEntry, add term, index
SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) {
SSyncRaftEntry* pEntry = syncEntryBuild3(pMsg, term, index, SYNC_RAFT_ENTRY_DATA);
assert(pEntry != NULL);
return pEntry;
}
SSyncRaftEntry* syncEntryBuild3(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index, EntryType entryType) {
SSyncRaftEntry* pEntry = syncEntryBuild(pMsg->dataLen);
assert(pEntry != NULL);
......@@ -37,12 +44,23 @@ SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncInde
pEntry->isWeak = pMsg->isWeak;
pEntry->term = term;
pEntry->index = index;
pEntry->entryType = entryType;
pEntry->dataLen = pMsg->dataLen;
memcpy(pEntry->data, pMsg->data, pMsg->dataLen);
return pEntry;
}
SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index) {
SSyncRaftEntry* pEntry = syncEntryBuild(0);
assert(pEntry != NULL);
pEntry->term = term;
pEntry->index = index;
pEntry->entryType = SYNC_RAFT_ENTRY_NOOP;
return pEntry;
}
void syncEntryDestory(SSyncRaftEntry* pEntry) {
if (pEntry != NULL) {
free(pEntry);
......@@ -85,6 +103,7 @@ cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry) {
cJSON_AddStringToObject(pRoot, "term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pEntry->index);
cJSON_AddStringToObject(pRoot, "index", u64buf);
cJSON_AddNumberToObject(pRoot, "entryType", pEntry->entryType);
cJSON_AddNumberToObject(pRoot, "dataLen", pEntry->dataLen);
char* s;
......
......@@ -34,7 +34,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
pLogStore->getLastTerm = logStoreLastTerm;
pLogStore->updateCommitIndex = logStoreUpdateCommitIndex;
pLogStore->getCommitIndex = logStoreGetCommitIndex;
return pLogStore; // to avoid compiler error
return pLogStore;
}
void logStoreDestory(SSyncLogStore* pLogStore) {
......@@ -48,18 +48,22 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
assert(pEntry->index == logStoreLastIndex(pLogStore) + 1);
SyncIndex lastIndex = logStoreLastIndex(pLogStore);
assert(pEntry->index == lastIndex + 1);
uint32_t len;
char* serialized = syncEntrySerialize(pEntry, &len);
assert(serialized != NULL);
int code;
code = walWrite(pWal, pEntry->index, pEntry->msgType, serialized, len);
assert(code == 0);
int code = 0;
/*
code = walWrite(pWal, pEntry->index, pEntry->entryType, serialized, len);
assert(code == 0);
*/
assert(walWrite(pWal, pEntry->index, pEntry->entryType, serialized, len) == 0);
walFsync(pWal, true);
free(serialized);
return code; // to avoid compiler error
return code;
}
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
......@@ -69,7 +73,7 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) {
SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
walReadWithHandle(pWalHandle, index);
assert(walReadWithHandle(pWalHandle, index) == 0);
pEntry = syncEntryDeserialize(pWalHandle->pHead->head.body, pWalHandle->pHead->head.len);
assert(pEntry != NULL);
......@@ -83,7 +87,7 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
walRollback(pWal, fromIndex);
assert(walRollback(pWal, fromIndex) == 0);
return 0; // to avoid compiler error
}
......@@ -107,7 +111,7 @@ SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
walCommit(pWal, index);
assert(walCommit(pWal, index) == 0);
return 0; // to avoid compiler error
}
......
......@@ -34,6 +34,7 @@ add_executable(syncEncodeTest "")
add_executable(syncWriteTest "")
add_executable(syncReplicateTest "")
add_executable(syncReplicateTest2 "")
add_executable(syncReplicateTest3 "")
add_executable(syncReplicateLoadTest "")
add_executable(syncRefTest "")
add_executable(syncLogStoreCheck "")
......@@ -183,6 +184,10 @@ target_sources(syncReplicateTest2
PRIVATE
"syncReplicateTest2.cpp"
)
target_sources(syncReplicateTest3
PRIVATE
"syncReplicateTest3.cpp"
)
target_sources(syncReplicateLoadTest
PRIVATE
"syncReplicateLoadTest.cpp"
......@@ -377,6 +382,11 @@ target_include_directories(syncReplicateTest2
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncReplicateTest3
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncReplicateLoadTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
......@@ -538,6 +548,10 @@ target_link_libraries(syncReplicateTest2
sync
gtest_main
)
target_link_libraries(syncReplicateTest3
sync
gtest_main
)
target_link_libraries(syncReplicateLoadTest
sync
gtest_main
......
......@@ -46,6 +46,20 @@ void test2() {
}
void test3() {
SyncClientRequest* pSyncMsg = syncClientRequestBuild(10);
pSyncMsg->originalRpcType = 33;
pSyncMsg->seqNum = 11;
pSyncMsg->isWeak = 1;
strcpy(pSyncMsg->data, "test3");
SSyncRaftEntry* pEntry = syncEntryBuild3(pSyncMsg, 100, 200, SYNC_RAFT_ENTRY_NOOP);
syncEntryPrint(pEntry);
syncClientRequestDestroy(pSyncMsg);
syncEntryDestory(pEntry);
}
void test4() {
SSyncRaftEntry* pEntry = syncEntryBuild(10);
assert(pEntry != NULL);
pEntry->msgType = 11;
......@@ -54,7 +68,8 @@ void test3() {
pEntry->isWeak = true;
pEntry->term = 44;
pEntry->index = 55;
strcpy(pEntry->data, "test3");
pEntry->entryType = SYNC_RAFT_ENTRY_CONFIG;
strcpy(pEntry->data, "test4");
syncEntryPrint(pEntry);
uint32_t len;
......@@ -76,6 +91,7 @@ int main(int argc, char** argv) {
test1();
test2();
test3();
test4();
return 0;
}
......@@ -178,9 +178,10 @@ int main(int argc, char **argv) {
while (1) {
sTrace(
"replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, "
"electTimerMS:%d",
"electTimerMS:%d, commitIndex:%ld",
pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm,
pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS);
pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS,
pSyncNode->commitIndex);
taosMsleep(1000);
}
......
......@@ -183,18 +183,20 @@ int main(int argc, char **argv) {
taosMsleep(1000);
sTrace(
"replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, "
"electTimerMS:%d",
"syncPropose sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, "
"electTimerMS:%d, commitIndex:%ld",
gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->pRaftStore->currentTerm,
gSyncNode->electTimerLogicClock, gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS);
gSyncNode->electTimerLogicClock, gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS,
gSyncNode->commitIndex);
}
while (1) {
sTrace(
"replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, "
"electTimerMS:%d",
"electTimerMS:%d, commitIndex:%ld",
gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->pRaftStore->currentTerm,
gSyncNode->electTimerLogicClock, gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS);
gSyncNode->electTimerLogicClock, gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS,
gSyncNode->commitIndex);
taosMsleep(1000);
}
......
......@@ -185,17 +185,19 @@ int main(int argc, char **argv) {
sTrace(
"syncPropose sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, "
"electTimerMS:%d",
"electTimerMS:%d, commitIndex:%ld",
pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm,
pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS);
pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS,
pSyncNode->commitIndex);
}
while (1) {
sTrace(
"replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, "
"electTimerMS:%d",
"electTimerMS:%d, commitIndex:%ld",
pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm,
pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS);
pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS,
pSyncNode->commitIndex);
taosMsleep(1000);
}
......
#define ALLOW_FORBID_FUNC
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncRaftEntry.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
uint16_t ports[] = {7010, 7110, 7210, 7310, 7410};
int32_t replicaNum = 3;
int32_t myIndex = 0;
SRaftId ids[TSDB_MAX_REPLICA];
SSyncInfo syncInfo;
SSyncFSM *pFsm;
SWal * pWal;
void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
ESyncState state) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
pFsm, index, isWeak, code, state, syncUtilState2String(state));
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
}
void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
ESyncState state) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, index, isWeak,
code, state, syncUtilState2String(state));
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
}
void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
ESyncState state) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
pFsm, index, isWeak, code, state, syncUtilState2String(state));
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
}
void initFsm() {
pFsm = (SSyncFSM *)malloc(sizeof(SSyncFSM));
pFsm->FpCommitCb = CommitCb;
pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb;
}
int64_t syncNodeInit() {
syncInfo.vgId = 1234;
syncInfo.rpcClient = gSyncIO->clientRpc;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "./replicate2_test_%d", myIndex);
int code = walInit();
assert(code == 0);
SWalCfg walCfg;
memset(&walCfg, 0, sizeof(SWalCfg));
walCfg.vgId = syncInfo.vgId;
walCfg.fsyncPeriod = 1000;
walCfg.retentionPeriod = 1000;
walCfg.rollPeriod = 1000;
walCfg.retentionSize = 1000;
walCfg.segSize = 1000;
walCfg.level = TAOS_WAL_FSYNC;
char tmpdir[128];
snprintf(tmpdir, sizeof(tmpdir), "./replicate2_test_wal_%d", myIndex);
pWal = walOpen(tmpdir, &walCfg);
assert(pWal != NULL);
syncInfo.pWal = pWal;
SSyncCfg *pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;
pCfg->replicaNum = replicaNum;
for (int i = 0; i < replicaNum; ++i) {
pCfg->nodeInfo[i].nodePort = ports[i];
snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
}
int64_t rid = syncStart(&syncInfo);
assert(rid > 0);
SSyncNode *pSyncNode = (SSyncNode *)syncNodeAcquire(rid);
assert(pSyncNode != NULL);
// pSyncNode->hbBaseLine = 500;
// pSyncNode->electBaseLine = 1500;
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
gSyncIO->pSyncNode = pSyncNode;
syncNodeRelease(pSyncNode);
return rid;
}
void initRaftId(SSyncNode *pSyncNode) {
for (int i = 0; i < replicaNum; ++i) {
ids[i] = pSyncNode->replicasId[i];
char *s = syncUtilRaftId2Str(&ids[i]);
printf("raftId[%d] : %s\n", i, s);
free(s);
}
}
SRpcMsg *step0(int i) {
SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg));
memset(pMsg, 0, sizeof(SRpcMsg));
pMsg->msgType = 9999;
pMsg->contLen = 128;
pMsg->pCont = malloc(pMsg->contLen);
snprintf((char *)(pMsg->pCont), pMsg->contLen, "value-%u-%d", ports[myIndex], i);
return pMsg;
}
SyncClientRequest *step1(const SRpcMsg *pMsg) {
SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true);
return pRetMsg;
}
int main(int argc, char **argv) {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
void logTest();
myIndex = 0;
if (argc >= 2) {
myIndex = atoi(argv[1]);
}
int recordCount = 100;
if (argc >= 3) {
recordCount = atoi(argv[2]);
}
int sleepMS = 10;
if (argc >= 4) {
sleepMS = atoi(argv[3]);
}
int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]);
assert(ret == 0);
initFsm();
ret = syncInit();
assert(ret == 0);
int64_t rid = syncNodeInit();
assert(rid > 0);
SSyncNode *pSyncNode = (SSyncNode *)syncNodeAcquire(rid);
assert(pSyncNode != NULL);
syncNodePrint2((char *)"", pSyncNode);
initRaftId(pSyncNode);
for (int i = 0; i < recordCount; ++i) {
// step0
SRpcMsg *pMsg0 = step0(i);
syncRpcMsgPrint2((char *)"==step0==", pMsg0);
syncPropose(rid, pMsg0, true);
taosMsleep(sleepMS);
free(pMsg0);
sTrace(
"syncPropose sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, "
"electTimerMS:%d, commitIndex:%ld",
pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm,
pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS,
pSyncNode->commitIndex);
}
while (1) {
sTrace(
"replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, "
"electTimerMS:%d, commitIndex:%ld",
pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm,
pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS,
pSyncNode->commitIndex);
taosMsleep(1000);
}
return 0;
}
......@@ -174,10 +174,6 @@ while $dbCnt < 2
#if $rows != 4 then
# return -1
#endi
##sql select * from st
##if $rows != 4 then
## return -1
##endi
endw
......
......@@ -24,9 +24,10 @@ endi
sql create table ct1 using stb tags ( 1 )
sql create table ct2 using stb tags ( 2 )
sql create table ct3 using stb tags ( 3 )
sql create table ct4 using stb tags ( 4 )
sql show tables
print $rows $data00 $data10 $data20
if $rows != 3 then
if $rows != 4 then
return -1
endi
......@@ -165,7 +166,7 @@ print ===> rows7: $data70 $data71 $data72 $data73 $data74 $data75
#endi
print =============== insert data into child table ct3 (n)
sql insert into ct3 values ( '2021-12-21 01:01:01.000', 1 )
sql insert into ct3 values ( '2021-12-21 01:01:01.000', NULL )
sql insert into ct3 values ( '2021-12-31 01:01:01.000', 1 )
sql insert into ct3 values ( '2022-01-01 01:01:06.000', 2 )
sql insert into ct3 values ( '2022-01-07 01:01:10.000', 3 )
......@@ -233,8 +234,73 @@ print ===> rows7: $data70 $data71 $data72 $data73 $data74
#endi
print =============== insert data into child table ct4 (y)
sql insert into ct4 values ( '2020-10-21 01:01:01.000', 1 )
sql insert into ct4 values ( '2020-12-31 01:01:01.000', 2 )
sql insert into ct4 values ( '2021-01-01 01:01:06.000', 3 )
sql insert into ct4 values ( '2021-05-07 01:01:10.000', 4 )
sql insert into ct4 values ( '2021-09-30 01:01:16.000', 5 )
sql insert into ct4 values ( '2022-02-01 01:01:20.000', 6 )
sql insert into ct4 values ( '2022-10-28 01:01:26.000', 7 )
sql insert into ct4 values ( '2022-12-01 01:01:30.000', 8 )
sql insert into ct4 values ( '2022-12-31 01:01:36.000', 9 )
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct4 interval(1y, 6n)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct4 interval(1y, 6n)
print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04
print ===> rows1: $data10 $data11 $data12 $data13 $data14
print ===> rows2: $data20 $data21 $data22 $data23 $data24
print ===> rows3: $data30 $data31 $data32 $data33 $data34
print ===> rows4: $data40 $data41 $data42 $data43 $data44
#if $rows != 5 then
# return -1
#endi
#if $data00 != 1 then
# return -1
#endi
#if $data40 != 1 then
# return -1
#endi
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct4 interval(1y, 6n) sliding(6n)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct4 interval(1y, 6n) sliding(6n)
print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04
print ===> rows1: $data10 $data11 $data12 $data13 $data14
print ===> rows2: $data20 $data21 $data22 $data23 $data24
print ===> rows3: $data30 $data31 $data32 $data33 $data34
print ===> rows4: $data40 $data41 $data42 $data43 $data44
#if $rows != 5 then
# return -1
#endi
#if $data00 != 1 then
# return -1
#endi
#if $data40 != 1 then
# return -1
#endi
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct4 interval(1y, 6n) sliding(12n)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct4 interval(1y, 6n) sliding(12n)
print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04
print ===> rows1: $data10 $data11 $data12 $data13 $data14
print ===> rows2: $data20 $data21 $data22 $data23 $data24
print ===> rows3: $data30 $data31 $data32 $data33 $data34
print ===> rows4: $data40 $data41 $data42 $data43 $data44
print ===> rows5: $data50 $data51 $data52 $data53 $data54
print ===> rows6: $data60 $data61 $data62 $data63 $data64
print ===> rows7: $data70 $data71 $data72 $data73 $data74
#if $rows != 8 then
# return -1
#endi
#if $data00 != 2 then
# return -1
#endi
#if $data70 != 1 then
# return -1
#endi
......
......@@ -231,6 +231,10 @@ endi
print =============== query data
sql select * from c1
print rows: $rows
print $data00 $data01
print $data10 $data11
print $data20 $data21
if $rows != 3 then
print $rows
return -1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册