提交 92b6782d 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into feature/dnode3

......@@ -6,13 +6,17 @@
"dockerfile": "Dockerfile",
// Update 'VARIANT' to pick an Debian / Ubuntu OS version: debian-11, debian-10, debian-9, ubuntu-21.04, ubuntu-20.04, ubuntu-18.04
// Use Debian 11, Debian 9, Ubuntu 18.04 or Ubuntu 21.04 on local arm64/Apple Silicon
"args": { "VARIANT": "ubuntu-21.04" }
"args": {
"VARIANT": "ubuntu-21.04"
}
},
"runArgs": ["--cap-add=SYS_PTRACE", "--security-opt", "seccomp=unconfined"],
"runArgs": [
"--cap-add=SYS_PTRACE",
"--security-opt",
"seccomp=unconfined"
],
// Set *default* container specific settings.json values on container create.
"settings": {},
// Add the IDs of extensions you want installed when the container is created.
"extensions": [
"ms-vscode.cpptools",
......@@ -21,15 +25,13 @@
"visualstudioexptteam.vscodeintel",
"eamodio.gitlens",
"matepek.vscode-catch2-test-adapter",
"spmeesseman.vscode-taskexplorer"
"spmeesseman.vscode-taskexplorer",
"cschlosser.doxdocgen"
],
// Use 'forwardPorts' to make a list of ports inside the container available locally.
// "forwardPorts": [],
// Use 'postCreateCommand' to run commands after the container is created.
// "postCreateCommand": "gcc -v",
// Comment out connect as root instead. More info: https://aka.ms/vscode-remote/containers/non-root.
"remoteUser": "root"
}
}
\ No newline at end of file
......@@ -156,10 +156,6 @@ typedef struct {
uint16_t port;
} SEpAddr;
typedef struct {
int32_t numOfVnodes;
} SMsgDesc;
typedef struct SMsgHead {
int32_t contLen;
int32_t vgId;
......
......@@ -90,23 +90,23 @@ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void * pTransporter, cons
/**
* Force renew a table's local cached meta data.
* @param pCatalog (input, got with catalogGetHandle)
* @param pRpc (input, rpc object)
* @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pTableName (input, table name, NOT including db name)
* @return error code
*/
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName);
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName);
/**
* Force renew a table's local cached meta data and get the new one.
* @param pCatalog (input, got with catalogGetHandle)
* @param pRpc (input, rpc object)
* @param pTransporter (input, rpc object)
* @param pMgmtEps (input, mnode EPs)
* @param pTableName (input, table name, NOT including db name)
* @param pTableMeta(output, table meta data, NEED to free it by calller)
* @return error code
*/
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
/**
......
......@@ -154,14 +154,14 @@ typedef struct SVgDataBlocks {
char *pData; // SMsgDesc + SSubmitMsg + SSubmitBlk + ...
} SVgDataBlocks;
typedef struct SInsertStmtInfo {
typedef struct SVnodeModifOpStmtInfo {
int16_t nodeType;
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
int8_t schemaAttache; // denote if submit block is built with table schema or not
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
uint32_t insertType; // insert data from [file|sql statement| bound statement]
const char* sql; // current sql statement position
} SInsertStmtInfo;
} SVnodeModifOpStmtInfo;
typedef struct SDclStmtInfo {
int16_t nodeType;
......
......@@ -22,11 +22,13 @@ extern "C" {
#include <assert.h>
#include <ctype.h>
#include <dirent.h>
#include <errno.h>
#include <float.h>
#include <inttypes.h>
#include <locale.h>
#include <math.h>
#include <sched.h>
#include <setjmp.h>
#include <signal.h>
#include <stdarg.h>
......@@ -36,19 +38,14 @@ extern "C" {
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <wctype.h>
#include <wchar.h>
#include <sched.h>
#include <ctype.h>
#include <errno.h>
#include <float.h>
#include <math.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/utsname.h>
#include <dirent.h>
#include <unistd.h>
#include <wchar.h>
#include <wctype.h>
#include <sys/mman.h>
#include "osAtomic.h"
#include "osDef.h"
......
......@@ -54,7 +54,7 @@ bool taosGetSysMemory(float *memoryUsedMB);
void taosPrintOsInfo();
int taosSystem(const char *cmd);
void taosKillSystem();
int32_t taosGetSystemUid(char *uid, int32_t uidlen);
int32_t taosGetSystemUUID(char *uid, int32_t uidlen);
char * taosGetCmdlineByPID(int pid);
void taosSetCoreDump(bool enable);
......
......@@ -43,7 +43,7 @@ int32_t tfFsync(int64_t tfd);
bool tfValid(int64_t tfd);
int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence);
int32_t tfFtruncate(int64_t tfd, int64_t length);
void * tfMmapReadOnly(int64_t tfd, int64_t length);
#ifdef __cplusplus
}
#endif
......
......@@ -24,7 +24,7 @@ extern "C" {
#include "tlockfree.h"
typedef uint32_t (*_hash_fn_t)(const char *, uint32_t);
typedef int32_t (*_equal_fn_t)(const void*, const void*, uint32_t len);
typedef int32_t (*_equal_fn_t)(const void*, const void*, size_t len);
typedef void (*_hash_before_fn_t)(void *);
typedef void (*_hash_free_fn_t)(void *);
......
......@@ -53,8 +53,8 @@ static void registerRequest(SRequestObj* pRequest) {
int32_t total = atomic_add_fetch_32(&pSummary->totalRequests, 1);
int32_t currentInst = atomic_add_fetch_32(&pSummary->currentRequests, 1);
tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 ", current:%d, app current:%d, total:%d", pRequest->self,
pRequest->pTscObj->id, num, currentInst, total);
tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 ", current:%d, app current:%d, total:%d, reqId:0x%"PRIx64, pRequest->self,
pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
}
}
......@@ -419,17 +419,20 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
*+------------+-----+-----------+---------------+
* @return
*/
static int32_t requestSerialId = 0;
uint64_t generateRequestId() {
uint64_t hashId = 0;
char uid[64] = {0};
int32_t code = taosGetSystemUid(uid, tListLen(uid));
if (code != TSDB_CODE_SUCCESS) {
tscError("Failed to get the system uid to generated request id, reason:%s. use ip address instead", tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {
hashId = MurmurHash3_32(uid, strlen(uid));
static uint64_t hashId = 0;
static int32_t requestSerialId = 0;
if (hashId == 0) {
char uid[64] = {0};
int32_t code = taosGetSystemUUID(uid, tListLen(uid));
if (code != TSDB_CODE_SUCCESS) {
tscError("Failed to get the system uid to generated request id, reason:%s. use ip address instead",
tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {
hashId = MurmurHash3_32(uid, strlen(uid));
}
}
int64_t ts = taosGetTimestampUs();
......
......@@ -140,7 +140,7 @@ int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj*
(*pRequest)->sqlstr[sqlLen] = 0;
(*pRequest)->sqlLen = sqlLen;
tscDebugL("0x%"PRIx64" SQL: %s", (*pRequest)->requestId, (*pRequest)->sqlstr);
tscDebugL("0x%"PRIx64" SQL: %s, reqId:0x"PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
return TSDB_CODE_SUCCESS;
}
......@@ -203,7 +203,10 @@ int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQuery, SQueryDag** pDag) {
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) {
if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
return scheduleExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob, &pRequest->affectedRows);
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
int32_t code = scheduleExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, pJob, &res);
pRequest->affectedRows = res.numOfRows;
return res.code;
}
return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob);
......@@ -371,7 +374,7 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con
taos_close(pTscObj);
pTscObj = NULL;
} else {
tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p", pTscObj->id, pTscObj->connId, pTscObj->pTransporter);
tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p, reqId:0x%"PRIx64, pTscObj->id, pTscObj->connId, pTscObj->pTransporter, pRequest->requestId);
destroyRequest(pRequest);
}
......@@ -441,14 +444,13 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
* There is not response callback function for submit response.
* The actual inserted number of points is the first number.
*/
int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
if (pMsg->code == TSDB_CODE_SUCCESS) {
tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%" PRId64 " ms", pRequest->requestId,
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen,
pRequest->metric.rsp - pRequest->metric.start);
tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%"PRIx64, pRequest->self,
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
} else {
tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%" PRId64 " ms", pRequest->requestId,
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen,
pRequest->metric.rsp - pRequest->metric.start);
tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%"PRIx64, pRequest->self,
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
}
taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
......
......@@ -262,6 +262,8 @@ const char *taos_data_type(int type) {
const char *taos_get_client_info() { return version; }
int taos_affected_rows(TAOS_RES *res) { return 1; }
int taos_affected_rows(TAOS_RES *res) {
return ((SRequestObj*)res)->affectedRows;
}
int taos_result_precision(TAOS_RES *res) { return TSDB_TIME_PRECISION_MILLI; }
......@@ -57,95 +57,95 @@ TEST(testCase, connect_Test) {
taos_close(pConn);
}
//TEST(testCase, create_user_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'");
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
//
//TEST(testCase, create_account_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'");
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
//
//TEST(testCase, drop_account_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "drop account aabc");
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
TEST(testCase, create_user_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
//TEST(testCase, show_user_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "show users");
// TAOS_ROW pRow = NULL;
//
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0};
// while((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
//
// taos_close(pConn);
//}
TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'");
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
}
//TEST(testCase, drop_user_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, create_account_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'");
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, drop_account_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "drop account aabc");
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, show_user_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "show users");
TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0};
while((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str);
}
taos_close(pConn);
}
TEST(testCase, drop_user_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "drop user abc");
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, show_db_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "drop user abc");
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
//TEST(testCase, show_db_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
//// assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "show databases");
// TAOS_ROW pRow = NULL;
//
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0};
// while((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
//
// taos_close(pConn);
//}
TAOS_RES* pRes = taos_query(pConn, "show databases");
TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0};
while((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str);
}
taos_close(pConn);
}
TEST(testCase, create_db_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
......@@ -496,26 +496,31 @@ TEST(testCase, create_multiple_tables) {
}
taos_free_result(pRes);
// for(int32_t i = 0; i < 10000; ++i) {
// char sql[512] = {0};
// snprintf(sql, tListLen(sql), "create table t_x_%d using st1 tags(2)", i);
// TAOS_RES* pres = taos_query(pConn, sql);
// if (taos_errno(pres) != 0) {
// printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres));
// }
// taos_free_result(pres);
// }
taos_close(pConn);
}
TEST(testCase, generated_request_id_test) {
uint64_t id0 = generateRequestId();
uint64_t id1 = generateRequestId();
uint64_t id2 = generateRequestId();
uint64_t id3 = generateRequestId();
uint64_t id4 = generateRequestId();
ASSERT_NE(id0, id1);
ASSERT_NE(id1, id2);
ASSERT_NE(id2, id3);
ASSERT_NE(id4, id3);
ASSERT_NE(id0, id2);
ASSERT_NE(id0, id4);
ASSERT_NE(id0, id3);
// SHashObj *phash = taosHashInit()
SHashObj *phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
for(int32_t i = 0; i < 1000000; ++i) {
uint64_t v = generateRequestId();
void* result = taosHashGet(phash, &v, sizeof(v));
ASSERT_EQ(result, nullptr);
taosHashPut(phash, &v, sizeof(v), NULL, 0);
}
taosHashClear(phash);
}
//TEST(testCase, projection_query_tables) {
......
......@@ -145,7 +145,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
clusterObj.createdTime = taosGetTimestampMs();
clusterObj.updateTime = clusterObj.createdTime;
int32_t code = taosGetSystemUid(clusterObj.name, TSDB_CLUSTER_ID_LEN);
int32_t code = taosGetSystemUUID(clusterObj.name, TSDB_CLUSTER_ID_LEN);
if (code != 0) {
strcpy(clusterObj.name, "tdengine2.0");
mError("failed to get name from system, set to default val %s", clusterObj.name);
......
......@@ -86,7 +86,7 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
if (pTbCfg->type == META_SUPER_TABLE) {
nTagCols = pTbCfg->stbCfg.nTagCols;
pTagSchema = pTbCfg->stbCfg.pTagSchema;
} else if (pTbCfg->type == META_SUPER_TABLE) {
} else if (pTbCfg->type == META_CHILD_TABLE) {
nTagCols = pStbCfg->stbCfg.nTagCols;
pTagSchema = pStbCfg->stbCfg.pTagSchema;
} else {
......
......@@ -255,7 +255,7 @@ static int metaOpenBDBEnv(DB_ENV **ppEnv, const char *path) {
return -1;
}
ret = pEnv->open(pEnv, path, DB_CREATE | DB_INIT_MPOOL, 0);
ret = pEnv->open(pEnv, path, DB_CREATE | DB_INIT_CDB | DB_INIT_MPOOL, 0);
if (ret != 0) {
BDB_PERR("Failed to open META env", ret);
return -1;
......@@ -382,8 +382,8 @@ static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey
// Second key is the first tag
void *pTagVal = tdGetKVRowValOfCol(pTbCfg->ctbCfg.pTag, (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId);
pDbt[1].data = varDataVal(pTagVal);
pDbt[1].size = varDataLen(pTagVal);
pDbt[1].data = pTagVal;
pDbt[1].size = sizeof(int32_t);
// Set index key
memset(pSKey, 0, sizeof(*pSKey));
......
......@@ -675,33 +675,31 @@ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const
return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta);
}
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName) {
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName) {
if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
SVgroupInfo vgroupInfo = {0};
int32_t code = 0;
CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pTableName, &vgroupInfo));
CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo));
STableMetaOutput output = {0};
CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pRpc, pMgmtEps, pTableName, &vgroupInfo, &output));
CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &output));
//CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pTableName, &output));
CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output));
_return:
tfree(output.tbMeta);
CTG_RET(code);
}
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pTableName, true, pTableMeta);
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, true, pTableMeta);
}
int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList) {
......
......@@ -22,6 +22,8 @@ extern "C" {
#include "tfile.h"
//#define USE_MMAP 1
#define DefaultMem 1024 * 1024
static char tmpFile[] = "./index";
......@@ -39,6 +41,9 @@ typedef struct WriterCtx {
bool readOnly;
char buf[256];
int size;
#ifdef USE_MMAP
char* ptr;
#endif
} file;
struct {
int32_t capa;
......
......@@ -74,16 +74,15 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
// sIdx->cache = (void*)indexCacheCreate(sIdx);
sIdx->tindex = indexTFileCreate(path);
if (sIdx->tindex == NULL) { goto END; }
sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
sIdx->cVersion = 1;
sIdx->path = calloc(1, strlen(path) + 1);
memcpy(sIdx->path, path, strlen(path));
sIdx->path = tstrdup(path);
pthread_mutex_init(&sIdx->mtx, NULL);
*index = sIdx;
return 0;
#endif
END:
if (sIdx != NULL) { indexClose(sIdx); }
......@@ -310,18 +309,14 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
// Get col info
IndexCache* cache = NULL;
pthread_mutex_lock(&sIdx->mtx);
char buf[128] = {0};
ICacheKey key = {.suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName)};
int32_t sz = indexSerialCacheKey(&key, buf);
pthread_mutex_lock(&sIdx->mtx);
IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz);
if (pCache == NULL) {
pthread_mutex_unlock(&sIdx->mtx);
return -1;
}
cache = *pCache;
cache = (pCache == NULL) ? NULL : *pCache;
pthread_mutex_unlock(&sIdx->mtx);
*result = taosArrayInit(4, sizeof(uint64_t));
......@@ -329,7 +324,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
STermValueType s = kTypeValue;
if (0 == indexCacheSearch(cache, query, *result, &s)) {
if (s == kTypeDeletion) {
indexInfo("col: %s already drop by other opera", term->colName);
indexInfo("col: %s already drop by", term->colName);
// coloum already drop by other oper, no need to query tindex
return 0;
} else {
......@@ -402,11 +397,11 @@ static void indexDestroyTempResult(SArray* result) {
}
int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
if (sIdx == NULL) { return -1; }
indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
indexInfo("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
IndexCache* pCache = (IndexCache*)cache;
TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName);
if (pReader == NULL) { indexWarn("empty pReader found"); }
if (pReader == NULL) { indexWarn("empty tfile reader found"); }
// handle flush
Iterate* cacheIter = indexCacheIteratorCreate(pCache);
Iterate* tfileIter = tfileIteratorCreate(pReader);
......@@ -504,21 +499,20 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
tfileWriterClose(tw);
TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName);
if (reader == NULL) { goto END; }
char buf[128] = {0};
TFileHeader* header = &reader->header;
ICacheKey key = {
.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType};
pthread_mutex_lock(&sIdx->mtx);
IndexTFile* ifile = (IndexTFile*)sIdx->tindex;
tfileCachePut(ifile->cache, &key, reader);
pthread_mutex_unlock(&sIdx->mtx);
return ret;
END:
tfileWriterClose(tw);
return -1;
}
int32_t indexSerialCacheKey(ICacheKey* key, char* buf) {
......
......@@ -20,7 +20,7 @@
#define MAX_INDEX_KEY_LEN 256 // test only, change later
#define MEM_TERM_LIMIT 5 * 10000
#define MEM_TERM_LIMIT 10 * 10000
// ref index_cache.h:22
//#define CACHE_KEY_LEN(p) \
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) +
......@@ -261,7 +261,7 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SA
return 0;
}
int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) {
if (cache == NULL) { return -1; }
if (cache == NULL) { return 0; }
IndexCache* pCache = cache;
MemTable *mem = NULL, *imm = NULL;
......
......@@ -18,7 +18,7 @@
#include "tutil.h"
static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len) {
if (ctx->offset + len > ctx->limit) { return -1; }
// if (ctx->offset + len > ctx->limit) { return -1; }
if (ctx->type == TFile) {
assert(len == tfWrite(ctx->file.fd, buf, len));
......@@ -31,7 +31,12 @@ static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len) {
static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len) {
int nRead = 0;
if (ctx->type == TFile) {
#ifdef USE_MMAP
nRead = len < ctx->file.size ? len : ctx->file.size;
memcpy(buf, ctx->file.ptr, nRead);
#else
nRead = tfRead(ctx->file.fd, buf, len);
#endif
} else {
memcpy(buf, ctx->mem.buf + ctx->offset, len);
}
......@@ -43,7 +48,13 @@ static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t off
int nRead = 0;
if (ctx->type == TFile) {
// tfLseek(ctx->file.fd, offset, 0);
#ifdef USE_MMAP
int32_t last = ctx->file.size - offset;
nRead = last >= len ? len : last;
memcpy(buf, ctx->file.ptr + offset, nRead);
#else
nRead = tfPread(ctx->file.fd, buf, len, offset);
#endif
} else {
// refactor later
assert(0);
......@@ -83,6 +94,9 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int
struct stat fstat;
stat(path, &fstat);
ctx->file.size = fstat.st_size;
#ifdef USE_MMAP
ctx->file.ptr = (char*)tfMmapReadOnly(ctx->file.fd, ctx->file.size);
#endif
}
memcpy(ctx->file.buf, path, strlen(path));
if (ctx->file.fd < 0) {
......@@ -112,7 +126,11 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) {
free(ctx->mem.buf);
} else {
tfClose(ctx->file.fd);
ctx->flush(ctx);
if (ctx->file.readOnly) {
#ifdef USE_MMAP
munmap(ctx->file.ptr, ctx->file.size);
#endif
}
if (remove) { unlink(ctx->file.buf); }
}
free(ctx);
......
......@@ -67,29 +67,18 @@ TFileCache* tfileCacheCreate(const char* path) {
for (size_t i = 0; i < taosArrayGetSize(files); i++) {
char* file = taosArrayGetP(files, i);
// refactor later, use colname and version info
char colName[256] = {0};
if (0 != tfileParseFileName(file, &suid, colName, (int*)&version)) {
indexInfo("try parse invalid file: %s, skip it", file);
continue;
}
char fullName[256] = {0};
sprintf(fullName, "%s/%s", path, file);
WriterCtx* wc = writerCtxCreate(TFile, fullName, true, 1024 * 1024 * 64);
WriterCtx* wc = writerCtxCreate(TFile, file, true, 1024 * 1024 * 64);
if (wc == NULL) {
indexError("failed to open index:%s", file);
goto End;
}
char buf[128] = {0};
TFileReader* reader = tfileReaderCreate(wc);
if (reader == NULL) { goto End; }
TFileHeader* header = &reader->header;
ICacheKey key = {.suid = header->suid,
.colName = header->colName,
.nColName = strlen(header->colName),
.colType = header->colType};
char buf[128] = {0};
ICacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = strlen(header->colName)};
int32_t sz = indexSerialCacheKey(&key, buf);
assert(sz < sizeof(buf));
......@@ -256,7 +245,8 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
// sort by coltype and write to tindex
if (order == false) {
__compar_fn_t fn;
int8_t colType = tw->header.colType;
int8_t colType = tw->header.colType;
if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) {
fn = tfileStrCompare;
} else {
......@@ -274,7 +264,8 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
// ugly code, refactor later
for (size_t i = 0; i < sz; i++) {
TFileValue* v = taosArrayGetP((SArray*)data, i);
// taosArrayRemoveDuplicate(v->tablId, tfileUidCompare, NULL);
taosArraySort(v->tableId, tfileUidCompare);
taosArrayRemoveDuplicate(v->tableId, tfileUidCompare, NULL);
int32_t tbsz = taosArrayGetSize(v->tableId);
fstOffset += TF_TABLE_TATOAL_SIZE(tbsz);
}
......@@ -351,10 +342,16 @@ void tfileWriterDestroy(TFileWriter* tw) {
}
IndexTFile* indexTFileCreate(const char* path) {
TFileCache* cache = tfileCacheCreate(path);
if (cache == NULL) { return NULL; }
IndexTFile* tfile = calloc(1, sizeof(IndexTFile));
if (tfile == NULL) { return NULL; }
if (tfile == NULL) {
tfileCacheDestroy(cache);
return NULL;
}
tfile->cache = tfileCacheCreate(path);
tfile->cache = cache;
return tfile;
}
void indexTFileDestroy(IndexTFile* tfile) {
......@@ -366,6 +363,7 @@ void indexTFileDestroy(IndexTFile* tfile) {
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) {
int ret = -1;
if (tfile == NULL) { return ret; }
IndexTFile* pTfile = (IndexTFile*)tfile;
SIndexTerm* term = query->term;
......@@ -545,12 +543,11 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0);
if (nread == -1) {
//
indexError("actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf),
errno, reader->ctx->file.fd, reader->ctx->file.buf);
} else {
indexError("actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf),
errno, reader->ctx->file.fd, reader->ctx->file.buf);
indexInfo("actual Read: %d, to read: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf),
reader->ctx->file.fd, reader->ctx->file.buf);
}
// assert(nread == sizeof(buf));
memcpy(&reader->header, buf, sizeof(buf));
......@@ -566,7 +563,8 @@ static int tfileReaderLoadFst(TFileReader* reader) {
WriterCtx* ctx = reader->ctx;
int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset);
indexError("nread = %d, and fst offset=%d, filename: %s ", nread, reader->header.fstOffset, ctx->file.buf);
indexInfo("nread = %d, and fst offset=%d, filename: %s, size: %d ", nread, reader->header.fstOffset, ctx->file.buf,
ctx->file.size);
// we assuse fst size less than FST_MAX_SIZE
assert(nread > 0 && nread < FST_MAX_SIZE);
......@@ -613,15 +611,20 @@ void tfileReaderUnRef(TFileReader* reader) {
static SArray* tfileGetFileList(const char* path) {
SArray* files = taosArrayInit(4, sizeof(void*));
char buf[128] = {0};
uint64_t suid;
uint32_t version;
DIR* dir = opendir(path);
if (NULL == dir) { return NULL; }
struct dirent* entry;
while ((entry = readdir(dir)) != NULL) {
if (entry->d_type && DT_DIR) { continue; }
size_t len = strlen(entry->d_name);
char* buf = calloc(1, len + 1);
memcpy(buf, entry->d_name, len);
char* file = entry->d_name;
if (0 != tfileParseFileName(file, &suid, buf, &version)) { continue; }
size_t len = strlen(path) + 1 + strlen(file) + 1;
char* buf = calloc(1, len);
sprintf(buf, "%s/%s", path, file);
taosArrayPush(files, &buf);
}
closedir(dir);
......
......@@ -28,7 +28,7 @@
#include "tutil.h"
using namespace std;
#define NUM_OF_THREAD 10
#define NUM_OF_THREAD 5
class DebugInfo {
public:
......@@ -679,6 +679,17 @@ class IndexObj {
}
return numOfTable;
}
int ReadMultiMillonData(const std::string& colName, const std::string& colVal = "Hello world",
size_t numOfTable = 100 * 10000) {
std::string tColVal = colVal;
int colValSize = tColVal.size();
for (int i = 0; i < numOfTable; i++) {
tColVal[i % colValSize] = 'a' + i % 26;
SearchOne(colName, tColVal);
}
return 0;
}
int Put(SIndexMultiTerm* fvs, uint64_t uid) {
numOfWrite += taosArrayGetSize(fvs);
......@@ -701,7 +712,8 @@ class IndexObj {
int64_t s = taosGetTimestampUs();
if (Search(mq, result) == 0) {
int64_t e = taosGetTimestampUs();
std::cout << "search one successfully and time cost:" << e - s << std::endl;
std::cout << "search and time cost:" << e - s << "\tquery col:" << colName << "\t val: " << colVal
<< "\t size:" << taosArrayGetSize(result) << std::endl;
} else {
}
int sz = taosArrayGetSize(result);
......@@ -710,6 +722,31 @@ class IndexObj {
return sz;
// assert(taosArrayGetSize(result) == targetSize);
}
int SearchOneTarget(const std::string& colName, const std::string& colVal, uint64_t val) {
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
indexMultiTermQueryAdd(mq, term, QUERY_TERM);
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
int64_t s = taosGetTimestampUs();
if (Search(mq, result) == 0) {
int64_t e = taosGetTimestampUs();
std::cout << "search one successfully and time cost:" << e - s << "\tquery col:" << colName
<< "\t val: " << colVal << "\t size:" << taosArrayGetSize(result) << std::endl;
} else {
}
int sz = taosArrayGetSize(result);
indexMultiTermQueryDestroy(mq);
taosArrayDestroy(result);
assert(sz == 1);
uint64_t* ret = (uint64_t*)taosArrayGet(result, 0);
assert(val = *ret);
return sz;
}
void PutOne(const std::string& colName, const std::string& colVal) {
SIndexMultiTerm* terms = indexMultiTermCreate();
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
......@@ -718,6 +755,14 @@ class IndexObj {
Put(terms, 10);
indexMultiTermDestroy(terms);
}
void PutOneTarge(const std::string& colName, const std::string& colVal, uint64_t val) {
SIndexMultiTerm* terms = indexMultiTermCreate();
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
indexMultiTermAdd(terms, term);
Put(terms, val);
indexMultiTermDestroy(terms);
}
void Debug() {
std::cout << "numOfWrite:" << numOfWrite << std::endl;
std::cout << "numOfRead:" << numOfRead << std::endl;
......@@ -830,15 +875,15 @@ TEST_F(IndexEnv2, testIndex_TrigeFlush) {
assert(numOfTable == target);
}
static void write_and_search(IndexObj* idx) {
std::string colName("tag1"), colVal("Hello");
static void single_write_and_search(IndexObj* idx) {
int target = idx->SearchOne("tag1", "Hello");
std::cout << "search: " << target << std::endl;
target = idx->SearchOne("tag2", "Test");
std::cout << "search: " << target << std::endl;
idx->PutOne(colName, colVal);
}
static void multi_write_and_search(IndexObj* idx) {
int target = idx->SearchOne("tag1", "Hello");
target = idx->SearchOne("tag2", "Test");
idx->WriteMultiMillonData("tag1", "Hello", 100 * 10000);
idx->WriteMultiMillonData("tag2", "Test", 100 * 10000);
}
TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
std::string path = "/tmp/cache_and_tfile";
......@@ -847,13 +892,27 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
}
index->PutOne("tag1", "Hello");
index->PutOne("tag2", "Test");
index->WriteMultiMillonData("tag1", "Hello", 50 * 10000);
index->WriteMultiMillonData("tag2", "Test", 50 * 10000);
index->WriteMultiMillonData("tag1", "Hello", 100 * 10000);
index->WriteMultiMillonData("tag2", "Test", 100 * 10000);
std::thread threads[NUM_OF_THREAD];
for (int i = 0; i < NUM_OF_THREAD; i++) {
//
threads[i] = std::thread(write_and_search, index);
threads[i] = std::thread(single_write_and_search, index);
}
for (int i = 0; i < NUM_OF_THREAD; i++) {
// TOD
threads[i].join();
}
}
TEST_F(IndexEnv2, testIndex_MultiWrite_and_MultiRead) {
std::string path = "/tmp/cache_and_tfile";
if (index->Init(path) != 0) {}
std::thread threads[NUM_OF_THREAD];
for (int i = 0; i < NUM_OF_THREAD; i++) {
//
threads[i] = std::thread(multi_write_and_search, index);
}
for (int i = 0; i < NUM_OF_THREAD; i++) {
// TOD
......@@ -862,15 +921,33 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
}
TEST_F(IndexEnv2, testIndex_restart) {
std::string path = "/tmp/test1";
std::string path = "/tmp/cache_and_tfile";
if (index->Init(path) != 0) {}
index->SearchOneTarget("tag1", "Hello", 10);
index->SearchOneTarget("tag2", "Test", 10);
}
TEST_F(IndexEnv2, testIndex_performance) {
std::string path = "/tmp/test2";
TEST_F(IndexEnv2, testIndex_read_performance) {
std::string path = "/tmp/cache_and_tfile";
if (index->Init(path) != 0) {}
index->PutOneTarge("tag1", "Hello", 12);
index->PutOneTarge("tag1", "Hello", 15);
index->ReadMultiMillonData("tag1", "Hello");
std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl;
assert(3 == index->SearchOne("tag1", "Hello"));
}
TEST_F(IndexEnv2, testIndexMultiTag) {
std::string path = "/tmp/test3";
std::string path = "/tmp/multi_tag";
if (index->Init(path) != 0) {}
index->WriteMultiMillonData("tag1", "Hello", 100 * 10000);
index->WriteMultiMillonData("tag2", "Test", 100 * 10000);
index->WriteMultiMillonData("tag3", "Test", 100 * 10000);
index->WriteMultiMillonData("tag4", "Test", 100 * 10000);
}
TEST_F(IndexEnv2, testLongComVal) {
std::string path = "/tmp/long_colVal";
if (index->Init(path) != 0) {}
// gen colVal by randstr
std::string randstr = "xxxxxxxxxxxxxxxxx";
index->WriteMultiMillonData("tag1", randstr, 100 * 10000);
}
......@@ -126,7 +126,7 @@ static FORCE_INLINE void getMemRowAppendInfo(SSchema *pSchema, uint8_t memRowTyp
int32_t idx, int32_t *toffset) {
int32_t schemaIdx = 0;
if (IS_DATA_COL_ORDERED(spd)) {
schemaIdx = spd->boundedColumns[idx];
schemaIdx = spd->boundedColumns[idx] - 1;
if (isDataRowT(memRowType)) {
*toffset = (spd->cols + schemaIdx)->toffset; // the offset of firstPart
} else {
......
......@@ -22,7 +22,7 @@ extern "C" {
#include "parser.h"
int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo);
int32_t parseInsertSql(SParseContext* pContext, SVnodeModifOpStmtInfo** pInfo);
#ifdef __cplusplus
}
......
......@@ -70,7 +70,7 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pSqlInfo, SQ
*/
SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen);
SInsertStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen);
SVnodeModifOpStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen);
/**
* Evaluate the numeric and timestamp arithmetic expression in the WHERE clause.
......
......@@ -326,6 +326,31 @@ typedef struct SVgroupTablesBatch {
SVgroupInfo info;
} SVgroupTablesBatch;
static int32_t doParseSerializeTagValue(SSchema* pTagSchema, int32_t numOfInputTag, SKVRowBuilder* pKvRowBuilder,
SArray* pTagValList, int32_t tsPrecision, SMsgBuf* pMsgBuf) {
const char* msg1 = "illegal value or data overflow";
int32_t code = TSDB_CODE_SUCCESS;
for (int32_t i = 0; i < numOfInputTag; ++i) {
SSchema* pSchema = &pTagSchema[i];
char* endPtr = NULL;
char tmpTokenBuf[TSDB_MAX_TAGS_LEN] = {0};
SKvParam param = {.builder = pKvRowBuilder, .schema = pSchema};
SToken* pItem = taosArrayGet(pTagValList, i);
code = parseValueToken(&endPtr, pItem, pSchema, tsPrecision, tmpTokenBuf, KvRowAppend, &param, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
tdDestroyKVRowBuilder(pKvRowBuilder);
return buildInvalidOperationMsg(pMsgBuf, msg1);
}
}
return code;
}
int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len) {
const char* msg1 = "invalid table name";
const char* msg2 = "tags number not matched";
......@@ -354,10 +379,9 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
}
SArray* pValList = pCreateTableInfo->pTagVals;
size_t numOfInputTag = taosArrayGetSize(pValList);
size_t numOfInputTag = taosArrayGetSize(pValList);
STableMeta* pSuperTableMeta = NULL;
code = catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &name, &pSuperTableMeta);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -463,21 +487,9 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
return buildInvalidOperationMsg(pMsgBuf, msg2);
}
for (int32_t i = 0; i < numOfInputTag; ++i) {
SSchema* pSchema = &pTagSchema[i];
char* endPtr = NULL;
char tmpTokenBuf[TSDB_MAX_TAGS_LEN] = {0};
SKvParam param = {.builder = &kvRowBuilder, .schema = pSchema};
SToken* pItem = taosArrayGet(pValList, i);
code = parseValueToken(&endPtr, pItem, pSchema, tinfo.precision, tmpTokenBuf, KvRowAppend, &param, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
tdDestroyKVRowBuilder(&kvRowBuilder);
return buildInvalidOperationMsg(pMsgBuf, msg4);
}
code = doParseSerializeTagValue(pTagSchema, numOfInputTag, &kvRowBuilder, pValList, tinfo.precision, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
......@@ -499,9 +511,9 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &tableName, &info);
struct SVCreateTbReq req = {0};
req.type = TD_CHILD_TABLE;
req.name = strdup(tNameGetTableName(&tableName));
req.ctbCfg.suid = pSuperTableMeta->suid;
req.type = TD_CHILD_TABLE;
req.name = strdup(tNameGetTableName(&tableName));
req.ctbCfg.suid = pSuperTableMeta->uid;
req.ctbCfg.pTag = row;
SVgroupTablesBatch* pTableBatch = taosHashGet(pVgroupHashmap, &info.vgId, sizeof(info.vgId));
......@@ -548,11 +560,12 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
taosArrayPush(pBufArray, &pVgData);
} while (true);
SInsertStmtInfo* pStmtInfo = calloc(1, sizeof(SInsertStmtInfo));
pStmtInfo->nodeType = TSDB_SQL_CREATE_TABLE;
SVnodeModifOpStmtInfo* pStmtInfo = calloc(1, sizeof(SVnodeModifOpStmtInfo));
pStmtInfo->nodeType = TSDB_SQL_CREATE_TABLE;
pStmtInfo->pDataBlocks = pBufArray;
*pOutput = pStmtInfo;
*len = sizeof(SInsertStmtInfo);
*pOutput = (char*) pStmtInfo;
*len = sizeof(SVnodeModifOpStmtInfo);
return TSDB_CODE_SUCCESS;
}
......@@ -823,14 +836,14 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, c
return NULL;
}
SInsertStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) {
SVnodeModifOpStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) {
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
assert(pCreateTable->type == TSQL_CREATE_CTABLE);
SMsgBuf m = {.buf = msgBuf, .len = msgBufLen};
SMsgBuf* pMsgBuf = &m;
SInsertStmtInfo* pInsertStmt = NULL;
SVnodeModifOpStmtInfo* pInsertStmt = NULL;
int32_t msgLen = 0;
int32_t code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, (char**) &pInsertStmt, &msgLen);
......
......@@ -467,7 +467,7 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SB
}
int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, SArray** pVgDataBlocks) {
const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
const int INSERT_HEAD_SIZE = sizeof(SSubmitMsg);
int code = 0;
bool isRawPayload = IS_RAW_PAYLOAD(payloadType);
SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
......
......@@ -61,7 +61,7 @@ typedef struct SInsertParseContext {
SArray* pTableDataBlocks; // global
SArray* pVgDataBlocks; // global
int32_t totalNum;
SInsertStmtInfo* pOutput;
SVnodeModifOpStmtInfo* pOutput;
} SInsertParseContext;
static int32_t skipInsertInto(SInsertParseContext* pCxt) {
......@@ -106,6 +106,7 @@ static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) {
SVgroupInfo vg;
CHECK_CODE(catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg));
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
pCxt->pTableMeta->vgId = vg.vgId; // todo remove
return TSDB_CODE_SUCCESS;
}
......@@ -120,11 +121,9 @@ static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pS
}
static void buildMsgHeader(SVgDataBlocks* blocks) {
SMsgDesc* desc = (SMsgDesc*)blocks->pData;
desc->numOfVnodes = htonl(1);
SSubmitMsg* submit = (SSubmitMsg*)(desc + 1);
SSubmitMsg* submit = (SSubmitMsg*)blocks->pData;
submit->header.vgId = htonl(blocks->vg.vgId);
submit->header.contLen = htonl(blocks->size - sizeof(SMsgDesc));
submit->header.contLen = htonl(blocks->size);
submit->length = submit->header.contLen;
submit->numOfBlocks = htonl(blocks->numOfTables);
SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
......@@ -425,7 +424,7 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks,
// 1. set the parsed value from sql string
for (int i = 0; i < spd->numOfBound; ++i) {
NEXT_TOKEN(pCxt->pSql, sToken);
SSchema *pSchema = &schema[spd->boundedColumns[i]];
SSchema *pSchema = &schema[spd->boundedColumns[i] - 1];
param.schema = pSchema;
param.compareStat = pBuilder->compareStat;
getMemRowAppendInfo(schema, pBuilder->memRowType, spd, i, &param.toffset);
......@@ -611,7 +610,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
// [(field1_name, ...)]
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
// [...];
int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) {
int32_t parseInsertSql(SParseContext* pContext, SVnodeModifOpStmtInfo** pInfo) {
SInsertParseContext context = {
.pComCxt = pContext,
.pSql = (char*) pContext->pSql,
......@@ -620,7 +619,7 @@ int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) {
.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false),
.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false),
.totalNum = 0,
.pOutput = calloc(1, sizeof(SInsertStmtInfo))
.pOutput = calloc(1, sizeof(SVnodeModifOpStmtInfo))
};
if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pOutput) {
......
......@@ -53,7 +53,7 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
}
if (toVnode) {
SInsertStmtInfo *pInsertInfo = qParserValidateCreateTbSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen);
SVnodeModifOpStmtInfo *pInsertInfo = qParserValidateCreateTbSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen);
if (pInsertInfo == NULL) {
return terrno;
}
......@@ -87,7 +87,7 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
int32_t qParseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
if (isInsertSql(pCxt->pSql, pCxt->sqlLen)) {
return parseInsertSql(pCxt, (SInsertStmtInfo**)pQuery);
return parseInsertSql(pCxt, (SVnodeModifOpStmtInfo**)pQuery);
} else {
return parseQuerySql(pCxt, pQuery);
}
......
......@@ -60,7 +60,7 @@ protected:
return code_;
}
SInsertStmtInfo* reslut() {
SVnodeModifOpStmtInfo* reslut() {
return res_;
}
......@@ -70,9 +70,7 @@ protected:
for (size_t i = 0; i < num; ++i) {
SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(res_->pDataBlocks, i);
cout << "vgId:" << vg->vg.vgId << ", numOfTables:" << vg->numOfTables << ", dataSize:" << vg->size << endl;
SMsgDesc* desc = (SMsgDesc*)(vg->pData);
cout << "numOfVnodes:" << ntohl(desc->numOfVnodes) << endl;
SSubmitMsg* submit = (SSubmitMsg*)(desc + 1);
SSubmitMsg* submit = (SSubmitMsg*)vg->pData;
cout << "length:" << ntohl(submit->length) << ", numOfBlocks:" << ntohl(submit->numOfBlocks) << endl;
int32_t numOfBlocks = ntohl(submit->numOfBlocks);
SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
......@@ -95,9 +93,7 @@ protected:
SVgDataBlocks* vg = (SVgDataBlocks*)taosArrayGetP(res_->pDataBlocks, i);
ASSERT_EQ(vg->numOfTables, numOfTables);
ASSERT_GE(vg->size, 0);
SMsgDesc* desc = (SMsgDesc*)(vg->pData);
ASSERT_EQ(ntohl(desc->numOfVnodes), 1);
SSubmitMsg* submit = (SSubmitMsg*)(desc + 1);
SSubmitMsg* submit = (SSubmitMsg*)vg->pData;
ASSERT_GE(ntohl(submit->length), 0);
ASSERT_GE(ntohl(submit->numOfBlocks), 0);
int32_t numOfBlocks = ntohl(submit->numOfBlocks);
......@@ -128,7 +124,7 @@ private:
char sqlBuf_[max_sql_len];
SParseContext cxt_;
int32_t code_;
SInsertStmtInfo* res_;
SVnodeModifOpStmtInfo* res_;
};
// INSERT INTO tb_name VALUES (field1_value, ...)
......
......@@ -27,8 +27,8 @@ std::unique_ptr<MockCatalogService> mockCatalogService;
class TableBuilder : public ITableBuilder {
public:
virtual TableBuilder& addColumn(const std::string& name, int8_t type, int32_t bytes) {
assert(colId_ < schema()->tableInfo.numOfTags + schema()->tableInfo.numOfColumns);
SSchema* col = schema()->schema + colId_;
assert(colId_ <= schema()->tableInfo.numOfTags + schema()->tableInfo.numOfColumns);
SSchema* col = schema()->schema + (colId_ - 1);
col->type = type;
col->colId = colId_++;
col->bytes = bytes;
......@@ -66,7 +66,7 @@ private:
return std::unique_ptr<TableBuilder>(new TableBuilder(meta));
}
TableBuilder(STableMeta* schemaMeta) : colId_(0), rowsize_(0), meta_(new MockTableMeta()) {
TableBuilder(STableMeta* schemaMeta) : colId_(1), rowsize_(0), meta_(new MockTableMeta()) {
meta_->schema.reset(schemaMeta);
}
......
......@@ -38,7 +38,7 @@ int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode) {
}
static int32_t createModificationOpPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) {
SInsertStmtInfo* pInsert = (SInsertStmtInfo*)pNode;
SVnodeModifOpStmtInfo* pInsert = (SVnodeModifOpStmtInfo*)pNode;
*pQueryPlan = calloc(1, sizeof(SQueryPlanNode));
SArray* blocks = taosArrayInit(taosArrayGetSize(pInsert->pDataBlocks), POINTER_BYTES);
......
......@@ -33,7 +33,7 @@ protected:
void pushScan(const string& db, const string& table, int32_t scanOp) {
shared_ptr<MockTableMeta> meta = mockCatalogService->getTableMeta(db, table);
EXPECT_TRUE(meta);
unique_ptr<SQueryPlanNode> scan((SQueryPlanNode*)calloc(1, sizeof(SQueryPlanNode)));
unique_ptr<SQueryPlanNode> scan((SQueryPlanNode*)myCalloc(1, sizeof(SQueryPlanNode)));
scan->info.type = scanOp;
scan->numOfCols = meta->schema->tableInfo.numOfColumns;
scan->pSchema = (SSchema*)myCalloc(1, sizeof(SSchema) * scan->numOfCols);
......
......@@ -43,15 +43,12 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3
bMsg->header.vgId = htonl(bInput->vgId);
if (bInput->dbName) {
strncpy(bMsg->dbFname, bInput->dbName, sizeof(bMsg->dbFname));
bMsg->dbFname[sizeof(bMsg->dbFname) - 1] = 0;
tstrncpy(bMsg->dbFname, bInput->dbName, tListLen(bMsg->dbFname));
}
strncpy(bMsg->tableFname, bInput->tableFullName, sizeof(bMsg->tableFname));
bMsg->tableFname[sizeof(bMsg->tableFname) - 1] = 0;
tstrncpy(bMsg->tableFname, bInput->tableFullName, tListLen(bMsg->tableFname));
*msgLen = (int32_t)sizeof(*bMsg);
return TSDB_CODE_SUCCESS;
}
......@@ -211,7 +208,7 @@ int32_t queryCreateTableMetaFromMsg(STableMetaMsg* msg, bool isSuperTable, STabl
pTableMeta->vgId = isSuperTable ? 0 : msg->vgId;
pTableMeta->tableType = isSuperTable ? TSDB_SUPER_TABLE : msg->tableType;
pTableMeta->uid = msg->suid;
pTableMeta->uid = msg->tuid;
pTableMeta->suid = msg->suid;
pTableMeta->sversion = msg->sversion;
pTableMeta->tversion = msg->tversion;
......@@ -250,8 +247,8 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
pOut->metaNum = 2;
if (pMetaMsg->dbFname[0]) {
snprintf(pOut->ctbFname, "%s.%s", pMetaMsg->dbFname, pMetaMsg->tbFname);
snprintf(pOut->tbFname, "%s.%s", pMetaMsg->dbFname, pMetaMsg->stbFname);
snprintf(pOut->ctbFname, sizeof(pOut->ctbFname), "%s.%s", pMetaMsg->dbFname, pMetaMsg->tbFname);
snprintf(pOut->tbFname, sizeof(pOut->tbFname), "%s.%s", pMetaMsg->dbFname, pMetaMsg->stbFname);
} else {
memcpy(pOut->ctbFname, pMetaMsg->tbFname, sizeof(pOut->ctbFname));
memcpy(pOut->tbFname, pMetaMsg->stbFname, sizeof(pOut->tbFname));
......@@ -272,7 +269,7 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
memcpy(pOut->tbFname, pMetaMsg->tbFname, sizeof(pOut->tbFname));
}
code = queryCreateTableMetaFromMsg(pMetaMsg, false, &pOut->tbMeta);
code = queryCreateTableMetaFromMsg(pMetaMsg, (pMetaMsg->tableType == TSDB_SUPER_TABLE), &pOut->tbMeta);
}
return code;
......
......@@ -372,7 +372,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) {
SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved));
if (!moved) {
SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status);
SCH_TASK_ERR_LOG(" task may already moved, status:%d", task->status);
return TSDB_CODE_SUCCESS;
}
......@@ -480,7 +480,6 @@ int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) {
int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) {
int32_t code = 0;
switch (msgType) {
case TDMT_VND_CREATE_TABLE_RSP: {
if (rspCode != TSDB_CODE_SUCCESS) {
......@@ -492,6 +491,8 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms
goto _task_error;
}
}
break;
}
case TDMT_VND_SUBMIT_RSP: {
if (rspCode != TSDB_CODE_SUCCESS) {
......
......@@ -1096,13 +1096,16 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
SRpcReqContext *pContext;
pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext);
char ipstr[24] = {0};
taosIpPort2String(pRecv->ip, pRecv->port, ipstr);
if (TMSG_INDEX(pHead->msgType) >= 1 && TMSG_INDEX(pHead->msgType) < TDMT_MAX) {
tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label,
pConn, (void *)pHead->ahandle, TMSG_INFO(pHead->msgType), pRecv->ip, pRecv->port, terrno, pRecv->msgLen,
tDebug("%s %p %p, %s received from %s, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label,
pConn, (void *)pHead->ahandle, TMSG_INFO(pHead->msgType), ipstr, terrno, pRecv->msgLen,
pHead->sourceId, pHead->destId, pHead->tranId, pHead->code);
} else {
tDebug("%s %p %p, %d received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label,
pConn, (void *)pHead->ahandle, pHead->msgType, pRecv->ip, pRecv->port, terrno, pRecv->msgLen,
tDebug("%s %p %p, %d received from %s, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label,
pConn, (void *)pHead->ahandle, pHead->msgType, ipstr, terrno, pRecv->msgLen,
pHead->sourceId, pHead->destId, pHead->tranId, pHead->code);
}
......
......@@ -252,7 +252,7 @@ LONG WINAPI FlCrashDump(PEXCEPTION_POINTERS ep) {
void taosSetCoreDump() { SetUnhandledExceptionFilter(&FlCrashDump); }
int32_t taosGetSystemUid(char *uid, int32_t uidlen) {
int32_t taosGetSystemUUID(char *uid, int32_t uidlen) {
GUID guid;
CoCreateGuid(&guid);
......@@ -452,7 +452,7 @@ int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) {
}
}
int32_t taosGetSystemUid(char *uid, int32_t uidlen) {
int32_t taosGetSystemUUID(char *uid, int32_t uidlen) {
uuid_t uuid = {0};
uuid_generate(uuid);
// it's caller's responsibility to make enough space for `uid`, that's 36-char + 1-null
......@@ -1070,7 +1070,7 @@ void taosSetCoreDump(bool enable) {
#endif
}
int32_t taosGetSystemUid(char *uid, int32_t uidlen) {
int32_t taosGetSystemUUID(char *uid, int32_t uidlen) {
int fd;
int len = 0;
......
......@@ -158,3 +158,13 @@ int32_t tfFtruncate(int64_t tfd, int64_t length) {
taosReleaseRef(tsFileRsetId, tfd);
return code;
}
void *tfMmapReadOnly(int64_t tfd, int64_t length) {
void *p = taosAcquireRef(tsFileRsetId, tfd);
if (p == NULL) return NULL;
int32_t fd = (int32_t)(uintptr_t)p;
void *ptr = mmap(NULL, length, PROT_READ, MAP_SHARED, fd, 0);
taosReleaseRef(tsFileRsetId, tfd);
return ptr;
}
......@@ -28,13 +28,13 @@
tfree(_n); \
} while (0)
#define FREE_HASH_NODE(_h, _n) \
do { \
if ((_h)->freeFp) { \
#define FREE_HASH_NODE(_h, _n) \
do { \
if ((_h)->freeFp) { \
(_h)->freeFp(GET_HASH_NODE_DATA(_n)); \
} \
\
DO_FREE_HASH_NODE(_n); \
} \
\
DO_FREE_HASH_NODE(_n); \
} while (0);
static FORCE_INLINE void __wr_lock(void *lock, int32_t type) {
......@@ -55,7 +55,6 @@ static FORCE_INLINE void __rd_unlock(void *lock, int32_t type) {
if (type == HASH_NO_LOCK) {
return;
}
taosRUnLockLatch(lock);
}
......@@ -63,7 +62,6 @@ static FORCE_INLINE void __wr_unlock(void *lock, int32_t type) {
if (type == HASH_NO_LOCK) {
return;
}
taosWUnLockLatch(lock);
}
......@@ -225,12 +223,12 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
// need the resize process, write lock applied
if (HASH_NEED_RESIZE(pHashObj)) {
__wr_lock(&pHashObj->lock, pHashObj->type);
__wr_lock((void*) &pHashObj->lock, pHashObj->type);
taosHashTableResize(pHashObj);
__wr_unlock(&pHashObj->lock, pHashObj->type);
__wr_unlock((void*) &pHashObj->lock, pHashObj->type);
}
__rd_lock(&pHashObj->lock, pHashObj->type);
__rd_lock((void*) &pHashObj->lock, pHashObj->type);
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHashEntry *pe = pHashObj->hashList[slot];
......@@ -272,7 +270,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
}
// enable resize
__rd_unlock(&pHashObj->lock, pHashObj->type);
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
atomic_add_fetch_32(&pHashObj->size, 1);
return 0;
......@@ -289,7 +287,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
}
// enable resize
__rd_unlock(&pHashObj->lock, pHashObj->type);
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
return pHashObj->enableUpdate ? 0 : -2;
}
......@@ -308,14 +306,14 @@ void* taosHashGetCloneExt(SHashObj *pHashObj, const void *key, size_t keyLen, vo
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
// only add the read lock to disable the resize process
__rd_lock(&pHashObj->lock, pHashObj->type);
__rd_lock((void*) &pHashObj->lock, pHashObj->type);
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHashEntry *pe = pHashObj->hashList[slot];
// no data, return directly
if (atomic_load_32(&pe->num) == 0) {
__rd_unlock(&pHashObj->lock, pHashObj->type);
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
return NULL;
}
......@@ -358,7 +356,7 @@ void* taosHashGetCloneExt(SHashObj *pHashObj, const void *key, size_t keyLen, vo
taosRUnLockLatch(&pe->latch);
}
__rd_unlock(&pHashObj->lock, pHashObj->type);
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
return data;
}
......@@ -370,14 +368,14 @@ void* taosHashGetCloneImpl(SHashObj *pHashObj, const void *key, size_t keyLen, v
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
// only add the read lock to disable the resize process
__rd_lock(&pHashObj->lock, pHashObj->type);
__rd_lock((void*) &pHashObj->lock, pHashObj->type);
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHashEntry *pe = pHashObj->hashList[slot];
// no data, return directly
if (atomic_load_32(&pe->num) == 0) {
__rd_unlock(&pHashObj->lock, pHashObj->type);
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
return NULL;
}
......@@ -415,7 +413,7 @@ void* taosHashGetCloneImpl(SHashObj *pHashObj, const void *key, size_t keyLen, v
taosRUnLockLatch(&pe->latch);
}
__rd_unlock(&pHashObj->lock, pHashObj->type);
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
return data;
}
......@@ -436,7 +434,7 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen/*, voi
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
// disable the resize process
__rd_lock(&pHashObj->lock, pHashObj->type);
__rd_lock((void*) &pHashObj->lock, pHashObj->type);
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHashEntry *pe = pHashObj->hashList[slot];
......@@ -450,7 +448,7 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen/*, voi
assert(pe->next == NULL);
taosWUnLockLatch(&pe->latch);
__rd_unlock(&pHashObj->lock, pHashObj->type);
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
return -1;
}
......@@ -491,7 +489,7 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen/*, voi
taosWUnLockLatch(&pe->latch);
}
__rd_unlock(&pHashObj->lock, pHashObj->type);
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
return code;
}
......@@ -502,7 +500,7 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi
}
// disable the resize process
__rd_lock(&pHashObj->lock, pHashObj->type);
__rd_lock((void*) &pHashObj->lock, pHashObj->type);
int32_t numOfEntries = (int32_t)pHashObj->capacity;
for (int32_t i = 0; i < numOfEntries; ++i) {
......@@ -566,7 +564,7 @@ int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), voi
}
}
__rd_unlock(&pHashObj->lock, pHashObj->type);
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
return 0;
}
......@@ -577,7 +575,7 @@ void taosHashClear(SHashObj *pHashObj) {
SHashNode *pNode, *pNext;
__wr_lock(&pHashObj->lock, pHashObj->type);
__wr_lock((void*) &pHashObj->lock, pHashObj->type);
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
SHashEntry *pEntry = pHashObj->hashList[i];
......@@ -601,7 +599,7 @@ void taosHashClear(SHashObj *pHashObj) {
}
atomic_store_32(&pHashObj->size, 0);
__wr_unlock(&pHashObj->lock, pHashObj->type);
__wr_unlock((void*) &pHashObj->lock, pHashObj->type);
}
void taosHashCleanup(SHashObj *pHashObj) {
......@@ -864,7 +862,7 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) {
char *data = NULL;
// only add the read lock to disable the resize process
__rd_lock(&pHashObj->lock, pHashObj->type);
__rd_lock((void*) &pHashObj->lock, pHashObj->type);
SHashNode *pNode = NULL;
if (p) {
......@@ -911,7 +909,7 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) {
}
}
__rd_unlock(&pHashObj->lock, pHashObj->type);
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
return data;
}
......@@ -920,7 +918,7 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p) {
if (pHashObj == NULL || p == NULL) return;
// only add the read lock to disable the resize process
__rd_lock(&pHashObj->lock, pHashObj->type);
__rd_lock((void*) &pHashObj->lock, pHashObj->type);
int slot;
taosHashReleaseNode(pHashObj, p, &slot);
......@@ -930,7 +928,7 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p) {
taosWUnLockLatch(&pe->latch);
}
__rd_unlock(&pHashObj->lock, pHashObj->type);
__rd_unlock((void*) &pHashObj->lock, pHashObj->type);
}
void taosHashRelease(SHashObj *pHashObj, void *p) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册