提交 8efaa79b 编写于 作者: P Ping Xiao

Merge branch 'develop' into xiaoping/add_test_case2

...@@ -4,6 +4,9 @@ ...@@ -4,6 +4,9 @@
[submodule "src/connector/grafanaplugin"] [submodule "src/connector/grafanaplugin"]
path = src/connector/grafanaplugin path = src/connector/grafanaplugin
url = https://github.com/taosdata/grafanaplugin url = https://github.com/taosdata/grafanaplugin
[submodule "tests/examples/rust"]
path = tests/examples/rust
url = https://github.com/songtianyi/tdengine-rust-bindings.git
[submodule "src/connector/hivemq-tdengine-extension"] [submodule "src/connector/hivemq-tdengine-extension"]
path = src/connector/hivemq-tdengine-extension path = src/connector/hivemq-tdengine-extension
url = https://github.com/huskar-t/hivemq-tdengine-extension.git url = https://github.com/huskar-t/hivemq-tdengine-extension.git
\ No newline at end of file
...@@ -24,8 +24,8 @@ int32_t dnodeInitVWrite(); ...@@ -24,8 +24,8 @@ int32_t dnodeInitVWrite();
void dnodeCleanupVWrite(); void dnodeCleanupVWrite();
void dnodeDispatchToVWriteQueue(SRpcMsg *pMsg); void dnodeDispatchToVWriteQueue(SRpcMsg *pMsg);
void * dnodeAllocVWriteQueue(void *pVnode); void * dnodeAllocVWriteQueue(void *pVnode);
void dnodeFreeVWriteQueue(void *wqueue); void dnodeFreeVWriteQueue(void *pWqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code); void dnodeSendRpcVWriteRsp(void *pVnode, void *pWrite, int32_t code);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -38,11 +38,11 @@ typedef struct { ...@@ -38,11 +38,11 @@ typedef struct {
} SVWriteWorkerPool; } SVWriteWorkerPool;
static SVWriteWorkerPool tsVWriteWP; static SVWriteWorkerPool tsVWriteWP;
static void *dnodeProcessVWriteQueue(void *param); static void *dnodeProcessVWriteQueue(void *pWorker);
int32_t dnodeInitVWrite() { int32_t dnodeInitVWrite() {
tsVWriteWP.max = tsNumOfCores; tsVWriteWP.max = tsNumOfCores;
tsVWriteWP.worker = (SVWriteWorker *)tcalloc(sizeof(SVWriteWorker), tsVWriteWP.max); tsVWriteWP.worker = tcalloc(sizeof(SVWriteWorker), tsVWriteWP.max);
if (tsVWriteWP.worker == NULL) return -1; if (tsVWriteWP.worker == NULL) return -1;
pthread_mutex_init(&tsVWriteWP.mutex, NULL); pthread_mutex_init(&tsVWriteWP.mutex, NULL);
...@@ -162,13 +162,13 @@ void *dnodeAllocVWriteQueue(void *pVnode) { ...@@ -162,13 +162,13 @@ void *dnodeAllocVWriteQueue(void *pVnode) {
return queue; return queue;
} }
void dnodeFreeVWriteQueue(void *wqueue) { void dnodeFreeVWriteQueue(void *pWqueue) {
taosCloseQueue(wqueue); taosCloseQueue(pWqueue);
} }
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) { void dnodeSendRpcVWriteRsp(void *pVnode, void *wparam, int32_t code) {
if (param == NULL) return; if (wparam == NULL) return;
SVWriteMsg *pWrite = param; SVWriteMsg *pWrite = wparam;
if (code < 0) pWrite->code = code; if (code < 0) pWrite->code = code;
int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1); int32_t count = atomic_add_fetch_32(&pWrite->processedCount, 1);
...@@ -183,13 +183,11 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) { ...@@ -183,13 +183,11 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) {
}; };
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
taosFreeQitem(pWrite); vnodeFreeFromWQueue(pVnode, pWrite);
vnodeRelease(pVnode);
} }
static void *dnodeProcessVWriteQueue(void *param) { static void *dnodeProcessVWriteQueue(void *wparam) {
SVWriteWorker *pWorker = param; SVWriteWorker *pWorker = wparam;
SVWriteMsg * pWrite; SVWriteMsg * pWrite;
void * pVnode; void * pVnode;
int32_t numOfMsgs; int32_t numOfMsgs;
...@@ -232,8 +230,7 @@ static void *dnodeProcessVWriteQueue(void *param) { ...@@ -232,8 +230,7 @@ static void *dnodeProcessVWriteQueue(void *param) {
if (pWrite->rspRet.rsp) { if (pWrite->rspRet.rsp) {
rpcFreeCont(pWrite->rspRet.rsp); rpcFreeCont(pWrite->rspRet.rsp);
} }
taosFreeQitem(pWrite); vnodeFreeFromWQueue(pVnode, pWrite);
vnodeRelease(pVnode);
} }
} }
} }
......
...@@ -54,8 +54,8 @@ void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet ...@@ -54,8 +54,8 @@ void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet
void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid); void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid);
void *dnodeAllocVWriteQueue(void *pVnode); void *dnodeAllocVWriteQueue(void *pVnode);
void dnodeFreeVWriteQueue(void *wqueue); void dnodeFreeVWriteQueue(void *pWqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code); void dnodeSendRpcVWriteRsp(void *pVnode, void *pWrite, int32_t code);
void *dnodeAllocVReadQueue(void *pVnode); void *dnodeAllocVReadQueue(void *pVnode);
void dnodeFreeVReadQueue(void *rqueue); void dnodeFreeVReadQueue(void *rqueue);
......
...@@ -201,7 +201,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR, 0, 0x0507, "Missing da ...@@ -201,7 +201,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR, 0, 0x0507, "Missing da
TAOS_DEFINE_ERROR(TSDB_CODE_VND_OUT_OF_MEMORY, 0, 0x0508, "Out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_VND_OUT_OF_MEMORY, 0, 0x0508, "Out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, 0, 0x0509, "Unexpected generic error in vnode") TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, 0, 0x0509, "Unexpected generic error in vnode")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VRESION_FILE, 0, 0x050A, "Invalid version file") TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VRESION_FILE, 0, 0x050A, "Invalid version file")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FULL, 0, 0x050B, "Vnode memory is full for commit is failed") TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FULL, 0, 0x050B, "Vnode memory is full because commit failed")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, 0, 0x0511, "Database suspended") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, 0, 0x0511, "Database suspended")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0512, "Write operation denied") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0512, "Write operation denied")
......
...@@ -70,11 +70,12 @@ void* vnodeAcquire(int32_t vgId); // add refcount ...@@ -70,11 +70,12 @@ void* vnodeAcquire(int32_t vgId); // add refcount
void vnodeRelease(void *pVnode); // dec refCount void vnodeRelease(void *pVnode); // dec refCount
void* vnodeGetWal(void *pVnode); void* vnodeGetWal(void *pVnode);
int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam); int32_t vnodeWriteToWQueue(void *pVnode, void *pHead, int32_t qtype, void *pRpcMsg);
int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam); void vnodeFreeFromWQueue(void *pVnode, SVWriteMsg *pWrite);
int32_t vnodeProcessWrite(void *pVnode, void *pHead, int32_t qtype, void *pRspRet);
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes); int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
void vnodeBuildStatusMsg(void *param); void vnodeBuildStatusMsg(void *pStatus);
void vnodeConfirmForward(void *param, uint64_t version, int32_t code); void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code);
void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes); void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes);
int32_t vnodeInitResources(); int32_t vnodeInitResources();
......
...@@ -111,6 +111,9 @@ void taosUninitTimer() { ...@@ -111,6 +111,9 @@ void taosUninitTimer() {
pthread_sigmask(SIG_BLOCK, &set, NULL); pthread_sigmask(SIG_BLOCK, &set, NULL);
*/ */
void taosMsleep(int mseconds) { void taosMsleep(int mseconds) {
#if 1
usleep(mseconds * 1000);
#else
struct timeval timeout; struct timeval timeout;
int seconds, useconds; int seconds, useconds;
...@@ -126,7 +129,8 @@ void taosMsleep(int mseconds) { ...@@ -126,7 +129,8 @@ void taosMsleep(int mseconds) {
select(0, NULL, NULL, NULL, &timeout); select(0, NULL, NULL, NULL, &timeout);
/* pthread_sigmask(SIG_UNBLOCK, &set, NULL); */ /* pthread_sigmask(SIG_UNBLOCK, &set, NULL); */
#endif
} }
#endif #endif
\ No newline at end of file
...@@ -85,7 +85,7 @@ static void httpProcessHttpData(void *param) { ...@@ -85,7 +85,7 @@ static void httpProcessHttpData(void *param) {
while (1) { while (1) {
struct epoll_event events[HTTP_MAX_EVENTS]; struct epoll_event events[HTTP_MAX_EVENTS];
//-1 means uncertainty, 0-nowait, 1-wait 1 ms, set it from -1 to 1 //-1 means uncertainty, 0-nowait, 1-wait 1 ms, set it from -1 to 1
fdNum = epoll_wait(pThread->pollFd, events, HTTP_MAX_EVENTS, 1); fdNum = epoll_wait(pThread->pollFd, events, HTTP_MAX_EVENTS, TAOS_EPOLL_WAIT_TIME);
if (pThread->stop) { if (pThread->stop) {
httpDebug("%p, http thread get stop event, exiting...", pThread); httpDebug("%p, http thread get stop event, exiting...", pThread);
break; break;
......
...@@ -36,6 +36,7 @@ extern "C" { ...@@ -36,6 +36,7 @@ extern "C" {
#define TAOS_SMSG_STATUS 7 #define TAOS_SMSG_STATUS 7
#define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16) #define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16)
#define SYNC_RECV_BUFFER_SIZE (5*1024*1024)
#define nodeRole pNode->peerInfo[pNode->selfIndex]->role #define nodeRole pNode->peerInfo[pNode->selfIndex]->role
#define nodeVersion pNode->peerInfo[pNode->selfIndex]->version #define nodeVersion pNode->peerInfo[pNode->selfIndex]->version
......
...@@ -179,6 +179,13 @@ int64_t syncStart(const SSyncInfo *pInfo) { ...@@ -179,6 +179,13 @@ int64_t syncStart(const SSyncInfo *pInfo) {
for (int32_t i = 0; i < pCfg->replica; ++i) { for (int32_t i = 0; i < pCfg->replica; ++i) {
const SNodeInfo *pNodeInfo = pCfg->nodeInfo + i; const SNodeInfo *pNodeInfo = pCfg->nodeInfo + i;
pNode->peerInfo[i] = syncAddPeer(pNode, pNodeInfo); pNode->peerInfo[i] = syncAddPeer(pNode, pNodeInfo);
if (pNode->peerInfo[i] == NULL) {
sError("vgId:%d, node:%d fqdn:%s port:%u is not configured, stop taosd", pNode->vgId, pNodeInfo->nodeId, pNodeInfo->nodeFqdn,
pNodeInfo->nodePort);
syncStop(pNode->rid);
exit(1);
}
if ((strcmp(pNodeInfo->nodeFqdn, tsNodeFqdn) == 0) && (pNodeInfo->nodePort == tsSyncPort)) { if ((strcmp(pNodeInfo->nodeFqdn, tsNodeFqdn) == 0) && (pNodeInfo->nodePort == tsSyncPort)) {
pNode->selfIndex = i; pNode->selfIndex = i;
} }
...@@ -476,7 +483,11 @@ static void syncRemovePeer(SSyncPeer *pPeer) { ...@@ -476,7 +483,11 @@ static void syncRemovePeer(SSyncPeer *pPeer) {
static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
uint32_t ip = taosGetIpFromFqdn(pInfo->nodeFqdn); uint32_t ip = taosGetIpFromFqdn(pInfo->nodeFqdn);
if (ip == -1) return NULL; if (ip == 0xFFFFFFFF) {
sError("failed to add peer, can resolve fqdn:%s since %s", pInfo->nodeFqdn, strerror(errno));
terrno = TSDB_CODE_RPC_FQDN_ERROR;
return NULL;
}
SSyncPeer *pPeer = calloc(1, sizeof(SSyncPeer)); SSyncPeer *pPeer = calloc(1, sizeof(SSyncPeer));
if (pPeer == NULL) return NULL; if (pPeer == NULL) return NULL;
......
...@@ -136,7 +136,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) { ...@@ -136,7 +136,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
int32_t ret, code = -1; int32_t ret, code = -1;
void *buffer = calloc(1024000, 1); // size for one record void *buffer = calloc(SYNC_MAX_SIZE, 1); // size for one record
if (buffer == NULL) return -1; if (buffer == NULL) return -1;
SWalHead *pHead = (SWalHead *)buffer; SWalHead *pHead = (SWalHead *)buffer;
...@@ -237,7 +237,7 @@ static int32_t syncOpenRecvBuffer(SSyncNode *pNode) { ...@@ -237,7 +237,7 @@ static int32_t syncOpenRecvBuffer(SSyncNode *pNode) {
SRecvBuffer *pRecv = calloc(sizeof(SRecvBuffer), 1); SRecvBuffer *pRecv = calloc(sizeof(SRecvBuffer), 1);
if (pRecv == NULL) return -1; if (pRecv == NULL) return -1;
pRecv->bufferSize = 5000000; pRecv->bufferSize = SYNC_RECV_BUFFER_SIZE;
pRecv->buffer = malloc(pRecv->bufferSize); pRecv->buffer = malloc(pRecv->bufferSize);
if (pRecv->buffer == NULL) { if (pRecv->buffer == NULL) {
free(pRecv); free(pRecv);
......
...@@ -282,7 +282,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe ...@@ -282,7 +282,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
_err: _err:
tfree(dataDir); tfree(dataDir);
tsdbCloseHelperFile(pHelper, 1, NULL); tsdbCloseHelperFile(pHelper, 1, pGroup);
return -1; return -1;
} }
......
...@@ -145,6 +145,7 @@ void tSkipListPutBatch(SSkipList *pSkipList, void **ppData, int ndata) { ...@@ -145,6 +145,7 @@ void tSkipListPutBatch(SSkipList *pSkipList, void **ppData, int ndata) {
// forward to put the rest of data // forward to put the rest of data
for (int idata = 1; idata < ndata; idata++) { for (int idata = 1; idata < ndata; idata++) {
pDataKey = pSkipList->keyFn(ppData[idata]); pDataKey = pSkipList->keyFn(ppData[idata]);
hasDup = false;
// Compare max key // Compare max key
pKey = SL_GET_MAX_KEY(pSkipList); pKey = SL_GET_MAX_KEY(pSkipList);
...@@ -153,8 +154,6 @@ void tSkipListPutBatch(SSkipList *pSkipList, void **ppData, int ndata) { ...@@ -153,8 +154,6 @@ void tSkipListPutBatch(SSkipList *pSkipList, void **ppData, int ndata) {
for (int i = 0; i < pSkipList->maxLevel; i++) { for (int i = 0; i < pSkipList->maxLevel; i++) {
forward[i] = SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail, i); forward[i] = SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail, i);
} }
hasDup = false;
} else { } else {
SSkipListNode *px = pSkipList->pHead; SSkipListNode *px = pSkipList->pHead;
for (int i = pSkipList->maxLevel - 1; i >= 0; --i) { for (int i = pSkipList->maxLevel - 1; i >= 0; --i) {
...@@ -173,7 +172,7 @@ void tSkipListPutBatch(SSkipList *pSkipList, void **ppData, int ndata) { ...@@ -173,7 +172,7 @@ void tSkipListPutBatch(SSkipList *pSkipList, void **ppData, int ndata) {
compare = pSkipList->comparFn(pKey, pDataKey); compare = pSkipList->comparFn(pKey, pDataKey);
if (compare >= 0) { if (compare >= 0) {
if (compare == 0) hasDup = true; if (compare == 0 && !hasDup) hasDup = true;
break; break;
} else { } else {
px = p; px = p;
......
...@@ -58,13 +58,13 @@ uint32_t taosGetIpFromFqdn(const char *fqdn) { ...@@ -58,13 +58,13 @@ uint32_t taosGetIpFromFqdn(const char *fqdn) {
} else { } else {
#ifdef EAI_SYSTEM #ifdef EAI_SYSTEM
if (ret == EAI_SYSTEM) { if (ret == EAI_SYSTEM) {
uError("failed to get the ip address, fqdn:%s, code:%d, reason:%s", fqdn, ret, strerror(errno)); uError("failed to get the ip address, fqdn:%s, since:%s", fqdn, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
} else { } else {
uError("failed to get the ip address, fqdn:%s, code:%d, reason:%s", fqdn, ret, gai_strerror(ret)); uError("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret));
} }
#else #else
uError("failed to get the ip address, fqdn:%s, code:%d, reason:%s", fqdn, ret, gai_strerror(ret)); uError("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret));
#endif #endif
return 0xFFFFFFFF; return 0xFFFFFFFF;
} }
......
...@@ -37,6 +37,7 @@ extern int32_t vDebugFlag; ...@@ -37,6 +37,7 @@ extern int32_t vDebugFlag;
typedef struct { typedef struct {
int32_t vgId; // global vnode group ID int32_t vgId; // global vnode group ID
int32_t refCount; // reference count int32_t refCount; // reference count
int32_t queuedMsg;
int32_t delay; int32_t delay;
int8_t status; int8_t status;
int8_t role; int8_t role;
......
...@@ -419,7 +419,11 @@ void vnodeRelease(void *pVnodeRaw) { ...@@ -419,7 +419,11 @@ void vnodeRelease(void *pVnodeRaw) {
} }
if (pVnode->wal) { if (pVnode->wal) {
if (code == 0) walRemoveAllOldFiles(pVnode->wal); if (code != 0) {
vError("vgId:%d, failed to commit while close tsdb repo, keep wal", pVnode->vgId);
} else {
walRemoveAllOldFiles(pVnode->wal);
}
walClose(pVnode->wal); walClose(pVnode->wal);
pVnode->wal = NULL; pVnode->wal = NULL;
} }
......
...@@ -28,13 +28,15 @@ ...@@ -28,13 +28,15 @@
#include "syncInt.h" #include "syncInt.h"
#include "tcq.h" #include "tcq.h"
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, SRspRet *); #define MAX_QUEUED_MSG_NUM 10000
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, SRspRet *);
static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet); static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
void vnodeInitWriteFp(void) { void vnodeInitWriteFp(void) {
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg;
...@@ -100,8 +102,8 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara ...@@ -100,8 +102,8 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
return syncCode; return syncCode;
} }
static int32_t vnodeCheckWrite(void *param) { static int32_t vnodeCheckWrite(void *vparam) {
SVnodeObj *pVnode = param; SVnodeObj *pVnode = vparam;
if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) { if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) {
vDebug("vgId:%d, no write auth, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); vDebug("vgId:%d, no write auth, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
return TSDB_CODE_VND_NO_WRITE_AUTH; return TSDB_CODE_VND_NO_WRITE_AUTH;
...@@ -127,8 +129,8 @@ static int32_t vnodeCheckWrite(void *param) { ...@@ -127,8 +129,8 @@ static int32_t vnodeCheckWrite(void *param) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void vnodeConfirmForward(void *param, uint64_t version, int32_t code) { void vnodeConfirmForward(void *vparam, uint64_t version, int32_t code) {
SVnodeObj *pVnode = (SVnodeObj *)param; SVnodeObj *pVnode = vparam;
syncConfirmForward(pVnode->sync, version, code); syncConfirmForward(pVnode->sync, version, code);
} }
...@@ -242,8 +244,25 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar ...@@ -242,8 +244,25 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar
memcpy(pWrite->pHead, pHead, sizeof(SWalHead) + pHead->len); memcpy(pWrite->pHead, pHead, sizeof(SWalHead) + pHead->len);
atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->refCount, 1);
vTrace("vgId:%d, get vnode wqueue, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
int32_t queued = atomic_add_fetch_32(&pVnode->queuedMsg, 1);
if (queued > MAX_QUEUED_MSG_NUM) {
vDebug("vgId:%d, too many msg:%d in vwqueue, flow control", pVnode->vgId, queued);
taosMsleep(1);
}
vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedMsg);
taosWriteQitem(pVnode->wqueue, qtype, pWrite); taosWriteQitem(pVnode->wqueue, qtype, pWrite);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) {
SVnodeObj *pVnode = vparam;
atomic_sub_fetch_32(&pVnode->queuedMsg, 1);
vTrace("vgId:%d, free from vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedMsg);
taosFreeQitem(pWrite);
vnodeRelease(pVnode);
}
...@@ -132,7 +132,7 @@ https://www.taosdata.com/cn/all-downloads/ ...@@ -132,7 +132,7 @@ https://www.taosdata.com/cn/all-downloads/
配置完成后,在命令行内使用taos shell连接server端 配置完成后,在命令行内使用taos shell连接server端
```shell ```shell
C:\TDengine>taos C:\TDengine>taos -h td01
Welcome to the TDengine shell from Linux, Client Version:2.0.1.1 Welcome to the TDengine shell from Linux, Client Version:2.0.1.1
Copyright (c) 2017 by TAOS Data, Inc. All rights reserved. Copyright (c) 2017 by TAOS Data, Inc. All rights reserved.
......
Subproject commit f2ffd30521b8e8afbc9d25c75f8eeeb6a48bd030
[package]
name = "tdengine"
version = "0.1.0"
authors = ["Chunhua Jiang <jiangch@3reality.com>"]
edition = "2018"
[dependencies]
# TDengine driver connector for Rust
It's a rust implementation for [TDengine](https://github.com/taosdata/TDengine), an open-sourced big data platform designed and optimized for the Internet of Things (IoT), Connected Cars, Industrial IoT, and IT Infrastructure and Application Monitoring.
## Dependencies
- Rust:
```
curl https://sh.rustup.rs -sSf | sh
```
## Run with Sample
Build and run basic sample:
```
cargo run --example demo
```
Build and run subscribe sample:
```
cargo run --example subscribe
```
// build.rs
use std::env;
fn main() {
let project_dir = env::var("CARGO_MANIFEST_DIR").unwrap();
println!("cargo:rustc-link-search={}", project_dir); // the "-L" flag
println!("cargo:rustc-link-lib=taos"); // the "-l" flag
}
use std::process;
use tdengine::Tdengine;
fn main() {
let tde = Tdengine::new("127.0.0.1", "root", "taosdata", "demo", 0)
.unwrap_or_else(|err| {
eprintln!("Can't create Tdengine: {}", err);
process::exit(1)
});
tde.query("drop database demo");
tde.query("create database demo");
tde.query("use demo");
tde.query("create table m1 (ts timestamp, speed int)");
for i in 0..10 {
tde.query(format!("insert into m1 values (now+{}s, {})", i, i).as_str());
}
}
use std::process;
use tdengine::Subscriber;
fn main() {
let subscriber = Subscriber::new("127.0.0.1", "root", "taosdata", "demo", "m1", 0, 1000)
.unwrap_or_else(|err| {
eprintln!("Can't create Subscriber: {}", err);
process::exit(1)
});
loop {
let row = subscriber.consume().unwrap_or_else(|err| {
eprintln!("consume exit: {}", err);
process::exit(1)
});
subscriber.print_row(&row);
}
}
/* automatically generated by rust-bindgen */
#![allow(unused)]
#![allow(non_camel_case_types)]
pub const _STDINT_H: u32 = 1;
pub const _FEATURES_H: u32 = 1;
pub const _DEFAULT_SOURCE: u32 = 1;
pub const __USE_ISOC11: u32 = 1;
pub const __USE_ISOC99: u32 = 1;
pub const __USE_ISOC95: u32 = 1;
pub const __USE_POSIX_IMPLICITLY: u32 = 1;
pub const _POSIX_SOURCE: u32 = 1;
pub const _POSIX_C_SOURCE: u32 = 200809;
pub const __USE_POSIX: u32 = 1;
pub const __USE_POSIX2: u32 = 1;
pub const __USE_POSIX199309: u32 = 1;
pub const __USE_POSIX199506: u32 = 1;
pub const __USE_XOPEN2K: u32 = 1;
pub const __USE_XOPEN2K8: u32 = 1;
pub const _ATFILE_SOURCE: u32 = 1;
pub const __USE_MISC: u32 = 1;
pub const __USE_ATFILE: u32 = 1;
pub const __USE_FORTIFY_LEVEL: u32 = 0;
pub const _STDC_PREDEF_H: u32 = 1;
pub const __STDC_IEC_559__: u32 = 1;
pub const __STDC_IEC_559_COMPLEX__: u32 = 1;
pub const __STDC_ISO_10646__: u32 = 201505;
pub const __STDC_NO_THREADS__: u32 = 1;
pub const __GNU_LIBRARY__: u32 = 6;
pub const __GLIBC__: u32 = 2;
pub const __GLIBC_MINOR__: u32 = 23;
pub const _SYS_CDEFS_H: u32 = 1;
pub const __WORDSIZE: u32 = 64;
pub const __WORDSIZE_TIME64_COMPAT32: u32 = 1;
pub const __SYSCALL_WORDSIZE: u32 = 64;
pub const _BITS_WCHAR_H: u32 = 1;
pub const INT8_MIN: i32 = -128;
pub const INT16_MIN: i32 = -32768;
pub const INT32_MIN: i32 = -2147483648;
pub const INT8_MAX: u32 = 127;
pub const INT16_MAX: u32 = 32767;
pub const INT32_MAX: u32 = 2147483647;
pub const UINT8_MAX: u32 = 255;
pub const UINT16_MAX: u32 = 65535;
pub const UINT32_MAX: u32 = 4294967295;
pub const INT_LEAST8_MIN: i32 = -128;
pub const INT_LEAST16_MIN: i32 = -32768;
pub const INT_LEAST32_MIN: i32 = -2147483648;
pub const INT_LEAST8_MAX: u32 = 127;
pub const INT_LEAST16_MAX: u32 = 32767;
pub const INT_LEAST32_MAX: u32 = 2147483647;
pub const UINT_LEAST8_MAX: u32 = 255;
pub const UINT_LEAST16_MAX: u32 = 65535;
pub const UINT_LEAST32_MAX: u32 = 4294967295;
pub const INT_FAST8_MIN: i32 = -128;
pub const INT_FAST16_MIN: i64 = -9223372036854775808;
pub const INT_FAST32_MIN: i64 = -9223372036854775808;
pub const INT_FAST8_MAX: u32 = 127;
pub const INT_FAST16_MAX: u64 = 9223372036854775807;
pub const INT_FAST32_MAX: u64 = 9223372036854775807;
pub const UINT_FAST8_MAX: u32 = 255;
pub const UINT_FAST16_MAX: i32 = -1;
pub const UINT_FAST32_MAX: i32 = -1;
pub const INTPTR_MIN: i64 = -9223372036854775808;
pub const INTPTR_MAX: u64 = 9223372036854775807;
pub const UINTPTR_MAX: i32 = -1;
pub const PTRDIFF_MIN: i64 = -9223372036854775808;
pub const PTRDIFF_MAX: u64 = 9223372036854775807;
pub const SIG_ATOMIC_MIN: i32 = -2147483648;
pub const SIG_ATOMIC_MAX: u32 = 2147483647;
pub const SIZE_MAX: i32 = -1;
pub const WINT_MIN: u32 = 0;
pub const WINT_MAX: u32 = 4294967295;
pub const TSDB_DATA_TYPE_NULL: u32 = 0;
pub const TSDB_DATA_TYPE_BOOL: u32 = 1;
pub const TSDB_DATA_TYPE_TINYINT: u32 = 2;
pub const TSDB_DATA_TYPE_SMALLINT: u32 = 3;
pub const TSDB_DATA_TYPE_INT: u32 = 4;
pub const TSDB_DATA_TYPE_BIGINT: u32 = 5;
pub const TSDB_DATA_TYPE_FLOAT: u32 = 6;
pub const TSDB_DATA_TYPE_DOUBLE: u32 = 7;
pub const TSDB_DATA_TYPE_BINARY: u32 = 8;
pub const TSDB_DATA_TYPE_TIMESTAMP: u32 = 9;
pub const TSDB_DATA_TYPE_NCHAR: u32 = 10;
pub type int_least8_t = ::std::os::raw::c_schar;
pub type int_least16_t = ::std::os::raw::c_short;
pub type int_least32_t = ::std::os::raw::c_int;
pub type int_least64_t = ::std::os::raw::c_long;
pub type uint_least8_t = ::std::os::raw::c_uchar;
pub type uint_least16_t = ::std::os::raw::c_ushort;
pub type uint_least32_t = ::std::os::raw::c_uint;
pub type uint_least64_t = ::std::os::raw::c_ulong;
pub type int_fast8_t = ::std::os::raw::c_schar;
pub type int_fast16_t = ::std::os::raw::c_long;
pub type int_fast32_t = ::std::os::raw::c_long;
pub type int_fast64_t = ::std::os::raw::c_long;
pub type uint_fast8_t = ::std::os::raw::c_uchar;
pub type uint_fast16_t = ::std::os::raw::c_ulong;
pub type uint_fast32_t = ::std::os::raw::c_ulong;
pub type uint_fast64_t = ::std::os::raw::c_ulong;
pub type intmax_t = ::std::os::raw::c_long;
pub type uintmax_t = ::std::os::raw::c_ulong;
pub const TSDB_OPTION_TSDB_OPTION_LOCALE: TSDB_OPTION = 0;
pub const TSDB_OPTION_TSDB_OPTION_CHARSET: TSDB_OPTION = 1;
pub const TSDB_OPTION_TSDB_OPTION_TIMEZONE: TSDB_OPTION = 2;
pub const TSDB_OPTION_TSDB_OPTION_CONFIGDIR: TSDB_OPTION = 3;
pub const TSDB_OPTION_TSDB_OPTION_SHELL_ACTIVITY_TIMER: TSDB_OPTION = 4;
pub const TSDB_OPTION_TSDB_MAX_OPTIONS: TSDB_OPTION = 5;
pub type TSDB_OPTION = u32;
#[repr(C)]
#[derive(Copy, Clone)]
pub struct taosField {
pub name: [::std::os::raw::c_char; 64usize],
pub bytes: ::std::os::raw::c_short,
pub type_: ::std::os::raw::c_char,
}
#[test]
fn bindgen_test_layout_taosField() {
assert_eq!(
::std::mem::size_of::<taosField>(),
68usize,
concat!("Size of: ", stringify!(taosField))
);
assert_eq!(
::std::mem::align_of::<taosField>(),
2usize,
concat!("Alignment of ", stringify!(taosField))
);
assert_eq!(
unsafe { &(*(::std::ptr::null::<taosField>())).name as *const _ as usize },
0usize,
concat!(
"Offset of field: ",
stringify!(taosField),
"::",
stringify!(name)
)
);
assert_eq!(
unsafe { &(*(::std::ptr::null::<taosField>())).bytes as *const _ as usize },
64usize,
concat!(
"Offset of field: ",
stringify!(taosField),
"::",
stringify!(bytes)
)
);
assert_eq!(
unsafe { &(*(::std::ptr::null::<taosField>())).type_ as *const _ as usize },
66usize,
concat!(
"Offset of field: ",
stringify!(taosField),
"::",
stringify!(type_)
)
);
}
pub type TAOS_FIELD = taosField;
extern "C" {
pub fn taos_init();
}
extern "C" {
pub fn taos_options(
option: TSDB_OPTION,
arg: *const ::std::os::raw::c_void,
...
) -> ::std::os::raw::c_int;
}
extern "C" {
pub fn taos_connect(
ip: *mut ::std::os::raw::c_char,
user: *mut ::std::os::raw::c_char,
pass: *mut ::std::os::raw::c_char,
db: *mut ::std::os::raw::c_char,
port: ::std::os::raw::c_int,
) -> *mut ::std::os::raw::c_void;
}
extern "C" {
pub fn taos_close(taos: *mut ::std::os::raw::c_void);
}
extern "C" {
pub fn taos_query(
taos: *mut ::std::os::raw::c_void,
sqlstr: *mut ::std::os::raw::c_char,
) -> ::std::os::raw::c_int;
}
extern "C" {
pub fn taos_use_result(taos: *mut ::std::os::raw::c_void) -> *mut ::std::os::raw::c_void;
}
extern "C" {
pub fn taos_fetch_row(res: *mut ::std::os::raw::c_void) -> *mut *mut ::std::os::raw::c_void;
}
extern "C" {
pub fn taos_result_precision(res: *mut ::std::os::raw::c_void) -> ::std::os::raw::c_int;
}
extern "C" {
pub fn taos_free_result(res: *mut ::std::os::raw::c_void);
}
extern "C" {
pub fn taos_field_count(taos: *mut ::std::os::raw::c_void) -> ::std::os::raw::c_int;
}
extern "C" {
pub fn taos_num_fields(res: *mut ::std::os::raw::c_void) -> ::std::os::raw::c_int;
}
extern "C" {
pub fn taos_affected_rows(taos: *mut ::std::os::raw::c_void) -> ::std::os::raw::c_int;
}
extern "C" {
pub fn taos_fetch_fields(res: *mut ::std::os::raw::c_void) -> *mut TAOS_FIELD;
}
extern "C" {
pub fn taos_select_db(
taos: *mut ::std::os::raw::c_void,
db: *mut ::std::os::raw::c_char,
) -> ::std::os::raw::c_int;
}
extern "C" {
pub fn taos_print_row(
str: *mut ::std::os::raw::c_char,
row: *mut *mut ::std::os::raw::c_void,
fields: *mut TAOS_FIELD,
num_fields: ::std::os::raw::c_int,
) -> ::std::os::raw::c_int;
}
extern "C" {
pub fn taos_stop_query(res: *mut ::std::os::raw::c_void);
}
extern "C" {
pub fn taos_fetch_block(
res: *mut ::std::os::raw::c_void,
rows: *mut *mut *mut ::std::os::raw::c_void,
) -> ::std::os::raw::c_int;
}
extern "C" {
pub fn taos_validate_sql(
taos: *mut ::std::os::raw::c_void,
sql: *mut ::std::os::raw::c_char,
) -> ::std::os::raw::c_int;
}
extern "C" {
pub fn taos_get_server_info(taos: *mut ::std::os::raw::c_void) -> *mut ::std::os::raw::c_char;
}
extern "C" {
pub fn taos_get_client_info() -> *mut ::std::os::raw::c_char;
}
extern "C" {
pub fn taos_errstr(taos: *mut ::std::os::raw::c_void) -> *mut ::std::os::raw::c_char;
}
extern "C" {
pub fn taos_errno(taos: *mut ::std::os::raw::c_void) -> ::std::os::raw::c_int;
}
extern "C" {
pub fn taos_query_a(
taos: *mut ::std::os::raw::c_void,
sqlstr: *mut ::std::os::raw::c_char,
fp: ::std::option::Option<
unsafe extern "C" fn(
param: *mut ::std::os::raw::c_void,
arg1: *mut ::std::os::raw::c_void,
code: ::std::os::raw::c_int,
),
>,
param: *mut ::std::os::raw::c_void,
);
}
extern "C" {
pub fn taos_fetch_rows_a(
res: *mut ::std::os::raw::c_void,
fp: ::std::option::Option<
unsafe extern "C" fn(
param: *mut ::std::os::raw::c_void,
arg1: *mut ::std::os::raw::c_void,
numOfRows: ::std::os::raw::c_int,
),
>,
param: *mut ::std::os::raw::c_void,
);
}
extern "C" {
pub fn taos_fetch_row_a(
res: *mut ::std::os::raw::c_void,
fp: ::std::option::Option<
unsafe extern "C" fn(
param: *mut ::std::os::raw::c_void,
arg1: *mut ::std::os::raw::c_void,
row: *mut *mut ::std::os::raw::c_void,
),
>,
param: *mut ::std::os::raw::c_void,
);
}
extern "C" {
pub fn taos_subscribe(
host: *mut ::std::os::raw::c_char,
user: *mut ::std::os::raw::c_char,
pass: *mut ::std::os::raw::c_char,
db: *mut ::std::os::raw::c_char,
table: *mut ::std::os::raw::c_char,
time: i64,
mseconds: ::std::os::raw::c_int,
) -> *mut ::std::os::raw::c_void;
}
extern "C" {
pub fn taos_consume(tsub: *mut ::std::os::raw::c_void) -> *mut *mut ::std::os::raw::c_void;
}
extern "C" {
pub fn taos_unsubscribe(tsub: *mut ::std::os::raw::c_void);
}
extern "C" {
pub fn taos_open_stream(
taos: *mut ::std::os::raw::c_void,
sqlstr: *mut ::std::os::raw::c_char,
fp: ::std::option::Option<
unsafe extern "C" fn(
param: *mut ::std::os::raw::c_void,
arg1: *mut ::std::os::raw::c_void,
row: *mut *mut ::std::os::raw::c_void,
),
>,
stime: i64,
param: *mut ::std::os::raw::c_void,
callback: ::std::option::Option<unsafe extern "C" fn(arg1: *mut ::std::os::raw::c_void)>,
) -> *mut ::std::os::raw::c_void;
}
extern "C" {
pub fn taos_close_stream(tstr: *mut ::std::os::raw::c_void);
}
extern "C" {
pub static mut configDir: [::std::os::raw::c_char; 0usize];
}
#![allow(unused)]
#![allow(non_camel_case_types)]
pub mod subscriber;
pub use subscriber::*;
pub mod tdengine;
pub use tdengine::*;
pub mod utils;
\ No newline at end of file
#![allow(non_camel_case_types)]
#![allow(non_snake_case)]
#[path = "utils.rs"]
mod utils;
use utils::*;
use utils::bindings::*;
use std::os::raw::{c_void, c_char, c_int, c_long};
pub struct Subscriber {
tsub: *mut c_void,
fields: *mut taosField,
fcount: c_int,
}
impl Subscriber {
pub fn new(host: &str,
username: &str,
passwd: &str,
db: &str,
table:&str,
time: i64,
mseconds: i32
) -> Result<Subscriber, &'static str> {
unsafe {
let mut tsub = taos_subscribe(str_into_raw(host),
str_into_raw(username),
str_into_raw(passwd),
str_into_raw(db),
str_into_raw(table),
time as c_long,
mseconds as c_int);
if tsub.is_null() {
return Err("subscribe error")
}
println!("subscribed to {} user:{}, db:{}, tb:{}, time:{}, mseconds:{}",
host, username, db, table, time, mseconds);
let mut fields = taos_fetch_fields(tsub);
if fields.is_null() {
taos_unsubscribe(tsub);
return Err("fetch fields error")
}
let fcount = taos_field_count(tsub);
if fcount == 0 {
taos_unsubscribe(tsub);
return Err("fields count is 0")
}
Ok(Subscriber{tsub, fields, fcount})
}
}
pub fn consume(self: &Subscriber) -> Result<Row, &'static str> {
unsafe {
let taosRow = taos_consume(self.tsub);
if taosRow.is_null() {
return Err("consume error")
}
let taosRow= std::slice::from_raw_parts(taosRow, self.fcount as usize);
let row = raw_into_row(self.fields, self.fcount, &taosRow);
Ok(row)
}
}
pub fn print_row(self: &Subscriber, row: &Row) {
println!("{}", format_row(row));
}
}
impl Drop for Subscriber {
fn drop(&mut self) {
unsafe {taos_unsubscribe(self.tsub);}
}
}
#[path = "bindings.rs"]
mod bindings;
use bindings::*;
#[path = "utils.rs"]
mod utils;
use utils::*;
use std::os::raw::c_void;
use std::os::raw::c_char;
use std::os::raw::c_int;
use std::os::raw::c_long;
pub struct Tdengine {
conn: *mut c_void,
}
/// - **TODO**: doc
impl Tdengine {
//! - **TODO**: implement default param.
//!
//! > refer to https://stackoverflow.com/questions/24047686/default-function-arguments-in-rust
pub fn new(ip: &str, username: &str, passwd: &str, db: &str, port: i32) -> Result<Tdengine, &'static str> {
unsafe {
taos_init();
let mut conn = taos_connect(str_into_raw(ip),
str_into_raw(username),
str_into_raw(passwd),
str_into_raw(db),
port as c_int);
if conn.is_null() {
Err("connect error")
} else {
println!("connected to {}:{} user:{}, db:{}", ip, port, username, db);
Ok(Tdengine {conn})
}
}
}
// - **TODO**: check error code
pub fn query(self: &Tdengine, s: &str) {
unsafe {
if taos_query(self.conn, str_into_raw(s)) == 0 {
println!("query '{}' ok", s);
} else {
println!("query '{}' error: {}", s, raw_into_str(taos_errstr(self.conn)));
}
}
}
}
impl Drop for Tdengine {
fn drop(&mut self) {
unsafe {taos_close(self.conn);}
}
}
#[cfg(test)]
mod tests {
#[test]
fn it_works() {
assert_eq!(2 + 2, 4);
}
}
\ No newline at end of file
#[path = "bindings.rs"]
pub mod bindings;
use bindings::*;
use std::fmt;
use std::fmt::Display;
use std::os::raw::{c_void, c_char, c_int};
use std::ffi::{CString, CStr};
// #[derive(Debug)]
pub enum Field {
tinyInt(i8),
smallInt(i16),
normalInt(i32),
bigInt(i64),
float(f32),
double(f64),
binary(String),
timeStamp(i64),
boolType(bool),
}
impl fmt::Display for Field {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match &*self {
Field::tinyInt(v) => write!(f, "{}", v),
Field::smallInt(v) => write!(f, "{}", v),
Field::normalInt(v) => write!(f, "{}", v),
Field::bigInt(v) => write!(f, "{}", v),
Field::float(v) => write!(f, "{}", v),
Field::double(v) => write!(f, "{}", v),
Field::binary(v) => write!(f, "{}", v),
Field::tinyInt(v) => write!(f, "{}", v),
Field::timeStamp(v) => write!(f, "{}", v),
Field::boolType(v) => write!(f, "{}", v),
}
}
}
// pub type Fields = Vec<Field>;
pub type Row = Vec<Field>;
pub fn format_row(row: &Row) -> String {
let mut s = String::new();
for field in row {
s.push_str(format!("{} ", field).as_str());
// println!("{}", field);
}
s
}
pub fn str_into_raw(s: &str) -> *mut c_char {
if s.is_empty() {
0 as *mut c_char
} else {
CString::new(s).unwrap().into_raw()
}
}
pub fn raw_into_str<'a>(raw: *mut c_char) -> &'static str {
unsafe {CStr::from_ptr(raw).to_str().unwrap()}
}
pub fn raw_into_field(raw: *mut TAOS_FIELD, fcount: c_int) -> Vec<taosField> {
let mut fields: Vec<taosField> = Vec::new();
for i in 0..fcount as isize {
fields.push(
taosField {
name: unsafe {(*raw.offset(i as isize))}.name,
bytes: unsafe {(*raw.offset(i as isize))}.bytes,
type_: unsafe {(*raw.offset(i as isize))}.type_,
}
);
}
/// TODO: error[E0382]: use of moved value: `fields`
// for field in &fields {
// println!("type: {}, bytes: {}", field.type_, field.bytes);
// }
fields
}
pub fn raw_into_row(fields: *mut TAOS_FIELD, fcount: c_int, raw_row: &[*mut c_void]) -> Row {
let mut row: Row= Vec::new();
let fields = raw_into_field(fields, fcount);
for (i, field) in fields.iter().enumerate() {
// println!("index: {}, type: {}, bytes: {}", i, field.type_, field.bytes);
unsafe {
match field.type_ as u32 {
TSDB_DATA_TYPE_TINYINT => {
row.push(Field::tinyInt(*(raw_row[i] as *mut i8)));
}
TSDB_DATA_TYPE_SMALLINT => {
row.push(Field::smallInt(*(raw_row[i] as *mut i16)));
}
TSDB_DATA_TYPE_INT => {
row.push(Field::normalInt(*(raw_row[i] as *mut i32)));
}
TSDB_DATA_TYPE_BIGINT => {
row.push(Field::bigInt(*(raw_row[i] as *mut i64)));
}
TSDB_DATA_TYPE_FLOAT => {
row.push(Field::float(*(raw_row[i] as *mut f32)));
}
TSDB_DATA_TYPE_DOUBLE => {
row.push(Field::double(*(raw_row[i] as *mut f64)));
}
TSDB_DATA_TYPE_BINARY | TSDB_DATA_TYPE_NCHAR => {
// row.push(Field::binary(*(raw_row[i] as *mut f64)));
}
TSDB_DATA_TYPE_TIMESTAMP => {
row.push(Field::timeStamp(*(raw_row[i] as *mut i64)));
}
TSDB_DATA_TYPE_BOOL => {
// row.push(Field::boolType(*(raw_row[i] as *mut i8) as bool));
}
_ => println!(""),
}
}
}
row
}
\ No newline at end of file
...@@ -5,7 +5,7 @@ GREEN='\033[1;32m' ...@@ -5,7 +5,7 @@ GREEN='\033[1;32m'
GREEN_DARK='\033[0;32m' GREEN_DARK='\033[0;32m'
GREEN_UNDERLINE='\033[4;32m' GREEN_UNDERLINE='\033[4;32m'
NC='\033[0m' NC='\033[0m'
nohup /root/TDinternal/debug/build/bin/taosd -c /root/TDinternal/community/sim/dnode1/cfg >/dev/null & nohup /var/lib/jenkins/workspace/TDinternal/debug/build/bin/taosd -c /var/lib/jenkins/workspace/TDinternal/community/sim/dnode1/cfg >/dev/null &
./crash_gen.sh --valgrind -p -t 10 -s 100 -b 4 ./crash_gen.sh --valgrind -p -t 10 -s 100 -b 4
pidof taosd|xargs kill pidof taosd|xargs kill
grep 'start to execute\|ERROR SUMMARY' valgrind.err|grep -v 'grep'|uniq|tee crash_gen_mem_err.log grep 'start to execute\|ERROR SUMMARY' valgrind.err|grep -v 'grep'|uniq|tee crash_gen_mem_err.log
......
...@@ -16,6 +16,10 @@ function runSimCaseOneByOne { ...@@ -16,6 +16,10 @@ function runSimCaseOneByOne {
./test.sh -f $case > /dev/null 2>&1 && \ ./test.sh -f $case > /dev/null 2>&1 && \
echo -e "${GREEN}$case success${NC}" | tee -a out.log || \ echo -e "${GREEN}$case success${NC}" | tee -a out.log || \
echo -e "${RED}$case failed${NC}" | tee -a out.log echo -e "${RED}$case failed${NC}" | tee -a out.log
out_log=`tail -1 out.log `
if [[ $out_log =~ 'failed' ]];then
exit 8
fi
end_time=`date +%s` end_time=`date +%s`
echo execution time of $case was `expr $end_time - $start_time`s. | tee -a out.log echo execution time of $case was `expr $end_time - $start_time`s. | tee -a out.log
fi fi
...@@ -37,6 +41,10 @@ function runPyCaseOneByOne { ...@@ -37,6 +41,10 @@ function runPyCaseOneByOne {
echo -e "${GREEN}$case success${NC}" | tee -a pytest-out.log || \ echo -e "${GREEN}$case success${NC}" | tee -a pytest-out.log || \
echo -e "${RED}$case failed${NC}" | tee -a pytest-out.log echo -e "${RED}$case failed${NC}" | tee -a pytest-out.log
end_time=`date +%s` end_time=`date +%s`
out_log=`tail -1 pytest-out.log `
if [[ $out_log =~ 'failed' ]];then
exit 8
fi
echo execution time of $case was `expr $end_time - $start_time`s. | tee -a pytest-out.log echo execution time of $case was `expr $end_time - $start_time`s. | tee -a pytest-out.log
else else
$line > /dev/null 2>&1 $line > /dev/null 2>&1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册