未验证 提交 aa67bb9e 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #9931 from taosdata/feature/tfs

add some testcases to tfs
......@@ -839,14 +839,12 @@ static int32_t mndRetrieveStb(SMnodeMsg *pReq, SShowObj *pShow, char *data, int3
SStbObj *pStb = NULL;
int32_t cols = 0;
char *pWrite;
char prefix[64] = {0};
char prefix[TSDB_DB_FNAME_LEN] = {0};
SDbObj *pDb = mndAcquireDb(pMnode, pShow->db);
if (pDb == NULL) {
return TSDB_CODE_MND_INVALID_DB;
}
if (pDb == NULL) return 0;
tstrncpy(prefix, pShow->db, 64);
tstrncpy(prefix, pShow->db, TSDB_DB_FNAME_LEN);
strcat(prefix, TS_PATH_DELIMITER);
int32_t prefixLen = (int32_t)strlen(prefix);
......
......@@ -92,6 +92,10 @@ static int32_t mndRestoreWal(SMnode *pMnode) {
goto WAL_RESTORE_OVER;
}
if (walCommit(pWal, sdbVer) != 0) {
goto WAL_RESTORE_OVER;
}
if (walBeginSnapshot(pWal, sdbVer) < 0) {
goto WAL_RESTORE_OVER;
}
......@@ -99,7 +103,6 @@ static int32_t mndRestoreWal(SMnode *pMnode) {
if (walEndSnapshot(pWal) < 0) {
goto WAL_RESTORE_OVER;
}
}
code = 0;
......
......@@ -540,35 +540,41 @@ static int32_t mndRetrieveVgroups(SMnodeMsg *pReq, SShowObj *pShow, char *data,
int32_t cols = 0;
char *pWrite;
SDbObj *pDb = mndAcquireDb(pMnode, pShow->db);
if (pDb == NULL) return 0;
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_VGROUP, pShow->pIter, (void **)&pVgroup);
if (pShow->pIter == NULL) break;
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pVgroup->vgId;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pVgroup->numOfTables;
cols++;
if (pVgroup->dbUid == pDb->uid) {
cols = 0;
for (int32_t i = 0; i < pShow->replica; ++i) {
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pVgroup->vnodeGid[i].dnodeId;
*(int32_t *)pWrite = pVgroup->vgId;
cols++;
const char *role = mndGetRoleStr(pVgroup->vnodeGid[i].role);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, role, pShow->bytes[cols]);
*(int32_t *)pWrite = pVgroup->numOfTables;
cols++;
for (int32_t i = 0; i < pShow->replica; ++i) {
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pVgroup->vnodeGid[i].dnodeId;
cols++;
const char *role = mndGetRoleStr(pVgroup->vnodeGid[i].role);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, role, pShow->bytes[cols]);
cols++;
}
numOfRows++;
}
sdbRelease(pSdb, pVgroup);
numOfRows++;
}
mndReleaseDb(pMnode, pDb);
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
pShow->numOfReads += numOfRows;
return numOfRows;
......
......@@ -6,4 +6,8 @@ target_include_directories(
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(tfs os util common)
\ No newline at end of file
target_link_libraries(tfs os util common)
if(${BUILD_TEST})
add_subdirectory(test)
endif(${BUILD_TEST})
\ No newline at end of file
......@@ -22,7 +22,6 @@
#include "taoserror.h"
#include "tcoding.h"
#include "tfs.h"
#include "tglobal.h"
#include "thash.h"
#include "tlog.h"
......
......@@ -141,6 +141,7 @@ const char *tfsGetDiskPath(STfs *pTfs, SDiskID diskId) { return TFS_DISK_AT(pTfs
void tfsInitFile(STfs *pTfs, STfsFile *pFile, SDiskID diskId, const char *rname) {
STfsDisk *pDisk = TFS_DISK_AT(pTfs, diskId);
if (pDisk == NULL) return;
pFile->did = diskId;
tstrncpy(pFile->rname, rname, TSDB_FILENAME_LEN);
......@@ -197,9 +198,7 @@ void tfsDirname(const STfsFile *pFile, char *dest) {
tstrncpy(dest, dirname(tname), TSDB_FILENAME_LEN);
}
int32_t tfsRemoveFile(const STfsFile *pFile) {
return remove(pFile->aname);
}
int32_t tfsRemoveFile(const STfsFile *pFile) { return remove(pFile->aname); }
int32_t tfsCopyFile(const STfsFile *pFile1, const STfsFile *pFile2) {
return taosCopyFile(pFile1->aname, pFile2->aname);
......@@ -291,6 +290,8 @@ int32_t tfsRename(STfs *pTfs, char *orname, char *nrname) {
snprintf(oaname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, orname);
snprintf(naname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, nrname);
if (taosRenameFile(oaname, naname) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
fError("failed to rename %s to %s since %s", oaname, naname, terrstr());
return -1;
}
}
......@@ -330,7 +331,12 @@ const STfsFile *tfsReaddir(STfsDir *pDir) {
// Skip . and ..
if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0) continue;
snprintf(bname, TMPNAME_LEN * 2, "%s%s%s", pDir->dirname, TD_DIRSEP, dp->d_name);
if (pDir->dirname == NULL || pDir->dirname[0] == 0) {
snprintf(bname, TMPNAME_LEN * 2, "%s", dp->d_name);
} else {
snprintf(bname, TMPNAME_LEN * 2, "%s%s%s", pDir->dirname, TD_DIRSEP, dp->d_name);
}
tfsInitFile(pDir->pTfs, &pDir->tfile, pDir->did, bname);
return &pDir->tfile;
}
......@@ -402,8 +408,7 @@ static int32_t tfsCheckAndFormatCfg(STfs *pTfs, SDiskCfg *pCfg) {
}
if (tfsFormatDir(pCfg->dir, dirName) < 0) {
fError("failed to mount %s to FS since invalid dir format", pCfg->dir);
terrno = TSDB_CODE_FS_INVLD_CFG;
fError("failed to mount %s to FS since %s", pCfg->dir, terrstr());
return -1;
}
......@@ -501,7 +506,11 @@ static int32_t tfsOpendirImpl(STfs *pTfs, STfsDir *pDir) {
pDir->did.level = pDisk->level;
pDir->did.id = pDisk->id;
snprintf(adir, TMPNAME_LEN * 2, "%s%s%s", pDisk->path, TD_DIRSEP, pDir->dirname);
if (pDisk->path == NULL || pDisk->path[0] == 0) {
snprintf(adir, TMPNAME_LEN * 2, "%s", pDir->dirname);
} else {
snprintf(adir, TMPNAME_LEN * 2, "%s%s%s", pDisk->path, TD_DIRSEP, pDir->dirname);
}
pDir->dir = opendir(adir);
if (pDir->dir != NULL) break;
}
......
enable_testing()
aux_source_directory(. TFS_TEST_SRC)
add_executable(tfs_test ${TFS_TEST_SRC})
target_link_libraries(
tfs_test
PUBLIC tfs
PUBLIC gtest_main
)
add_test(
NAME tfs_test
COMMAND tfs_test
)
/**
* @file tfsTest.cpp
* @author slguan (slguan@taosdata.com)
* @brief TFS module tests
* @version 1.0
* @date 2022-01-20
*
* @copyright Copyright (c) 2022
*
*/
#include <gtest/gtest.h>
#include "os.h"
#include "tfs.h"
class TfsTest : public ::testing::Test {
protected:
static void SetUpTestSuite() { root = "/tmp/tfsTest"; }
static void TearDownTestSuite() {}
public:
void SetUp() override {}
void TearDown() override {}
static const char *root;
};
const char *TfsTest::root;
TEST_F(TfsTest, 01_Open_Close) {
SDiskCfg dCfg = {0};
tstrncpy(dCfg.dir, root, TSDB_FILENAME_LEN);
dCfg.level = 0;
dCfg.primary = 1;
taosRemoveDir(root);
STfs *pTfs = tfsOpen(&dCfg, 1);
ASSERT_EQ(pTfs, nullptr);
taosMkDir(root);
pTfs = tfsOpen(&dCfg, 1);
ASSERT_NE(pTfs, nullptr);
tfsUpdateSize(pTfs);
SDiskSize size = tfsGetSize(pTfs);
EXPECT_GT(size.avail, 0);
EXPECT_GT(size.used, 0);
EXPECT_GT(size.total, size.avail);
EXPECT_GT(size.total, size.used);
tfsClose(pTfs);
}
TEST_F(TfsTest, 02_AllocDisk) {
int32_t code = 0;
SDiskCfg dCfg = {0};
tstrncpy(dCfg.dir, root, TSDB_FILENAME_LEN);
dCfg.level = 0;
dCfg.primary = 1;
taosRemoveDir(root);
taosMkDir(root);
STfs *pTfs = tfsOpen(&dCfg, 1);
ASSERT_NE(pTfs, nullptr);
SDiskID did;
did.id = 0;
did.level = 0;
code = tfsAllocDisk(pTfs, 0, &did);
EXPECT_EQ(code, 0);
EXPECT_EQ(did.id, 0);
EXPECT_EQ(did.level, 0);
did.id = 1;
did.level = 1;
code = tfsAllocDisk(pTfs, 0, &did);
EXPECT_EQ(code, 0);
EXPECT_EQ(did.id, 0);
EXPECT_EQ(did.level, 0);
did.id = 1;
did.level = 2;
code = tfsAllocDisk(pTfs, 0, &did);
EXPECT_EQ(code, 0);
EXPECT_EQ(did.id, 0);
EXPECT_EQ(did.level, 0);
did.id = 1;
did.level = 3;
code = tfsAllocDisk(pTfs, 0, &did);
EXPECT_EQ(code, 0);
EXPECT_EQ(did.id, 0);
EXPECT_EQ(did.level, 0);
const char *primary = tfsGetPrimaryPath(pTfs);
EXPECT_STREQ(primary, root);
const char *path = tfsGetDiskPath(pTfs, did);
EXPECT_STREQ(path, root);
tfsClose(pTfs);
}
TEST_F(TfsTest, 03_Dir) {
int32_t code = 0;
SDiskCfg dCfg = {0};
tstrncpy(dCfg.dir, root, TSDB_FILENAME_LEN);
dCfg.level = 0;
dCfg.primary = 1;
taosRemoveDir(root);
taosMkDir(root);
STfs *pTfs = tfsOpen(&dCfg, 1);
ASSERT_NE(pTfs, nullptr);
char p1[] = "p1";
char ap1[128] = {0};
snprintf(ap1, 128, "%s%s%s", root, TD_DIRSEP, p1);
EXPECT_NE(taosDirExist(ap1), 0);
EXPECT_EQ(tfsMkdir(pTfs, p1), 0);
EXPECT_EQ(taosDirExist(ap1), 0);
char p2[] = "p2";
char ap2[128] = {0};
snprintf(ap2, 128, "%s%s%s", root, TD_DIRSEP, p2);
SDiskID did = {0};
EXPECT_NE(taosDirExist(ap2), 0);
EXPECT_EQ(tfsMkdirAt(pTfs, p2, did), 0);
EXPECT_EQ(taosDirExist(ap2), 0);
char p3[] = "p3/p2/p1/p0";
char ap3[128] = {0};
snprintf(ap3, 128, "%s%s%s", root, TD_DIRSEP, p3);
EXPECT_NE(taosDirExist(ap3), 0);
EXPECT_NE(tfsMkdir(pTfs, p3), 0);
EXPECT_NE(tfsMkdirAt(pTfs, p3, did), 0);
EXPECT_EQ(tfsMkdirRecurAt(pTfs, p3, did), 0);
EXPECT_EQ(taosDirExist(ap3), 0);
EXPECT_EQ(tfsRmdir(pTfs, p3), 0);
EXPECT_NE(taosDirExist(ap3), 0);
char p45[] = "p5";
char p44[] = "p4";
char p4[] = "p4/p2/p1/p0";
char ap4[128] = {0};
snprintf(ap4, 128, "%s%s%s", root, TD_DIRSEP, p4);
EXPECT_NE(taosDirExist(ap4), 0);
EXPECT_EQ(tfsMkdirRecurAt(pTfs, p4, did), 0);
EXPECT_EQ(taosDirExist(ap4), 0);
EXPECT_EQ(tfsRename(pTfs, p44, p45), 0);
EXPECT_EQ(tfsRmdir(pTfs, p4), 0);
EXPECT_NE(taosDirExist(ap4), 0);
tfsClose(pTfs);
}
TEST_F(TfsTest, 04_File) {
int32_t code = 0;
SDiskCfg dCfg = {0};
tstrncpy(dCfg.dir, root, TSDB_FILENAME_LEN);
dCfg.level = 0;
dCfg.primary = 1;
taosRemoveDir(root);
taosMkDir(root);
STfs *pTfs = tfsOpen(&dCfg, 1);
ASSERT_NE(pTfs, nullptr);
STfsFile file0;
STfsFile file1;
STfsFile file2;
STfsFile file3;
STfsFile file4;
SDiskID did0 = {0};
SDiskID did1 = {0};
SDiskID did2 = {0};
SDiskID did3 = {0};
SDiskID did4 = {0};
did3.id = 1;
did4.level = 1;
tfsInitFile(pTfs, &file0, did0, "fname");
tfsInitFile(pTfs, &file1, did1, "fname");
tfsInitFile(pTfs, &file2, did2, "fnamex");
tfsInitFile(pTfs, &file3, did3, "fname");
tfsInitFile(pTfs, &file4, did4, "fname");
EXPECT_TRUE(tfsIsSameFile(&file0, &file1));
EXPECT_FALSE(tfsIsSameFile(&file0, &file2));
EXPECT_FALSE(tfsIsSameFile(&file0, &file3));
EXPECT_FALSE(tfsIsSameFile(&file0, &file4));
{
int32_t size = 1024;
void *ret = malloc(size + sizeof(size_t));
*(size_t *)ret = size;
void *buf = (void *)((char *)ret + sizeof(size_t));
file0.did.id = 0;
file0.did.level = 0;
int32_t len = tfsEncodeFile((void **)&buf, &file0);
EXPECT_EQ(len, 8);
STfsFile outfile = {0};
char *outbuf = (char *)tfsDecodeFile(pTfs, (void *)((char *)buf - len), &outfile);
int32_t decodeLen = (outbuf - (char *)buf);
EXPECT_EQ(outfile.did.id, 0);
EXPECT_EQ(outfile.did.level, 0);
EXPECT_STREQ(outfile.aname, file0.aname);
EXPECT_STREQ(outfile.rname, "fname");
EXPECT_EQ(outfile.pTfs, pTfs);
}
{
char n1[] = "t3/t1.json";
char n2[] = "t3/t2.json";
STfsFile f1 = {0};
STfsFile f2 = {0};
SDiskID did;
did.id = 0;
did.level = 0;
tfsInitFile(pTfs, &f1, did, n1);
tfsInitFile(pTfs, &f2, did, n2);
EXPECT_EQ(tfsMkdir(pTfs, "t3"), 0);
FILE *fp = fopen(f1.aname, "w");
ASSERT_NE(fp, nullptr);
fwrite("12345678", 1, 5, fp);
fclose(fp);
char base[128] = {0};
tfsBasename(&f1, base);
char dir[128] = {0};
tfsDirname(&f1, dir);
EXPECT_STREQ(base, "t1.json");
char fulldir[128];
snprintf(fulldir, 128, "%s%s%s", root, TD_DIRSEP, "t3");
EXPECT_STREQ(dir, fulldir);
EXPECT_NE(tfsCopyFile(&f1, &f2), 0);
char af2[128] = {0};
snprintf(af2, 128, "%s%s%s", root, TD_DIRSEP, n2);
EXPECT_EQ(taosDirExist(af2), 0);
tfsRemoveFile(&f2);
EXPECT_NE(taosDirExist(af2), 0);
EXPECT_NE(tfsCopyFile(&f1, &f2), 0);
{
STfsDir *pDir = tfsOpendir(pTfs, "");
const STfsFile *pf1 = tfsReaddir(pDir);
EXPECT_STREQ(pf1->rname, "t3");
EXPECT_EQ(pf1->did.id, 0);
EXPECT_EQ(pf1->did.level, 0);
EXPECT_EQ(pf1->pTfs, pTfs);
const STfsFile *pf2 = tfsReaddir(pDir);
EXPECT_EQ(pf2, nullptr);
tfsClosedir(pDir);
}
{
STfsDir *pDir = tfsOpendir(pTfs, "t3");
const STfsFile *pf1 = tfsReaddir(pDir);
EXPECT_NE(pf1, nullptr);
EXPECT_EQ(pf1->did.id, 0);
EXPECT_EQ(pf1->did.level, 0);
EXPECT_EQ(pf1->pTfs, pTfs);
const STfsFile *pf2 = tfsReaddir(pDir);
EXPECT_NE(pf2, nullptr);
const STfsFile *pf3 = tfsReaddir(pDir);
EXPECT_EQ(pf3, nullptr);
tfsClosedir(pDir);
}
}
tfsClose(pTfs);
}
\ No newline at end of file
# Base components
add_subdirectory(os)
add_subdirectory(util)
add_subdirectory(common)
# Library components
# Service components
# add_subdirectory(mnode)
# add_subdirectory(vnode)
# add_subdirectory(qnode)
add_subdirectory(dnode)
Subproject commit b8f76da4a708d158ec3cc4b844571dc4414e36b4
Subproject commit 4a4d79099b076b8ff12d5b4fdbcba54049a6866d
Subproject commit ce5201014136503d34fecbd56494b67b4961056c
CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20)
PROJECT(TDengine)
INCLUDE_DIRECTORIES(inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC)
ADD_LIBRARY(tcq ${SRC})
IF (TD_SOMODE_STATIC)
TARGET_LINK_LIBRARIES(tcq tutil common taos_static)
ELSE ()
TARGET_LINK_LIBRARIES(tcq tutil common taos)
ENDIF ()
ADD_SUBDIRECTORY(test)
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include <errno.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include "../../../include/client/taos.h"
#include "taosdef.h"
#include "tmsg.h"
#include "tcq.h"
#include "tdataformat.h"
#include "tglobal.h"
#include "tlog.h"
#include "tsclient.h"
#include "ttimer.h"
#include "twal.h"
#define cFatal(...) { if (cqDebugFlag & DEBUG_FATAL) { taosPrintLog("CQ FATAL ", 255, __VA_ARGS__); }}
#define cError(...) { if (cqDebugFlag & DEBUG_ERROR) { taosPrintLog("CQ ERROR ", 255, __VA_ARGS__); }}
#define cWarn(...) { if (cqDebugFlag & DEBUG_WARN) { taosPrintLog("CQ WARN ", 255, __VA_ARGS__); }}
#define cInfo(...) { if (cqDebugFlag & DEBUG_INFO) { taosPrintLog("CQ ", 255, __VA_ARGS__); }}
#define cDebug(...) { if (cqDebugFlag & DEBUG_DEBUG) { taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__); }}
#define cTrace(...) { if (cqDebugFlag & DEBUG_TRACE) { taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__); }}
typedef struct SCqObj {
tmr_h tmrId;
int64_t rid;
uint64_t uid;
int32_t tid; // table ID
int32_t rowSize; // bytes of a row
char * dstTable;
char * sqlStr; // SQL string
STSchema * pSchema; // pointer to schema array
void * pStream;
struct SCqObj *prev;
struct SCqObj *next;
SCqContext * pContext;
} SCqObj;
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row);
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj);
int32_t cqObjRef = -1;
int32_t cqVnodeNum = 0;
void cqRmFromList(SCqObj *pObj) {
//LOCK in caller
SCqContext *pContext = pObj->pContext;
if (pObj->prev) {
pObj->prev->next = pObj->next;
} else {
pContext->pHead = pObj->next;
}
if (pObj->next) {
pObj->next->prev = pObj->prev;
}
}
static void freeSCqContext(void *handle) {
if (handle == NULL) {
return;
}
SCqContext *pContext = handle;
pthread_mutex_destroy(&pContext->mutex);
taosTmrCleanUp(pContext->tmrCtrl);
pContext->tmrCtrl = NULL;
cDebug("vgId:%d, CQ is closed", pContext->vgId);
free(pContext);
}
void cqFree(void *handle) {
if (tsEnableStream == 0) {
return;
}
SCqObj *pObj = handle;
SCqContext *pContext = pObj->pContext;
int32_t delete = 0;
pthread_mutex_lock(&pContext->mutex);
// free the resources associated
if (pObj->pStream) {
taos_close_stream(pObj->pStream);
pObj->pStream = NULL;
} else {
taosTmrStop(pObj->tmrId);
pObj->tmrId = 0;
}
cInfo("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr);
tdFreeSchema(pObj->pSchema);
free(pObj->dstTable);
free(pObj->sqlStr);
free(pObj);
pContext->cqObjNum--;
if (pContext->cqObjNum <= 0 && pContext->delete) {
delete = 1;
}
pthread_mutex_unlock(&pContext->mutex);
if (delete) {
freeSCqContext(pContext);
}
}
void cqCreateRef() {
int32_t ref = atomic_load_32(&cqObjRef);
if (ref == -1) {
ref = taosOpenRef(4096, cqFree);
if (atomic_val_compare_exchange_32(&cqObjRef, -1, ref) != -1) {
taosCloseRef(ref);
}
}
}
void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
if (tsEnableStream == 0) {
return NULL;
}
SCqContext *pContext = calloc(sizeof(SCqContext), 1);
if (pContext == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
atomic_add_fetch_32(&cqVnodeNum, 1);
cqCreateRef();
pContext->tmrCtrl = taosTmrInit(0, 0, 0, "CQ");
tstrncpy(pContext->user, pCfg->user, sizeof(pContext->user));
tstrncpy(pContext->pass, pCfg->pass, sizeof(pContext->pass));
const char* db = pCfg->db;
for (const char* p = db; *p != 0; p++) {
if (*p == '.') {
db = p + 1;
break;
}
}
tstrncpy(pContext->db, db, sizeof(pContext->db));
pContext->vgId = pCfg->vgId;
pContext->cqWrite = pCfg->cqWrite;
tscEmbedded = 1;
pthread_mutex_init(&pContext->mutex, NULL);
cDebug("vgId:%d, CQ is opened", pContext->vgId);
return pContext;
}
void cqClose(void *handle) {
if (tsEnableStream == 0) {
return;
}
SCqContext *pContext = handle;
if (handle == NULL) return;
pContext->delete = 1;
int32_t hasCq = 0;
int32_t existLoop = 0;
// stop all CQs
cqStop(pContext);
int64_t rid = 0;
while (1) {
pthread_mutex_lock(&pContext->mutex);
SCqObj *pObj = pContext->pHead;
if (pObj) {
cqRmFromList(pObj);
rid = pObj->rid;
hasCq = 1;
if (pContext->pHead == NULL) {
existLoop = 1;
}
} else {
pthread_mutex_unlock(&pContext->mutex);
break;
}
pthread_mutex_unlock(&pContext->mutex);
taosRemoveRef(cqObjRef, rid);
if (existLoop) {
break;
}
}
if (hasCq == 0) {
freeSCqContext(pContext);
}
int32_t remainn = atomic_sub_fetch_32(&cqVnodeNum, 1);
if (remainn <= 0) {
int32_t ref = cqObjRef;
cqObjRef = -1;
taosCloseRef(ref);
}
}
void cqStart(void *handle) {
if (tsEnableStream == 0) {
return;
}
SCqContext *pContext = handle;
if (pContext->dbConn || pContext->master) return;
cDebug("vgId:%d, start all CQs", pContext->vgId);
pthread_mutex_lock(&pContext->mutex);
pContext->master = 1;
SCqObj *pObj = pContext->pHead;
while (pObj) {
cqCreateStream(pContext, pObj);
pObj = pObj->next;
}
pthread_mutex_unlock(&pContext->mutex);
}
void cqStop(void *handle) {
if (tsEnableStream == 0) {
return;
}
SCqContext *pContext = handle;
cDebug("vgId:%d, stop all CQs", pContext->vgId);
if (pContext->dbConn == NULL || pContext->master == 0) return;
pthread_mutex_lock(&pContext->mutex);
pContext->master = 0;
SCqObj *pObj = pContext->pHead;
while (pObj) {
if (pObj->pStream) {
taos_close_stream(pObj->pStream);
pObj->pStream = NULL;
cInfo("vgId:%d, id:%d CQ:%s is closed", pContext->vgId, pObj->tid, pObj->sqlStr);
} else {
taosTmrStop(pObj->tmrId);
pObj->tmrId = 0;
}
pObj = pObj->next;
}
if (pContext->dbConn) taos_close(pContext->dbConn);
pContext->dbConn = NULL;
pthread_mutex_unlock(&pContext->mutex);
}
void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema, int start) {
if (tsEnableStream == 0) {
return NULL;
}
SCqContext *pContext = handle;
int64_t rid = 0;
pthread_mutex_lock(&pContext->mutex);
SCqObj *pObj = pContext->pHead;
while (pObj) {
if (pObj->uid == uid) {
rid = pObj->rid;
pthread_mutex_unlock(&pContext->mutex);
return (void *)rid;
}
pObj = pObj->next;
}
pthread_mutex_unlock(&pContext->mutex);
pObj = calloc(sizeof(SCqObj), 1);
if (pObj == NULL) return NULL;
pObj->uid = uid;
pObj->tid = sid;
if (dstTable != NULL) {
pObj->dstTable = strdup(dstTable);
}
pObj->sqlStr = strdup(sqlStr);
pObj->pSchema = tdDupSchema(pSchema);
pObj->rowSize = schemaTLen(pSchema);
cInfo("vgId:%d, id:%d CQ:%s is created", pContext->vgId, pObj->tid, pObj->sqlStr);
pthread_mutex_lock(&pContext->mutex);
pObj->next = pContext->pHead;
if (pContext->pHead) pContext->pHead->prev = pObj;
pContext->pHead = pObj;
pContext->cqObjNum++;
pObj->rid = taosAddRef(cqObjRef, pObj);
if(start && pContext->master) {
cqCreateStream(pContext, pObj);
} else {
pObj->pContext = pContext;
}
rid = pObj->rid;
pthread_mutex_unlock(&pContext->mutex);
return (void *)rid;
}
void cqDrop(void *handle) {
if (tsEnableStream == 0) {
return;
}
SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)handle);
if (pObj == NULL) {
return;
}
SCqContext *pContext = pObj->pContext;
pthread_mutex_lock(&pContext->mutex);
cqRmFromList(pObj);
// free the resources associated
if (pObj->pStream) {
taos_close_stream(pObj->pStream);
pObj->pStream = NULL;
} else {
taosTmrStop(pObj->tmrId);
pObj->tmrId = 0;
}
pthread_mutex_unlock(&pContext->mutex);
taosRemoveRef(cqObjRef, (int64_t)handle);
taosReleaseRef(cqObjRef, (int64_t)handle);
}
static void doCreateStream(void *param, TAOS_RES *result, int32_t code) {
SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)param);
if (pObj == NULL) {
return;
}
SCqContext* pContext = pObj->pContext;
SSqlObj* pSql = (SSqlObj*)result;
if (code == TSDB_CODE_SUCCESS) {
if (atomic_val_compare_exchange_ptr(&(pContext->dbConn), NULL, pSql->pTscObj) != NULL) {
taos_close(pSql->pTscObj);
}
}
pthread_mutex_lock(&pContext->mutex);
cqCreateStream(pContext, pObj);
pthread_mutex_unlock(&pContext->mutex);
taosReleaseRef(cqObjRef, (int64_t)param);
}
static void cqProcessCreateTimer(void *param, void *tmrId) {
SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)param);
if (pObj == NULL) {
return;
}
SCqContext* pContext = pObj->pContext;
if (pContext->dbConn == NULL) {
cDebug("vgId:%d, try connect to TDengine", pContext->vgId);
taos_connect_a(NULL, pContext->user, pContext->pass, pContext->db, 0, doCreateStream, param, NULL);
} else {
pthread_mutex_lock(&pContext->mutex);
cqCreateStream(pContext, pObj);
pthread_mutex_unlock(&pContext->mutex);
}
taosReleaseRef(cqObjRef, (int64_t)param);
}
// inner implement in tscStream.c
TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* desName, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *), void* cqhandle);
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
pObj->pContext = pContext;
if (pContext->dbConn == NULL) {
cDebug("vgId:%d, create dbConn after 1000 ms", pContext->vgId);
pObj->tmrId = taosTmrStart(cqProcessCreateTimer, 1000, (void *)pObj->rid, pContext->tmrCtrl);
return;
}
pObj->tmrId = 0;
if (pObj->pStream == NULL) {
pObj->pStream = taos_open_stream_withname(pContext->dbConn, pObj->dstTable, pObj->sqlStr, cqProcessStreamRes, \
INT64_MIN, (void *)pObj->rid, NULL, pContext);
// TODO the pObj->pStream may be released if error happens
if (pObj->pStream) {
pContext->num++;
cDebug("vgId:%d, id:%d CQ:%s is opened", pContext->vgId, pObj->tid, pObj->sqlStr);
} else {
cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr);
}
}
}
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)param);
if (pObj == NULL) {
return;
}
if (tres == NULL && row == NULL) {
taos_close_stream(pObj->pStream);
pObj->pStream = NULL;
taosReleaseRef(cqObjRef, (int64_t)param);
return;
}
SCqContext *pContext = pObj->pContext;
STSchema *pSchema = pObj->pSchema;
if (pObj->pStream == NULL) {
taosReleaseRef(cqObjRef, (int64_t)param);
return;
}
cDebug("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr);
int32_t size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_MEM_ROW_DATA_HEAD_SIZE + pObj->rowSize;
char *buffer = calloc(size, 1);
SWalHead *pHead = (SWalHead *)buffer;
SSubmitMsg *pMsg = (SSubmitMsg *) (buffer + sizeof(SWalHead));
SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg));
SMemRow trow = (SMemRow)pBlk->data;
SDataRow dataRow = (SDataRow)memRowDataBody(trow);
memRowSetType(trow, SMEM_ROW_DATA);
tdInitDataRow(dataRow, pSchema);
for (int32_t i = 0; i < pSchema->numOfCols; i++) {
STColumn *c = pSchema->columns + i;
void *val = row[i];
if (val == NULL) {
val = (void *)getNullValue(c->type);
} else if (c->type == TSDB_DATA_TYPE_BINARY) {
val = ((char*)val) - sizeof(VarDataLenT);
} else if (c->type == TSDB_DATA_TYPE_NCHAR) {
char buf[TSDB_MAX_NCHAR_LEN];
int32_t len = taos_fetch_lengths(tres)[i];
taosMbsToUcs4(val, len, buf, sizeof(buf), &len);
memcpy((char *)val + sizeof(VarDataLenT), buf, len);
varDataLen(val) = len;
}
tdAppendColVal(dataRow, val, c->type, c->offset);
}
pBlk->dataLen = htonl(memRowDataTLen(trow));
pBlk->schemaLen = 0;
pBlk->uid = htobe64(pObj->uid);
pBlk->tid = htonl(pObj->tid);
pBlk->numOfRows = htons(1);
pBlk->sversion = htonl(pSchema->version);
pBlk->padding = 0;
pHead->len = sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + memRowDataTLen(trow);
pMsg->header.vgId = htonl(pContext->vgId);
pMsg->header.contLen = htonl(pHead->len);
pMsg->length = pMsg->header.contLen;
pMsg->numOfBlocks = htonl(1);
pHead->msgType = TDMT_VND_SUBMIT;
pHead->version = 0;
// write into vnode write queue
pContext->cqWrite(pContext->vgId, pHead, TAOS_QTYPE_CQ, NULL);
free(buffer);
taosReleaseRef(cqObjRef, (int64_t)param);
}
CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20)
PROJECT(TDengine)
LIST(APPEND CQTEST_SRC ./cqtest.c)
ADD_EXECUTABLE(cqtest ${CQTEST_SRC})
TARGET_LINK_LIBRARIES(cqtest tcq taos_static)
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
//#define _DEFAULT_SOURCE
#include "os.h"
#include "taosdef.h"
#include "tmsg.h"
#include "tglobal.h"
#include "tlog.h"
#include "tcq.h"
int64_t ver = 0;
void *pCq = NULL;
int writeToQueue(int32_t vgId, void *data, int type, void *pMsg) {
return 0;
}
int main(int argc, char *argv[]) {
int num = 3;
for (int i=1; i<argc; ++i) {
if (strcmp(argv[i], "-d")==0 && i < argc-1) {
dDebugFlag = atoi(argv[++i]);
} else if (strcmp(argv[i], "-n") == 0 && i <argc-1) {
num = atoi(argv[++i]);
} else {
printf("\nusage: %s [options] \n", argv[0]);
printf(" [-n num]: number of streams, default:%d\n", num);
printf(" [-d debugFlag]: debug flag, default:%d\n", dDebugFlag);
printf(" [-h help]: print out this help\n\n");
exit(0);
}
}
taosInitLog("cq.log", 100000, 10);
SCqCfg cqCfg;
strcpy(cqCfg.user, TSDB_DEFAULT_USER);
strcpy(cqCfg.pass, TSDB_DEFAULT_PASS);
cqCfg.vgId = 2;
cqCfg.cqWrite = writeToQueue;
pCq = cqOpen(NULL, &cqCfg);
if (pCq == NULL) {
printf("failed to open CQ\n");
exit(-1);
}
STSchemaBuilder schemaBuilder = {0};
tdInitTSchemaBuilder(&schemaBuilder, 0);
tdAddColToSchema(&schemaBuilder, TSDB_DATA_TYPE_TIMESTAMP, 0, 8);
tdAddColToSchema(&schemaBuilder, TSDB_DATA_TYPE_INT, 1, 4);
STSchema *pSchema = tdGetSchemaFromBuilder(&schemaBuilder);
tdDestroyTSchemaBuilder(&schemaBuilder);
for (int sid =1; sid<10; ++sid) {
cqCreate(pCq, sid, sid, NULL, "select avg(speed) from demo.t1 sliding(1s) interval(5s)", pSchema, 1);
}
tdFreeSchema(pSchema);
while (1) {
char c = (char)getchar();
switch(c) {
case 's':
cqStart(pCq);
break;
case 't':
cqStop(pCq);
break;
case 'c':
// create a CQ
break;
case 'd':
// drop a CQ
break;
case 'q':
break;
default:
printf("invalid command:%c", c);
}
if (c=='q') break;
}
cqClose(pCq);
taosCloseLog();
return 0;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_QUERY_H
#define TDENGINE_QUERY_H
#ifdef __cplusplus
extern "C" {
#endif
typedef void* qinfo_t;
/**
* create the qinfo object according to QueryTableMsg
* @param tsdb
* @param pQueryTableMsg
* @param qinfo
* @return
*/
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo, uint64_t qId);
/**
* the main query execution function, including query on both table and multitables,
* which are decided according to the tag or table name query conditions
*
* @param qinfo
* @return
*/
bool qTableQuery(qinfo_t qinfo, uint64_t *qId);
/**
* Retrieve the produced results information, if current query is not paused or completed,
* this function will be blocked to wait for the query execution completed or paused,
* in which case enough results have been produced already.
*
* @param qinfo
* @return
*/
int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContext);
/**
*
* Retrieve the actual results to fill the response message payload.
* Note that this function must be executed after qRetrieveQueryResultInfo is invoked.
*
* @param qinfo qinfo object
* @param pRsp response message
* @param contLen payload length
* @return
*/
int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec);
/**
*
* @param qinfo
* @return
*/
void* qGetResultRetrieveMsg(qinfo_t qinfo);
/**
* kill current ongoing query and free query handle automatically
* @param qinfo qhandle
* @return
*/
int32_t qKillQuery(qinfo_t qinfo);
//kill by qid
int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCount);
bool qSolveCommitNoBlock(void* pRepo, void* pMgmt);
int32_t qQueryCompleted(qinfo_t qinfo);
/**
* destroy query info structure
* @param qHandle
*/
void qDestroyQueryInfo(qinfo_t qHandle);
void* qOpenQueryMgmt(int32_t vgId);
void qQueryMgmtNotifyClosed(void* pExecutor);
void qQueryMgmtReOpen(void *pExecutor);
void qCleanupQueryMgmt(void* pExecutor);
void** qRegisterQInfo(void* pMgmt, uint64_t qId, void *qInfo);
void** qAcquireQInfo(void* pMgmt, uint64_t key);
void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle);
bool checkQIdEqual(void *qHandle, uint64_t qId);
int64_t genQueryId(void);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_QUERY_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_CQ_H_
#define _TD_CQ_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "tdataformat.h"
typedef int32_t (*FCqWrite)(int32_t vgId, void *pHead, int32_t qtype, void *pMsg);
typedef struct {
int32_t vgId;
char user[TSDB_USER_LEN];
char pass[TSDB_PASSWORD_LEN];
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; // size must same with SVnodeObj.db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]
FCqWrite cqWrite;
} SCqCfg;
// SCqContext
typedef struct {
int32_t vgId;
int32_t master;
int32_t num; // number of continuous streams
char user[TSDB_USER_LEN];
char pass[TSDB_PASSWORD_LEN];
char db[TSDB_DB_NAME_LEN];
FCqWrite cqWrite;
struct SCqObj *pHead;
void *dbConn;
void *tmrCtrl;
pthread_mutex_t mutex;
int32_t delete;
int32_t cqObjNum;
} SCqContext;
// the following API shall be called by vnode
void *cqOpen(void *ahandle, const SCqCfg *pCfg);
void cqClose(void *handle);
// if vnode is master, vnode call this API to start CQ
void cqStart(void *handle);
// if vnode is slave/unsynced, vnode shall call this API to stop CQ
void cqStop(void *handle);
// cqCreate is called by TSDB to start an instance of CQ
void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, char *sqlStr, STSchema *pSchema, int start);
// cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate
void cqDrop(void *handle);
extern int32_t cqDebugFlag;
#ifdef __cplusplus
}
#endif
#endif // _TD_CQ_H_
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_H_
#define _TD_TSDB_H_
#include <pthread.h>
#include <stdbool.h>
#include <stdint.h>
#include "taosdef.h"
#include "tmsg.h"
#include "tarray.h"
#include "tdataformat.h"
#include "tname.h"
#include "hash.h"
#include "tlockfree.h"
#include "tlist.h"
#ifdef __cplusplus
extern "C" {
#endif
#define TSDB_VERSION_MAJOR 1
#define TSDB_VERSION_MINOR 0
#define TSDB_INVALID_SUPER_TABLE_ID -1
#define TSDB_STATUS_COMMIT_START 1
#define TSDB_STATUS_COMMIT_OVER 2
#define TSDB_STATUS_COMMIT_NOBLOCK 3 //commit no block, need to be solved
// TSDB STATE DEFINITION
#define TSDB_STATE_OK 0x0
#define TSDB_STATE_BAD_META 0x1
#define TSDB_STATE_BAD_DATA 0x2
// --------- TSDB APPLICATION HANDLE DEFINITION
typedef struct {
void *appH;
void *cqH;
int (*notifyStatus)(void *, int status, int eno);
int (*eventCallBack)(void *);
void *(*cqCreateFunc)(void *handle, uint64_t uid, int32_t sid, const char *dstTable, char *sqlStr, STSchema *pSchema, int start);
void (*cqDropFunc)(void *handle);
} STsdbAppH;
// --------- TSDB REPOSITORY CONFIGURATION DEFINITION
typedef struct {
int32_t tsdbId;
int32_t cacheBlockSize;
int32_t totalBlocks;
int32_t daysPerFile; // day per file sharding policy
int32_t keep; // day of data to keep
int32_t keep1;
int32_t keep2;
int32_t minRowsPerFileBlock; // minimum rows per file block
int32_t maxRowsPerFileBlock; // maximum rows per file block
int8_t precision;
int8_t compression;
int8_t update;
int8_t cacheLastRow; // 0:no cache, 1: cache last row, 2: cache last NULL column 3: 1&2
} STsdbCfg;
#define CACHE_NO_LAST(c) ((c)->cacheLastRow == 0)
#define CACHE_LAST_ROW(c) (((c)->cacheLastRow & 1) > 0)
#define CACHE_LAST_NULL_COLUMN(c) (((c)->cacheLastRow & 2) > 0)
// --------- TSDB REPOSITORY USAGE STATISTICS
typedef struct {
int64_t totalStorage; // total bytes occupie
int64_t compStorage;
int64_t pointsWritten; // total data points written
} STsdbStat;
typedef struct STsdbRepo STsdbRepo;
STsdbCfg *tsdbGetCfg(const STsdbRepo *repo);
// --------- TSDB REPOSITORY DEFINITION
int32_t tsdbCreateRepo(int repoid);
int32_t tsdbDropRepo(int repoid);
STsdbRepo *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH);
int tsdbCloseRepo(STsdbRepo *repo, int toCommit);
int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg);
int tsdbGetState(STsdbRepo *repo);
int8_t tsdbGetCompactState(STsdbRepo *repo);
// --------- TSDB TABLE DEFINITION
typedef struct {
uint64_t uid; // the unique table ID
int32_t tid; // the table ID in the repository.
} STableId;
// --------- TSDB TABLE configuration
typedef struct {
ETableType type;
char * name;
STableId tableId;
int32_t sversion;
char * sname; // super table name
uint64_t superUid;
STSchema * schema;
STSchema * tagSchema;
SKVRow tagValues;
char * sql;
} STableCfg;
void tsdbClearTableCfg(STableCfg *config);
void *tsdbGetTableTagVal(const void *pTable, int32_t colId, int16_t type, int16_t bytes);
char *tsdbGetTableName(void *pTable);
#define TSDB_TABLEID(_table) ((STableId*) (_table))
#define TSDB_PREV_ROW 0x1
#define TSDB_NEXT_ROW 0x2
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);
int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg);
int tsdbDropTable(STsdbRepo *pRepo, STableId tableId);
int tsdbUpdateTableTagValue(STsdbRepo *repo, SUpdateTableTagValMsg *pMsg);
uint32_t tsdbGetFileInfo(STsdbRepo *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size);
// the TSDB repository info
typedef struct STsdbRepoInfo {
STsdbCfg tsdbCfg;
uint64_t version; // version of the repository
int64_t tsdbTotalDataSize; // the original inserted data size
int64_t tsdbTotalDiskSize; // the total disk size taken by this TSDB repository
// TODO: Other informations to add
} STsdbRepoInfo;
STsdbRepoInfo *tsdbGetStatus(STsdbRepo *pRepo);
// the meter information report structure
typedef struct {
STableCfg tableCfg;
uint64_t version;
int64_t tableTotalDataSize; // In bytes
int64_t tableTotalDiskSize; // In bytes
} STableInfo;
// -- FOR INSERT DATA
/**
* Insert data to a table in a repository
* @param pRepo the TSDB repository handle
* @param pData the data to insert (will give a more specific description)
*
* @return the number of points inserted, -1 for failure and the error number is set
*/
int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp);
// -- FOR QUERY TIME SERIES DATA
typedef void *TsdbQueryHandleT; // Use void to hide implementation details
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3
// query condition to build multi-table data block iterator
typedef struct STsdbQueryCond {
STimeWindow twindow;
int32_t order; // desc|asc order to iterate the data block
int32_t numOfCols;
SColumnInfo *colList;
bool loadExternalRows; // load external rows or not
int32_t type; // data block load type:
} STsdbQueryCond;
typedef struct STableData STableData;
typedef struct {
T_REF_DECLARE()
SRWLatch latch;
TSKEY keyFirst;
TSKEY keyLast;
int64_t numOfRows;
int32_t maxTables;
STableData **tData;
SList * actList;
SList * extraBuffList;
SList * bufBlockList;
int64_t pointsAdd; // TODO
int64_t storageAdd; // TODO
} SMemTable;
typedef struct {
SMemTable* mem;
SMemTable* imem;
SMemTable mtable;
SMemTable* omem;
} SMemSnapshot;
typedef struct SMemRef {
int32_t ref;
SMemSnapshot snapshot;
} SMemRef;
typedef struct SDataBlockInfo {
STimeWindow window;
int32_t rows;
int32_t numOfCols;
int64_t uid;
int32_t tid;
} SDataBlockInfo;
typedef struct SFileBlockInfo {
int32_t numBlocksOfStep;
} SFileBlockInfo;
typedef struct {
void *pTable;
TSKEY lastKey;
} STableKeyInfo;
typedef struct {
uint32_t numOfTables;
SArray *pGroupList;
SHashObj *map; // speedup acquire the tableQueryInfo by table uid
} STableGroupInfo;
#define TSDB_BLOCK_DIST_STEP_ROWS 16
typedef struct {
uint16_t rowSize;
uint16_t numOfFiles;
uint32_t numOfTables;
uint64_t totalSize;
uint64_t totalRows;
int32_t maxRows;
int32_t minRows;
int32_t firstSeekTimeUs;
uint32_t numOfRowsInMemTable;
uint32_t numOfSmallBlocks;
SArray *dataBlockInfos;
} STableBlockDist;
/**
* Get the data block iterator, starting from position according to the query condition
*
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param tableInfoGroup table object list in the form of set, grouped into different sets according to the
* group by condition
* @param qinfo query info handle from query processor
* @return
*/
TsdbQueryHandleT *tsdbQueryTables(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId,
SMemRef *pRef);
/**
* Get the last row of the given query time window for all the tables in STableGroupInfo object.
* Note that only one data block with only row will be returned while invoking retrieve data block function for
* all tables in this group.
*
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param tableInfo table list.
* @return
*/
TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, uint64_t qId,
SMemRef *pRef);
TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef);
bool isTsdbCacheLastRow(TsdbQueryHandleT* pTsdbReadHandle);
/**
* get the queried table object list
* @param pHandle
* @return
*/
SArray *tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle);
/**
* get the group list according to table id from client
* @param tsdb
* @param pCond
* @param groupList
* @param qinfo
* @return
*/
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList,
uint64_t qId, SMemRef *pRef);
/**
* get num of rows in mem table
*
* @param pHandle
* @return row size
*/
int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT* pHandle);
/**
* move to next block if exists
*
* @param pTsdbReadHandle
* @return
*/
bool tsdbNextDataBlock(TsdbQueryHandleT pTsdbReadHandle);
/**
* Get current data block information
*
* @param pTsdbReadHandle
* @param pBlockInfo
* @return
*/
void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
/**
*
* Get the pre-calculated information w.r.t. current data block.
*
* In case of data block in cache, the pBlockStatis will always be NULL.
* If a block is not completed loaded from disk, the pBlockStatis will be NULL.
* @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0
* @return
*/
int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT *pTsdbReadHandle, SDataStatis **pBlockStatis);
/**
*
* The query condition with primary timestamp is passed to iterator during its constructor function,
* the returned data block must be satisfied with the time window condition in any cases,
* which means the SData data block is not actually the completed disk data blocks.
*
* @param pTsdbReadHandle query handle
* @param pColumnIdList required data columns id list
* @return
*/
SArray *tsdbRetrieveDataBlock(TsdbQueryHandleT *pTsdbReadHandle, SArray *pColumnIdList);
/**
* Get the qualified table id for a super table according to the tag query expression.
* @param stableid. super table sid
* @param pTagCond. tag query condition
*/
int32_t tsdbQuerySTableByTagCond(STsdbRepo *tsdb, uint64_t uid, TSKEY key, const char *pTagCond, size_t len,
int16_t tagNameRelType, const char *tbnameCond, STableGroupInfo *pGroupList,
SColIndex *pColIndex, int32_t numOfCols);
/**
* destroy the created table group list, which is generated by tag query
* @param pGroupList
*/
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
/**
* create the table group result including only one table, used to handle the normal table query
*
* @param tsdb tsdbHandle
* @param uid table uid
* @param pGroupInfo the generated result
* @return
*/
int32_t tsdbGetOneTableGroup(STsdbRepo *tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
/**
*
* @param tsdb
* @param pTableIdList
* @param pGroupInfo
* @return
*/
int32_t tsdbGetTableGroupFromIdList(STsdbRepo *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo);
/**
* clean up the query handle
* @param queryHandle
*/
void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle);
void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond);
void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond, STableGroupInfo* groupList);
int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist* pTableBlockInfo);
/**
* get the statistics of repo usage
* @param repo. point to the tsdbrepo
* @param totalPoints. total data point written
* @param totalStorage. total bytes took by the tsdb
* @param compStorage. total bytes took by the tsdb after compressed
*/
void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage);
int tsdbInitCommitQueue();
void tsdbDestroyCommitQueue();
int tsdbSyncCommit(STsdbRepo *repo);
void tsdbIncCommitRef(int vgId);
void tsdbDecCommitRef(int vgId);
// For TSDB file sync
int tsdbSyncSend(void *pRepo, SOCKET socketFd);
int tsdbSyncRecv(void *pRepo, SOCKET socketFd);
// For TSDB Compact
int tsdbCompact(STsdbRepo *pRepo);
// For TSDB Health Monitor
// no problem return true
bool tsdbNoProblem(STsdbRepo* pRepo);
#ifdef __cplusplus
}
#endif
#endif // _TD_TSDB_H_
......@@ -4,7 +4,7 @@ system sh/exec.sh -n dnode1 -s start
sql connect
print =============== create database
sql create database d1
sql create database d1 vgroups 2
sql show databases
if $rows != 1 then
return -1
......@@ -22,6 +22,21 @@ if $data03 != 0 then
return -1
endi
print =============== show vgroups1
sql use d1
sql show vgroups
if $rows != 2 then
return -1
endi
if $data00 != 2 then
return -1
endi
if $data10 != 3 then
return -1
endi
print =============== drop database
sql drop database d1
sql show databases
......@@ -30,14 +45,68 @@ if $rows != 0 then
endi
print =============== more databases
sql create database d2
sql create database d3
sql create database d4
sql create database d2 vgroups 2
sql create database d3 vgroups 3
sql create database d4 vgroups 4
sql show databases
if $rows != 3 then
return -1
endi
print =============== show vgroups2
sql show d2.vgroups
if $rows != 2 then
return -1
endi
if $data00 != 4 then
return -1
endi
if $data10 != 5 then
return -1
endi
print =============== show vgroups3
sql show d3.vgroups
if $rows != 3 then
return -1
endi
if $data00 != 6 then
return -1
endi
if $data10 != 7 then
return -1
endi
if $data20 != 8 then
return -1
endi
print =============== show vgroups4
sql show d4.vgroups
if $rows != 4 then
return -1
endi
if $data00 != 9 then
return -1
endi
if $data10 != 10 then
return -1
endi
if $data20 != 11 then
return -1
endi
if $data30 != 12 then
return -1
endi
print =============== drop database
sql drop database d2
sql drop database d3
......@@ -50,7 +119,7 @@ if $data00 != d4 then
return -1
endi
if $data02 != 2 then
if $data02 != 4 then
return -1
endi
......@@ -58,19 +127,12 @@ if $data03 != 0 then
return -1
endi
print =============== show vgroups
sql show databases
if $rows != 1 then
return -1
endi
print =============== show vgroups4 again
sql_error use d1
sql use d4
sql show vgroups
if $rows != 2 then
if $rows != 4 then
return -1
endi
......@@ -81,15 +143,17 @@ if $data00 != 1 then
return -1
endi
if $data02 != 2 then
if $data02 != 4 then
return -1
endi
print =============== restart
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
system sh/exec.sh -n dnode1 -s start
print =============== show databases
sql show databases
if $rows != 1 then
return -1
endi
......@@ -99,7 +163,27 @@ sql_error use d1
sql use d4
sql show vgroups
if $rows != 2 then
if $rows != 4 then
return -1
endi
print =============== create databases
sql create database d5 vgroups 5;
print =============== show vgroups
sql use d5
sql show vgroups
if $rows != 5 then
return -1
endi
sql show d4.vgroups
if $rows != 4 then
return -1
endi
sql show d5.vgroups
if $rows != 5 then
return -1
endi
......
......@@ -7,8 +7,10 @@ print ============================ dnode1 start
$i = 0
$dbPrefix = db
$stPrefix = st
$tbPrefix = tb
$db = $dbPrefix . $i
$st = $stPrefix . $i
$tb = $tbPrefix . $i
print =============== step1
......@@ -68,12 +70,47 @@ if $data06 != 15 then
return -1
endi
return
print =============== step6
$i = $i + 1
while $i < 5
$db = $dbPrefix . $i
$st = $stPrefix . $i
$tb = $tbPrefix . $i
print create database $db
sql create database $db
print use $db
sql use $db
print create table $st (ts timestamp, i int) tags (j int)
sql create table $st (ts timestamp, i int) tags (j int)
print create table $tb using $st tags(1)
sql create table $tb using $st tags(1)
sql show stables
if $rows != 1 then
return -1
endi
print $data00 $data01 $data02 $data03
if $data00 != $st then
return -1
endi
sql show tables
if $rows != 1 then
return -1
endi
print $data00 $data01 $data02 $data03
if $data00 != $tb then
return -1
endi
$i = $i + 1
endw
......@@ -86,93 +123,179 @@ while $i < 5
endw
print =============== step8
$i = 0
$i = 1
$db = $dbPrefix . $i
$st = $stPrefix . $i
$tb = $tbPrefix . $i
sql create database $db
sql use $db
sql create table st (ts timestamp, i int) tags (j int)
sql create table $tb using st tags(1)
sql create table $st (ts timestamp, i int) tags (j int)
sql create table $tb using $st tags(1)
return
system sh/exec.sh -n dnode1 -s stop -x SIGINT
sql show stables
if $rows != 1 then
return -1
endi
if $data00 != $st then
return -1
endi
sql show tables
if $rows != 1 then
return -1
endi
if $data00 != $tb then
return -1
endi
print =============== step9
sql drop database $db
print =============== step10
sql create database $db
sql use $db
sql show stables
if $rows != 0 then
return -1
endi
sql show tables
if $rows != 0 then
return -1
endi
print =============== step11
sql create table st (ts timestamp, i int) tags (j int)
sql create table $tb using st tags(1)
sql create table $st (ts timestamp, i int) tags (j int)
sql create table $tb using $st tags(1)
sql show stables
if $rows != 1 then
return -1
endi
if $data00 != $st then
return -1
endi
sql show tables
if $rows != 1 then
return -1
endi
if $data00 != $tb then
return -1
endi
print =============== step12
sql drop database $db
print =============== step13
sql create database $db
sql use $db
sql show stables
if $rows != 0 then
return -1
endi
sql show tables
if $rows != 0 then
return -1
endi
sql create table st (ts timestamp, i int) tags (j int)
sql create table $tb using st tags(1)
print ============== step14
sql create table $st (ts timestamp, i int) tags (j int)
sql create table $tb using $st tags(1)
sql show stables
if $rows != 1 then
return -1
endi
if $data00 != $st then
return -1
endi
sql show tables
if $rows != 1 then
return -1
endi
if $data00 != $tb then
return -1
endi
sql insert into $tb values (now+1a, 0)
sql insert into $tb values (now+2a, 1)
sql insert into $tb values (now+3a, 2)
sql insert into $tb values (now+4a, 3)
sql insert into $tb values (now+5a, 4)
return
sql select * from $tb
if $rows != 5 then
return -1
endi
sql select * from $stb
if $rows != 5 then
return -1
endi
print =============== step14
sql drop database $db
print =============== step15
sql create database $db
sql use $db
sql show stables
if $rows != 0 then
return -1
endi
sql show tables
if $rows != 0 then
return -1
endi
print =============== step16
sql create table st (ts timestamp, i int) tags (j int)
sql create table $tb using st tags(1)
sql create table $st (ts timestamp, i int) tags (j int)
sql create table $tb using $st tags(1)
sql show stables
if $rows != 1 then
return -1
endi
if $data00 != $st then
return -1
endi
sql show tables
if $rows != 1 then
return -1
endi
if $data00 != $tb then
return -1
endi
sql insert into $tb values (now+1a, 0)
sql insert into $tb values (now+2a, 1)
sql insert into $tb values (now+3a, 2)
sql insert into $tb values (now+4a, 3)
sql insert into $tb values (now+5a, 4)
sql select * from $tb
if $rows != 0 then
if $rows != 5 then
return -1
endi
sql select * from $stb
if $rows != 5 then
return -1
endi
......
......@@ -78,8 +78,8 @@ if $data02 != master then
return -1
endi
print =============== create table
sql create database d1;
print =============== create database
sql create database d1 vgroups 4;
sql create database d2;
sql show databases
......@@ -90,10 +90,98 @@ endi
sql use d1
sql show vgroups;
if $rows != 2 then
if $rows != 4 then
return -1
endi
print =============== create table
sql use d1
sql create table st (ts timestamp, i int) tags (j int)
sql show stables
if $rows != 1 then
return -1
endi
print $data00 $data01 $data02
sql create table c1 using st tags(1)
sql create table c2 using st tags(2)
sql create table c3 using st tags(2)
sql create table c4 using st tags(2)
sql create table c5 using st tags(2)
sql show tables
print $data00 $data01 $data02
#if $rows != 5 then
# return -1
#endi
print =============== insert data
sql insert into c1 values(now+1s, 1)
sql insert into c1 values(now+2s, 2)
sql insert into c1 values(now+3s, 3)
sql insert into c2 values(now+1s, 1)
sql insert into c2 values(now+2s, 2)
sql insert into c2 values(now+3s, 3)
sql insert into c3 values(now+1s, 1)
sql insert into c3 values(now+2s, 2)
sql insert into c3 values(now+3s, 3)
sql insert into c4 values(now+1s, 1)
sql insert into c4 values(now+2s, 2)
sql insert into c4 values(now+3s, 3)
sql insert into c5 values(now+1s, 1)
sql insert into c5 values(now+2s, 2)
sql insert into c5 values(now+3s, 3)
print =============== query data
sql select * from c1
if $rows != 3 then
return -1
endi
print $data00 $data01
print $data10 $data11
print $data20 $data11
if $data01 != 1 then
return -1
endi
if $data11 != 2 then
return -1
endi
if $data21 != 3 then
return -1
endi
sql select * from c2
if $rows != 3 then
return -1
endi
sql select * from c3
if $rows != 3 then
return -1
endi
sql select * from c4
if $rows != 3 then
return -1
endi
sql select * from c5
if $rows != 3 then
return -1
endi
#sql select * from st
#if $rows != 15 then
# return -1
#endi
print =============== drop dnode
sql drop dnode 2;
sql show dnodes;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册