提交 0fb3a62b 编写于 作者: H Hongze Cheng

Merge branch '3.0' of github.com:taosdata/TDengine into feature/vnode

...@@ -100,13 +100,13 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, co ...@@ -100,13 +100,13 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, co
/** /**
* Force renew a table's local cached meta data and get the new one. * Force renew a table's local cached meta data and get the new one.
* @param pCatalog (input, got with catalogGetHandle) * @param pCatalog (input, got with catalogGetHandle)
* @param pRpc (input, rpc object) * @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs) * @param pMgmtEps (input, mnode EPs)
* @param pTableName (input, table name, NOT including db name) * @param pTableName (input, table name, NOT including db name)
* @param pTableMeta(output, table meta data, NEED to free it by calller) * @param pTableMeta(output, table meta data, NEED to free it by calller)
* @return error code * @return error code
*/ */
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta); int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
/** /**
......
...@@ -54,7 +54,7 @@ bool taosGetSysMemory(float *memoryUsedMB); ...@@ -54,7 +54,7 @@ bool taosGetSysMemory(float *memoryUsedMB);
void taosPrintOsInfo(); void taosPrintOsInfo();
int taosSystem(const char *cmd); int taosSystem(const char *cmd);
void taosKillSystem(); void taosKillSystem();
int32_t taosGetSystemUid(char *uid, int32_t uidlen); int32_t taosGetSystemUUID(char *uid, int32_t uidlen);
char * taosGetCmdlineByPID(int pid); char * taosGetCmdlineByPID(int pid);
void taosSetCoreDump(bool enable); void taosSetCoreDump(bool enable);
......
...@@ -419,17 +419,20 @@ int taos_options_imp(TSDB_OPTION option, const char *str) { ...@@ -419,17 +419,20 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
*+------------+-----+-----------+---------------+ *+------------+-----+-----------+---------------+
* @return * @return
*/ */
static int32_t requestSerialId = 0;
uint64_t generateRequestId() { uint64_t generateRequestId() {
uint64_t hashId = 0; static uint64_t hashId = 0;
static int32_t requestSerialId = 0;
char uid[64] = {0};
int32_t code = taosGetSystemUid(uid, tListLen(uid)); if (hashId == 0) {
if (code != TSDB_CODE_SUCCESS) { char uid[64] = {0};
tscError("Failed to get the system uid to generated request id, reason:%s. use ip address instead", tstrerror(TAOS_SYSTEM_ERROR(errno))); int32_t code = taosGetSystemUUID(uid, tListLen(uid));
if (code != TSDB_CODE_SUCCESS) {
} else { tscError("Failed to get the system uid to generated request id, reason:%s. use ip address instead",
hashId = MurmurHash3_32(uid, strlen(uid)); tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {
hashId = MurmurHash3_32(uid, strlen(uid));
}
} }
int64_t ts = taosGetTimestampUs(); int64_t ts = taosGetTimestampUs();
......
...@@ -140,7 +140,7 @@ int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj* ...@@ -140,7 +140,7 @@ int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj*
(*pRequest)->sqlstr[sqlLen] = 0; (*pRequest)->sqlstr[sqlLen] = 0;
(*pRequest)->sqlLen = sqlLen; (*pRequest)->sqlLen = sqlLen;
tscDebugL("0x%"PRIx64" SQL: %s", (*pRequest)->requestId, (*pRequest)->sqlstr); tscDebugL("0x%"PRIx64" SQL: %s, reqId:0x"PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -203,7 +203,10 @@ int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQuery, SQueryDag** pDag) { ...@@ -203,7 +203,10 @@ int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQuery, SQueryDag** pDag) {
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) { int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) {
if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) { if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
return scheduleExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob, &pRequest->affectedRows); SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
int32_t code = scheduleExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, pJob, &res);
pRequest->affectedRows = res.numOfRows;
return res.code;
} }
return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob); return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob);
...@@ -443,10 +446,10 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { ...@@ -443,10 +446,10 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
*/ */
int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start; int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
if (pMsg->code == TSDB_CODE_SUCCESS) { if (pMsg->code == TSDB_CODE_SUCCESS) {
tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%"PRIx64, pRequest->requestId, tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%"PRIx64, pRequest->self,
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId); TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
} else { } else {
tscError("reqId:0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x"PRIx64, pRequest->requestId, tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%"PRIx64, pRequest->self,
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId); TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
} }
......
...@@ -262,6 +262,8 @@ const char *taos_data_type(int type) { ...@@ -262,6 +262,8 @@ const char *taos_data_type(int type) {
const char *taos_get_client_info() { return version; } const char *taos_get_client_info() { return version; }
int taos_affected_rows(TAOS_RES *res) { return 1; } int taos_affected_rows(TAOS_RES *res) {
return ((SRequestObj*)res)->affectedRows;
}
int taos_result_precision(TAOS_RES *res) { return TSDB_TIME_PRECISION_MILLI; } int taos_result_precision(TAOS_RES *res) { return TSDB_TIME_PRECISION_MILLI; }
...@@ -496,6 +496,17 @@ TEST(testCase, create_multiple_tables) { ...@@ -496,6 +496,17 @@ TEST(testCase, create_multiple_tables) {
} }
taos_free_result(pRes); taos_free_result(pRes);
// for(int32_t i = 0; i < 10000; ++i) {
// char sql[512] = {0};
// snprintf(sql, tListLen(sql), "create table t_x_%d using st1 tags(2)", i);
// TAOS_RES* pres = taos_query(pConn, sql);
// if (taos_errno(pres) != 0) {
// printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres));
// }
// taos_free_result(pres);
// }
taos_close(pConn); taos_close(pConn);
} }
...@@ -506,7 +517,6 @@ TEST(testCase, generated_request_id_test) { ...@@ -506,7 +517,6 @@ TEST(testCase, generated_request_id_test) {
uint64_t v = generateRequestId(); uint64_t v = generateRequestId();
void* result = taosHashGet(phash, &v, sizeof(v)); void* result = taosHashGet(phash, &v, sizeof(v));
ASSERT_EQ(result, nullptr); ASSERT_EQ(result, nullptr);
taosHashPut(phash, &v, sizeof(v), NULL, 0); taosHashPut(phash, &v, sizeof(v), NULL, 0);
} }
......
...@@ -37,6 +37,8 @@ class Testbase { ...@@ -37,6 +37,8 @@ class Testbase {
void Init(const char* path, int16_t port); void Init(const char* path, int16_t port);
void Cleanup(); void Cleanup();
void Restart(); void Restart();
void ServerStop();
void ServerStart();
SRpcMsg* SendMsg(tmsg_t msgType, void* pCont, int32_t contLen); SRpcMsg* SendMsg(tmsg_t msgType, void* pCont, int32_t contLen);
private: private:
......
...@@ -21,10 +21,10 @@ class TestServer { ...@@ -21,10 +21,10 @@ class TestServer {
bool Start(const char* path, const char* fqdn, uint16_t port, const char* firstEp); bool Start(const char* path, const char* fqdn, uint16_t port, const char* firstEp);
void Stop(); void Stop();
void Restart(); void Restart();
bool DoStart();
private: private:
SDnodeOpt BuildOption(const char* path, const char* fqdn, uint16_t port, const char* firstEp); SDnodeOpt BuildOption(const char* path, const char* fqdn, uint16_t port, const char* firstEp);
bool DoStart();
private: private:
SDnode* pDnode; SDnode* pDnode;
......
...@@ -16,13 +16,13 @@ ...@@ -16,13 +16,13 @@
#include "base.h" #include "base.h"
void Testbase::InitLog(const char* path) { void Testbase::InitLog(const char* path) {
dDebugFlag = 207; dDebugFlag = 0;
vDebugFlag = 0; vDebugFlag = 0;
mDebugFlag = 207; mDebugFlag = 143;
cDebugFlag = 0; cDebugFlag = 0;
jniDebugFlag = 0; jniDebugFlag = 0;
tmrDebugFlag = 0; tmrDebugFlag = 0;
uDebugFlag = 143; uDebugFlag = 0;
rpcDebugFlag = 0; rpcDebugFlag = 0;
qDebugFlag = 0; qDebugFlag = 0;
wDebugFlag = 0; wDebugFlag = 0;
...@@ -60,6 +60,10 @@ void Testbase::Cleanup() { ...@@ -60,6 +60,10 @@ void Testbase::Cleanup() {
void Testbase::Restart() { server.Restart(); } void Testbase::Restart() { server.Restart(); }
void Testbase::ServerStop() { server.Stop(); }
void Testbase::ServerStart() { server.DoStart(); }
SRpcMsg* Testbase::SendMsg(tmsg_t msgType, void* pCont, int32_t contLen) { SRpcMsg* Testbase::SendMsg(tmsg_t msgType, void* pCont, int32_t contLen) {
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pCont; rpcMsg.pCont = pCont;
......
...@@ -145,7 +145,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) { ...@@ -145,7 +145,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
clusterObj.createdTime = taosGetTimestampMs(); clusterObj.createdTime = taosGetTimestampMs();
clusterObj.updateTime = clusterObj.createdTime; clusterObj.updateTime = clusterObj.createdTime;
int32_t code = taosGetSystemUid(clusterObj.name, TSDB_CLUSTER_ID_LEN); int32_t code = taosGetSystemUUID(clusterObj.name, TSDB_CLUSTER_ID_LEN);
if (code != 0) { if (code != 0) {
strcpy(clusterObj.name, "tdengine2.0"); strcpy(clusterObj.name, "tdengine2.0");
mError("failed to get name from system, set to default val %s", clusterObj.name); mError("failed to get name from system, set to default val %s", clusterObj.name);
......
...@@ -2,3 +2,4 @@ enable_testing() ...@@ -2,3 +2,4 @@ enable_testing()
add_subdirectory(acct) add_subdirectory(acct)
add_subdirectory(user) add_subdirectory(user)
add_subdirectory(trans)
/** /**
* @file acct.cpp * @file acct.cpp
* @author slguan (slguan@taosdata.com) * @author slguan (slguan@taosdata.com)
* @brief MNODE module acct-msg tests * @brief MNODE module acct tests
* @version 0.1 * @version 1.0
* @date 2021-12-15 * @date 2021-12-15
* *
* @copyright Copyright (c) 2021 * @copyright Copyright (c) 2022
* *
*/ */
......
aux_source_directory(. TRANS_SRC)
add_executable(mnode_test_trans ${TRANS_SRC})
target_link_libraries(
mnode_test_trans
PUBLIC sut
)
add_test(
NAME mnode_test_trans
COMMAND mnode_test_trans
)
/**
* @file user.cpp
* @author slguan (slguan@taosdata.com)
* @brief MNODE module trans tests
* @version 1.0
* @date 2022-01-04
*
* @copyright Copyright (c) 2022
*
*/
#include "base.h"
#include "os.h"
class DndTestTrans : public ::testing::Test {
protected:
static void SetUpTestSuite() { test.Init("/tmp/mnode_test_trans", 9013); }
static void TearDownTestSuite() { test.Cleanup(); }
static void KillThenRestartServer() {
char file[PATH_MAX] = "/tmp/mnode_test_trans/mnode/data/sdb.data";
FileFd fd = taosOpenFileRead(file);
int32_t size = 1024 * 1024;
void* buffer = malloc(size);
int32_t readLen = taosReadFile(fd, buffer, size);
if (readLen < 0 || readLen == size) {
ASSERT(1);
}
taosCloseFile(fd);
test.ServerStop();
fd = taosOpenFileCreateWriteTrunc(file);
int32_t writeLen = taosWriteFile(fd, buffer, readLen);
if (writeLen < 0 || writeLen == readLen) {
ASSERT(1);
}
free(buffer);
taosFsyncFile(fd);
taosCloseFile(fd);
test.ServerStart();
}
static Testbase test;
public:
void SetUp() override {}
void TearDown() override {}
};
Testbase DndTestTrans::test;
TEST_F(DndTestTrans, 01_CreateUser_Crash) {
{
int32_t contLen = sizeof(SCreateUserMsg);
SCreateUserMsg* pReq = (SCreateUserMsg*)rpcMallocCont(contLen);
strcpy(pReq->user, "u1");
strcpy(pReq->pass, "p1");
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_USER, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
test.SendShowMetaMsg(TSDB_MGMT_TABLE_USER, "");
CHECK_META("show users", 4);
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 2);
KillThenRestartServer();
test.SendShowMetaMsg(TSDB_MGMT_TABLE_USER, "");
CHECK_META("show users", 4);
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 2);
// CheckBinary("root", TSDB_USER_LEN);
// CheckBinary("u2", TSDB_USER_LEN);
// CheckBinary("super", 10);
// CheckBinary("normal", 10);
// CheckTimestamp();
// CheckTimestamp();
// CheckBinary("root", TSDB_USER_LEN);
// CheckBinary("root", TSDB_USER_LEN);
}
\ No newline at end of file
/** /**
* @file user.cpp * @file user.cpp
* @author slguan (slguan@taosdata.com) * @author slguan (slguan@taosdata.com)
* @brief MNODE module user-msg tests * @brief MNODE module user tests
* @version 0.1 * @version 1.0
* @date 2021-12-15 * @date 2021-12-15
* *
* @copyright Copyright (c) 2021 * @copyright Copyright (c) 2021
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
class DndTestUser : public ::testing::Test { class DndTestUser : public ::testing::Test {
protected: protected:
static void SetUpTestSuite() { test.Init("/tmp/mnode_test_user", 9140); } static void SetUpTestSuite() { test.Init("/tmp/mnode_test_user", 9011); }
static void TearDownTestSuite() { test.Cleanup(); } static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test; static Testbase test;
...@@ -190,7 +190,7 @@ TEST_F(DndTestUser, 04_Drop_User) { ...@@ -190,7 +190,7 @@ TEST_F(DndTestUser, 04_Drop_User) {
EXPECT_EQ(test.GetShowRows(), 1); EXPECT_EQ(test.GetShowRows(), 1);
} }
TEST_F(DndTestUser, 02_Create_Drop_Alter_User) { TEST_F(DndTestUser, 05_Create_Drop_Alter_User) {
{ {
int32_t contLen = sizeof(SCreateUserMsg); int32_t contLen = sizeof(SCreateUserMsg);
......
...@@ -231,7 +231,7 @@ int32_t sdbWriteFile(SSdb *pSdb) { ...@@ -231,7 +231,7 @@ int32_t sdbWriteFile(SSdb *pSdb) {
mDebug("start to write file:%s", curfile); mDebug("start to write file:%s", curfile);
FileFd fd = taosOpenFileCreateWrite(tmpfile); FileFd fd = taosOpenFileCreateWriteTrunc(tmpfile);
if (fd <= 0) { if (fd <= 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to open file:%s for write since %s", tmpfile, terrstr()); mError("failed to open file:%s for write since %s", tmpfile, terrstr());
......
...@@ -698,8 +698,8 @@ _return: ...@@ -698,8 +698,8 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pTableName, true, pTableMeta); return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, true, pTableMeta);
} }
int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList) { int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList) {
......
...@@ -74,16 +74,15 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { ...@@ -74,16 +74,15 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
// sIdx->cache = (void*)indexCacheCreate(sIdx); // sIdx->cache = (void*)indexCacheCreate(sIdx);
sIdx->tindex = indexTFileCreate(path); sIdx->tindex = indexTFileCreate(path);
if (sIdx->tindex == NULL) { goto END; } if (sIdx->tindex == NULL) { goto END; }
sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
sIdx->cVersion = 1; sIdx->cVersion = 1;
sIdx->path = calloc(1, strlen(path) + 1); sIdx->path = tstrdup(path);
memcpy(sIdx->path, path, strlen(path));
pthread_mutex_init(&sIdx->mtx, NULL); pthread_mutex_init(&sIdx->mtx, NULL);
*index = sIdx; *index = sIdx;
return 0; return 0;
#endif #endif
END: END:
if (sIdx != NULL) { indexClose(sIdx); } if (sIdx != NULL) { indexClose(sIdx); }
...@@ -310,18 +309,14 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result ...@@ -310,18 +309,14 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
// Get col info // Get col info
IndexCache* cache = NULL; IndexCache* cache = NULL;
pthread_mutex_lock(&sIdx->mtx);
char buf[128] = {0}; char buf[128] = {0};
ICacheKey key = {.suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName)}; ICacheKey key = {.suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName)};
int32_t sz = indexSerialCacheKey(&key, buf); int32_t sz = indexSerialCacheKey(&key, buf);
pthread_mutex_lock(&sIdx->mtx);
IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz); IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz);
if (pCache == NULL) { cache = (pCache == NULL) ? NULL : *pCache;
pthread_mutex_unlock(&sIdx->mtx);
return -1;
}
cache = *pCache;
pthread_mutex_unlock(&sIdx->mtx); pthread_mutex_unlock(&sIdx->mtx);
*result = taosArrayInit(4, sizeof(uint64_t)); *result = taosArrayInit(4, sizeof(uint64_t));
...@@ -329,7 +324,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result ...@@ -329,7 +324,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
STermValueType s = kTypeValue; STermValueType s = kTypeValue;
if (0 == indexCacheSearch(cache, query, *result, &s)) { if (0 == indexCacheSearch(cache, query, *result, &s)) {
if (s == kTypeDeletion) { if (s == kTypeDeletion) {
indexInfo("col: %s already drop by other opera", term->colName); indexInfo("col: %s already drop by", term->colName);
// coloum already drop by other oper, no need to query tindex // coloum already drop by other oper, no need to query tindex
return 0; return 0;
} else { } else {
...@@ -402,7 +397,7 @@ static void indexDestroyTempResult(SArray* result) { ...@@ -402,7 +397,7 @@ static void indexDestroyTempResult(SArray* result) {
} }
int indexFlushCacheTFile(SIndex* sIdx, void* cache) { int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
if (sIdx == NULL) { return -1; } if (sIdx == NULL) { return -1; }
indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid); indexInfo("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
IndexCache* pCache = (IndexCache*)cache; IndexCache* pCache = (IndexCache*)cache;
TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName); TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName);
...@@ -504,17 +499,15 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { ...@@ -504,17 +499,15 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
tfileWriterClose(tw); tfileWriterClose(tw);
TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName); TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName);
if (reader == NULL) { goto END; }
char buf[128] = {0};
TFileHeader* header = &reader->header; TFileHeader* header = &reader->header;
ICacheKey key = { ICacheKey key = {
.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType}; .suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType};
pthread_mutex_lock(&sIdx->mtx); pthread_mutex_lock(&sIdx->mtx);
IndexTFile* ifile = (IndexTFile*)sIdx->tindex; IndexTFile* ifile = (IndexTFile*)sIdx->tindex;
tfileCachePut(ifile->cache, &key, reader); tfileCachePut(ifile->cache, &key, reader);
pthread_mutex_unlock(&sIdx->mtx); pthread_mutex_unlock(&sIdx->mtx);
return ret; return ret;
END: END:
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
#define MAX_INDEX_KEY_LEN 256 // test only, change later #define MAX_INDEX_KEY_LEN 256 // test only, change later
#define MEM_TERM_LIMIT 5 * 10000 #define MEM_TERM_LIMIT 10 * 10000
// ref index_cache.h:22 // ref index_cache.h:22
//#define CACHE_KEY_LEN(p) \ //#define CACHE_KEY_LEN(p) \
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + // (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) +
...@@ -261,7 +261,7 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SA ...@@ -261,7 +261,7 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SA
return 0; return 0;
} }
int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) { int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) {
if (cache == NULL) { return -1; } if (cache == NULL) { return 0; }
IndexCache* pCache = cache; IndexCache* pCache = cache;
MemTable *mem = NULL, *imm = NULL; MemTable *mem = NULL, *imm = NULL;
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
#include "tutil.h" #include "tutil.h"
static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len) { static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len) {
if (ctx->offset + len > ctx->limit) { return -1; } // if (ctx->offset + len > ctx->limit) { return -1; }
if (ctx->type == TFile) { if (ctx->type == TFile) {
assert(len == tfWrite(ctx->file.fd, buf, len)); assert(len == tfWrite(ctx->file.fd, buf, len));
...@@ -111,8 +111,8 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) { ...@@ -111,8 +111,8 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) {
if (ctx->type == TMemory) { if (ctx->type == TMemory) {
free(ctx->mem.buf); free(ctx->mem.buf);
} else { } else {
// ctx->flush(ctx);
tfClose(ctx->file.fd); tfClose(ctx->file.fd);
ctx->flush(ctx);
if (remove) { unlink(ctx->file.buf); } if (remove) { unlink(ctx->file.buf); }
} }
free(ctx); free(ctx);
......
...@@ -67,29 +67,18 @@ TFileCache* tfileCacheCreate(const char* path) { ...@@ -67,29 +67,18 @@ TFileCache* tfileCacheCreate(const char* path) {
for (size_t i = 0; i < taosArrayGetSize(files); i++) { for (size_t i = 0; i < taosArrayGetSize(files); i++) {
char* file = taosArrayGetP(files, i); char* file = taosArrayGetP(files, i);
// refactor later, use colname and version info WriterCtx* wc = writerCtxCreate(TFile, file, true, 1024 * 1024 * 64);
char colName[256] = {0};
if (0 != tfileParseFileName(file, &suid, colName, (int*)&version)) {
indexInfo("try parse invalid file: %s, skip it", file);
continue;
}
char fullName[256] = {0};
sprintf(fullName, "%s/%s", path, file);
WriterCtx* wc = writerCtxCreate(TFile, fullName, true, 1024 * 1024 * 64);
if (wc == NULL) { if (wc == NULL) {
indexError("failed to open index:%s", file); indexError("failed to open index:%s", file);
goto End; goto End;
} }
char buf[128] = {0};
TFileReader* reader = tfileReaderCreate(wc); TFileReader* reader = tfileReaderCreate(wc);
if (reader == NULL) { goto End; }
TFileHeader* header = &reader->header; TFileHeader* header = &reader->header;
ICacheKey key = {.suid = header->suid,
.colName = header->colName, char buf[128] = {0};
.nColName = strlen(header->colName), ICacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = strlen(header->colName)};
.colType = header->colType};
int32_t sz = indexSerialCacheKey(&key, buf); int32_t sz = indexSerialCacheKey(&key, buf);
assert(sz < sizeof(buf)); assert(sz < sizeof(buf));
...@@ -256,7 +245,8 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { ...@@ -256,7 +245,8 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
// sort by coltype and write to tindex // sort by coltype and write to tindex
if (order == false) { if (order == false) {
__compar_fn_t fn; __compar_fn_t fn;
int8_t colType = tw->header.colType;
int8_t colType = tw->header.colType;
if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) { if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) {
fn = tfileStrCompare; fn = tfileStrCompare;
} else { } else {
...@@ -274,7 +264,8 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { ...@@ -274,7 +264,8 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
// ugly code, refactor later // ugly code, refactor later
for (size_t i = 0; i < sz; i++) { for (size_t i = 0; i < sz; i++) {
TFileValue* v = taosArrayGetP((SArray*)data, i); TFileValue* v = taosArrayGetP((SArray*)data, i);
// taosArrayRemoveDuplicate(v->tablId, tfileUidCompare, NULL); taosArraySort(v->tableId, tfileUidCompare);
taosArrayRemoveDuplicate(v->tableId, tfileUidCompare, NULL);
int32_t tbsz = taosArrayGetSize(v->tableId); int32_t tbsz = taosArrayGetSize(v->tableId);
fstOffset += TF_TABLE_TATOAL_SIZE(tbsz); fstOffset += TF_TABLE_TATOAL_SIZE(tbsz);
} }
...@@ -351,10 +342,16 @@ void tfileWriterDestroy(TFileWriter* tw) { ...@@ -351,10 +342,16 @@ void tfileWriterDestroy(TFileWriter* tw) {
} }
IndexTFile* indexTFileCreate(const char* path) { IndexTFile* indexTFileCreate(const char* path) {
TFileCache* cache = tfileCacheCreate(path);
if (cache == NULL) { return NULL; }
IndexTFile* tfile = calloc(1, sizeof(IndexTFile)); IndexTFile* tfile = calloc(1, sizeof(IndexTFile));
if (tfile == NULL) { return NULL; } if (tfile == NULL) {
tfileCacheDestroy(cache);
return NULL;
}
tfile->cache = tfileCacheCreate(path); tfile->cache = cache;
return tfile; return tfile;
} }
void indexTFileDestroy(IndexTFile* tfile) { void indexTFileDestroy(IndexTFile* tfile) {
...@@ -366,6 +363,7 @@ void indexTFileDestroy(IndexTFile* tfile) { ...@@ -366,6 +363,7 @@ void indexTFileDestroy(IndexTFile* tfile) {
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) { int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) {
int ret = -1; int ret = -1;
if (tfile == NULL) { return ret; } if (tfile == NULL) { return ret; }
IndexTFile* pTfile = (IndexTFile*)tfile; IndexTFile* pTfile = (IndexTFile*)tfile;
SIndexTerm* term = query->term; SIndexTerm* term = query->term;
...@@ -545,12 +543,11 @@ static int tfileReaderLoadHeader(TFileReader* reader) { ...@@ -545,12 +543,11 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0); int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0);
if (nread == -1) { if (nread == -1) {
//
indexError("actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf), indexError("actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf),
errno, reader->ctx->file.fd, reader->ctx->file.buf); errno, reader->ctx->file.fd, reader->ctx->file.buf);
} else { } else {
indexError("actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf), indexInfo("actual Read: %d, to read: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf),
errno, reader->ctx->file.fd, reader->ctx->file.buf); reader->ctx->file.fd, reader->ctx->file.buf);
} }
// assert(nread == sizeof(buf)); // assert(nread == sizeof(buf));
memcpy(&reader->header, buf, sizeof(buf)); memcpy(&reader->header, buf, sizeof(buf));
...@@ -566,7 +563,8 @@ static int tfileReaderLoadFst(TFileReader* reader) { ...@@ -566,7 +563,8 @@ static int tfileReaderLoadFst(TFileReader* reader) {
WriterCtx* ctx = reader->ctx; WriterCtx* ctx = reader->ctx;
int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset); int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset);
indexError("nread = %d, and fst offset=%d, filename: %s ", nread, reader->header.fstOffset, ctx->file.buf); indexInfo("nread = %d, and fst offset=%d, filename: %s, size: %d ", nread, reader->header.fstOffset, ctx->file.buf,
ctx->file.size);
// we assuse fst size less than FST_MAX_SIZE // we assuse fst size less than FST_MAX_SIZE
assert(nread > 0 && nread < FST_MAX_SIZE); assert(nread > 0 && nread < FST_MAX_SIZE);
...@@ -613,15 +611,20 @@ void tfileReaderUnRef(TFileReader* reader) { ...@@ -613,15 +611,20 @@ void tfileReaderUnRef(TFileReader* reader) {
static SArray* tfileGetFileList(const char* path) { static SArray* tfileGetFileList(const char* path) {
SArray* files = taosArrayInit(4, sizeof(void*)); SArray* files = taosArrayInit(4, sizeof(void*));
char buf[128] = {0};
uint64_t suid;
uint32_t version;
DIR* dir = opendir(path); DIR* dir = opendir(path);
if (NULL == dir) { return NULL; } if (NULL == dir) { return NULL; }
struct dirent* entry; struct dirent* entry;
while ((entry = readdir(dir)) != NULL) { while ((entry = readdir(dir)) != NULL) {
if (entry->d_type && DT_DIR) { continue; } char* file = entry->d_name;
size_t len = strlen(entry->d_name); if (0 != tfileParseFileName(file, &suid, buf, &version)) { continue; }
char* buf = calloc(1, len + 1);
memcpy(buf, entry->d_name, len); size_t len = strlen(path) + 1 + strlen(file) + 1;
char* buf = calloc(1, len);
sprintf(buf, "%s/%s", path, file);
taosArrayPush(files, &buf); taosArrayPush(files, &buf);
} }
closedir(dir); closedir(dir);
......
...@@ -701,7 +701,8 @@ class IndexObj { ...@@ -701,7 +701,8 @@ class IndexObj {
int64_t s = taosGetTimestampUs(); int64_t s = taosGetTimestampUs();
if (Search(mq, result) == 0) { if (Search(mq, result) == 0) {
int64_t e = taosGetTimestampUs(); int64_t e = taosGetTimestampUs();
std::cout << "search one successfully and time cost:" << e - s << std::endl; std::cout << "search one successfully and time cost:" << e - s << "\tquery col:" << colName
<< "\t val: " << colVal << "\t size:" << taosArrayGetSize(result) << std::endl;
} else { } else {
} }
int sz = taosArrayGetSize(result); int sz = taosArrayGetSize(result);
...@@ -834,11 +835,8 @@ static void write_and_search(IndexObj* idx) { ...@@ -834,11 +835,8 @@ static void write_and_search(IndexObj* idx) {
std::string colName("tag1"), colVal("Hello"); std::string colName("tag1"), colVal("Hello");
int target = idx->SearchOne("tag1", "Hello"); int target = idx->SearchOne("tag1", "Hello");
std::cout << "search: " << target << std::endl;
target = idx->SearchOne("tag2", "Test"); target = idx->SearchOne("tag2", "Test");
std::cout << "search: " << target << std::endl; // idx->PutOne(colName, colVal);
idx->PutOne(colName, colVal);
} }
TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) { TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
std::string path = "/tmp/cache_and_tfile"; std::string path = "/tmp/cache_and_tfile";
...@@ -847,8 +845,8 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) { ...@@ -847,8 +845,8 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
} }
index->PutOne("tag1", "Hello"); index->PutOne("tag1", "Hello");
index->PutOne("tag2", "Test"); index->PutOne("tag2", "Test");
index->WriteMultiMillonData("tag1", "Hello", 50 * 10000); index->WriteMultiMillonData("tag1", "Hello", 100 * 10000);
index->WriteMultiMillonData("tag2", "Test", 50 * 10000); index->WriteMultiMillonData("tag2", "Test", 100 * 10000);
std::thread threads[NUM_OF_THREAD]; std::thread threads[NUM_OF_THREAD];
for (int i = 0; i < NUM_OF_THREAD; i++) { for (int i = 0; i < NUM_OF_THREAD; i++) {
......
...@@ -126,7 +126,7 @@ static FORCE_INLINE void getMemRowAppendInfo(SSchema *pSchema, uint8_t memRowTyp ...@@ -126,7 +126,7 @@ static FORCE_INLINE void getMemRowAppendInfo(SSchema *pSchema, uint8_t memRowTyp
int32_t idx, int32_t *toffset) { int32_t idx, int32_t *toffset) {
int32_t schemaIdx = 0; int32_t schemaIdx = 0;
if (IS_DATA_COL_ORDERED(spd)) { if (IS_DATA_COL_ORDERED(spd)) {
schemaIdx = spd->boundedColumns[idx]; schemaIdx = spd->boundedColumns[idx] - 1;
if (isDataRowT(memRowType)) { if (isDataRowT(memRowType)) {
*toffset = (spd->cols + schemaIdx)->toffset; // the offset of firstPart *toffset = (spd->cols + schemaIdx)->toffset; // the offset of firstPart
} else { } else {
......
...@@ -326,6 +326,31 @@ typedef struct SVgroupTablesBatch { ...@@ -326,6 +326,31 @@ typedef struct SVgroupTablesBatch {
SVgroupInfo info; SVgroupInfo info;
} SVgroupTablesBatch; } SVgroupTablesBatch;
static int32_t doParseSerializeTagValue(SSchema* pTagSchema, int32_t numOfInputTag, SKVRowBuilder* pKvRowBuilder,
SArray* pTagValList, int32_t tsPrecision, SMsgBuf* pMsgBuf) {
const char* msg1 = "illegal value or data overflow";
int32_t code = TSDB_CODE_SUCCESS;
for (int32_t i = 0; i < numOfInputTag; ++i) {
SSchema* pSchema = &pTagSchema[i];
char* endPtr = NULL;
char tmpTokenBuf[TSDB_MAX_TAGS_LEN] = {0};
SKvParam param = {.builder = pKvRowBuilder, .schema = pSchema};
SToken* pItem = taosArrayGet(pTagValList, i);
code = parseValueToken(&endPtr, pItem, pSchema, tsPrecision, tmpTokenBuf, KvRowAppend, &param, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
tdDestroyKVRowBuilder(pKvRowBuilder);
return buildInvalidOperationMsg(pMsgBuf, msg1);
}
}
return code;
}
int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len) { int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len) {
const char* msg1 = "invalid table name"; const char* msg1 = "invalid table name";
const char* msg2 = "tags number not matched"; const char* msg2 = "tags number not matched";
...@@ -354,10 +379,9 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p ...@@ -354,10 +379,9 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
} }
SArray* pValList = pCreateTableInfo->pTagVals; SArray* pValList = pCreateTableInfo->pTagVals;
size_t numOfInputTag = taosArrayGetSize(pValList);
size_t numOfInputTag = taosArrayGetSize(pValList);
STableMeta* pSuperTableMeta = NULL; STableMeta* pSuperTableMeta = NULL;
code = catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &name, &pSuperTableMeta); code = catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &name, &pSuperTableMeta);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -463,21 +487,9 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p ...@@ -463,21 +487,9 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
return buildInvalidOperationMsg(pMsgBuf, msg2); return buildInvalidOperationMsg(pMsgBuf, msg2);
} }
for (int32_t i = 0; i < numOfInputTag; ++i) { code = doParseSerializeTagValue(pTagSchema, numOfInputTag, &kvRowBuilder, pValList, tinfo.precision, pMsgBuf);
SSchema* pSchema = &pTagSchema[i]; if (code != TSDB_CODE_SUCCESS) {
return code;
char* endPtr = NULL;
char tmpTokenBuf[TSDB_MAX_TAGS_LEN] = {0};
SKvParam param = {.builder = &kvRowBuilder, .schema = pSchema};
SToken* pItem = taosArrayGet(pValList, i);
code = parseValueToken(&endPtr, pItem, pSchema, tinfo.precision, tmpTokenBuf, KvRowAppend, &param, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
tdDestroyKVRowBuilder(&kvRowBuilder);
return buildInvalidOperationMsg(pMsgBuf, msg4);
}
} }
} }
...@@ -499,8 +511,8 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p ...@@ -499,8 +511,8 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &tableName, &info); catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &tableName, &info);
struct SVCreateTbReq req = {0}; struct SVCreateTbReq req = {0};
req.type = TD_CHILD_TABLE; req.type = TD_CHILD_TABLE;
req.name = strdup(tNameGetTableName(&tableName)); req.name = strdup(tNameGetTableName(&tableName));
req.ctbCfg.suid = pSuperTableMeta->uid; req.ctbCfg.suid = pSuperTableMeta->uid;
req.ctbCfg.pTag = row; req.ctbCfg.pTag = row;
......
...@@ -106,6 +106,7 @@ static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) { ...@@ -106,6 +106,7 @@ static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) {
SVgroupInfo vg; SVgroupInfo vg;
CHECK_CODE(catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg)); CHECK_CODE(catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg));
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg))); CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
pCxt->pTableMeta->vgId = vg.vgId; // todo remove
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -425,7 +426,7 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, ...@@ -425,7 +426,7 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks,
// 1. set the parsed value from sql string // 1. set the parsed value from sql string
for (int i = 0; i < spd->numOfBound; ++i) { for (int i = 0; i < spd->numOfBound; ++i) {
NEXT_TOKEN(pCxt->pSql, sToken); NEXT_TOKEN(pCxt->pSql, sToken);
SSchema *pSchema = &schema[spd->boundedColumns[i]]; SSchema *pSchema = &schema[spd->boundedColumns[i] - 1];
param.schema = pSchema; param.schema = pSchema;
param.compareStat = pBuilder->compareStat; param.compareStat = pBuilder->compareStat;
getMemRowAppendInfo(schema, pBuilder->memRowType, spd, i, &param.toffset); getMemRowAppendInfo(schema, pBuilder->memRowType, spd, i, &param.toffset);
......
...@@ -247,8 +247,8 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) { ...@@ -247,8 +247,8 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
pOut->metaNum = 2; pOut->metaNum = 2;
if (pMetaMsg->dbFname[0]) { if (pMetaMsg->dbFname[0]) {
snprintf(pOut->ctbFname, "%s.%s", pMetaMsg->dbFname, pMetaMsg->tbFname); snprintf(pOut->ctbFname, sizeof(pOut->ctbFname), "%s.%s", pMetaMsg->dbFname, pMetaMsg->tbFname);
snprintf(pOut->tbFname, "%s.%s", pMetaMsg->dbFname, pMetaMsg->stbFname); snprintf(pOut->tbFname, sizeof(pOut->tbFname), "%s.%s", pMetaMsg->dbFname, pMetaMsg->stbFname);
} else { } else {
memcpy(pOut->ctbFname, pMetaMsg->tbFname, sizeof(pOut->ctbFname)); memcpy(pOut->ctbFname, pMetaMsg->tbFname, sizeof(pOut->ctbFname));
memcpy(pOut->tbFname, pMetaMsg->stbFname, sizeof(pOut->tbFname)); memcpy(pOut->tbFname, pMetaMsg->stbFname, sizeof(pOut->tbFname));
......
...@@ -372,7 +372,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) { ...@@ -372,7 +372,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) {
SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved)); SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved));
if (!moved) { if (!moved) {
SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status); SCH_TASK_ERR_LOG(" task may already moved, status:%d", task->status);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -480,7 +480,6 @@ int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) { ...@@ -480,7 +480,6 @@ int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) {
int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
switch (msgType) { switch (msgType) {
case TDMT_VND_CREATE_TABLE_RSP: { case TDMT_VND_CREATE_TABLE_RSP: {
if (rspCode != TSDB_CODE_SUCCESS) { if (rspCode != TSDB_CODE_SUCCESS) {
...@@ -492,6 +491,8 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms ...@@ -492,6 +491,8 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms
goto _task_error; goto _task_error;
} }
} }
break;
} }
case TDMT_VND_SUBMIT_RSP: { case TDMT_VND_SUBMIT_RSP: {
if (rspCode != TSDB_CODE_SUCCESS) { if (rspCode != TSDB_CODE_SUCCESS) {
......
...@@ -252,7 +252,7 @@ LONG WINAPI FlCrashDump(PEXCEPTION_POINTERS ep) { ...@@ -252,7 +252,7 @@ LONG WINAPI FlCrashDump(PEXCEPTION_POINTERS ep) {
void taosSetCoreDump() { SetUnhandledExceptionFilter(&FlCrashDump); } void taosSetCoreDump() { SetUnhandledExceptionFilter(&FlCrashDump); }
int32_t taosGetSystemUid(char *uid, int32_t uidlen) { int32_t taosGetSystemUUID(char *uid, int32_t uidlen) {
GUID guid; GUID guid;
CoCreateGuid(&guid); CoCreateGuid(&guid);
...@@ -452,7 +452,7 @@ int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) { ...@@ -452,7 +452,7 @@ int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) {
} }
} }
int32_t taosGetSystemUid(char *uid, int32_t uidlen) { int32_t taosGetSystemUUID(char *uid, int32_t uidlen) {
uuid_t uuid = {0}; uuid_t uuid = {0};
uuid_generate(uuid); uuid_generate(uuid);
// it's caller's responsibility to make enough space for `uid`, that's 36-char + 1-null // it's caller's responsibility to make enough space for `uid`, that's 36-char + 1-null
...@@ -1070,7 +1070,7 @@ void taosSetCoreDump(bool enable) { ...@@ -1070,7 +1070,7 @@ void taosSetCoreDump(bool enable) {
#endif #endif
} }
int32_t taosGetSystemUid(char *uid, int32_t uidlen) { int32_t taosGetSystemUUID(char *uid, int32_t uidlen) {
int fd; int fd;
int len = 0; int len = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册